Repository: incubator-beam
Updated Branches:
  refs/heads/master a18c27488 -> 3b1c2a3cf


Add a Residual Shard for Empty Readers

When the UnboundedReadEvaluator starts, it checks to see if the reader
has any elements available. If not, it immediately terminates (without
taking a new checkpoint). It should also ensure that the result contains
a residual shard that will continue to read from the shard, utilizing
the same reader if possible.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/97c89498
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/97c89498
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/97c89498

Branch: refs/heads/master
Commit: 97c89498486817bf5f2d9488e815a481c755e806
Parents: a18c274
Author: Thomas Groh <tg...@google.com>
Authored: Tue Oct 18 10:57:53 2016 -0700
Committer: Dan Halperin <dhalp...@google.com>
Committed: Tue Oct 18 13:54:09 2016 -0700

----------------------------------------------------------------------
 .../direct/UnboundedReadEvaluatorFactory.java   | 10 ++++
 .../UnboundedReadEvaluatorFactoryTest.java      | 52 ++++++++++++++++++++
 2 files changed, 62 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/97c89498/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java
index 28f88b3..e529088 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java
@@ -139,6 +139,16 @@ class UnboundedReadEvaluatorFactory implements 
TransformEvaluatorFactory {
               .addUnprocessedElements(
                   Collections.singleton(
                       WindowedValue.timestampedValueInGlobalWindow(residual, 
watermark)));
+        } else if 
(reader.getWatermark().isBefore(BoundedWindow.TIMESTAMP_MAX_VALUE)) {
+          // If the reader had no elements available, but the shard is not 
done, reuse it later
+          resultBuilder.addUnprocessedElements(
+              Collections.<WindowedValue<?>>singleton(
+                  element.withValue(
+                      UnboundedSourceShard.of(
+                          shard.getSource(),
+                          shard.getDeduplicator(),
+                          reader,
+                          shard.getCheckpoint()))));
         }
       } catch (IOException e) {
         if (reader != null) {

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/97c89498/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java
 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java
index 76acb03..9a7fec3 100644
--- 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java
+++ 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java
@@ -49,6 +49,7 @@ import org.apache.beam.sdk.coders.AtomicCoder;
 import org.apache.beam.sdk.coders.BigEndianLongCoder;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.io.CountingInput;
 import org.apache.beam.sdk.io.CountingSource;
 import org.apache.beam.sdk.io.Read;
 import org.apache.beam.sdk.io.UnboundedSource;
@@ -66,6 +67,7 @@ import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.PCollection;
 import org.hamcrest.Matchers;
 import org.joda.time.DateTime;
+import org.joda.time.Duration;
 import org.joda.time.Instant;
 import org.joda.time.ReadableInstant;
 import org.junit.Before;
@@ -224,6 +226,56 @@ public class UnboundedReadEvaluatorFactoryTest {
   }
 
   @Test
+  public void noElementsAvailableReaderIncludedInResidual() throws Exception {
+    TestPipeline p = TestPipeline.create();
+    // Read with a very slow rate so by the second read there are no more 
elements
+    PCollection<Long> pcollection =
+        p.apply(CountingInput.unbounded().withRate(1L, 
Duration.standardDays(1)));
+    AppliedPTransform<?, ?, ?> sourceTransform = 
pcollection.getProducingTransformInternal();
+
+    
when(context.createRootBundle()).thenReturn(bundleFactory.createRootBundle());
+    Collection<CommittedBundle<?>> initialInputs =
+        new UnboundedReadEvaluatorFactory.InputProvider(context)
+            .getInitialInputs(sourceTransform, 1);
+
+    // Process the initial shard. This might produce some output, and will 
produce a residual shard
+    // which should produce no output when read from within the following day.
+    
when(context.createBundle(pcollection)).thenReturn(bundleFactory.createBundle(pcollection));
+    CommittedBundle<?> inputBundle = Iterables.getOnlyElement(initialInputs);
+    TransformEvaluator<UnboundedSourceShard<Long, TestCheckpointMark>> 
evaluator =
+        factory.forApplication(sourceTransform, inputBundle);
+    for (WindowedValue<?> value : inputBundle.getElements()) {
+      evaluator.processElement(
+          (WindowedValue<UnboundedSourceShard<Long, TestCheckpointMark>>) 
value);
+    }
+    TransformResult result = evaluator.finishBundle();
+
+    // Read from the residual of the first read. This should not produce any 
output, but should
+    // include a residual shard in the result.
+    UncommittedBundle<Long> secondOutput = bundleFactory.createBundle(longs);
+    when(context.createBundle(longs)).thenReturn(secondOutput);
+    TransformEvaluator<UnboundedSourceShard<Long, TestCheckpointMark>> 
secondEvaluator =
+        factory.forApplication(sourceTransform, inputBundle);
+    WindowedValue<UnboundedSourceShard<Long, TestCheckpointMark>> residual =
+        (WindowedValue<UnboundedSourceShard<Long, TestCheckpointMark>>)
+            Iterables.getOnlyElement(result.getUnprocessedElements());
+    secondEvaluator.processElement(residual);
+    TransformResult secondResult = secondEvaluator.finishBundle();
+
+    // Sanity check that nothing was output (The test would have to run for 
more than a day to do
+    // so correctly.)
+    assertThat(
+        secondOutput.commit(Instant.now()).getElements(),
+        Matchers.<WindowedValue<Long>>emptyIterable());
+
+    // Test that even though the reader produced no outputs, there is still a 
residual shard.
+    UnboundedSourceShard<Long, TestCheckpointMark> residualShard =
+        (UnboundedSourceShard<Long, TestCheckpointMark>)
+            
Iterables.getOnlyElement(secondResult.getUnprocessedElements()).getValue();
+    assertThat(residualShard.getExistingReader(), not(nullValue()));
+  }
+
+  @Test
   public void evaluatorReusesReader() throws Exception {
     ContiguousSet<Long> elems = ContiguousSet.create(Range.closed(0L, 20L), 
DiscreteDomain.longs());
     TestUnboundedSource<Long> source =

Reply via email to