Add multi stream and flattened stream tests. Misc. fixups.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/24ab6053 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/24ab6053 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/24ab6053 Branch: refs/heads/master Commit: 24ab60538697b284b915157e08218de2e1e42f7b Parents: da5f849 Author: Sela <[email protected]> Authored: Mon Feb 20 00:14:39 2017 +0200 Committer: Sela <[email protected]> Committed: Wed Mar 1 00:18:02 2017 +0200 ---------------------------------------------------------------------- .../beam/runners/spark/TestSparkRunner.java | 8 +- .../beam/runners/spark/io/CreateStream.java | 41 +++++ .../translation/streaming/CreateStreamTest.java | 161 ++++++++++++------- 3 files changed, 146 insertions(+), 64 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/24ab6053/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java index 24bc038..985f75d 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java @@ -24,7 +24,8 @@ import static org.hamcrest.Matchers.is; import java.io.File; import java.io.IOException; import org.apache.beam.runners.core.UnboundedReadFromBoundedSource; -import org.apache.beam.runners.spark.aggregators.AccumulatorSingleton; +import org.apache.beam.runners.spark.aggregators.AggregatorsAccumulator; +import org.apache.beam.runners.spark.metrics.SparkMetricsContainer; import org.apache.beam.runners.spark.util.GlobalWatermarkHolder; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.io.BoundedReadFromUnboundedSource; @@ -118,8 +119,9 @@ public final class TestSparkRunner extends PipelineRunner<SparkPipelineResult> { long timeout = sparkOptions.getForcedTimeout(); SparkPipelineResult result = null; try { - // clear state of Accumulators and Aggregators. - AccumulatorSingleton.clear(); + // clear state of Aggregators, Metrics and Watermarks. + AggregatorsAccumulator.clear(); + SparkMetricsContainer.clear(); GlobalWatermarkHolder.clear(); TestPipelineOptions testPipelineOptions = pipeline.getOptions().as(TestPipelineOptions.class); http://git-wip-us.apache.org/repos/asf/beam/blob/24ab6053/runners/spark/src/main/java/org/apache/beam/runners/spark/io/CreateStream.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/CreateStream.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/CreateStream.java index 2149372..70784f1 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/CreateStream.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/CreateStream.java @@ -37,6 +37,47 @@ import org.joda.time.Instant; /** * Create an input stream from Queue. For SparkRunner tests only. * + * <p>To properly compose a stream of micro-batches with their Watermarks, please keep in mind + * that eventually there a two queues here - one for batches and another for Watermarks. + * + * <p>While both queues advance according to Spark's batch-interval, there is a slight difference + * in how data is pushed into the stream compared to the advancement of Watermarks since Watermarks + * advance onBatchCompleted hook call so if you'd want to set the watermark advance for a specific + * batch it should be called before that batch. + * Also keep in mind that being a queue that is polled per batch interval, if there is a need to + * "hold" the same Watermark without advancing it it should be stated explicitly or the Watermark + * will advance as soon as it can (in the next batch completed hook). + * + * <p>Example 1: + * + * {@code + * CreateStream.<TimestampedValue<String>>withBatchInterval(batchDuration) + * .nextBatch( + * TimestampedValue.of("foo", endOfGlobalWindow), + * TimestampedValue.of("bar", endOfGlobalWindow)) + * .advanceNextBatchWatermarkToInfinity(); + * } + * The first batch will see the default start-of-time WM of + * {@link BoundedWindow#TIMESTAMP_MIN_VALUE} and any following batch will see + * the end-of-time WM {@link BoundedWindow#TIMESTAMP_MAX_VALUE}. + * + * <p>Example 2: + * + * {@code + * CreateStream.<TimestampedValue<String>>withBatchInterval(batchDuration) + * .nextBatch( + * TimestampedValue.of(1, instant)) + * .advanceWatermarkForNextBatch(instant.plus(Duration.standardMinutes(20))) + * .nextBatch( + * TimestampedValue.of(2, instant)) + * .nextBatch( + * TimestampedValue.of(3, instant)) + * .advanceWatermarkForNextBatch(instant.plus(Duration.standardMinutes(30))) + * } + * The first batch will see the start-of-time WM and the second will see the advanced (+20 min.) WM. + * The third WM will see the WM advanced to +30 min, because this is the next advancement of the WM + * regardless of where it ws called in the construction of CreateStream. + * //TODO: write a proper Builder enforcing all those rules mentioned. * @param <T> stream type. */ public final class CreateStream<T> extends PTransform<PBegin, PCollection<T>> { http://git-wip-us.apache.org/repos/asf/beam/blob/24ab6053/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/CreateStreamTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/CreateStreamTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/CreateStreamTest.java index 0cb33ab..9ee5cc5 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/CreateStreamTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/CreateStreamTest.java @@ -52,6 +52,7 @@ import org.apache.beam.sdk.transforms.windowing.IntervalWindow; import org.apache.beam.sdk.transforms.windowing.Never; import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionList; import org.apache.beam.sdk.values.TimestampedValue; import org.joda.time.Duration; import org.joda.time.Instant; @@ -68,6 +69,9 @@ import org.junit.rules.TestName; * <p>Since Spark is a micro-batch engine, and will process any test-sized input * within the same (first) batch, it is important to make sure inputs are ingested across * micro-batches using {@link org.apache.spark.streaming.dstream.QueueInputDStream}. + * This test suite uses {@link CreateStream} to construct such + * {@link org.apache.spark.streaming.dstream.QueueInputDStream} and advance the system's WMs. + * //TODO: add synchronized/processing time trigger. */ public class CreateStreamTest implements Serializable { @@ -161,29 +165,6 @@ public class CreateStreamTest implements Serializable { p.run(); } -// @Test -// @Category({NeedsRunner.class, UsesTestStream.class}) -// public void testProcessingTimeTrigger() { -// TestStream<Long> source = TestStream.create(VarLongCoder.of()) -// .addElements(TimestampedValue.of(1L, new Instant(1000L)), -// TimestampedValue.of(2L, new Instant(2000L))) -// .advanceProcessingTime(Duration.standardMinutes(12)) -// .addElements(TimestampedValue.of(3L, new Instant(3000L))) -// .advanceProcessingTime(Duration.standardMinutes(6)) -// .advanceWatermarkToInfinity(); -// -// PCollection<Long> sum = p.apply(source) -// .apply(Window.<Long>triggering(AfterWatermark.pastEndOfWindow() -// .withEarlyFirings(AfterProcessingTime.pastFirstElementInPane() -// .plusDelayOf(Duration.standardMinutes(5)))).accumulatingFiredPanes() -// .withAllowedLateness(Duration.ZERO)) -// .apply(Sum.longsGlobally()); -// -// PAssert.that(sum).inEarlyGlobalWindowPanes().containsInAnyOrder(3L, 6L); -// -// p.run(); -// } - @Test public void testDiscardingMode() throws IOException { SparkPipelineOptions options = commonOptions.withTmpCheckpointDir(checkpointParentDir); @@ -249,7 +230,7 @@ public class CreateStreamTest implements Serializable { CreateStream<TimestampedValue<String>> source = CreateStream.<TimestampedValue<String>>withBatchInterval(batchDuration) .nextBatch() - .advanceWatermarkForNextBatch(new Instant(-1_000_000)) + .advanceWatermarkForNextBatch(new Instant(0)) .nextBatch( TimestampedValue.of("late", lateElementTimestamp), TimestampedValue.of("onTime", new Instant(100))) @@ -268,8 +249,9 @@ public class CreateStreamTest implements Serializable { .apply(Values.<Iterable<String>>create()) .apply(Flatten.<String>iterables()); - //TODO: empty panes do not emmit anything so Spark won't evaluate an "empty" assertion. -// PAssert.that(values).inWindow(windowFn.assignWindow(lateElementTimestamp)).empty(); + PAssert.that(values) + .inWindow(windowFn.assignWindow(lateElementTimestamp)) + .empty(); PAssert.that(values) .inWindow(windowFn.assignWindow(new Instant(100))) .containsInAnyOrder("onTime"); @@ -308,40 +290,97 @@ public class CreateStreamTest implements Serializable { p.run(); } -// @Test -// public void testMultipleStreams() throws IOException { -// SparkPipelineOptions options = commonOptions.withTmpCheckpointDir(checkpointParentDir); -// Pipeline p = Pipeline.create(options); -// options.setJobName(testName.getMethodName()); -// Duration batchDuration = Duration.millis(options.getBatchIntervalMillis()); -// -// CreateStream<String> source = -// CreateStream.<String>withBatchInterval(batchDuration) -// .nextBatch("foo", "bar").advanceWatermarkForNextBatch(new Instant(100)) -// .nextBatch().advanceNextBatchWatermarkToInfinity(); -// -//// CreateStream<Integer> other = -//// CreateStream.<Integer>withBatchInterval(batchDuration) -//// .nextBatch(1, 2, 3, 4) -//// .advanceNextBatchWatermarkToInfinity(); -// -// PCollection<String> createStrings = -// p.apply("CreateStrings", source).setCoder(StringUtf8Coder.of()) -// .apply("WindowStrings", -// Window.<String>triggering(AfterPane.elementCountAtLeast(2)) -// .withAllowedLateness(Duration.ZERO) -// .accumulatingFiredPanes()); -// PAssert.that(createStrings).containsInAnyOrder("foo", "bar"); -//// PCollection<Integer> createInts = -//// p.apply("CreateInts", other).setCoder(VarIntCoder.of()) -//// .apply("WindowInts", -//// Window.<Integer>triggering(AfterPane.elementCountAtLeast(4)) -//// .withAllowedLateness(Duration.ZERO) -//// .accumulatingFiredPanes()); -//// PAssert.that(createInts).containsInAnyOrder(1, 2, 3, 4); -// -// p.run(); -// } + @Test + public void testMultipleStreams() throws IOException { + SparkPipelineOptions options = commonOptions.withTmpCheckpointDir(checkpointParentDir); + Pipeline p = Pipeline.create(options); + options.setJobName(testName.getMethodName()); + Duration batchDuration = Duration.millis(options.getBatchIntervalMillis()); + + CreateStream<String> source = + CreateStream.<String>withBatchInterval(batchDuration) + .nextBatch("foo", "bar") + .advanceNextBatchWatermarkToInfinity(); + CreateStream<Integer> other = + CreateStream.<Integer>withBatchInterval(batchDuration) + .nextBatch(1, 2, 3, 4) + .advanceNextBatchWatermarkToInfinity(); + + PCollection<String> createStrings = + p.apply("CreateStrings", source).setCoder(StringUtf8Coder.of()) + .apply("WindowStrings", + Window.<String>triggering(AfterPane.elementCountAtLeast(2)) + .withAllowedLateness(Duration.ZERO) + .accumulatingFiredPanes()); + PAssert.that(createStrings).containsInAnyOrder("foo", "bar"); + + PCollection<Integer> createInts = + p.apply("CreateInts", other).setCoder(VarIntCoder.of()) + .apply("WindowInts", + Window.<Integer>triggering(AfterPane.elementCountAtLeast(4)) + .withAllowedLateness(Duration.ZERO) + .accumulatingFiredPanes()); + PAssert.that(createInts).containsInAnyOrder(1, 2, 3, 4); + + p.run(); + } + + @Test + public void testFlattenedWithWatermarkHold() throws IOException { + SparkPipelineOptions options = commonOptions.withTmpCheckpointDir(checkpointParentDir); + Pipeline p = Pipeline.create(options); + options.setJobName(testName.getMethodName()); + Duration batchDuration = Duration.millis(options.getBatchIntervalMillis()); + + Instant instant = new Instant(0); + CreateStream<TimestampedValue<Integer>> source1 = + CreateStream.<TimestampedValue<Integer>>withBatchInterval(batchDuration) + .nextBatch() + .advanceWatermarkForNextBatch(instant.plus(Duration.standardMinutes(5))) + .nextBatch( + TimestampedValue.of(1, instant), + TimestampedValue.of(2, instant), + TimestampedValue.of(3, instant)) + .advanceWatermarkForNextBatch(instant.plus(Duration.standardMinutes(10))); + CreateStream<TimestampedValue<Integer>> source2 = + CreateStream.<TimestampedValue<Integer>>withBatchInterval(batchDuration) + .nextBatch() + .advanceWatermarkForNextBatch(instant.plus(Duration.standardMinutes(1))) + .nextBatch( + TimestampedValue.of(4, instant)) + .advanceWatermarkForNextBatch(instant.plus(Duration.standardMinutes(2))) + .nextBatch( + TimestampedValue.of(5, instant)) + .advanceWatermarkForNextBatch(instant.plus(Duration.standardMinutes(5))); + + PCollection<Integer> windowed1 = p + .apply(source1).setCoder(TimestampedValue.TimestampedValueCoder.of(VarIntCoder.of())) + .apply(ParDo.of(new OnlyValue<Integer>())) + .apply(Window.<Integer>into(FixedWindows.of(Duration.standardMinutes(5))) + .triggering(AfterWatermark.pastEndOfWindow()) + .accumulatingFiredPanes() + .withAllowedLateness(Duration.ZERO)); + PCollection<Integer> windowed2 = p + .apply(source2).setCoder(TimestampedValue.TimestampedValueCoder.of(VarIntCoder.of())) + .apply(ParDo.of(new OnlyValue<Integer>())) + .apply(Window.<Integer>into(FixedWindows.of(Duration.standardMinutes(5))) + .triggering(AfterWatermark.pastEndOfWindow()) + .accumulatingFiredPanes() + .withAllowedLateness(Duration.ZERO)); + + PCollectionList<Integer> pCollectionList = PCollectionList.of(windowed1).and(windowed2); + PCollection<Integer> flattened = pCollectionList.apply(Flatten.<Integer>pCollections()); + PCollection<Integer> triggered = flattened + .apply(WithKeys.<Integer, Integer>of(1)) + .apply(GroupByKey.<Integer, Integer>create()) + .apply(Values.<Iterable<Integer>>create()) + .apply(Flatten.<Integer>iterables()); + + IntervalWindow window = new IntervalWindow(instant, instant.plus(Duration.standardMinutes(5L))); + PAssert.that(triggered).inOnTimePane(window).containsInAnyOrder(1, 2, 3, 4, 5); + + p.run(); + } @Test public void testElementAtPositiveInfinityThrows() { @@ -378,7 +417,7 @@ public class CreateStreamTest implements Serializable { OnlyValue() { } @ProcessElement - public void onlyValue(ProcessContext c) { + public void emitTimestampedValue(ProcessContext c) { c.outputWithTimestamp(c.element().getValue(), c.element().getTimestamp()); } }
