Make test timeout configurable and use in TestDataflowRunner
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/7e82e35b Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/7e82e35b Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/7e82e35b Branch: refs/heads/master Commit: 7e82e35b2476db6b2b491861e5c2c52042ce2161 Parents: 5653b86 Author: Kenneth Knowles <k...@google.com> Authored: Wed Nov 9 19:24:44 2016 -0800 Committer: Kenneth Knowles <k...@google.com> Committed: Thu Nov 10 10:49:28 2016 -0800 ---------------------------------------------------------------------- .../beam/runners/dataflow/testing/TestDataflowRunner.java | 10 +++++++--- .../org/apache/beam/sdk/testing/TestPipelineOptions.java | 6 ++++++ 2 files changed, 13 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7e82e35b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunner.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunner.java index 10c72b7..70c3f58 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunner.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunner.java @@ -135,10 +135,14 @@ public class TestDataflowRunner extends PipelineRunner<DataflowPipelineJob> { } } }); - State finalState = job.waitUntilFinish(Duration.standardMinutes(10L), messageHandler); + State finalState = + job.waitUntilFinish( + Duration.standardSeconds(options.getTestTimeoutSeconds()), messageHandler); if (finalState == null || finalState == State.RUNNING) { - LOG.info("Dataflow job {} took longer than 10 minutes to complete, cancelling.", - job.getJobId()); + LOG.info( + "Dataflow job {} took longer than {} seconds to complete, cancelling.", + job.getJobId(), + options.getTestTimeoutSeconds()); job.cancel(); } success = resultFuture.get(); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7e82e35b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipelineOptions.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipelineOptions.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipelineOptions.java index ff553ba..0739381 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipelineOptions.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipelineOptions.java @@ -17,6 +17,7 @@ */ package org.apache.beam.sdk.testing; +import javax.annotation.Nullable; import org.apache.beam.sdk.PipelineResult; import org.apache.beam.sdk.options.Default; import org.apache.beam.sdk.options.DefaultValueFactory; @@ -41,6 +42,11 @@ public interface TestPipelineOptions extends PipelineOptions { SerializableMatcher<PipelineResult> getOnSuccessMatcher(); void setOnSuccessMatcher(SerializableMatcher<PipelineResult> value); + @Default.Long(10 * 60) + @Nullable + Long getTestTimeoutSeconds(); + void setTestTimeoutSeconds(Long value); + /** * Factory for {@link PipelineResult} matchers which always pass. */