Repository: beam Updated Branches: refs/heads/master 3711c0caf -> 844e53e34
DataflowRunner: automatically determine container image type Even if user supplies a base container image policy. Of course, preserve the ability of the user to fully override the image. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/6b7593b0 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/6b7593b0 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/6b7593b0 Branch: refs/heads/master Commit: 6b7593b0a3cd0514145bb6002fff0958a6630303 Parents: 3711c0c Author: Dan Halperin <[email protected]> Authored: Mon Apr 3 09:25:57 2017 -0700 Committer: Dan Halperin <[email protected]> Committed: Mon Apr 3 15:58:07 2017 -0700 ---------------------------------------------------------------------- .../beam/runners/dataflow/DataflowRunner.java | 18 ++++++++++---- .../options/DataflowPipelineDebugOptions.java | 2 +- .../DataflowPipelineWorkerPoolOptions.java | 10 +------- .../runners/dataflow/DataflowRunnerTest.java | 25 ++++++++++++++++++++ 4 files changed, 41 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/6b7593b0/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java index ea96ae8..6eec8f8 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java @@ -68,7 +68,6 @@ import org.apache.beam.runners.dataflow.DataflowPipelineTranslator.JobSpecificat import org.apache.beam.runners.dataflow.StreamingViewOverrides.StreamingCreatePCollectionViewFactory; import org.apache.beam.runners.dataflow.options.DataflowPipelineDebugOptions; import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions; -import org.apache.beam.runners.dataflow.options.DataflowPipelineWorkerPoolOptions; import org.apache.beam.runners.dataflow.util.DataflowTemplateJob; import org.apache.beam.runners.dataflow.util.DataflowTransport; import org.apache.beam.runners.dataflow.util.MonitoringUtil; @@ -543,9 +542,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> { // Set the Docker container image that executes Dataflow worker harness, residing in Google // Container Registry. Translator is guaranteed to create a worker pool prior to this point. - String workerHarnessContainerImage = - options.as(DataflowPipelineWorkerPoolOptions.class) - .getWorkerHarnessContainerImage(); + String workerHarnessContainerImage = getContainerImageForJob(options); for (WorkerPool workerPool : newJob.getEnvironment().getWorkerPools()) { workerPool.setWorkerHarnessContainerImage(workerHarnessContainerImage); } @@ -1341,4 +1338,17 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> { } } + @VisibleForTesting + static String getContainerImageForJob(DataflowPipelineOptions options) { + String workerHarnessContainerImage = options.getWorkerHarnessContainerImage(); + if (!workerHarnessContainerImage.contains("IMAGE")) { + return workerHarnessContainerImage; + } else if (hasExperiment(options, "beam_fn_api")) { + return workerHarnessContainerImage.replace("IMAGE", "java"); + } else if (options.isStreaming()) { + return workerHarnessContainerImage.replace("IMAGE", "beam-java-streaming"); + } else { + return workerHarnessContainerImage.replace("IMAGE", "beam-java-batch"); + } + } } http://git-wip-us.apache.org/repos/asf/beam/blob/6b7593b0/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineDebugOptions.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineDebugOptions.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineDebugOptions.java index 729bca4..d0ea722 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineDebugOptions.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineDebugOptions.java @@ -56,7 +56,7 @@ public interface DataflowPipelineDebugOptions extends PipelineOptions { @Experimental @Nullable List<String> getExperiments(); - void setExperiments(List<String> value); + void setExperiments(@Nullable List<String> value); /** * The root URL for the Dataflow API. {@code dataflowEndpoint} can override this value http://git-wip-us.apache.org/repos/asf/beam/blob/6b7593b0/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineWorkerPoolOptions.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineWorkerPoolOptions.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineWorkerPoolOptions.java index e2c4bf4..00d2194 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineWorkerPoolOptions.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineWorkerPoolOptions.java @@ -20,7 +20,6 @@ package org.apache.beam.runners.dataflow.options; import com.fasterxml.jackson.annotation.JsonIgnore; import java.util.List; import javax.annotation.Nullable; -import org.apache.beam.runners.dataflow.DataflowRunner; import org.apache.beam.runners.dataflow.DataflowRunnerInfo; import org.apache.beam.sdk.annotations.Experimental; import org.apache.beam.sdk.options.Default; @@ -129,15 +128,8 @@ public interface DataflowPipelineWorkerPoolOptions extends PipelineOptions { implements DefaultValueFactory<String> { @Override public String create(PipelineOptions options) { - DataflowPipelineOptions dataflowOptions = options.as(DataflowPipelineOptions.class); String containerVersion = DataflowRunnerInfo.getDataflowRunnerInfo().getContainerVersion(); - String containerType; - if (DataflowRunner.hasExperiment(dataflowOptions, "beam_fn_api")) { - containerType = "java"; - } else { - containerType = dataflowOptions.isStreaming() ? "beam-java-streaming" : "beam-java-batch"; - } - return String.format("dataflow.gcr.io/v1beta3/%s:%s", containerType, containerVersion); + return String.format("dataflow.gcr.io/v1beta3/IMAGE:%s", containerVersion); } } http://git-wip-us.apache.org/repos/asf/beam/blob/6b7593b0/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java index 0735b5c..79a96e7 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java @@ -17,6 +17,7 @@ */ package org.apache.beam.runners.dataflow; +import static org.apache.beam.runners.dataflow.DataflowRunner.getContainerImageForJob; import static org.hamcrest.Matchers.both; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; @@ -1103,4 +1104,28 @@ public class DataflowRunnerTest { assertFalse(DataflowRunner.hasExperiment(options, "ba")); assertFalse(DataflowRunner.hasExperiment(options, "BAR")); } + + @Test + public void testWorkerHarnessContainerImage() { + DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class); + + // default image set + options.setWorkerHarnessContainerImage("some-container"); + assertThat(getContainerImageForJob(options), equalTo("some-container")); + + // batch, legacy + options.setWorkerHarnessContainerImage("gcr.io/IMAGE/foo"); + options.setExperiments(null); + options.setStreaming(false); + assertThat( + getContainerImageForJob(options), equalTo("gcr.io/beam-java-batch/foo")); + // streaming, legacy + options.setStreaming(true); + assertThat( + getContainerImageForJob(options), equalTo("gcr.io/beam-java-streaming/foo")); + // streaming, fnapi + options.setExperiments(ImmutableList.of("experiment1", "beam_fn_api")); + assertThat( + getContainerImageForJob(options), equalTo("gcr.io/java/foo")); + } }
