Repository: incubator-beam
Updated Branches:
  refs/heads/master 912500f13 -> ac252a7e1


Update Watermarks even if a Reader is empty

This ensures that the pipeline will make progress even if a reader stops
producing elements.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/ff7fe07b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/ff7fe07b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/ff7fe07b

Branch: refs/heads/master
Commit: ff7fe07be96de393b763e7b3d213734040aa3795
Parents: 912500f
Author: Thomas Groh <[email protected]>
Authored: Mon Nov 7 12:59:06 2016 -0800
Committer: Thomas Groh <[email protected]>
Committed: Mon Nov 7 15:08:43 2016 -0800

----------------------------------------------------------------------
 .../direct/UnboundedReadEvaluatorFactory.java    |  6 ++++--
 .../UnboundedReadEvaluatorFactoryTest.java       | 19 +++++++++++++------
 2 files changed, 17 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ff7fe07b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java
index e529088..fb09b3e 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java
@@ -35,6 +35,7 @@ import org.apache.beam.sdk.io.UnboundedSource;
 import org.apache.beam.sdk.io.UnboundedSource.CheckpointMark;
 import org.apache.beam.sdk.io.UnboundedSource.UnboundedReader;
 import org.apache.beam.sdk.transforms.AppliedPTransform;
+import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.util.WindowedValue;
@@ -143,12 +144,13 @@ class UnboundedReadEvaluatorFactory implements 
TransformEvaluatorFactory {
           // If the reader had no elements available, but the shard is not 
done, reuse it later
           resultBuilder.addUnprocessedElements(
               Collections.<WindowedValue<?>>singleton(
-                  element.withValue(
+                  WindowedValue.timestampedValueInGlobalWindow(
                       UnboundedSourceShard.of(
                           shard.getSource(),
                           shard.getDeduplicator(),
                           reader,
-                          shard.getCheckpoint()))));
+                          shard.getCheckpoint()),
+                      reader.getWatermark())));
         }
       } catch (IOException e) {
         if (reader != null) {

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ff7fe07b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java
 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java
index 9a7fec3..18c7cec 100644
--- 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java
+++ 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java
@@ -260,6 +260,7 @@ public class UnboundedReadEvaluatorFactoryTest {
         (WindowedValue<UnboundedSourceShard<Long, TestCheckpointMark>>)
             Iterables.getOnlyElement(result.getUnprocessedElements());
     secondEvaluator.processElement(residual);
+
     TransformResult secondResult = secondEvaluator.finishBundle();
 
     // Sanity check that nothing was output (The test would have to run for 
more than a day to do
@@ -268,11 +269,14 @@ public class UnboundedReadEvaluatorFactoryTest {
         secondOutput.commit(Instant.now()).getElements(),
         Matchers.<WindowedValue<Long>>emptyIterable());
 
-    // Test that even though the reader produced no outputs, there is still a 
residual shard.
-    UnboundedSourceShard<Long, TestCheckpointMark> residualShard =
-        (UnboundedSourceShard<Long, TestCheckpointMark>)
-            
Iterables.getOnlyElement(secondResult.getUnprocessedElements()).getValue();
-    assertThat(residualShard.getExistingReader(), not(nullValue()));
+    // Test that even though the reader produced no outputs, there is still a 
residual shard with
+    // the updated watermark.
+    WindowedValue<UnboundedSourceShard<Long, TestCheckpointMark>> unprocessed =
+        (WindowedValue<UnboundedSourceShard<Long, TestCheckpointMark>>)
+            Iterables.getOnlyElement(secondResult.getUnprocessedElements());
+    assertThat(
+        unprocessed.getTimestamp(), 
Matchers.<ReadableInstant>greaterThan(residual.getTimestamp()));
+    assertThat(unprocessed.getValue().getExistingReader(), not(nullValue()));
   }
 
   @Test
@@ -377,6 +381,8 @@ public class UnboundedReadEvaluatorFactoryTest {
   }
 
   private static class TestUnboundedSource<T> extends UnboundedSource<T, 
TestCheckpointMark> {
+    private static int getWatermarkCalls = 0;
+
     static int readerClosedCount;
     static int readerAdvancedCount;
     private final Coder<T> coder;
@@ -447,7 +453,8 @@ public class UnboundedReadEvaluatorFactoryTest {
 
       @Override
       public Instant getWatermark() {
-        return Instant.now();
+        getWatermarkCalls++;
+        return new Instant(index + getWatermarkCalls);
       }
 
       @Override

Reply via email to