This is an automated email from the ASF dual-hosted git repository.

yichi pushed a commit to branch release-2.39.0
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/release-2.39.0 by this push:
     new 1b5086ca77d Chery-pick pull request #17394: [BEAM-14014] Add parameter 
for service account impersonation in GCP credentials (#17597)
1b5086ca77d is described below

commit 1b5086ca77d6a667615224ddb865a85490d735da
Author: Yichi Zhang <[email protected]>
AuthorDate: Tue May 10 11:19:37 2022 -0700

    Chery-pick pull request #17394: [BEAM-14014] Add parameter for service 
account impersonation in GCP credentials (#17597)
    
    Co-authored-by: Kenn Knowles <[email protected]>
---
 .../examples/build.gradle                          | 54 ++++++++++++++++++----
 .../sdk/extensions/gcp/auth/CredentialFactory.java |  2 +
 .../extensions/gcp/auth/GcpCredentialFactory.java  | 51 +++++++++++++++++---
 .../sdk/extensions/gcp/options/GcpOptions.java     | 19 ++++++++
 4 files changed, 111 insertions(+), 15 deletions(-)

diff --git a/runners/google-cloud-dataflow-java/examples/build.gradle 
b/runners/google-cloud-dataflow-java/examples/build.gradle
index 21f80630897..36379f63cff 100644
--- a/runners/google-cloud-dataflow-java/examples/build.gradle
+++ b/runners/google-cloud-dataflow-java/examples/build.gradle
@@ -42,7 +42,28 @@ def dockerJavaImageName = 
project(':runners:google-cloud-dataflow-java').ext.doc
 // If -PuseExecutableStage is set, the use_executable_stage_bundle_execution 
wil be enabled.
 def fnapiExperiments = project.hasProperty('useExecutableStage') ? 
'beam_fn_api_use_deprecated_read,use_executable_stage_bundle_execution' : 
"beam_fn_api,beam_fn_api_use_deprecated_read"
 
-def commonConfig = { dataflowWorkerJar, workerHarnessContainerImage = '', 
additionalOptions = [] ->
+// For testing impersonation, we use three ingredients:
+//  - a principal to impersonate
+//  - a dataflow service account that only that principal is allowed to launch 
jobs as
+//  - a temp root that only the above two accounts have access to
+//
+// Jenkins and Dataflow workers both run as GCE default service account. So we 
remove that account from all the above.
+def impersonateServiceAccount = 
project.findProperty('gcpImpersonateServiceAccount') ?: 
'[email protected]'
+def dataflowWorkerImpersonationServiceAccount = 
project.findProperty('dataflowWorkerImpersonationServiceAccount') ?:
+        
"impersonation-dataflow-wor...@apache-beam-testing.iam.gserviceaccount.com"
+def impersonationTempRoot = project.findProperty('gcpImpersonationTempRoot') 
?: 'gs://impersonation-test-bucket/tmproot'
+
+
+def commonConfig = { Map args ->
+    if (!args.dataflowWorkerJar) {
+        throw new GradleException("Dataflow integration test configuration 
requires dataflowWorkerJar parameter")
+    }
+
+    def actualDataflowWorkerJar = args.dataflowWorkerJar
+    def actualWorkerHarnessContainerImage = args.workerHarnessContainerImage 
?: ''
+    def actualGcsTempRoot = args.gcsTempRoot ?: gcsTempRoot
+    def additionalOptions = args.additionalOptions ?: []
+
    // return the preevaluated configuration closure
    return {
        testClassesDirs = 
files(project(":examples:java").sourceSets.test.output.classesDirs)
@@ -54,10 +75,10 @@ def commonConfig = { dataflowWorkerJar, 
workerHarnessContainerImage = '', additi
        def preCommitBeamTestPipelineOptions = [
                "--project=${gcpProject}",
                "--region=${gcpRegion}",
-               "--tempRoot=${gcsTempRoot}",
+               "--tempRoot=${actualGcsTempRoot}",
                "--runner=TestDataflowRunner",
-               "--dataflowWorkerJar=${dataflowWorkerJar}",
-               "--workerHarnessContainerImage=${workerHarnessContainerImage}"
+               "--dataflowWorkerJar=${actualDataflowWorkerJar}",
+               
"--workerHarnessContainerImage=${actualWorkerHarnessContainerImage}"
                ] + additionalOptions
        systemProperty "beamTestPipelineOptions", 
JsonOutput.toJson(preCommitBeamTestPipelineOptions)
      }
@@ -66,14 +87,30 @@ def commonConfig = { dataflowWorkerJar, 
workerHarnessContainerImage = '', additi
 task preCommitLegacyWorker(type: Test) {
   dependsOn 
":runners:google-cloud-dataflow-java:worker:legacy-worker:shadowJar"
   def dataflowWorkerJar = project.findProperty('dataflowWorkerJar') ?: 
project(":runners:google-cloud-dataflow-java:worker:legacy-worker").shadowJar.archivePath
-  with commonConfig(dataflowWorkerJar)
+  with commonConfig(dataflowWorkerJar: dataflowWorkerJar)
+}
+
+task preCommitLegacyWorkerImpersonate(type: Test) {
+    dependsOn 
":runners:google-cloud-dataflow-java:worker:legacy-worker:shadowJar"
+    def dataflowWorkerJar = project.findProperty('dataflowWorkerJar') ?: 
project(":runners:google-cloud-dataflow-java:worker:legacy-worker").shadowJar.archivePath
+    with commonConfig(
+     dataflowWorkerJar: dataflowWorkerJar,
+         gcsTempRoot: impersonationTempRoot,
+         additionalOptions: [
+            "--impersonateServiceAccount=${impersonateServiceAccount}",
+            "--serviceAccount=${dataflowWorkerImpersonationServiceAccount}"
+    ])
 }
 
 task verifyFnApiWorker(type: Test) {
   dependsOn ":runners:google-cloud-dataflow-java:worker:shadowJar"
   dependsOn ":runners:google-cloud-dataflow-java:buildAndPushDockerContainer"
   def dataflowWorkerJar = project.findProperty('dataflowWorkerJar') ?: 
project(":runners:google-cloud-dataflow-java:worker").shadowJar.archivePath
-  with commonConfig(dataflowWorkerJar, dockerJavaImageName, 
["--experiments=${fnapiExperiments}"])
+  with commonConfig(
+          dataflowWorkerJar: dataflowWorkerJar,
+          workerHarnessContainerImage: dockerJavaImageName,
+          additionalOptions: ["--experiments=${fnapiExperiments}"]
+  )
   useJUnit {
     excludeCategories 'org.apache.beam.sdk.testing.StreamingIT'
   }
@@ -83,7 +120,7 @@ task postCommitLegacyWorkerJava11(type: Test) {
   dependsOn 
":runners:google-cloud-dataflow-java:worker:legacy-worker:shadowJar"
   def dataflowWorkerJar = project.findProperty('dataflowWorkerJar') ?: 
project(":runners:google-cloud-dataflow-java:worker:legacy-worker").shadowJar.archivePath
   systemProperty "java.specification.version", "11"
-  with commonConfig(dataflowWorkerJar)
+  with commonConfig(dataflowWorkerJar: dataflowWorkerJar)
 }
 
 task java11PostCommit() {
@@ -94,7 +131,7 @@ task postCommitLegacyWorkerJava17(type: Test) {
     dependsOn 
":runners:google-cloud-dataflow-java:worker:legacy-worker:shadowJar"
     def dataflowWorkerJar = project.findProperty('dataflowWorkerJar') ?: 
project(":runners:google-cloud-dataflow-java:worker:legacy-worker").shadowJar.archivePath
     systemProperty "java.specification.version", "17"
-    with commonConfig(dataflowWorkerJar)
+    with commonConfig(dataflowWorkerJar: dataflowWorkerJar)
 }
 
 task java17PostCommit() {
@@ -103,6 +140,7 @@ task java17PostCommit() {
 
 task preCommit() {
   dependsOn preCommitLegacyWorker
+  dependsOn preCommitLegacyWorkerImpersonate
 }
 
 task verifyPortabilityApi() {
diff --git 
a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/auth/CredentialFactory.java
 
b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/auth/CredentialFactory.java
index 3dc8afc5fe8..6e1e71dbb4e 100644
--- 
a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/auth/CredentialFactory.java
+++ 
b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/auth/CredentialFactory.java
@@ -20,8 +20,10 @@ package org.apache.beam.sdk.extensions.gcp.auth;
 import com.google.auth.Credentials;
 import java.io.IOException;
 import java.security.GeneralSecurityException;
+import org.checkerframework.checker.nullness.qual.Nullable;
 
 /** Construct an oauth credential to be used by the SDK and the SDK workers. */
 public interface CredentialFactory {
+  @Nullable
   Credentials getCredential() throws IOException, GeneralSecurityException;
 }
diff --git 
a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/auth/GcpCredentialFactory.java
 
b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/auth/GcpCredentialFactory.java
index e7193da1c6b..71685e3f1ab 100644
--- 
a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/auth/GcpCredentialFactory.java
+++ 
b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/auth/GcpCredentialFactory.java
@@ -17,20 +17,22 @@
  */
 package org.apache.beam.sdk.extensions.gcp.auth;
 
+import static 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+
 import com.google.auth.Credentials;
 import com.google.auth.oauth2.GoogleCredentials;
+import com.google.auth.oauth2.ImpersonatedCredentials;
 import java.io.IOException;
 import java.util.Arrays;
 import java.util.List;
+import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
 import org.apache.beam.sdk.options.PipelineOptions;
+import org.checkerframework.checker.nullness.qual.Nullable;
 
 /**
  * Construct an oauth credential to be used by the SDK and the SDK workers. 
Returns a GCP
  * credential.
  */
-@SuppressWarnings({
-  "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
-})
 public class GcpCredentialFactory implements CredentialFactory {
   /**
    * The scope cloud-platform provides access to all Cloud Platform resources. 
cloud-platform isn't
@@ -50,17 +52,52 @@ public class GcpCredentialFactory implements 
CredentialFactory {
           "https://www.googleapis.com/auth/bigquery.insertdata";,
           "https://www.googleapis.com/auth/pubsub";);
 
-  private static final GcpCredentialFactory INSTANCE = new 
GcpCredentialFactory();
+  // If non-null, a list of service account emails to be used as an 
impersonation chain.
+  private @Nullable List<String> impersonateServiceAccountChain;
+
+  private GcpCredentialFactory(@Nullable List<String> 
impersonateServiceAccountChain) {
+    if (impersonateServiceAccountChain != null) {
+      checkArgument(impersonateServiceAccountChain.size() > 0);
+    }
+
+    this.impersonateServiceAccountChain = impersonateServiceAccountChain;
+  }
 
   public static GcpCredentialFactory fromOptions(PipelineOptions options) {
-    return INSTANCE;
+    @Nullable
+    String impersonateServiceAccountArg =
+        options.as(GcpOptions.class).getImpersonateServiceAccount();
+
+    @Nullable
+    List<String> impersonateServiceAccountChain =
+        impersonateServiceAccountArg == null
+            ? null
+            : Arrays.asList(impersonateServiceAccountArg.split(","));
+
+    return new GcpCredentialFactory(impersonateServiceAccountChain);
   }
 
   /** Returns a default GCP {@link Credentials} or null when it fails. */
   @Override
-  public Credentials getCredential() {
+  public @Nullable Credentials getCredential() {
     try {
-      return GoogleCredentials.getApplicationDefault().createScoped(SCOPES);
+      GoogleCredentials applicationDefaultCredentials =
+          GoogleCredentials.getApplicationDefault().createScoped(SCOPES);
+
+      if (impersonateServiceAccountChain == null) {
+        return applicationDefaultCredentials;
+      } else {
+        String targetPrincipal =
+            
impersonateServiceAccountChain.get(impersonateServiceAccountChain.size() - 1);
+        List<String> delegationChain =
+            impersonateServiceAccountChain.subList(0, 
impersonateServiceAccountChain.size() - 1);
+
+        GoogleCredentials impersonationCredentials =
+            ImpersonatedCredentials.create(
+                applicationDefaultCredentials, targetPrincipal, 
delegationChain, SCOPES, 0);
+
+        return impersonationCredentials;
+      }
     } catch (IOException e) {
       // Ignore the exception
       // Pipelines that only access to public data should be able to run 
without credentials.
diff --git 
a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcpOptions.java
 
b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcpOptions.java
index f593e2f5c9d..0f1afec29be 100644
--- 
a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcpOptions.java
+++ 
b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcpOptions.java
@@ -169,6 +169,25 @@ public interface GcpOptions extends GoogleApiDebugOptions, 
PipelineOptions {
 
   void setGcpCredential(Credentials value);
 
+  /**
+   * All API requests will be made as the given service account or target 
service account in an
+   * impersonation delegation chain instead of the currently selected account. 
You can specify
+   * either a single service account as the impersonator, or a comma-separated 
list of service
+   * accounts to create an impersonation delegation chain.
+   */
+  @Description(
+      "All API requests will be made as the given service account or"
+          + " target service account in an impersonation delegation chain"
+          + " instead of the currently selected account. You can specify"
+          + " either a single service account as the impersonator, or a"
+          + " comma-separated list of service accounts to create an"
+          + " impersonation delegation chain.")
+  @JsonIgnore
+  @Nullable
+  String getImpersonateServiceAccount();
+
+  void setImpersonateServiceAccount(String impersonateServiceAccount);
+
   /** Experiment to turn on the Streaming Engine experiment. */
   String STREAMING_ENGINE_EXPERIMENT = "enable_streaming_engine";
 

Reply via email to