Repository: beam Updated Branches: refs/heads/master 09f68159d -> 710941e90
Temporarily disable Dataflow pipeline_url metadata Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/324dae73 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/324dae73 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/324dae73 Branch: refs/heads/master Commit: 324dae7345de220cad9f8df7b7952d076bb36185 Parents: 5fb30ec Author: Kenneth Knowles <[email protected]> Authored: Sat Oct 28 16:04:04 2017 -0700 Committer: Kenneth Knowles <[email protected]> Committed: Sun Oct 29 20:49:27 2017 -0700 ---------------------------------------------------------------------- .../org/apache/beam/runners/dataflow/DataflowRunner.java | 6 ++++-- .../apache/beam/runners/dataflow/DataflowRunnerTest.java | 11 +++++------ .../apache_beam/runners/dataflow/internal/apiclient.py | 10 ++++++---- 3 files changed, 15 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/324dae73/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java index 545321d..334c8e5 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java @@ -571,8 +571,10 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> { String workerHarnessContainerImage = getContainerImageForJob(options); for (WorkerPool workerPool : newJob.getEnvironment().getWorkerPools()) { workerPool.setWorkerHarnessContainerImage(workerHarnessContainerImage); - workerPool.setMetadata( - ImmutableMap.of(STAGED_PIPELINE_METADATA_PROPERTY, stagedPipeline.getLocation())); + + // https://issues.apache.org/jira/browse/BEAM-3116 + // workerPool.setMetadata( + // ImmutableMap.of(STAGED_PIPELINE_METADATA_PROPERTY, stagedPipeline.getLocation())); } newJob.getEnvironment().setVersion(getEnvironmentVersion(options)); http://git-wip-us.apache.org/repos/asf/beam/blob/324dae73/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java index 1568eda..66cf11d 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java @@ -22,7 +22,6 @@ import static org.hamcrest.Matchers.both; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.hasItem; -import static org.hamcrest.Matchers.hasKey; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.not; import static org.hamcrest.Matchers.startsWith; @@ -46,7 +45,6 @@ import com.google.api.services.dataflow.Dataflow; import com.google.api.services.dataflow.model.DataflowPackage; import com.google.api.services.dataflow.model.Job; import com.google.api.services.dataflow.model.ListJobsResponse; -import com.google.api.services.dataflow.model.WorkerPool; import com.google.api.services.storage.model.StorageObject; import com.google.common.base.Throwables; import com.google.common.collect.ImmutableList; @@ -166,10 +164,11 @@ public class DataflowRunnerTest implements Serializable { assertNull(job.getCurrentState()); assertTrue(Pattern.matches("[a-z]([-a-z0-9]*[a-z0-9])?", job.getName())); - for (WorkerPool workerPool : job.getEnvironment().getWorkerPools()) { - assertThat(workerPool.getMetadata(), - hasKey(DataflowRunner.STAGED_PIPELINE_METADATA_PROPERTY)); - } + // https://issues.apache.org/jira/browse/BEAM-3116 + // for (WorkerPool workerPool : job.getEnvironment().getWorkerPools()) { + // assertThat(workerPool.getMetadata(), + // hasKey(DataflowRunner.STAGED_PIPELINE_METADATA_PROPERTY)); + // } } @Before http://git-wip-us.apache.org/repos/asf/beam/blob/324dae73/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py index d225503..3aa563d 100644 --- a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py +++ b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py @@ -190,15 +190,17 @@ class Environment(object): pool = dataflow.WorkerPool( kind='local' if self.local else 'harness', packages=package_descriptors, - metadata=dataflow.WorkerPool.MetadataValue(), + # https://issues.apache.org/jira/browse/BEAM-3116 + # metadata=dataflow.WorkerPool.MetadataValue(), taskrunnerSettings=dataflow.TaskRunnerSettings( parallelWorkerSettings=dataflow.WorkerSettings( baseUrl=GoogleCloudOptions.DATAFLOW_ENDPOINT, servicePath=self.google_cloud_options.dataflow_endpoint))) - pool.metadata.additionalProperties.append( - dataflow.WorkerPool.MetadataValue.AdditionalProperty( - key=names.STAGED_PIPELINE_URL_METADATA_FIELD, value=pipeline_url)) + # https://issues.apache.org/jira/browse/BEAM-3116 + # pool.metadata.additionalProperties.append( + # dataflow.WorkerPool.MetadataValue.AdditionalProperty( + # key=names.STAGED_PIPELINE_URL_METADATA_FIELD, value=pipeline_url)) pool.autoscalingSettings = dataflow.AutoscalingSettings() # Set worker pool options received through command line.
