Make CreateStream a TimestampedValue Source. fixup! a nicer DSL to construct CreateStream.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/62ddca63 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/62ddca63 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/62ddca63 Branch: refs/heads/master Commit: 62ddca638ae76918f3a0a5fac4ca8fbdb1c7fb9a Parents: 123f482 Author: Sela <[email protected]> Authored: Mon Feb 27 23:35:10 2017 +0200 Committer: Sela <[email protected]> Committed: Wed Mar 1 00:18:09 2017 +0200 ---------------------------------------------------------------------- .../beam/runners/spark/io/CreateStream.java | 66 ++++++++++++----- .../streaming/StreamingTransformTranslator.java | 26 +++++-- .../runners/spark/SparkPipelineStateTest.java | 2 +- .../translation/streaming/CreateStreamTest.java | 76 ++++++++------------ .../streaming/TrackStreamingSourcesTest.java | 22 +++--- 5 files changed, 111 insertions(+), 81 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/62ddca63/runners/spark/src/main/java/org/apache/beam/runners/spark/io/CreateStream.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/CreateStream.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/CreateStream.java index 70784f1..f2e0bb3 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/CreateStream.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/CreateStream.java @@ -19,11 +19,16 @@ package org.apache.beam.runners.spark.io; import static com.google.common.base.Preconditions.checkArgument; +import com.google.common.collect.Lists; import java.util.Arrays; +import java.util.Collections; import java.util.Deque; import java.util.LinkedList; +import java.util.List; import java.util.Queue; import org.apache.beam.runners.spark.util.GlobalWatermarkHolder.SparkWatermarks; +import org.apache.beam.sdk.coders.CannotProvideCoderException; +import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.util.WindowingStrategy; @@ -83,20 +88,22 @@ import org.joda.time.Instant; public final class CreateStream<T> extends PTransform<PBegin, PCollection<T>> { private final Duration batchInterval; - private final Instant initialSystemTime; - private final Queue<Iterable<T>> batches = new LinkedList<>(); + private final Queue<Iterable<TimestampedValue<T>>> batches = new LinkedList<>(); private final Deque<SparkWatermarks> times = new LinkedList<>(); + private final Coder<T> coder; + private Instant initialSystemTime; private Instant lowWatermark = BoundedWindow.TIMESTAMP_MIN_VALUE; //for test purposes. - private CreateStream(Duration batchInterval, Instant initialSystemTime) { + private CreateStream(Duration batchInterval, Instant initialSystemTime, Coder<T> coder) { this.batchInterval = batchInterval; this.initialSystemTime = initialSystemTime; + this.coder = coder; } /** Set the batch interval for the stream. */ - public static <T> CreateStream<T> withBatchInterval(Duration batchInterval) { - return new CreateStream<>(batchInterval, new Instant(0)); + public static <T> CreateStream<T> of(Coder<T> coder, Duration batchInterval) { + return new CreateStream<>(batchInterval, new Instant(0), coder); } /** @@ -104,25 +111,46 @@ public final class CreateStream<T> extends PTransform<PBegin, PCollection<T>> { * This is backed by a {@link Queue} so stream input order would keep the population order (FIFO). */ @SafeVarargs - public final CreateStream<T> nextBatch(T... batchElements) { + public final CreateStream<T> nextBatch(TimestampedValue<T>... batchElements) { // validate timestamps if timestamped elements. - for (T element: batchElements) { - if (element instanceof TimestampedValue) { - TimestampedValue timestampedValue = (TimestampedValue) element; - checkArgument( - timestampedValue.getTimestamp().isBefore(BoundedWindow.TIMESTAMP_MAX_VALUE), - "Elements must have timestamps before %s. Got: %s", - BoundedWindow.TIMESTAMP_MAX_VALUE, - timestampedValue.getTimestamp()); - } + for (TimestampedValue<T> element: batchElements) { + TimestampedValue timestampedValue = (TimestampedValue) element; + checkArgument( + timestampedValue.getTimestamp().isBefore(BoundedWindow.TIMESTAMP_MAX_VALUE), + "Elements must have timestamps before %s. Got: %s", + BoundedWindow.TIMESTAMP_MAX_VALUE, + timestampedValue.getTimestamp()); } batches.offer(Arrays.asList(batchElements)); return this; } + /** + * For non-timestamped elements. + */ + @SafeVarargs + public final CreateStream<T> nextBatch(T... batchElements) { + List<TimestampedValue<T>> timestamped = Lists.newArrayListWithCapacity(batchElements.length); + // as TimestampedValue. + for (T element: batchElements) { + timestamped.add(TimestampedValue.atMinimumTimestamp(element)); + } + batches.offer(timestamped); + return this; + } + + /** + * Adds an empty batch. + */ + public CreateStream<T> emptyBatch() { + batches.offer(Collections.<TimestampedValue<T>>emptyList()); + return this; + } + /** Set the initial synchronized processing time. */ public CreateStream<T> initialSystemTimeAt(Instant initialSystemTime) { - return new CreateStream<>(batchInterval, initialSystemTime); + this.initialSystemTime = initialSystemTime; + return this; } /** @@ -160,7 +188,7 @@ public final class CreateStream<T> extends PTransform<PBegin, PCollection<T>> { } /** Get the underlying queue representing the mock stream of micro-batches. */ - public Queue<Iterable<T>> getBatches() { + public Queue<Iterable<TimestampedValue<T>>> getBatches() { return batches; } @@ -178,4 +206,8 @@ public final class CreateStream<T> extends PTransform<PBegin, PCollection<T>> { input.getPipeline(), WindowingStrategy.globalDefault(), PCollection.IsBounded.UNBOUNDED); } + @Override + protected Coder<T> getDefaultOutputCoder() throws CannotProvideCoderException { + return coder; + } } http://git-wip-us.apache.org/repos/asf/beam/blob/62ddca63/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java index fc98781..4a07741 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java @@ -31,6 +31,7 @@ import java.util.List; import java.util.Map; import java.util.Queue; import java.util.concurrent.LinkedBlockingQueue; +import javax.annotation.Nonnull; import org.apache.beam.runners.spark.aggregators.NamedAggregators; import org.apache.beam.runners.spark.aggregators.SparkAggregators; import org.apache.beam.runners.spark.coders.CoderHelpers; @@ -67,6 +68,8 @@ import org.apache.beam.sdk.transforms.GroupByKey; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.GlobalWindow; +import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.transforms.windowing.WindowFn; import org.apache.beam.sdk.util.CombineFnUtil; @@ -75,6 +78,7 @@ import org.apache.beam.sdk.util.WindowingStrategy; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.TaggedPValue; +import org.apache.beam.sdk.values.TimestampedValue; import org.apache.beam.sdk.values.TupleTag; import org.apache.spark.Accumulator; import org.apache.spark.api.java.JavaPairRDD; @@ -87,6 +91,7 @@ import org.apache.spark.streaming.api.java.JavaPairDStream; import org.apache.spark.streaming.api.java.JavaStreamingContext; + /** * Supports translation between a Beam transform, and Spark's operations on DStreams. */ @@ -127,14 +132,25 @@ final class StreamingTransformTranslator { public void evaluate(CreateStream<T> transform, EvaluationContext context) { Coder<T> coder = context.getOutput(transform).getCoder(); JavaStreamingContext jssc = context.getStreamingContext(); - Queue<Iterable<T>> values = transform.getBatches(); - WindowedValue.ValueOnlyWindowedValueCoder<T> windowCoder = - WindowedValue.getValueOnlyCoder(coder); + Queue<Iterable<TimestampedValue<T>>> values = transform.getBatches(); + WindowedValue.FullWindowedValueCoder<T> windowCoder = + WindowedValue.FullWindowedValueCoder.of(coder, GlobalWindow.Coder.INSTANCE); // create the DStream from queue. Queue<JavaRDD<WindowedValue<T>>> rddQueue = new LinkedBlockingQueue<>(); - for (Iterable<T> v : values) { + for (Iterable<TimestampedValue<T>> tv : values) { Iterable<WindowedValue<T>> windowedValues = - Iterables.transform(v, WindowingHelpers.<T>windowValueFunction()); + Iterables.transform( + tv, + new com.google.common.base.Function<TimestampedValue<T>, WindowedValue<T>>() { + @Override + public WindowedValue<T> apply(@Nonnull TimestampedValue<T> timestampedValue) { + return WindowedValue.of( + timestampedValue.getValue(), + timestampedValue.getTimestamp(), + GlobalWindow.INSTANCE, + PaneInfo.NO_FIRING); + } + }); JavaRDD<WindowedValue<T>> rdd = jssc.sparkContext() .parallelize(CoderHelpers.toByteArrays(windowedValues, windowCoder)) http://git-wip-us.apache.org/repos/asf/beam/blob/62ddca63/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkPipelineStateTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkPipelineStateTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkPipelineStateTest.java index c856203..37a201c 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkPipelineStateTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkPipelineStateTest.java @@ -75,7 +75,7 @@ public class SparkPipelineStateTest implements Serializable { private PTransform<PBegin, PCollection<String>> getValues(final SparkPipelineOptions options) { return options.isStreaming() - ? CreateStream.<String>withBatchInterval(Duration.millis(1)).nextBatch("one", "two") + ? CreateStream.of(StringUtf8Coder.of(), Duration.millis(1)).nextBatch("one", "two") : Create.of("one", "two"); } http://git-wip-us.apache.org/repos/asf/beam/blob/62ddca63/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/CreateStreamTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/CreateStreamTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/CreateStreamTest.java index f2783a1..ff77535 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/CreateStreamTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/CreateStreamTest.java @@ -32,10 +32,8 @@ import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.coders.VarIntCoder; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.transforms.Count; -import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.Flatten; import org.apache.beam.sdk.transforms.GroupByKey; -import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.transforms.Sum; import org.apache.beam.sdk.transforms.Values; @@ -83,9 +81,9 @@ public class CreateStreamTest implements Serializable { public void testLateDataAccumulating() throws IOException { Pipeline p = pipelineRule.createPipeline(); Instant instant = new Instant(0); - CreateStream<TimestampedValue<Integer>> source = - CreateStream.<TimestampedValue<Integer>>withBatchInterval(pipelineRule.batchDuration()) - .nextBatch() + CreateStream<Integer> source = + CreateStream.of(VarIntCoder.of(), pipelineRule.batchDuration()) + .emptyBatch() .advanceWatermarkForNextBatch(instant.plus(Duration.standardMinutes(6))) .nextBatch( TimestampedValue.of(1, instant), @@ -104,8 +102,7 @@ public class CreateStreamTest implements Serializable { TimestampedValue.of(-3, instant)); PCollection<Integer> windowed = p - .apply(source).setCoder(TimestampedValue.TimestampedValueCoder.of(VarIntCoder.of())) - .apply(ParDo.of(new OnlyValue<Integer>())) + .apply(source) .apply(Window.<Integer>into(FixedWindows.of(Duration.standardMinutes(5))).triggering( AfterWatermark.pastEndOfWindow() .withEarlyFirings(AfterProcessingTime.pastFirstElementInPane() @@ -156,8 +153,8 @@ public class CreateStreamTest implements Serializable { @Test public void testDiscardingMode() throws IOException { Pipeline p = pipelineRule.createPipeline(); - CreateStream<TimestampedValue<String>> source = - CreateStream.<TimestampedValue<String>>withBatchInterval(pipelineRule.batchDuration()) + CreateStream<String> source = + CreateStream.of(StringUtf8Coder.of(), pipelineRule.batchDuration()) .nextBatch( TimestampedValue.of("firstPane", new Instant(100)), TimestampedValue.of("alsoFirstPane", new Instant(200))) @@ -172,8 +169,7 @@ public class CreateStreamTest implements Serializable { FixedWindows windowFn = FixedWindows.of(Duration.millis(1000L)); Duration allowedLateness = Duration.millis(5000L); PCollection<String> values = - p.apply(source).setCoder(TimestampedValue.TimestampedValueCoder.of(StringUtf8Coder.of())) - .apply(ParDo.of(new OnlyValue<String>())) + p.apply(source) .apply( Window.<String>into(windowFn) .triggering( @@ -207,9 +203,9 @@ public class CreateStreamTest implements Serializable { public void testFirstElementLate() throws IOException { Pipeline p = pipelineRule.createPipeline(); Instant lateElementTimestamp = new Instant(-1_000_000); - CreateStream<TimestampedValue<String>> source = - CreateStream.<TimestampedValue<String>>withBatchInterval(pipelineRule.batchDuration()) - .nextBatch() + CreateStream<String> source = + CreateStream.of(StringUtf8Coder.of(), pipelineRule.batchDuration()) + .emptyBatch() .advanceWatermarkForNextBatch(new Instant(0)) .nextBatch( TimestampedValue.of("late", lateElementTimestamp), @@ -219,8 +215,6 @@ public class CreateStreamTest implements Serializable { FixedWindows windowFn = FixedWindows.of(Duration.millis(1000L)); Duration allowedLateness = Duration.millis(5000L); PCollection<String> values = p.apply(source) - .setCoder(TimestampedValue.TimestampedValueCoder.of(StringUtf8Coder.of())) - .apply(ParDo.of(new OnlyValue<String>())) .apply(Window.<String>into(windowFn).triggering(DefaultTrigger.of()) .discardingFiredPanes() .withAllowedLateness(allowedLateness)) @@ -243,8 +237,8 @@ public class CreateStreamTest implements Serializable { public void testElementsAtAlmostPositiveInfinity() throws IOException { Pipeline p = pipelineRule.createPipeline(); Instant endOfGlobalWindow = GlobalWindow.INSTANCE.maxTimestamp(); - CreateStream<TimestampedValue<String>> source = - CreateStream.<TimestampedValue<String>>withBatchInterval(pipelineRule.batchDuration()) + CreateStream<String> source = + CreateStream.of(StringUtf8Coder.of(), pipelineRule.batchDuration()) .nextBatch( TimestampedValue.of("foo", endOfGlobalWindow), TimestampedValue.of("bar", endOfGlobalWindow)) @@ -252,8 +246,6 @@ public class CreateStreamTest implements Serializable { FixedWindows windows = FixedWindows.of(Duration.standardHours(6)); PCollection<String> windowedValues = p.apply(source) - .setCoder(TimestampedValue.TimestampedValueCoder.of(StringUtf8Coder.of())) - .apply(ParDo.of(new OnlyValue<String>())) .apply(Window.<String>into(windows)) .apply(WithKeys.<Integer, String>of(1)) .apply(GroupByKey.<Integer, String>create()) @@ -270,16 +262,16 @@ public class CreateStreamTest implements Serializable { public void testMultipleStreams() throws IOException { Pipeline p = pipelineRule.createPipeline(); CreateStream<String> source = - CreateStream.<String>withBatchInterval(pipelineRule.batchDuration()) + CreateStream.of(StringUtf8Coder.of(), pipelineRule.batchDuration()) .nextBatch("foo", "bar") .advanceNextBatchWatermarkToInfinity(); CreateStream<Integer> other = - CreateStream.<Integer>withBatchInterval(pipelineRule.batchDuration()) + CreateStream.of(VarIntCoder.of(), pipelineRule.batchDuration()) .nextBatch(1, 2, 3, 4) .advanceNextBatchWatermarkToInfinity(); PCollection<String> createStrings = - p.apply("CreateStrings", source).setCoder(StringUtf8Coder.of()) + p.apply("CreateStrings", source) .apply("WindowStrings", Window.<String>triggering(AfterPane.elementCountAtLeast(2)) .withAllowedLateness(Duration.ZERO) @@ -287,7 +279,7 @@ public class CreateStreamTest implements Serializable { PAssert.that(createStrings).containsInAnyOrder("foo", "bar"); PCollection<Integer> createInts = - p.apply("CreateInts", other).setCoder(VarIntCoder.of()) + p.apply("CreateInts", other) .apply("WindowInts", Window.<Integer>triggering(AfterPane.elementCountAtLeast(4)) .withAllowedLateness(Duration.ZERO) @@ -301,18 +293,18 @@ public class CreateStreamTest implements Serializable { public void testFlattenedWithWatermarkHold() throws IOException { Pipeline p = pipelineRule.createPipeline(); Instant instant = new Instant(0); - CreateStream<TimestampedValue<Integer>> source1 = - CreateStream.<TimestampedValue<Integer>>withBatchInterval(pipelineRule.batchDuration()) - .nextBatch() + CreateStream<Integer> source1 = + CreateStream.of(VarIntCoder.of(), pipelineRule.batchDuration()) + .emptyBatch() .advanceWatermarkForNextBatch(instant.plus(Duration.standardMinutes(5))) .nextBatch( TimestampedValue.of(1, instant), TimestampedValue.of(2, instant), TimestampedValue.of(3, instant)) .advanceWatermarkForNextBatch(instant.plus(Duration.standardMinutes(10))); - CreateStream<TimestampedValue<Integer>> source2 = - CreateStream.<TimestampedValue<Integer>>withBatchInterval(pipelineRule.batchDuration()) - .nextBatch() + CreateStream<Integer> source2 = + CreateStream.of(VarIntCoder.of(), pipelineRule.batchDuration()) + .emptyBatch() .advanceWatermarkForNextBatch(instant.plus(Duration.standardMinutes(1))) .nextBatch( TimestampedValue.of(4, instant)) @@ -322,15 +314,13 @@ public class CreateStreamTest implements Serializable { .advanceWatermarkForNextBatch(instant.plus(Duration.standardMinutes(5))); PCollection<Integer> windowed1 = p - .apply(source1).setCoder(TimestampedValue.TimestampedValueCoder.of(VarIntCoder.of())) - .apply(ParDo.of(new OnlyValue<Integer>())) + .apply(source1) .apply(Window.<Integer>into(FixedWindows.of(Duration.standardMinutes(5))) .triggering(AfterWatermark.pastEndOfWindow()) .accumulatingFiredPanes() .withAllowedLateness(Duration.ZERO)); PCollection<Integer> windowed2 = p - .apply(source2).setCoder(TimestampedValue.TimestampedValueCoder.of(VarIntCoder.of())) - .apply(ParDo.of(new OnlyValue<Integer>())) + .apply(source2) .apply(Window.<Integer>into(FixedWindows.of(Duration.standardMinutes(5))) .triggering(AfterWatermark.pastEndOfWindow()) .accumulatingFiredPanes() @@ -352,8 +342,8 @@ public class CreateStreamTest implements Serializable { @Test public void testElementAtPositiveInfinityThrows() { - CreateStream<TimestampedValue<Integer>> source = - CreateStream.<TimestampedValue<Integer>>withBatchInterval(pipelineRule.batchDuration()) + CreateStream<Integer> source = + CreateStream.of(VarIntCoder.of(), pipelineRule.batchDuration()) .nextBatch(TimestampedValue.of(-1, BoundedWindow.TIMESTAMP_MAX_VALUE.minus(1L))); thrown.expect(IllegalArgumentException.class); source.nextBatch(TimestampedValue.of(1, BoundedWindow.TIMESTAMP_MAX_VALUE)); @@ -362,7 +352,7 @@ public class CreateStreamTest implements Serializable { @Test public void testAdvanceWatermarkNonMonotonicThrows() { CreateStream<Integer> source = - CreateStream.<Integer>withBatchInterval(pipelineRule.batchDuration()) + CreateStream.of(VarIntCoder.of(), pipelineRule.batchDuration()) .advanceWatermarkForNextBatch(new Instant(0L)); thrown.expect(IllegalArgumentException.class); source.advanceWatermarkForNextBatch(new Instant(-1L)); @@ -371,19 +361,9 @@ public class CreateStreamTest implements Serializable { @Test public void testAdvanceWatermarkEqualToPositiveInfinityThrows() { CreateStream<Integer> source = - CreateStream.<Integer>withBatchInterval(pipelineRule.batchDuration()) + CreateStream.of(VarIntCoder.of(), pipelineRule.batchDuration()) .advanceWatermarkForNextBatch(BoundedWindow.TIMESTAMP_MAX_VALUE.minus(1L)); thrown.expect(IllegalArgumentException.class); source.advanceWatermarkForNextBatch(BoundedWindow.TIMESTAMP_MAX_VALUE); } - - private static class OnlyValue<T> extends DoFn<TimestampedValue<T>, T> { - - OnlyValue() { } - - @ProcessElement - public void emitTimestampedValue(ProcessContext c) { - c.outputWithTimestamp(c.element().getValue(), c.element().getTimestamp()); - } - } } http://git-wip-us.apache.org/repos/asf/beam/blob/62ddca63/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/TrackStreamingSourcesTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/TrackStreamingSourcesTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/TrackStreamingSourcesTest.java index 32cef7e..b181a04 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/TrackStreamingSourcesTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/TrackStreamingSourcesTest.java @@ -77,11 +77,11 @@ public class TrackStreamingSourcesTest { Pipeline p = Pipeline.create(options); CreateStream<Integer> emptyStream = - CreateStream.<Integer>withBatchInterval( - Duration.millis(options.getBatchIntervalMillis())).nextBatch(); + CreateStream.of( + VarIntCoder.of(), + Duration.millis(options.getBatchIntervalMillis())).emptyBatch(); - p.apply(emptyStream).setCoder(VarIntCoder.of()) - .apply(ParDo.of(new PassthroughFn<>())); + p.apply(emptyStream).apply(ParDo.of(new PassthroughFn<>())); p.traverseTopologically(new StreamingSourceTracker(jssc, p, ParDo.Bound.class, 0)); assertThat(StreamingSourceTracker.numAssertions, equalTo(1)); @@ -97,14 +97,16 @@ public class TrackStreamingSourcesTest { Pipeline p = Pipeline.create(options); CreateStream<Integer> queueStream1 = - CreateStream.<Integer>withBatchInterval( - Duration.millis(options.getBatchIntervalMillis())).nextBatch(); + CreateStream.of( + VarIntCoder.of(), + Duration.millis(options.getBatchIntervalMillis())).emptyBatch(); CreateStream<Integer> queueStream2 = - CreateStream.<Integer>withBatchInterval( - Duration.millis(options.getBatchIntervalMillis())).nextBatch(); + CreateStream.of( + VarIntCoder.of(), + Duration.millis(options.getBatchIntervalMillis())).emptyBatch(); - PCollection<Integer> pcol1 = p.apply(queueStream1).setCoder(VarIntCoder.of()); - PCollection<Integer> pcol2 = p.apply(queueStream2).setCoder(VarIntCoder.of()); + PCollection<Integer> pcol1 = p.apply(queueStream1); + PCollection<Integer> pcol2 = p.apply(queueStream2); PCollection<Integer> flattened = PCollectionList.of(pcol1).and(pcol2).apply(Flatten.<Integer>pCollections()); flattened.apply(ParDo.of(new PassthroughFn<>()));
