Add assertion that valid jobs must have staged pipeline
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/cef997ff Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/cef997ff Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/cef997ff Branch: refs/heads/master Commit: cef997ff06629a2c77b5aeb4f9ad40d8c4b3b22c Parents: 090c512 Author: Kenneth Knowles <[email protected]> Authored: Wed Oct 18 06:49:13 2017 -0700 Committer: Kenneth Knowles <[email protected]> Committed: Wed Oct 18 13:02:25 2017 -0700 ---------------------------------------------------------------------- .../java/org/apache/beam/runners/dataflow/DataflowRunner.java | 3 ++- .../org/apache/beam/runners/dataflow/DataflowRunnerTest.java | 7 +++++++ 2 files changed, 9 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/cef997ff/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java index ecef072..545321d 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java @@ -192,7 +192,8 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> { @VisibleForTesting static final String PIPELINE_FILE_NAME = "pipeline.pb"; - private static final String STAGED_PIPELINE_METADATA_PROPERTY = "pipeline_url"; + @VisibleForTesting + static final String STAGED_PIPELINE_METADATA_PROPERTY = "pipeline_url"; private final Set<PCollection<?>> pcollectionsRequiringIndexedFormat; http://git-wip-us.apache.org/repos/asf/beam/blob/cef997ff/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java index 5bc798a..02abc34 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java @@ -22,6 +22,7 @@ import static org.hamcrest.Matchers.both; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.hasItem; +import static org.hamcrest.Matchers.hasKey; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.not; import static org.hamcrest.Matchers.startsWith; @@ -45,6 +46,7 @@ import com.google.api.services.dataflow.Dataflow; import com.google.api.services.dataflow.model.DataflowPackage; import com.google.api.services.dataflow.model.Job; import com.google.api.services.dataflow.model.ListJobsResponse; +import com.google.api.services.dataflow.model.WorkerPool; import com.google.api.services.storage.model.StorageObject; import com.google.common.base.Throwables; import com.google.common.collect.ImmutableList; @@ -163,6 +165,11 @@ public class DataflowRunnerTest implements Serializable { assertNull(job.getId()); assertNull(job.getCurrentState()); assertTrue(Pattern.matches("[a-z]([-a-z0-9]*[a-z0-9])?", job.getName())); + + for (WorkerPool workerPool : job.getEnvironment().getWorkerPools()) { + assertThat(workerPool.getMetadata(), + hasKey(DataflowRunner.STAGED_PIPELINE_METADATA_PROPERTY)); + } } @Before
