This is an automated email from the ASF dual-hosted git repository. pabloem pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new 78be15f Add nexmark option to allow cancel streaming query job after complete new ea928a2 Merge pull request #12119 from y1chi/nexmark 78be15f is described below commit 78be15f5efe11d9dd0476d181430b55569caccd0 Author: Yichi Zhang <zyi...@google.com> AuthorDate: Mon Jun 29 11:25:35 2020 -0700 Add nexmark option to allow cancel streaming query job after complete --- .../src/main/java/org/apache/beam/sdk/nexmark/NexmarkLauncher.java | 3 +++ .../src/main/java/org/apache/beam/sdk/nexmark/NexmarkOptions.java | 6 ++++++ 2 files changed, 9 insertions(+) diff --git a/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkLauncher.java b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkLauncher.java index 66385af..619a517 100644 --- a/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkLauncher.java +++ b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkLauncher.java @@ -480,6 +480,9 @@ public class NexmarkLauncher<OptionT extends NexmarkOptions> { && quietFor.isLongerThan(DONE_DELAY)) { NexmarkUtils.console("streaming query appears to have finished waiting for completion."); waitingForShutdown = true; + if (options.getCancelStreamingJobAfterFinish()) { + cancelJob = true; + } } else if (quietFor.isLongerThan(STUCK_TERMINATE_DELAY)) { NexmarkUtils.console( "ERROR: streaming query appears to have been stuck for %d minutes, cancelling job.", diff --git a/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkOptions.java b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkOptions.java index db9b97e..ea2be42 100644 --- a/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkOptions.java +++ b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkOptions.java @@ -139,6 +139,12 @@ public interface NexmarkOptions void setStreamTimeout(Integer streamTimeout); + @Description("Proactively cancels streaming job after query is completed") + @Default.Boolean(false) + boolean getCancelStreamingJobAfterFinish(); + + void setCancelStreamingJobAfterFinish(boolean cancelStreamingJobAfterFinish); + @Description("Number of unbounded sources to create events.") @Nullable Integer getNumEventGenerators();