Repository: beam Updated Branches: refs/heads/master 8f5f19d11 -> 0a6211b56
DataflowRunner: experimental support for issuing FnAPI based jobs Also cleanup some code around checking for existence of experiments. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/131c9f91 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/131c9f91 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/131c9f91 Branch: refs/heads/master Commit: 131c9f916dae6345ec77a869112ae5901b568f23 Parents: 8f5f19d Author: Dan Halperin <[email protected]> Authored: Wed Mar 1 23:06:11 2017 -0800 Committer: Dan Halperin <[email protected]> Committed: Thu Mar 2 18:42:19 2017 -0800 ---------------------------------------------------------------------- runners/google-cloud-dataflow-java/pom.xml | 3 +- .../dataflow/DataflowPipelineTranslator.java | 3 +- .../beam/runners/dataflow/DataflowRunner.java | 46 ++++++++++++-------- .../runners/dataflow/DataflowRunnerInfo.java | 38 ++++++++-------- .../options/DataflowPipelineDebugOptions.java | 2 + .../DataflowPipelineWorkerPoolOptions.java | 10 +++-- .../beam/runners/dataflow/dataflow.properties | 8 ++-- .../dataflow/DataflowRunnerInfoTest.java | 23 +++++----- .../runners/dataflow/DataflowRunnerTest.java | 17 ++++++++ 9 files changed, 92 insertions(+), 58 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/131c9f91/runners/google-cloud-dataflow-java/pom.xml ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/pom.xml b/runners/google-cloud-dataflow-java/pom.xml index fdd088f..fb06797 100644 --- a/runners/google-cloud-dataflow-java/pom.xml +++ b/runners/google-cloud-dataflow-java/pom.xml @@ -34,7 +34,8 @@ <properties> <dataflow.container_version>beam-master-20170228</dataflow.container_version> - <dataflow.environment_major_version>6</dataflow.environment_major_version> + <dataflow.fnapi_environment_major_version>1</dataflow.fnapi_environment_major_version> + <dataflow.legacy_environment_major_version>6</dataflow.legacy_environment_major_version> </properties> <build> http://git-wip-us.apache.org/repos/asf/beam/blob/131c9f91/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java index 7e559e9..06e5048 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java @@ -327,8 +327,7 @@ public class DataflowPipelineTranslator { workerPool.setNumWorkers(options.getNumWorkers()); if (options.isStreaming() - && (options.getExperiments() == null - || !options.getExperiments().contains("enable_windmill_service"))) { + && !DataflowRunner.hasExperiment(options, "enable_windmill_service")) { // Use separate data disk for streaming. Disk disk = new Disk(); disk.setDiskType(options.getWorkerDiskType()); http://git-wip-us.apache.org/repos/asf/beam/blob/131c9f91/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java index dbf1958..50b6b4f 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java @@ -51,7 +51,6 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Collections; -import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -307,14 +306,12 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> { PTransformMatchers.parDoWithFnType(unsupported), UnsupportedOverrideFactory.withMessage(getUnsupportedMessage(unsupported, true))); } - if (options.getExperiments() == null - || !options.getExperiments().contains("enable_custom_pubsub_source")) { + if (!hasExperiment(options, "enable_custom_pubsub_source")) { ptoverrides.put( PTransformMatchers.classEqualTo(PubsubUnboundedSource.class), new ReflectiveRootOverrideFactory(StreamingPubsubIORead.class, this)); } - if (options.getExperiments() == null - || !options.getExperiments().contains("enable_custom_pubsub_sink")) { + if (!hasExperiment(options, "enable_custom_pubsub_sink")) { ptoverrides.put( PTransformMatchers.classEqualTo(PubsubUnboundedSink.class), new StreamingPubsubIOWriteOverrideFactory(this)); @@ -559,20 +556,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> { workerPool.setWorkerHarnessContainerImage(workerHarnessContainerImage); } - // Requirements about the service. - Map<String, Object> environmentVersion = new HashMap<>(); - environmentVersion.put( - PropertyNames.ENVIRONMENT_VERSION_MAJOR_KEY, - DataflowRunnerInfo.getDataflowRunnerInfo().getEnvironmentMajorVersion()); - newJob.getEnvironment().setVersion(environmentVersion); - // Default jobType is JAVA_BATCH_AUTOSCALING: A Java job with workers that the job can - // autoscale if specified. - String jobType = "JAVA_BATCH_AUTOSCALING"; - - if (options.isStreaming()) { - jobType = "STREAMING"; - } - environmentVersion.put(PropertyNames.ENVIRONMENT_VERSION_JOB_TYPE_KEY, jobType); + newJob.getEnvironment().setVersion(getEnvironmentVersion(options)); if (hooks != null) { hooks.modifyEnvironmentBeforeSubmission(newJob.getEnvironment()); @@ -680,6 +664,30 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> { return dataflowPipelineJob; } + /** Returns true if the specified experiment is enabled, handling null experiments. */ + public static boolean hasExperiment(DataflowPipelineDebugOptions options, String experiment) { + List<String> experiments = + firstNonNull(options.getExperiments(), Collections.<String>emptyList()); + return experiments.contains(experiment); + } + + /** Helper to configure the Dataflow Job Environment based on the user's job options. */ + private static Map<String, Object> getEnvironmentVersion(DataflowPipelineOptions options) { + DataflowRunnerInfo runnerInfo = DataflowRunnerInfo.getDataflowRunnerInfo(); + String majorVersion; + String jobType; + if (hasExperiment(options, "beam_fn_api")) { + majorVersion = runnerInfo.getFnApiEnvironmentMajorVersion(); + jobType = options.isStreaming() ? "FNAPI_STREAMING" : "FNAPI_BATCH"; + } else { + majorVersion = runnerInfo.getLegacyEnvironmentMajorVersion(); + jobType = options.isStreaming() ? "STREAMING" : "JAVA_BATCH_AUTOSCALING"; + } + return ImmutableMap.<String, Object>of( + PropertyNames.ENVIRONMENT_VERSION_MAJOR_KEY, majorVersion, + PropertyNames.ENVIRONMENT_VERSION_JOB_TYPE_KEY, jobType); + } + @VisibleForTesting void replaceTransforms(Pipeline pipeline) { for (Map.Entry<PTransformMatcher, PTransformOverrideFactory> override : overrides.entrySet()) { http://git-wip-us.apache.org/repos/asf/beam/blob/131c9f91/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunnerInfo.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunnerInfo.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunnerInfo.java index 59cb8a4..12b3f38 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunnerInfo.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunnerInfo.java @@ -47,32 +47,34 @@ public final class DataflowRunnerInfo { private Properties properties; - private static final String ENVIRONMENT_MAJOR_VERSION_KEY = "environment.major.version"; - private static final String BATCH_WORKER_HARNESS_CONTAINER_IMAGE_KEY = "worker.image.batch"; - private static final String STREAMING_WORKER_HARNESS_CONTAINER_IMAGE_KEY = - "worker.image.streaming"; + private static final String FNAPI_ENVIRONMENT_MAJOR_VERSION_KEY = + "fnapi.environment.major.version"; + private static final String LEGACY_ENVIRONMENT_MAJOR_VERSION_KEY = + "legacy.environment.major.version"; + private static final String CONTAINER_VERSION_KEY = "container.version"; - /** Provides the environment's major version number. */ - public String getEnvironmentMajorVersion() { + /** Provides the legacy environment's major version number. */ + public String getLegacyEnvironmentMajorVersion() { checkState( - properties.containsKey(ENVIRONMENT_MAJOR_VERSION_KEY), "Unknown environment major version"); - return properties.getProperty(ENVIRONMENT_MAJOR_VERSION_KEY); + properties.containsKey(LEGACY_ENVIRONMENT_MAJOR_VERSION_KEY), + "Unknown legacy environment major version"); + return properties.getProperty(LEGACY_ENVIRONMENT_MAJOR_VERSION_KEY); } - /** Provides the batch worker harness container image name. */ - public String getBatchWorkerHarnessContainerImage() { + /** Provides the FnAPI environment's major version number. */ + public String getFnApiEnvironmentMajorVersion() { checkState( - properties.containsKey(BATCH_WORKER_HARNESS_CONTAINER_IMAGE_KEY), - "Unknown batch worker harness container image"); - return properties.getProperty(BATCH_WORKER_HARNESS_CONTAINER_IMAGE_KEY); + properties.containsKey(FNAPI_ENVIRONMENT_MAJOR_VERSION_KEY), + "Unknown FnAPI environment major version"); + return properties.getProperty(FNAPI_ENVIRONMENT_MAJOR_VERSION_KEY); } - /** Provides the streaming worker harness container image name. */ - public String getStreamingWorkerHarnessContainerImage() { + /** Provides the container version that will be used for constructing harness image paths. */ + public String getContainerVersion() { checkState( - properties.containsKey(STREAMING_WORKER_HARNESS_CONTAINER_IMAGE_KEY), - "Unknown streaming worker harness container image"); - return properties.getProperty(STREAMING_WORKER_HARNESS_CONTAINER_IMAGE_KEY); + properties.containsKey(CONTAINER_VERSION_KEY), + "Unknown container version"); + return properties.getProperty(CONTAINER_VERSION_KEY); } private DataflowRunnerInfo(String resourcePath) { http://git-wip-us.apache.org/repos/asf/beam/blob/131c9f91/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineDebugOptions.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineDebugOptions.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineDebugOptions.java index cdfa3f5..729bca4 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineDebugOptions.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineDebugOptions.java @@ -21,6 +21,7 @@ import com.fasterxml.jackson.annotation.JsonIgnore; import com.google.api.services.dataflow.Dataflow; import java.util.List; import java.util.Map; +import javax.annotation.Nullable; import org.apache.beam.runners.dataflow.util.DataflowTransport; import org.apache.beam.runners.dataflow.util.GcsStager; import org.apache.beam.runners.dataflow.util.Stager; @@ -53,6 +54,7 @@ public interface DataflowPipelineDebugOptions extends PipelineOptions { + "be enabled with this flag. Please sync with the Dataflow team before enabling any " + "experiments.") @Experimental + @Nullable List<String> getExperiments(); void setExperiments(List<String> value); http://git-wip-us.apache.org/repos/asf/beam/blob/131c9f91/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineWorkerPoolOptions.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineWorkerPoolOptions.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineWorkerPoolOptions.java index 3c5d05a..e2c4bf4 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineWorkerPoolOptions.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineWorkerPoolOptions.java @@ -20,6 +20,7 @@ package org.apache.beam.runners.dataflow.options; import com.fasterxml.jackson.annotation.JsonIgnore; import java.util.List; import javax.annotation.Nullable; +import org.apache.beam.runners.dataflow.DataflowRunner; import org.apache.beam.runners.dataflow.DataflowRunnerInfo; import org.apache.beam.sdk.annotations.Experimental; import org.apache.beam.sdk.options.Default; @@ -129,11 +130,14 @@ public interface DataflowPipelineWorkerPoolOptions extends PipelineOptions { @Override public String create(PipelineOptions options) { DataflowPipelineOptions dataflowOptions = options.as(DataflowPipelineOptions.class); - if (dataflowOptions.isStreaming()) { - return DataflowRunnerInfo.getDataflowRunnerInfo().getStreamingWorkerHarnessContainerImage(); + String containerVersion = DataflowRunnerInfo.getDataflowRunnerInfo().getContainerVersion(); + String containerType; + if (DataflowRunner.hasExperiment(dataflowOptions, "beam_fn_api")) { + containerType = "java"; } else { - return DataflowRunnerInfo.getDataflowRunnerInfo().getBatchWorkerHarnessContainerImage(); + containerType = dataflowOptions.isStreaming() ? "beam-java-streaming" : "beam-java-batch"; } + return String.format("dataflow.gcr.io/v1beta3/%s:%s", containerType, containerVersion); } } http://git-wip-us.apache.org/repos/asf/beam/blob/131c9f91/runners/google-cloud-dataflow-java/src/main/resources/org/apache/beam/runners/dataflow/dataflow.properties ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/resources/org/apache/beam/runners/dataflow/dataflow.properties b/runners/google-cloud-dataflow-java/src/main/resources/org/apache/beam/runners/dataflow/dataflow.properties index 47e316c..ac68970 100644 --- a/runners/google-cloud-dataflow-java/src/main/resources/org/apache/beam/runners/dataflow/dataflow.properties +++ b/runners/google-cloud-dataflow-java/src/main/resources/org/apache/beam/runners/dataflow/dataflow.properties @@ -16,8 +16,6 @@ # # Dataflow runtime properties -environment.major.version=${dataflow.environment_major_version} - -worker.image.batch=dataflow.gcr.io/v1beta3/beam-java-batch:${dataflow.container_version} - -worker.image.streaming=dataflow.gcr.io/v1beta3/beam-java-streaming:${dataflow.container_version} +legacy.environment.major.version=${dataflow.legacy_environment_major_version} +fnapi.environment.major.version=${dataflow.fnapi_environment_major_version} +container.version=${dataflow.container_version} http://git-wip-us.apache.org/repos/asf/beam/blob/131c9f91/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerInfoTest.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerInfoTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerInfoTest.java index 9b5b374..3502040 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerInfoTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerInfoTest.java @@ -18,6 +18,7 @@ package org.apache.beam.runners.dataflow; import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.not; import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; @@ -32,20 +33,22 @@ public class DataflowRunnerInfoTest { public void getDataflowRunnerInfo() throws Exception { DataflowRunnerInfo info = DataflowRunnerInfo.getDataflowRunnerInfo(); - String version = info.getEnvironmentMajorVersion(); + String version = info.getLegacyEnvironmentMajorVersion(); // Validate major version is a number assertTrue( - String.format("Environment major version number %s is not a number", version), + String.format("Legacy environment major version number %s is not a number", version), version.matches("\\d+")); - // Validate container images contain gcr.io - assertThat( - "batch worker harness container image invalid", - info.getBatchWorkerHarnessContainerImage(), - containsString("gcr.io")); + version = info.getFnApiEnvironmentMajorVersion(); + // Validate major version is a number + assertTrue( + String.format("FnAPI environment major version number %s is not a number", version), + version.matches("\\d+")); + + // Validate container version does not contain a $ (indicating it was not filled in). assertThat( - "streaming worker harness container image invalid", - info.getStreamingWorkerHarnessContainerImage(), - containsString("gcr.io")); + "container version invalid", + info.getContainerVersion(), + not(containsString("$"))); } } http://git-wip-us.apache.org/repos/asf/beam/blob/131c9f91/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java index a788077..246feb0 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java @@ -24,6 +24,7 @@ import static org.hamcrest.Matchers.hasItem; import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.startsWith; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertThat; @@ -1118,4 +1119,20 @@ public class DataflowRunnerTest { thrown.expect(RuntimeException.class); p.run(); } + + @Test + public void testHasExperiment() { + DataflowPipelineDebugOptions options = + PipelineOptionsFactory.as(DataflowPipelineDebugOptions.class); + + options.setExperiments(null); + assertFalse(DataflowRunner.hasExperiment(options, "foo")); + + options.setExperiments(ImmutableList.of("foo", "bar")); + assertTrue(DataflowRunner.hasExperiment(options, "foo")); + assertTrue(DataflowRunner.hasExperiment(options, "bar")); + assertFalse(DataflowRunner.hasExperiment(options, "baz")); + assertFalse(DataflowRunner.hasExperiment(options, "ba")); + assertFalse(DataflowRunner.hasExperiment(options, "BAR")); + } }
