This is an automated email from the ASF dual-hosted git repository.

yhu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 6c7bbdef7d9 Correctly override apache/beam containers for RC on 
Dataflow runner job submission (#36199)
6c7bbdef7d9 is described below

commit 6c7bbdef7d95d42f755bdfc4771d942e634cedc4
Author: Yi Hu <ya...@google.com>
AuthorDate: Wed Sep 24 11:00:52 2025 -0400

    Correctly override apache/beam containers for RC on Dataflow runner job 
submission (#36199)
---
 .../beam/runners/dataflow/DataflowRunner.java      | 22 ++++-
 .../beam/runners/dataflow/DataflowRunnerTest.java  | 96 ++++++++--------------
 2 files changed, 53 insertions(+), 65 deletions(-)

diff --git 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
index d25a37e92dc..de6a039b707 100644
--- 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
+++ 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
@@ -1040,9 +1040,12 @@ public class DataflowRunner extends 
PipelineRunner<DataflowPipelineJob> {
             && !updated
             // don't update if the container image is already configured by 
DataflowRunner
             && !containerImage.equals(getContainerImageForJob(options))) {
+          String imageAndTag =
+              normalizeDataflowImageAndTag(
+                  containerImage.substring(containerImage.lastIndexOf("/")));
           containerImage =
               
DataflowRunnerInfo.getDataflowRunnerInfo().getContainerImageBaseRepository()
-                  + containerImage.substring(containerImage.lastIndexOf("/"));
+                  + imageAndTag;
         }
         environmentBuilder.setPayload(
             RunnerApi.DockerPayload.newBuilder()
@@ -1055,6 +1058,23 @@ public class DataflowRunner extends 
PipelineRunner<DataflowPipelineJob> {
     return pipelineBuilder.build();
   }
 
+  static String normalizeDataflowImageAndTag(String imageAndTag) {
+    if (imageAndTag.startsWith("/beam_java")
+        || imageAndTag.startsWith("/beam_python")
+        || imageAndTag.startsWith("/beam_go_")) {
+      int tagIdx = imageAndTag.lastIndexOf(":");
+      if (tagIdx > 0) {
+        // For release candidates, apache/beam_ images has rc tag while 
Dataflow does not
+        String tag = imageAndTag.substring(tagIdx); // e,g, ":2.xx.0rc1"
+        int mayRc = tag.toLowerCase().lastIndexOf("rc");
+        if (mayRc > 0) {
+          imageAndTag = imageAndTag.substring(0, tagIdx) + tag.substring(0, 
mayRc);
+        }
+      }
+    }
+    return imageAndTag;
+  }
+
   @VisibleForTesting
   protected RunnerApi.Pipeline resolveArtifacts(RunnerApi.Pipeline pipeline) {
     RunnerApi.Pipeline.Builder pipelineBuilder = pipeline.toBuilder();
diff --git 
a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
 
b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
index c9bd50da0a5..db8fbd525ac 100644
--- 
a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
+++ 
b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
@@ -1224,6 +1224,23 @@ public class DataflowRunnerTest implements Serializable {
     DataflowRunner.fromOptions(options);
   }
 
+  private static RunnerApi.Pipeline containerUrlToPipeline(String url) {
+    return RunnerApi.Pipeline.newBuilder()
+        .setComponents(
+            RunnerApi.Components.newBuilder()
+                .putEnvironments(
+                    "env",
+                    RunnerApi.Environment.newBuilder()
+                        
.setUrn(BeamUrns.getUrn(RunnerApi.StandardEnvironments.Environments.DOCKER))
+                        .setPayload(
+                            RunnerApi.DockerPayload.newBuilder()
+                                .setContainerImage(url)
+                                .build()
+                                .toByteString())
+                        .build()))
+        .build();
+  }
+
   @Test
   public void testApplySdkEnvironmentOverrides() throws IOException {
     DataflowPipelineOptions options = buildPipelineOptions();
@@ -1231,38 +1248,8 @@ public class DataflowRunnerTest implements Serializable {
     String gcrPythonContainerUrl = 
"gcr.io/apache-beam-testing/beam-sdk/beam_python3.9_sdk:latest";
     options.setSdkHarnessContainerImageOverrides(".*python.*," + 
gcrPythonContainerUrl);
     DataflowRunner runner = DataflowRunner.fromOptions(options);
-    RunnerApi.Pipeline pipeline =
-        RunnerApi.Pipeline.newBuilder()
-            .setComponents(
-                RunnerApi.Components.newBuilder()
-                    .putEnvironments(
-                        "env",
-                        RunnerApi.Environment.newBuilder()
-                            .setUrn(
-                                
BeamUrns.getUrn(RunnerApi.StandardEnvironments.Environments.DOCKER))
-                            .setPayload(
-                                RunnerApi.DockerPayload.newBuilder()
-                                    
.setContainerImage(dockerHubPythonContainerUrl)
-                                    .build()
-                                    .toByteString())
-                            .build()))
-            .build();
-    RunnerApi.Pipeline expectedPipeline =
-        RunnerApi.Pipeline.newBuilder()
-            .setComponents(
-                RunnerApi.Components.newBuilder()
-                    .putEnvironments(
-                        "env",
-                        RunnerApi.Environment.newBuilder()
-                            .setUrn(
-                                
BeamUrns.getUrn(RunnerApi.StandardEnvironments.Environments.DOCKER))
-                            .setPayload(
-                                RunnerApi.DockerPayload.newBuilder()
-                                    .setContainerImage(gcrPythonContainerUrl)
-                                    .build()
-                                    .toByteString())
-                            .build()))
-            .build();
+    RunnerApi.Pipeline pipeline = 
containerUrlToPipeline(dockerHubPythonContainerUrl);
+    RunnerApi.Pipeline expectedPipeline = 
containerUrlToPipeline(gcrPythonContainerUrl);
     assertThat(runner.applySdkEnvironmentOverrides(pipeline, options), 
equalTo(expectedPipeline));
   }
 
@@ -1272,38 +1259,19 @@ public class DataflowRunnerTest implements Serializable 
{
     String dockerHubPythonContainerUrl = "apache/beam_python3.9_sdk:latest";
     String gcrPythonContainerUrl = 
"gcr.io/cloud-dataflow/v1beta3/beam_python3.9_sdk:latest";
     DataflowRunner runner = DataflowRunner.fromOptions(options);
-    RunnerApi.Pipeline pipeline =
-        RunnerApi.Pipeline.newBuilder()
-            .setComponents(
-                RunnerApi.Components.newBuilder()
-                    .putEnvironments(
-                        "env",
-                        RunnerApi.Environment.newBuilder()
-                            .setUrn(
-                                
BeamUrns.getUrn(RunnerApi.StandardEnvironments.Environments.DOCKER))
-                            .setPayload(
-                                RunnerApi.DockerPayload.newBuilder()
-                                    
.setContainerImage(dockerHubPythonContainerUrl)
-                                    .build()
-                                    .toByteString())
-                            .build()))
-            .build();
-    RunnerApi.Pipeline expectedPipeline =
-        RunnerApi.Pipeline.newBuilder()
-            .setComponents(
-                RunnerApi.Components.newBuilder()
-                    .putEnvironments(
-                        "env",
-                        RunnerApi.Environment.newBuilder()
-                            .setUrn(
-                                
BeamUrns.getUrn(RunnerApi.StandardEnvironments.Environments.DOCKER))
-                            .setPayload(
-                                RunnerApi.DockerPayload.newBuilder()
-                                    .setContainerImage(gcrPythonContainerUrl)
-                                    .build()
-                                    .toByteString())
-                            .build()))
-            .build();
+    RunnerApi.Pipeline pipeline = 
containerUrlToPipeline(dockerHubPythonContainerUrl);
+    RunnerApi.Pipeline expectedPipeline = 
containerUrlToPipeline(gcrPythonContainerUrl);
+    assertThat(runner.applySdkEnvironmentOverrides(pipeline, options), 
equalTo(expectedPipeline));
+  }
+
+  @Test
+  public void testApplySdkEnvironmentOverridesRcByDefault() throws IOException 
{
+    DataflowPipelineOptions options = buildPipelineOptions();
+    String dockerHubPythonContainerUrl = "apache/beam_python3.9_sdk:2.68.0rc2";
+    String gcrPythonContainerUrl = 
"gcr.io/cloud-dataflow/v1beta3/beam_python3.9_sdk:2.68.0";
+    DataflowRunner runner = DataflowRunner.fromOptions(options);
+    RunnerApi.Pipeline pipeline = 
containerUrlToPipeline(dockerHubPythonContainerUrl);
+    RunnerApi.Pipeline expectedPipeline = 
containerUrlToPipeline(gcrPythonContainerUrl);
     assertThat(runner.applySdkEnvironmentOverrides(pipeline, options), 
equalTo(expectedPipeline));
   }
 

Reply via email to