[Beam-1001] Add non blocking cancel() and waitUntilFinish() for streaming applications. remove timeout parameer in spark pipeline option.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/dafd5be7 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/dafd5be7 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/dafd5be7 Branch: refs/heads/gearpump-runner Commit: dafd5be7f69f191fc9edb8b9f9aec010ca368f50 Parents: d93e9a8 Author: ksalant <[email protected]> Authored: Sun Nov 20 11:57:16 2016 +0200 Committer: Sela <[email protected]> Committed: Sun Nov 20 19:25:52 2016 +0200 ---------------------------------------------------------------------- .../runners/spark/SparkPipelineOptions.java | 6 -- .../apache/beam/runners/spark/SparkRunner.java | 5 +- .../spark/translation/EvaluationContext.java | 59 +++++++++++++------- .../SparkRunnerStreamingContextFactory.java | 3 +- .../streaming/EmptyStreamAssertionTest.java | 3 +- .../streaming/FlattenStreamingTest.java | 4 +- .../streaming/KafkaStreamingTest.java | 11 ++-- .../ResumeFromCheckpointStreamingTest.java | 3 +- .../streaming/SimpleStreamingWordCountTest.java | 4 +- .../streaming/utils/PAssertStreaming.java | 8 ++- .../SparkTestPipelineOptionsForStreaming.java | 1 - 11 files changed, 60 insertions(+), 47 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/dafd5be7/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java index b1ebde9..0fd790e 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java @@ -39,12 +39,6 @@ public interface SparkPipelineOptions String getSparkMaster(); void setSparkMaster(String master); - @Description("Timeout to wait (in msec) for a streaming execution to stop, -1 runs until " - + "execution is stopped") - @Default.Long(-1) - Long getTimeout(); - void setTimeout(Long timeoutMillis); - @Description("Batch interval for Spark streaming in milliseconds.") @Default.Long(1000) Long getBatchIntervalMillis(); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/dafd5be7/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java index 6bbef39..e800071 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java @@ -137,11 +137,8 @@ public final class SparkRunner extends PipelineRunner<EvaluationResult> { // if recovering from checkpoint, we have to reconstruct the EvaluationResult instance. return contextFactory.getCtxt() == null ? new EvaluationContext(jssc.sc(), - pipeline, jssc, mOptions.getTimeout()) : contextFactory.getCtxt(); + pipeline, jssc) : contextFactory.getCtxt(); } else { - if (mOptions.getTimeout() > 0) { - LOG.info("Timeout is ignored by the SparkRunner in batch."); - } JavaSparkContext jsc = SparkContextFactory.getSparkContext(mOptions); EvaluationContext ctxt = new EvaluationContext(jsc, pipeline); SparkPipelineTranslator translator = new TransformTranslator.Translator(); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/dafd5be7/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java index aaf7573..1183fbb 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java @@ -45,6 +45,7 @@ import org.apache.beam.sdk.values.POutput; import org.apache.beam.sdk.values.PValue; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.streaming.StreamingContextState; import org.apache.spark.streaming.api.java.JavaStreamingContext; import org.joda.time.Duration; @@ -57,7 +58,6 @@ public class EvaluationContext implements EvaluationResult { private JavaStreamingContext jssc; private final SparkRuntimeContext runtime; private final Pipeline pipeline; - private long timeout; private final Map<PValue, Dataset> datasets = new LinkedHashMap<>(); private final Map<PValue, Dataset> pcollections = new LinkedHashMap<>(); private final Set<Dataset> leaves = new LinkedHashSet<>(); @@ -76,10 +76,9 @@ public class EvaluationContext implements EvaluationResult { } public EvaluationContext(JavaSparkContext jsc, Pipeline pipeline, - JavaStreamingContext jssc, long timeout) { + JavaStreamingContext jssc) { this(jsc, pipeline); this.jssc = jssc; - this.timeout = timeout; this.state = State.RUNNING; } @@ -226,18 +225,14 @@ public class EvaluationContext implements EvaluationResult { @Override public void close(boolean gracefully) { - if (isStreamingPipeline()) { - // stop streaming context - if (timeout > 0) { - jssc.awaitTerminationOrTimeout(timeout); - } else { - jssc.awaitTermination(); + // Stopping streaming job if running + if (isStreamingPipeline() && !state.isTerminal()) { + try { + cancel(gracefully); + } catch (IOException e) { + throw new RuntimeException("Failed to cancel streaming job", e); } - // stop streaming context gracefully, so checkpointing (and other computations) get to - // finish before shutdown. - jssc.stop(false, gracefully); } - state = State.DONE; SparkContextFactory.stopSparkContext(jsc); } @@ -248,21 +243,47 @@ public class EvaluationContext implements EvaluationResult { @Override public State cancel() throws IOException { - throw new UnsupportedOperationException( - "Spark runner EvaluationContext does not support cancel."); + return cancel(true); + } + + private State cancel(boolean gracefully) throws IOException { + if (isStreamingPipeline()) { + if (!state.isTerminal()) { + jssc.stop(false, gracefully); + state = State.CANCELLED; + } + return state; + } else { + // Batch is currently blocking so + // there is no way to cancel a batch job + // will be handled at BEAM-1000 + throw new UnsupportedOperationException( + "Spark runner EvaluationContext does not support cancel."); + } } @Override public State waitUntilFinish() { - return waitUntilFinish(Duration.millis(-1)); + return waitUntilFinish(Duration.ZERO); } @Override public State waitUntilFinish(Duration duration) { if (isStreamingPipeline()) { - throw new UnsupportedOperationException( - "Spark runner EvaluationContext does not support waitUntilFinish for streaming " - + "pipelines."); + // According to PipelineResult: Provide a value less than 1 ms for an infinite wait + if (duration.getMillis() < 1L) { + jssc.awaitTermination(); + state = State.DONE; + } else { + jssc.awaitTermination(duration.getMillis()); + // According to PipelineResult: The final state of the pipeline or null on timeout + if (jssc.getState().equals(StreamingContextState.STOPPED)) { + state = State.DONE; + } else { + return null; + } + } + return state; } else { // This is no-op, since Spark runner in batch is blocking. // It needs to be updated once SparkRunner supports non-blocking execution: http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/dafd5be7/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/SparkRunnerStreamingContextFactory.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/SparkRunnerStreamingContextFactory.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/SparkRunnerStreamingContextFactory.java index 01398e4..af90ff1 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/SparkRunnerStreamingContextFactory.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/SparkRunnerStreamingContextFactory.java @@ -73,8 +73,7 @@ public class SparkRunnerStreamingContextFactory implements JavaStreamingContextF JavaSparkContext jsc = SparkContextFactory.getSparkContext(options); JavaStreamingContext jssc = new JavaStreamingContext(jsc, batchDuration); - ctxt = new EvaluationContext(jsc, pipeline, jssc, - options.getTimeout()); + ctxt = new EvaluationContext(jsc, pipeline, jssc); pipeline.traverseTopologically(new SparkRunner.Evaluator(translator, ctxt)); ctxt.computeOutputs(); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/dafd5be7/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/EmptyStreamAssertionTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/EmptyStreamAssertionTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/EmptyStreamAssertionTest.java index ec75eb7..d40bcff 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/EmptyStreamAssertionTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/EmptyStreamAssertionTest.java @@ -68,7 +68,8 @@ public class EmptyStreamAssertionTest implements Serializable { .apply(Window.<String>into(FixedWindows.of(windowDuration))); try { - PAssertStreaming.runAndAssertContents(pipeline, output, new String[0]); + PAssertStreaming.runAndAssertContents(pipeline, output, new String[0], + Duration.standardSeconds(1L)); } catch (AssertionError e) { assertTrue("Expected error message: " + EXPECTED_ERR + " but got: " + e.getMessage(), e.getMessage().equals(EXPECTED_ERR)); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/dafd5be7/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/FlattenStreamingTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/FlattenStreamingTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/FlattenStreamingTest.java index f69bd7f..3e75b18 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/FlattenStreamingTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/FlattenStreamingTest.java @@ -76,7 +76,7 @@ public class FlattenStreamingTest { PCollectionList<String> list = PCollectionList.of(windowedW1).and(windowedW2); PCollection<String> union = list.apply(Flatten.<String>pCollections()); - PAssertStreaming.runAndAssertContents(p, union, EXPECTED_UNION); + PAssertStreaming.runAndAssertContents(p, union, EXPECTED_UNION, Duration.standardSeconds(1L)); } @Test @@ -95,7 +95,7 @@ public class FlattenStreamingTest { PCollectionList<String> list = PCollectionList.of(windowedW1).and(windowedW2); PCollection<String> union = list.apply(Flatten.<String>pCollections()); - PAssertStreaming.runAndAssertContents(p, union, EXPECTED_UNION); + PAssertStreaming.runAndAssertContents(p, union, EXPECTED_UNION, Duration.standardSeconds(1L)); } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/dafd5be7/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/KafkaStreamingTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/KafkaStreamingTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/KafkaStreamingTest.java index 6b2486b..d55ed39 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/KafkaStreamingTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/KafkaStreamingTest.java @@ -118,7 +118,7 @@ public class KafkaStreamingTest { .apply(ParDo.of(new FormatKVFn())) .apply(Distinct.<String>create()); - PAssertStreaming.runAndAssertContents(p, deduped, expected); + PAssertStreaming.runAndAssertContents(p, deduped, expected, Duration.standardSeconds(1L)); } @Test @@ -143,10 +143,6 @@ public class KafkaStreamingTest { // It seems that the consumer's first "position" lookup (in unit test) takes +200 msec, // so to be on the safe side we'll set to 750 msec. options.setMinReadTimeMillis(750L); - // run for more than 1 batch interval, so that reading of latest is attempted in the - // first batch with no luck, while the OnBatchCompleted injected-input afterwards will be read - // in the second interval. - options.setTimeout(Duration.standardSeconds(3).getMillis()); //------- test: read and format. Pipeline p = Pipeline.create(options); @@ -168,7 +164,10 @@ public class KafkaStreamingTest { .apply(Window.<KV<String, String>>into(FixedWindows.of(batchAndWindowDuration))) .apply(ParDo.of(new FormatKVFn())); - PAssertStreaming.runAndAssertContents(p, formatted, expected); + // run for more than 1 batch interval, so that reading of latest is attempted in the + // first batch with no luck, while the OnBatchCompleted injected-input afterwards will be read + // in the second interval. + PAssertStreaming.runAndAssertContents(p, formatted, expected, Duration.standardSeconds(3)); } private static void produce(String topic, Map<String, String> messages) { http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/dafd5be7/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java index af93d84..b57787f 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java @@ -163,7 +163,8 @@ public class ResumeFromCheckpointStreamingTest { // requires a graceful stop so that checkpointing of the first run would finish successfully // before stopping and attempting to resume. - return PAssertStreaming.runAndAssertContents(p, formattedKV, EXPECTED); + return PAssertStreaming.runAndAssertContents(p, formattedKV, EXPECTED, + Duration.standardSeconds(1L)); } @AfterClass http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/dafd5be7/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/SimpleStreamingWordCountTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/SimpleStreamingWordCountTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/SimpleStreamingWordCountTest.java index 4c503c4..9a15ff2 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/SimpleStreamingWordCountTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/SimpleStreamingWordCountTest.java @@ -67,8 +67,6 @@ public class SimpleStreamingWordCountTest implements Serializable { // override defaults options.setBatchIntervalMillis(BATCH_INTERVAL.getMillis()); - // graceful stop is on, so no worries about the timeout and window being equal - options.setTimeout(windowDuration.getMillis()); Pipeline pipeline = Pipeline.create(options); @@ -80,6 +78,6 @@ public class SimpleStreamingWordCountTest implements Serializable { .apply(new WordCount.CountWords()) .apply(MapElements.via(new WordCount.FormatAsTextFn())); - PAssertStreaming.runAndAssertContents(pipeline, output, EXPECTED_WORD_COUNTS); + PAssertStreaming.runAndAssertContents(pipeline, output, EXPECTED_WORD_COUNTS, windowDuration); } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/dafd5be7/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/PAssertStreaming.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/PAssertStreaming.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/PAssertStreaming.java index 496735d..23aca43 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/PAssertStreaming.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/PAssertStreaming.java @@ -34,6 +34,7 @@ import org.apache.beam.sdk.transforms.Sum; import org.apache.beam.sdk.transforms.Values; import org.apache.beam.sdk.transforms.WithKeys; import org.apache.beam.sdk.values.PCollection; +import org.joda.time.Duration; import org.junit.Assert; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -57,6 +58,7 @@ public final class PAssertStreaming implements Serializable { public static <T> EvaluationResult runAndAssertContents(Pipeline p, PCollection<T> actual, T[] expected, + Duration timeout, boolean stopGracefully) { // Because PAssert does not support non-global windowing, but all our data is in one window, // we set up the assertion directly. @@ -68,6 +70,7 @@ public final class PAssertStreaming implements Serializable { // run the pipeline. EvaluationResult res = (EvaluationResult) p.run(); + res.waitUntilFinish(timeout); res.close(stopGracefully); // validate assertion succeeded (at least once). int success = res.getAggregatorValue(PAssert.SUCCESS_COUNTER, Integer.class); @@ -86,8 +89,9 @@ public final class PAssertStreaming implements Serializable { */ public static <T> EvaluationResult runAndAssertContents(Pipeline p, PCollection<T> actual, - T[] expected) { - return runAndAssertContents(p, actual, expected, true); + T[] expected, + Duration timeout) { + return runAndAssertContents(p, actual, expected, timeout, true); } private static class AssertDoFn<T> extends OldDoFn<Iterable<T>, Void> { http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/dafd5be7/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/SparkTestPipelineOptionsForStreaming.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/SparkTestPipelineOptionsForStreaming.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/SparkTestPipelineOptionsForStreaming.java index 1c0b68a..f74c74a 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/SparkTestPipelineOptionsForStreaming.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/SparkTestPipelineOptionsForStreaming.java @@ -31,7 +31,6 @@ public class SparkTestPipelineOptionsForStreaming extends SparkTestPipelineOptio @Override protected void before() throws Throwable { super.before(); - options.setTimeout(1000L); options.setStreaming(true); }
