[BEAM-2408] Fix watermark emission in Flink UnboundedSourceWrapper Before, there was no call to setNextWatermarkTimer() in case the source had multiple Readers.
This also adds a test for watermark emission to UnboundedSourceWrapperTest. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/8f4fa439 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/8f4fa439 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/8f4fa439 Branch: refs/heads/DSL_SQL Commit: 8f4fa4394609504348ada988948f0a1386d54c0e Parents: c1dc8f5 Author: Aljoscha Krettek <[email protected]> Authored: Mon Jun 5 15:46:26 2017 +0200 Committer: Aljoscha Krettek <[email protected]> Committed: Wed Jun 7 19:43:11 2017 +0200 ---------------------------------------------------------------------- .../streaming/io/UnboundedSourceWrapper.java | 2 + .../streaming/UnboundedSourceWrapperTest.java | 111 ++++++++++++++++++- 2 files changed, 112 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/8f4fa439/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java index ec21699..6055a43 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java @@ -283,6 +283,8 @@ public class UnboundedSourceWrapper< } } + setNextWatermarkTimer(this.runtimeContext); + // a flag telling us whether any of the localReaders had data // if no reader had data, sleep for bit boolean hadData = false; http://git-wip-us.apache.org/repos/asf/beam/blob/8f4fa439/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/UnboundedSourceWrapperTest.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/UnboundedSourceWrapperTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/UnboundedSourceWrapperTest.java index 716e71d..bb2be60 100644 --- a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/UnboundedSourceWrapperTest.java +++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/UnboundedSourceWrapperTest.java @@ -21,11 +21,14 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import com.google.common.base.Joiner; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.HashSet; import java.util.Set; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.options.PipelineOptions; @@ -45,6 +48,7 @@ import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles; import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness; import org.apache.flink.util.InstantiationUtil; import org.apache.flink.util.OutputTag; +import org.joda.time.Instant; import org.junit.Test; import org.junit.experimental.runners.Enclosed; import org.junit.runner.RunWith; @@ -88,7 +92,7 @@ public class UnboundedSourceWrapperTest { * If numSplits > numTasks the source has one source will manage multiple readers. */ @Test - public void testReaders() throws Exception { + public void testValueEmission() throws Exception { final int numElements = 20; final Object checkpointLock = new Object(); PipelineOptions options = PipelineOptionsFactory.create(); @@ -164,6 +168,111 @@ public class UnboundedSourceWrapperTest { } /** + * Creates a {@link UnboundedSourceWrapper} that has one or multiple readers per source. + * If numSplits > numTasks the source has one source will manage multiple readers. + * + * <p>This test verifies that watermark are correctly forwarded. + */ + @Test(timeout = 30_000) + public void testWatermarkEmission() throws Exception { + final int numElements = 500; + final Object checkpointLock = new Object(); + PipelineOptions options = PipelineOptionsFactory.create(); + + // this source will emit exactly NUM_ELEMENTS across all parallel readers, + // afterwards it will stall. We check whether we also receive NUM_ELEMENTS + // elements later. + TestCountingSource source = new TestCountingSource(numElements); + UnboundedSourceWrapper<KV<Integer, Integer>, TestCountingSource.CounterMark> flinkWrapper = + new UnboundedSourceWrapper<>("stepName", options, source, numSplits); + + assertEquals(numSplits, flinkWrapper.getSplitSources().size()); + + final StreamSource<WindowedValue< + ValueWithRecordId<KV<Integer, Integer>>>, + UnboundedSourceWrapper< + KV<Integer, Integer>, + TestCountingSource.CounterMark>> sourceOperator = new StreamSource<>(flinkWrapper); + + final AbstractStreamOperatorTestHarness< + WindowedValue<ValueWithRecordId<KV<Integer, Integer>>>> testHarness = + new AbstractStreamOperatorTestHarness<>( + sourceOperator, + numTasks /* max parallelism */, + numTasks /* parallelism */, + 0 /* subtask index */); + + testHarness.setProcessingTime(Instant.now().getMillis()); + testHarness.setTimeCharacteristic(TimeCharacteristic.EventTime); + + final ConcurrentLinkedQueue<Object> caughtExceptions = new ConcurrentLinkedQueue<>(); + + // use the AtomicBoolean just for the set()/get() functionality for communicating + // with the outer Thread + final AtomicBoolean seenWatermark = new AtomicBoolean(false); + + Thread sourceThread = new Thread() { + @Override + public void run() { + try { + testHarness.open(); + sourceOperator.run(checkpointLock, + new TestStreamStatusMaintainer(), + new Output<StreamRecord<WindowedValue<ValueWithRecordId<KV<Integer, Integer>>>>>() { + + @Override + public void emitWatermark(Watermark watermark) { + if (watermark.getTimestamp() >= numElements / 2) { + seenWatermark.set(true); + } + } + + @Override + public <X> void collect(OutputTag<X> outputTag, StreamRecord<X> streamRecord) { + } + + @Override + public void emitLatencyMarker(LatencyMarker latencyMarker) { + } + + @Override + public void collect(StreamRecord<WindowedValue< + ValueWithRecordId<KV<Integer, Integer>>>> windowedValueStreamRecord) { + } + + @Override + public void close() { + + } + }); + } catch (Exception e) { + System.out.println("Caught exception: " + e); + caughtExceptions.add(e); + } + } + }; + + sourceThread.start(); + + while (true) { + if (!caughtExceptions.isEmpty()) { + fail("Caught exception(s): " + Joiner.on(",").join(caughtExceptions)); + } + if (seenWatermark.get()) { + break; + } + Thread.sleep(10); + + // need to advance this so that the watermark timers in the source wrapper fire + testHarness.setProcessingTime(Instant.now().getMillis()); + } + + sourceOperator.cancel(); + sourceThread.join(); + } + + + /** * Verify that snapshot/restore work as expected. We bring up a source and cancel * after seeing a certain number of elements. Then we snapshot that source, * bring up a completely new source that we restore from the snapshot and verify
