This is an automated email from the ASF dual-hosted git repository.
yichi pushed a commit to branch release-2.39.0
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/release-2.39.0 by this push:
new 1b5086ca77d Chery-pick pull request #17394: [BEAM-14014] Add parameter
for service account impersonation in GCP credentials (#17597)
1b5086ca77d is described below
commit 1b5086ca77d6a667615224ddb865a85490d735da
Author: Yichi Zhang <[email protected]>
AuthorDate: Tue May 10 11:19:37 2022 -0700
Chery-pick pull request #17394: [BEAM-14014] Add parameter for service
account impersonation in GCP credentials (#17597)
Co-authored-by: Kenn Knowles <[email protected]>
---
.../examples/build.gradle | 54 ++++++++++++++++++----
.../sdk/extensions/gcp/auth/CredentialFactory.java | 2 +
.../extensions/gcp/auth/GcpCredentialFactory.java | 51 +++++++++++++++++---
.../sdk/extensions/gcp/options/GcpOptions.java | 19 ++++++++
4 files changed, 111 insertions(+), 15 deletions(-)
diff --git a/runners/google-cloud-dataflow-java/examples/build.gradle
b/runners/google-cloud-dataflow-java/examples/build.gradle
index 21f80630897..36379f63cff 100644
--- a/runners/google-cloud-dataflow-java/examples/build.gradle
+++ b/runners/google-cloud-dataflow-java/examples/build.gradle
@@ -42,7 +42,28 @@ def dockerJavaImageName =
project(':runners:google-cloud-dataflow-java').ext.doc
// If -PuseExecutableStage is set, the use_executable_stage_bundle_execution
wil be enabled.
def fnapiExperiments = project.hasProperty('useExecutableStage') ?
'beam_fn_api_use_deprecated_read,use_executable_stage_bundle_execution' :
"beam_fn_api,beam_fn_api_use_deprecated_read"
-def commonConfig = { dataflowWorkerJar, workerHarnessContainerImage = '',
additionalOptions = [] ->
+// For testing impersonation, we use three ingredients:
+// - a principal to impersonate
+// - a dataflow service account that only that principal is allowed to launch
jobs as
+// - a temp root that only the above two accounts have access to
+//
+// Jenkins and Dataflow workers both run as GCE default service account. So we
remove that account from all the above.
+def impersonateServiceAccount =
project.findProperty('gcpImpersonateServiceAccount') ?:
'[email protected]'
+def dataflowWorkerImpersonationServiceAccount =
project.findProperty('dataflowWorkerImpersonationServiceAccount') ?:
+
"impersonation-dataflow-wor...@apache-beam-testing.iam.gserviceaccount.com"
+def impersonationTempRoot = project.findProperty('gcpImpersonationTempRoot')
?: 'gs://impersonation-test-bucket/tmproot'
+
+
+def commonConfig = { Map args ->
+ if (!args.dataflowWorkerJar) {
+ throw new GradleException("Dataflow integration test configuration
requires dataflowWorkerJar parameter")
+ }
+
+ def actualDataflowWorkerJar = args.dataflowWorkerJar
+ def actualWorkerHarnessContainerImage = args.workerHarnessContainerImage
?: ''
+ def actualGcsTempRoot = args.gcsTempRoot ?: gcsTempRoot
+ def additionalOptions = args.additionalOptions ?: []
+
// return the preevaluated configuration closure
return {
testClassesDirs =
files(project(":examples:java").sourceSets.test.output.classesDirs)
@@ -54,10 +75,10 @@ def commonConfig = { dataflowWorkerJar,
workerHarnessContainerImage = '', additi
def preCommitBeamTestPipelineOptions = [
"--project=${gcpProject}",
"--region=${gcpRegion}",
- "--tempRoot=${gcsTempRoot}",
+ "--tempRoot=${actualGcsTempRoot}",
"--runner=TestDataflowRunner",
- "--dataflowWorkerJar=${dataflowWorkerJar}",
- "--workerHarnessContainerImage=${workerHarnessContainerImage}"
+ "--dataflowWorkerJar=${actualDataflowWorkerJar}",
+
"--workerHarnessContainerImage=${actualWorkerHarnessContainerImage}"
] + additionalOptions
systemProperty "beamTestPipelineOptions",
JsonOutput.toJson(preCommitBeamTestPipelineOptions)
}
@@ -66,14 +87,30 @@ def commonConfig = { dataflowWorkerJar,
workerHarnessContainerImage = '', additi
task preCommitLegacyWorker(type: Test) {
dependsOn
":runners:google-cloud-dataflow-java:worker:legacy-worker:shadowJar"
def dataflowWorkerJar = project.findProperty('dataflowWorkerJar') ?:
project(":runners:google-cloud-dataflow-java:worker:legacy-worker").shadowJar.archivePath
- with commonConfig(dataflowWorkerJar)
+ with commonConfig(dataflowWorkerJar: dataflowWorkerJar)
+}
+
+task preCommitLegacyWorkerImpersonate(type: Test) {
+ dependsOn
":runners:google-cloud-dataflow-java:worker:legacy-worker:shadowJar"
+ def dataflowWorkerJar = project.findProperty('dataflowWorkerJar') ?:
project(":runners:google-cloud-dataflow-java:worker:legacy-worker").shadowJar.archivePath
+ with commonConfig(
+ dataflowWorkerJar: dataflowWorkerJar,
+ gcsTempRoot: impersonationTempRoot,
+ additionalOptions: [
+ "--impersonateServiceAccount=${impersonateServiceAccount}",
+ "--serviceAccount=${dataflowWorkerImpersonationServiceAccount}"
+ ])
}
task verifyFnApiWorker(type: Test) {
dependsOn ":runners:google-cloud-dataflow-java:worker:shadowJar"
dependsOn ":runners:google-cloud-dataflow-java:buildAndPushDockerContainer"
def dataflowWorkerJar = project.findProperty('dataflowWorkerJar') ?:
project(":runners:google-cloud-dataflow-java:worker").shadowJar.archivePath
- with commonConfig(dataflowWorkerJar, dockerJavaImageName,
["--experiments=${fnapiExperiments}"])
+ with commonConfig(
+ dataflowWorkerJar: dataflowWorkerJar,
+ workerHarnessContainerImage: dockerJavaImageName,
+ additionalOptions: ["--experiments=${fnapiExperiments}"]
+ )
useJUnit {
excludeCategories 'org.apache.beam.sdk.testing.StreamingIT'
}
@@ -83,7 +120,7 @@ task postCommitLegacyWorkerJava11(type: Test) {
dependsOn
":runners:google-cloud-dataflow-java:worker:legacy-worker:shadowJar"
def dataflowWorkerJar = project.findProperty('dataflowWorkerJar') ?:
project(":runners:google-cloud-dataflow-java:worker:legacy-worker").shadowJar.archivePath
systemProperty "java.specification.version", "11"
- with commonConfig(dataflowWorkerJar)
+ with commonConfig(dataflowWorkerJar: dataflowWorkerJar)
}
task java11PostCommit() {
@@ -94,7 +131,7 @@ task postCommitLegacyWorkerJava17(type: Test) {
dependsOn
":runners:google-cloud-dataflow-java:worker:legacy-worker:shadowJar"
def dataflowWorkerJar = project.findProperty('dataflowWorkerJar') ?:
project(":runners:google-cloud-dataflow-java:worker:legacy-worker").shadowJar.archivePath
systemProperty "java.specification.version", "17"
- with commonConfig(dataflowWorkerJar)
+ with commonConfig(dataflowWorkerJar: dataflowWorkerJar)
}
task java17PostCommit() {
@@ -103,6 +140,7 @@ task java17PostCommit() {
task preCommit() {
dependsOn preCommitLegacyWorker
+ dependsOn preCommitLegacyWorkerImpersonate
}
task verifyPortabilityApi() {
diff --git
a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/auth/CredentialFactory.java
b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/auth/CredentialFactory.java
index 3dc8afc5fe8..6e1e71dbb4e 100644
---
a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/auth/CredentialFactory.java
+++
b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/auth/CredentialFactory.java
@@ -20,8 +20,10 @@ package org.apache.beam.sdk.extensions.gcp.auth;
import com.google.auth.Credentials;
import java.io.IOException;
import java.security.GeneralSecurityException;
+import org.checkerframework.checker.nullness.qual.Nullable;
/** Construct an oauth credential to be used by the SDK and the SDK workers. */
public interface CredentialFactory {
+ @Nullable
Credentials getCredential() throws IOException, GeneralSecurityException;
}
diff --git
a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/auth/GcpCredentialFactory.java
b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/auth/GcpCredentialFactory.java
index e7193da1c6b..71685e3f1ab 100644
---
a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/auth/GcpCredentialFactory.java
+++
b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/auth/GcpCredentialFactory.java
@@ -17,20 +17,22 @@
*/
package org.apache.beam.sdk.extensions.gcp.auth;
+import static
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+
import com.google.auth.Credentials;
import com.google.auth.oauth2.GoogleCredentials;
+import com.google.auth.oauth2.ImpersonatedCredentials;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
+import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
import org.apache.beam.sdk.options.PipelineOptions;
+import org.checkerframework.checker.nullness.qual.Nullable;
/**
* Construct an oauth credential to be used by the SDK and the SDK workers.
Returns a GCP
* credential.
*/
-@SuppressWarnings({
- "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
-})
public class GcpCredentialFactory implements CredentialFactory {
/**
* The scope cloud-platform provides access to all Cloud Platform resources.
cloud-platform isn't
@@ -50,17 +52,52 @@ public class GcpCredentialFactory implements
CredentialFactory {
"https://www.googleapis.com/auth/bigquery.insertdata",
"https://www.googleapis.com/auth/pubsub");
- private static final GcpCredentialFactory INSTANCE = new
GcpCredentialFactory();
+ // If non-null, a list of service account emails to be used as an
impersonation chain.
+ private @Nullable List<String> impersonateServiceAccountChain;
+
+ private GcpCredentialFactory(@Nullable List<String>
impersonateServiceAccountChain) {
+ if (impersonateServiceAccountChain != null) {
+ checkArgument(impersonateServiceAccountChain.size() > 0);
+ }
+
+ this.impersonateServiceAccountChain = impersonateServiceAccountChain;
+ }
public static GcpCredentialFactory fromOptions(PipelineOptions options) {
- return INSTANCE;
+ @Nullable
+ String impersonateServiceAccountArg =
+ options.as(GcpOptions.class).getImpersonateServiceAccount();
+
+ @Nullable
+ List<String> impersonateServiceAccountChain =
+ impersonateServiceAccountArg == null
+ ? null
+ : Arrays.asList(impersonateServiceAccountArg.split(","));
+
+ return new GcpCredentialFactory(impersonateServiceAccountChain);
}
/** Returns a default GCP {@link Credentials} or null when it fails. */
@Override
- public Credentials getCredential() {
+ public @Nullable Credentials getCredential() {
try {
- return GoogleCredentials.getApplicationDefault().createScoped(SCOPES);
+ GoogleCredentials applicationDefaultCredentials =
+ GoogleCredentials.getApplicationDefault().createScoped(SCOPES);
+
+ if (impersonateServiceAccountChain == null) {
+ return applicationDefaultCredentials;
+ } else {
+ String targetPrincipal =
+
impersonateServiceAccountChain.get(impersonateServiceAccountChain.size() - 1);
+ List<String> delegationChain =
+ impersonateServiceAccountChain.subList(0,
impersonateServiceAccountChain.size() - 1);
+
+ GoogleCredentials impersonationCredentials =
+ ImpersonatedCredentials.create(
+ applicationDefaultCredentials, targetPrincipal,
delegationChain, SCOPES, 0);
+
+ return impersonationCredentials;
+ }
} catch (IOException e) {
// Ignore the exception
// Pipelines that only access to public data should be able to run
without credentials.
diff --git
a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcpOptions.java
b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcpOptions.java
index f593e2f5c9d..0f1afec29be 100644
---
a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcpOptions.java
+++
b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcpOptions.java
@@ -169,6 +169,25 @@ public interface GcpOptions extends GoogleApiDebugOptions,
PipelineOptions {
void setGcpCredential(Credentials value);
+ /**
+ * All API requests will be made as the given service account or target
service account in an
+ * impersonation delegation chain instead of the currently selected account.
You can specify
+ * either a single service account as the impersonator, or a comma-separated
list of service
+ * accounts to create an impersonation delegation chain.
+ */
+ @Description(
+ "All API requests will be made as the given service account or"
+ + " target service account in an impersonation delegation chain"
+ + " instead of the currently selected account. You can specify"
+ + " either a single service account as the impersonator, or a"
+ + " comma-separated list of service accounts to create an"
+ + " impersonation delegation chain.")
+ @JsonIgnore
+ @Nullable
+ String getImpersonateServiceAccount();
+
+ void setImpersonateServiceAccount(String impersonateServiceAccount);
+
/** Experiment to turn on the Streaming Engine experiment. */
String STREAMING_ENGINE_EXPERIMENT = "enable_streaming_engine";