Repository: incubator-beam Updated Branches: refs/heads/master 912500f13 -> ac252a7e1
Update Watermarks even if a Reader is empty This ensures that the pipeline will make progress even if a reader stops producing elements. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/ff7fe07b Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/ff7fe07b Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/ff7fe07b Branch: refs/heads/master Commit: ff7fe07be96de393b763e7b3d213734040aa3795 Parents: 912500f Author: Thomas Groh <[email protected]> Authored: Mon Nov 7 12:59:06 2016 -0800 Committer: Thomas Groh <[email protected]> Committed: Mon Nov 7 15:08:43 2016 -0800 ---------------------------------------------------------------------- .../direct/UnboundedReadEvaluatorFactory.java | 6 ++++-- .../UnboundedReadEvaluatorFactoryTest.java | 19 +++++++++++++------ 2 files changed, 17 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ff7fe07b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java index e529088..fb09b3e 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java @@ -35,6 +35,7 @@ import org.apache.beam.sdk.io.UnboundedSource; import org.apache.beam.sdk.io.UnboundedSource.CheckpointMark; import org.apache.beam.sdk.io.UnboundedSource.UnboundedReader; import org.apache.beam.sdk.transforms.AppliedPTransform; +import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.GlobalWindow; import org.apache.beam.sdk.util.WindowedValue; @@ -143,12 +144,13 @@ class UnboundedReadEvaluatorFactory implements TransformEvaluatorFactory { // If the reader had no elements available, but the shard is not done, reuse it later resultBuilder.addUnprocessedElements( Collections.<WindowedValue<?>>singleton( - element.withValue( + WindowedValue.timestampedValueInGlobalWindow( UnboundedSourceShard.of( shard.getSource(), shard.getDeduplicator(), reader, - shard.getCheckpoint())))); + shard.getCheckpoint()), + reader.getWatermark()))); } } catch (IOException e) { if (reader != null) { http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ff7fe07b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java index 9a7fec3..18c7cec 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java @@ -260,6 +260,7 @@ public class UnboundedReadEvaluatorFactoryTest { (WindowedValue<UnboundedSourceShard<Long, TestCheckpointMark>>) Iterables.getOnlyElement(result.getUnprocessedElements()); secondEvaluator.processElement(residual); + TransformResult secondResult = secondEvaluator.finishBundle(); // Sanity check that nothing was output (The test would have to run for more than a day to do @@ -268,11 +269,14 @@ public class UnboundedReadEvaluatorFactoryTest { secondOutput.commit(Instant.now()).getElements(), Matchers.<WindowedValue<Long>>emptyIterable()); - // Test that even though the reader produced no outputs, there is still a residual shard. - UnboundedSourceShard<Long, TestCheckpointMark> residualShard = - (UnboundedSourceShard<Long, TestCheckpointMark>) - Iterables.getOnlyElement(secondResult.getUnprocessedElements()).getValue(); - assertThat(residualShard.getExistingReader(), not(nullValue())); + // Test that even though the reader produced no outputs, there is still a residual shard with + // the updated watermark. + WindowedValue<UnboundedSourceShard<Long, TestCheckpointMark>> unprocessed = + (WindowedValue<UnboundedSourceShard<Long, TestCheckpointMark>>) + Iterables.getOnlyElement(secondResult.getUnprocessedElements()); + assertThat( + unprocessed.getTimestamp(), Matchers.<ReadableInstant>greaterThan(residual.getTimestamp())); + assertThat(unprocessed.getValue().getExistingReader(), not(nullValue())); } @Test @@ -377,6 +381,8 @@ public class UnboundedReadEvaluatorFactoryTest { } private static class TestUnboundedSource<T> extends UnboundedSource<T, TestCheckpointMark> { + private static int getWatermarkCalls = 0; + static int readerClosedCount; static int readerAdvancedCount; private final Coder<T> coder; @@ -447,7 +453,8 @@ public class UnboundedReadEvaluatorFactoryTest { @Override public Instant getWatermark() { - return Instant.now(); + getWatermarkCalls++; + return new Instant(index + getWatermarkCalls); } @Override
