Update Watermarks even if a Reader is empty This ensures that the pipeline will make progress even if a reader stops producing elements.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/550978f6 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/550978f6 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/550978f6 Branch: refs/heads/master Commit: 550978f630ee1e4424e981ddc5ff5e89aa8c797d Parents: bfc527d Author: Thomas Groh <[email protected]> Authored: Mon Nov 7 12:59:06 2016 -0800 Committer: Thomas Groh <[email protected]> Committed: Tue Nov 8 10:17:13 2016 -0800 ---------------------------------------------------------------------- .../direct/UnboundedReadEvaluatorFactory.java | 6 ++- .../UnboundedReadEvaluatorFactoryTest.java | 47 +++++++++++++------- 2 files changed, 35 insertions(+), 18 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/550978f6/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java index e529088..fb09b3e 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java @@ -35,6 +35,7 @@ import org.apache.beam.sdk.io.UnboundedSource; import org.apache.beam.sdk.io.UnboundedSource.CheckpointMark; import org.apache.beam.sdk.io.UnboundedSource.UnboundedReader; import org.apache.beam.sdk.transforms.AppliedPTransform; +import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.GlobalWindow; import org.apache.beam.sdk.util.WindowedValue; @@ -143,12 +144,13 @@ class UnboundedReadEvaluatorFactory implements TransformEvaluatorFactory { // If the reader had no elements available, but the shard is not done, reuse it later resultBuilder.addUnprocessedElements( Collections.<WindowedValue<?>>singleton( - element.withValue( + WindowedValue.timestampedValueInGlobalWindow( UnboundedSourceShard.of( shard.getSource(), shard.getDeduplicator(), reader, - shard.getCheckpoint())))); + shard.getCheckpoint()), + reader.getWatermark()))); } } catch (IOException e) { if (reader != null) { http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/550978f6/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java index 9a7fec3..8d38275 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java @@ -49,7 +49,7 @@ import org.apache.beam.sdk.coders.AtomicCoder; import org.apache.beam.sdk.coders.BigEndianLongCoder; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.CoderException; -import org.apache.beam.sdk.io.CountingInput; +import org.apache.beam.sdk.coders.VarLongCoder; import org.apache.beam.sdk.io.CountingSource; import org.apache.beam.sdk.io.Read; import org.apache.beam.sdk.io.UnboundedSource; @@ -63,11 +63,11 @@ import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.GlobalWindow; import org.apache.beam.sdk.util.CoderUtils; +import org.apache.beam.sdk.util.VarInt; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.PCollection; import org.hamcrest.Matchers; import org.joda.time.DateTime; -import org.joda.time.Duration; import org.joda.time.Instant; import org.joda.time.ReadableInstant; import org.junit.Before; @@ -230,7 +230,7 @@ public class UnboundedReadEvaluatorFactoryTest { TestPipeline p = TestPipeline.create(); // Read with a very slow rate so by the second read there are no more elements PCollection<Long> pcollection = - p.apply(CountingInput.unbounded().withRate(1L, Duration.standardDays(1))); + p.apply(Read.from(new TestUnboundedSource<>(VarLongCoder.of(), 1L))); AppliedPTransform<?, ?, ?> sourceTransform = pcollection.getProducingTransformInternal(); when(context.createRootBundle()).thenReturn(bundleFactory.createRootBundle()); @@ -260,6 +260,7 @@ public class UnboundedReadEvaluatorFactoryTest { (WindowedValue<UnboundedSourceShard<Long, TestCheckpointMark>>) Iterables.getOnlyElement(result.getUnprocessedElements()); secondEvaluator.processElement(residual); + TransformResult secondResult = secondEvaluator.finishBundle(); // Sanity check that nothing was output (The test would have to run for more than a day to do @@ -268,11 +269,14 @@ public class UnboundedReadEvaluatorFactoryTest { secondOutput.commit(Instant.now()).getElements(), Matchers.<WindowedValue<Long>>emptyIterable()); - // Test that even though the reader produced no outputs, there is still a residual shard. - UnboundedSourceShard<Long, TestCheckpointMark> residualShard = - (UnboundedSourceShard<Long, TestCheckpointMark>) - Iterables.getOnlyElement(secondResult.getUnprocessedElements()).getValue(); - assertThat(residualShard.getExistingReader(), not(nullValue())); + // Test that even though the reader produced no outputs, there is still a residual shard with + // the updated watermark. + WindowedValue<UnboundedSourceShard<Long, TestCheckpointMark>> unprocessed = + (WindowedValue<UnboundedSourceShard<Long, TestCheckpointMark>>) + Iterables.getOnlyElement(secondResult.getUnprocessedElements()); + assertThat( + unprocessed.getTimestamp(), Matchers.<ReadableInstant>greaterThan(residual.getTimestamp())); + assertThat(unprocessed.getValue().getExistingReader(), not(nullValue())); } @Test @@ -377,6 +381,8 @@ public class UnboundedReadEvaluatorFactoryTest { } private static class TestUnboundedSource<T> extends UnboundedSource<T, TestCheckpointMark> { + private static int getWatermarkCalls = 0; + static int readerClosedCount; static int readerAdvancedCount; private final Coder<T> coder; @@ -398,8 +404,8 @@ public class UnboundedReadEvaluatorFactoryTest { @Override public UnboundedSource.UnboundedReader<T> createReader( - PipelineOptions options, TestCheckpointMark checkpointMark) { - return new TestUnboundedReader(elems); + PipelineOptions options, @Nullable TestCheckpointMark checkpointMark) { + return new TestUnboundedReader(elems, checkpointMark == null ? -1 : checkpointMark.index); } @Override @@ -425,9 +431,9 @@ public class UnboundedReadEvaluatorFactoryTest { private final List<T> elems; private int index; - public TestUnboundedReader(List<T> elems) { + public TestUnboundedReader(List<T> elems, int startIndex) { this.elems = elems; - this.index = -1; + this.index = startIndex; } @Override @@ -447,12 +453,13 @@ public class UnboundedReadEvaluatorFactoryTest { @Override public Instant getWatermark() { - return Instant.now(); + getWatermarkCalls++; + return new Instant(index + getWatermarkCalls); } @Override public CheckpointMark getCheckpointMark() { - return new TestCheckpointMark(); + return new TestCheckpointMark(index); } @Override @@ -488,6 +495,12 @@ public class UnboundedReadEvaluatorFactoryTest { } private static class TestCheckpointMark implements CheckpointMark { + final int index; + + private TestCheckpointMark(int index) { + this.index = index; + } + @Override public void finalizeCheckpoint() throws IOException {} @@ -497,13 +510,15 @@ public class UnboundedReadEvaluatorFactoryTest { TestCheckpointMark value, OutputStream outStream, org.apache.beam.sdk.coders.Coder.Context context) - throws CoderException, IOException {} + throws CoderException, IOException { + VarInt.encode(value.index, outStream); + } @Override public TestCheckpointMark decode( InputStream inStream, org.apache.beam.sdk.coders.Coder.Context context) throws CoderException, IOException { - return new TestCheckpointMark(); + return new TestCheckpointMark(VarInt.decodeInt(inStream)); } } }
