This is an automated email from the ASF dual-hosted git repository. yhu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new 6c7bbdef7d9 Correctly override apache/beam containers for RC on Dataflow runner job submission (#36199) 6c7bbdef7d9 is described below commit 6c7bbdef7d95d42f755bdfc4771d942e634cedc4 Author: Yi Hu <ya...@google.com> AuthorDate: Wed Sep 24 11:00:52 2025 -0400 Correctly override apache/beam containers for RC on Dataflow runner job submission (#36199) --- .../beam/runners/dataflow/DataflowRunner.java | 22 ++++- .../beam/runners/dataflow/DataflowRunnerTest.java | 96 ++++++++-------------- 2 files changed, 53 insertions(+), 65 deletions(-) diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java index d25a37e92dc..de6a039b707 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java @@ -1040,9 +1040,12 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> { && !updated // don't update if the container image is already configured by DataflowRunner && !containerImage.equals(getContainerImageForJob(options))) { + String imageAndTag = + normalizeDataflowImageAndTag( + containerImage.substring(containerImage.lastIndexOf("/"))); containerImage = DataflowRunnerInfo.getDataflowRunnerInfo().getContainerImageBaseRepository() - + containerImage.substring(containerImage.lastIndexOf("/")); + + imageAndTag; } environmentBuilder.setPayload( RunnerApi.DockerPayload.newBuilder() @@ -1055,6 +1058,23 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> { return pipelineBuilder.build(); } + static String normalizeDataflowImageAndTag(String imageAndTag) { + if (imageAndTag.startsWith("/beam_java") + || imageAndTag.startsWith("/beam_python") + || imageAndTag.startsWith("/beam_go_")) { + int tagIdx = imageAndTag.lastIndexOf(":"); + if (tagIdx > 0) { + // For release candidates, apache/beam_ images has rc tag while Dataflow does not + String tag = imageAndTag.substring(tagIdx); // e,g, ":2.xx.0rc1" + int mayRc = tag.toLowerCase().lastIndexOf("rc"); + if (mayRc > 0) { + imageAndTag = imageAndTag.substring(0, tagIdx) + tag.substring(0, mayRc); + } + } + } + return imageAndTag; + } + @VisibleForTesting protected RunnerApi.Pipeline resolveArtifacts(RunnerApi.Pipeline pipeline) { RunnerApi.Pipeline.Builder pipelineBuilder = pipeline.toBuilder(); diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java index c9bd50da0a5..db8fbd525ac 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java @@ -1224,6 +1224,23 @@ public class DataflowRunnerTest implements Serializable { DataflowRunner.fromOptions(options); } + private static RunnerApi.Pipeline containerUrlToPipeline(String url) { + return RunnerApi.Pipeline.newBuilder() + .setComponents( + RunnerApi.Components.newBuilder() + .putEnvironments( + "env", + RunnerApi.Environment.newBuilder() + .setUrn(BeamUrns.getUrn(RunnerApi.StandardEnvironments.Environments.DOCKER)) + .setPayload( + RunnerApi.DockerPayload.newBuilder() + .setContainerImage(url) + .build() + .toByteString()) + .build())) + .build(); + } + @Test public void testApplySdkEnvironmentOverrides() throws IOException { DataflowPipelineOptions options = buildPipelineOptions(); @@ -1231,38 +1248,8 @@ public class DataflowRunnerTest implements Serializable { String gcrPythonContainerUrl = "gcr.io/apache-beam-testing/beam-sdk/beam_python3.9_sdk:latest"; options.setSdkHarnessContainerImageOverrides(".*python.*," + gcrPythonContainerUrl); DataflowRunner runner = DataflowRunner.fromOptions(options); - RunnerApi.Pipeline pipeline = - RunnerApi.Pipeline.newBuilder() - .setComponents( - RunnerApi.Components.newBuilder() - .putEnvironments( - "env", - RunnerApi.Environment.newBuilder() - .setUrn( - BeamUrns.getUrn(RunnerApi.StandardEnvironments.Environments.DOCKER)) - .setPayload( - RunnerApi.DockerPayload.newBuilder() - .setContainerImage(dockerHubPythonContainerUrl) - .build() - .toByteString()) - .build())) - .build(); - RunnerApi.Pipeline expectedPipeline = - RunnerApi.Pipeline.newBuilder() - .setComponents( - RunnerApi.Components.newBuilder() - .putEnvironments( - "env", - RunnerApi.Environment.newBuilder() - .setUrn( - BeamUrns.getUrn(RunnerApi.StandardEnvironments.Environments.DOCKER)) - .setPayload( - RunnerApi.DockerPayload.newBuilder() - .setContainerImage(gcrPythonContainerUrl) - .build() - .toByteString()) - .build())) - .build(); + RunnerApi.Pipeline pipeline = containerUrlToPipeline(dockerHubPythonContainerUrl); + RunnerApi.Pipeline expectedPipeline = containerUrlToPipeline(gcrPythonContainerUrl); assertThat(runner.applySdkEnvironmentOverrides(pipeline, options), equalTo(expectedPipeline)); } @@ -1272,38 +1259,19 @@ public class DataflowRunnerTest implements Serializable { String dockerHubPythonContainerUrl = "apache/beam_python3.9_sdk:latest"; String gcrPythonContainerUrl = "gcr.io/cloud-dataflow/v1beta3/beam_python3.9_sdk:latest"; DataflowRunner runner = DataflowRunner.fromOptions(options); - RunnerApi.Pipeline pipeline = - RunnerApi.Pipeline.newBuilder() - .setComponents( - RunnerApi.Components.newBuilder() - .putEnvironments( - "env", - RunnerApi.Environment.newBuilder() - .setUrn( - BeamUrns.getUrn(RunnerApi.StandardEnvironments.Environments.DOCKER)) - .setPayload( - RunnerApi.DockerPayload.newBuilder() - .setContainerImage(dockerHubPythonContainerUrl) - .build() - .toByteString()) - .build())) - .build(); - RunnerApi.Pipeline expectedPipeline = - RunnerApi.Pipeline.newBuilder() - .setComponents( - RunnerApi.Components.newBuilder() - .putEnvironments( - "env", - RunnerApi.Environment.newBuilder() - .setUrn( - BeamUrns.getUrn(RunnerApi.StandardEnvironments.Environments.DOCKER)) - .setPayload( - RunnerApi.DockerPayload.newBuilder() - .setContainerImage(gcrPythonContainerUrl) - .build() - .toByteString()) - .build())) - .build(); + RunnerApi.Pipeline pipeline = containerUrlToPipeline(dockerHubPythonContainerUrl); + RunnerApi.Pipeline expectedPipeline = containerUrlToPipeline(gcrPythonContainerUrl); + assertThat(runner.applySdkEnvironmentOverrides(pipeline, options), equalTo(expectedPipeline)); + } + + @Test + public void testApplySdkEnvironmentOverridesRcByDefault() throws IOException { + DataflowPipelineOptions options = buildPipelineOptions(); + String dockerHubPythonContainerUrl = "apache/beam_python3.9_sdk:2.68.0rc2"; + String gcrPythonContainerUrl = "gcr.io/cloud-dataflow/v1beta3/beam_python3.9_sdk:2.68.0"; + DataflowRunner runner = DataflowRunner.fromOptions(options); + RunnerApi.Pipeline pipeline = containerUrlToPipeline(dockerHubPythonContainerUrl); + RunnerApi.Pipeline expectedPipeline = containerUrlToPipeline(gcrPythonContainerUrl); assertThat(runner.applySdkEnvironmentOverrides(pipeline, options), equalTo(expectedPipeline)); }