[BEAM-769] Spark: Make graceful stop the default. streaming tests fail on "nothing processed" if runtime env. is slow because timeout is hit before processing is done.
Keep "pumping-in" the last batch in a mocked stream to handle overflowing batches in case of a graceful stop. Change tests accordingly. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/e43228c9 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/e43228c9 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/e43228c9 Branch: refs/heads/gearpump-runner Commit: e43228c92cd9dd8a81a28940d419b721a2aeb2d8 Parents: a9a41eb Author: Sela <[email protected]> Authored: Fri Oct 21 01:20:33 2016 +0300 Committer: Dan Halperin <[email protected]> Committed: Mon Oct 24 07:07:27 2016 -0700 ---------------------------------------------------------------------- .../streaming/StreamingEvaluationContext.java | 11 ++++++++--- .../translation/streaming/EmptyStreamAssertionTest.java | 10 +++++++--- .../streaming/ResumeFromCheckpointStreamingTest.java | 2 +- .../streaming/SimpleStreamingWordCountTest.java | 1 - .../translation/streaming/utils/PAssertStreaming.java | 6 +++--- 5 files changed, 19 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e43228c9/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingEvaluationContext.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingEvaluationContext.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingEvaluationContext.java index 2652f2b..49afa26 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingEvaluationContext.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingEvaluationContext.java @@ -92,6 +92,7 @@ public class StreamingEvaluationContext extends EvaluationContext { WindowedValue.getValueOnlyCoder(coder); // create the DStream from queue Queue<JavaRDD<WindowedValue<T>>> rddQueue = new LinkedBlockingQueue<>(); + JavaRDD<WindowedValue<T>> lastRDD = null; for (Iterable<T> v : values) { Iterable<WindowedValue<T>> windowedValues = Iterables.transform(v, WindowingHelpers.<T>windowValueFunction()); @@ -99,10 +100,14 @@ public class StreamingEvaluationContext extends EvaluationContext { CoderHelpers.toByteArrays(windowedValues, windowCoder)).map( CoderHelpers.fromByteFunction(windowCoder)); rddQueue.offer(rdd); + lastRDD = rdd; } - // create dstream from queue, one at a time, no defaults - // mainly for unit test so no reason to have this configurable - dStream = jssc.queueStream(rddQueue, true); + // create dstream from queue, one at a time, + // with last as default in case batches repeat (graceful stops for example). + // if the stream is empty, avoid creating a default empty RDD. + // mainly for unit test so no reason to have this configurable. + dStream = lastRDD != null ? jssc.queueStream(rddQueue, true, lastRDD) + : jssc.queueStream(rddQueue, true); } return dStream; } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e43228c9/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/EmptyStreamAssertionTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/EmptyStreamAssertionTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/EmptyStreamAssertionTest.java index 1560c66..4f2a7c6 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/EmptyStreamAssertionTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/EmptyStreamAssertionTest.java @@ -34,7 +34,7 @@ import org.apache.beam.sdk.values.PCollection; import org.joda.time.Duration; import org.junit.Rule; import org.junit.Test; - +import org.junit.rules.TemporaryFolder; /** @@ -48,11 +48,15 @@ public class EmptyStreamAssertionTest implements Serializable { + " but: was <0>"; @Rule + public TemporaryFolder checkpointParentDir = new TemporaryFolder(); + + @Rule public TestOptionsForStreaming commonOptions = new TestOptionsForStreaming(); @Test - public void testFixedWindows() throws Exception { - SparkPipelineOptions options = commonOptions.getOptions(); + public void testAssertion() throws Exception { + SparkPipelineOptions options = commonOptions.withTmpCheckpointDir( + checkpointParentDir.newFolder(getClass().getSimpleName())); Duration windowDuration = new Duration(options.getBatchIntervalMillis()); Pipeline pipeline = Pipeline.create(options); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e43228c9/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java index fc7fa34..fd1d11a 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java @@ -157,7 +157,7 @@ public class ResumeFromCheckpointStreamingTest { // requires a graceful stop so that checkpointing of the first run would finish successfully // before stopping and attempting to resume. - return PAssertStreaming.runAndAssertContents(p, formattedKV, EXPECTED, true); + return PAssertStreaming.runAndAssertContents(p, formattedKV, EXPECTED); } @AfterClass http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e43228c9/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/SimpleStreamingWordCountTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/SimpleStreamingWordCountTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/SimpleStreamingWordCountTest.java index 8f2dde3..4bc9a3d 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/SimpleStreamingWordCountTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/SimpleStreamingWordCountTest.java @@ -62,7 +62,6 @@ public class SimpleStreamingWordCountTest implements Serializable { @Test public void testFixedWindows() throws Exception { - SparkPipelineOptions options = commonOptions.withTmpCheckpointDir( checkpointParentDir.newFolder(getClass().getSimpleName())); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e43228c9/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/PAssertStreaming.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/PAssertStreaming.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/PAssertStreaming.java index 3bf1ef6..496735d 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/PAssertStreaming.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/PAssertStreaming.java @@ -81,13 +81,13 @@ public final class PAssertStreaming implements Serializable { } /** - * Default to stop immediately, useful for most tests except for the once that may require - * to finish writing checkpoints for example. + * Default to stop gracefully so that tests will finish processing even if slower for reasons + * such as a slow runtime environment. */ public static <T> EvaluationResult runAndAssertContents(Pipeline p, PCollection<T> actual, T[] expected) { - return runAndAssertContents(p, actual, expected, false); + return runAndAssertContents(p, actual, expected, true); } private static class AssertDoFn<T> extends OldDoFn<Iterable<T>, Void> {
