Repository: incubator-beam
Updated Branches:
  refs/heads/master 0ddba6d8d -> ecbc64117


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7306e16b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java
 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java
index 77c0bcb..25642dd 100644
--- 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java
+++ 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java
@@ -19,7 +19,7 @@ package org.apache.beam.runners.direct;
 
 import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.hamcrest.Matchers.equalTo;
-import static org.hamcrest.Matchers.hasSize;
+import static org.hamcrest.Matchers.not;
 import static org.hamcrest.Matchers.nullValue;
 import static org.junit.Assert.assertThat;
 import static org.mockito.Mockito.mock;
@@ -28,16 +28,20 @@ import static org.mockito.Mockito.when;
 import com.google.common.collect.ContiguousSet;
 import com.google.common.collect.DiscreteDomain;
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
 import com.google.common.collect.Range;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.List;
 import java.util.NoSuchElementException;
 import javax.annotation.Nullable;
 import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
 import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle;
+import 
org.apache.beam.runners.direct.UnboundedReadDeduplicator.NeverDeduplicator;
+import 
org.apache.beam.runners.direct.UnboundedReadEvaluatorFactory.UnboundedSourceShard;
 import org.apache.beam.sdk.coders.AtomicCoder;
 import org.apache.beam.sdk.coders.BigEndianLongCoder;
 import org.apache.beam.sdk.coders.Coder;
@@ -68,16 +72,17 @@ import org.junit.runners.JUnit4;
 @RunWith(JUnit4.class)
 public class UnboundedReadEvaluatorFactoryTest {
   private PCollection<Long> longs;
-  private TransformEvaluatorFactory factory;
+  private UnboundedReadEvaluatorFactory factory;
   private EvaluationContext context;
   private UncommittedBundle<Long> output;
 
   private BundleFactory bundleFactory = ImmutableListBundleFactory.create();
 
+  private UnboundedSource<Long, ?> source;
+
   @Before
   public void setup() {
-    UnboundedSource<Long, ?> source =
-        CountingSource.unboundedWithTimestampFn(new LongToInstantFn());
+    source = CountingSource.unboundedWithTimestampFn(new LongToInstantFn());
     TestPipeline p = TestPipeline.create();
     longs = p.apply(Read.from(source));
 
@@ -89,49 +94,36 @@ public class UnboundedReadEvaluatorFactoryTest {
 
   @Test
   public void unboundedSourceInMemoryTransformEvaluatorProducesElements() 
throws Exception {
-    TransformEvaluator<?> evaluator =
-        factory.forApplication(longs.getProducingTransformInternal(), null);
+    
when(context.createRootBundle()).thenReturn(bundleFactory.createRootBundle());
 
-    TransformResult result = evaluator.finishBundle();
-    assertThat(
-        result.getWatermarkHold(), 
Matchers.<ReadableInstant>lessThan(DateTime.now().toInstant()));
-    assertThat(
-        output.commit(Instant.now()).getElements(),
-        containsInAnyOrder(
-            tgw(1L), tgw(2L), tgw(4L), tgw(8L), tgw(9L), tgw(7L), tgw(6L), 
tgw(5L), tgw(3L),
-            tgw(0L)));
-  }
+    Collection<CommittedBundle<?>> initialInputs =
+        factory.getInitialInputs(longs.getProducingTransformInternal());
 
-  /**
-   * Demonstrate that multiple sequential creations will produce additional 
elements if the source
-   * can provide them.
-   */
-  @Test
-  public void 
unboundedSourceInMemoryTransformEvaluatorMultipleSequentialCalls() throws 
Exception {
-    TransformEvaluator<?> evaluator =
-        factory.forApplication(longs.getProducingTransformInternal(), null);
+    CommittedBundle<?> inputShards = Iterables.getOnlyElement(initialInputs);
+    UnboundedSourceShard<Long, ?> inputShard =
+        (UnboundedSourceShard<Long, ?>)
+            Iterables.getOnlyElement(inputShards.getElements()).getValue();
+    TransformEvaluator<? super UnboundedSourceShard<Long, ?>> evaluator =
+        factory.forApplication(
+            longs.getProducingTransformInternal(), inputShards);
 
+    evaluator.processElement((WindowedValue) 
Iterables.getOnlyElement(inputShards.getElements()));
     TransformResult result = evaluator.finishBundle();
+
+    WindowedValue<?> residual = 
Iterables.getOnlyElement(result.getUnprocessedElements());
+    assertThat(
+        residual.getTimestamp(), 
Matchers.<ReadableInstant>lessThan(DateTime.now().toInstant()));
+    UnboundedSourceShard<Long, ?> residualShard =
+        (UnboundedSourceShard<Long, ?>) residual.getValue();
     assertThat(
-        result.getWatermarkHold(), 
Matchers.<ReadableInstant>lessThan(DateTime.now().toInstant()));
+        residualShard.getSource(),
+        Matchers.<UnboundedSource<Long, ?>>equalTo(inputShard.getSource()));
+    assertThat(residualShard.getCheckpoint(), not(nullValue()));
     assertThat(
         output.commit(Instant.now()).getElements(),
         containsInAnyOrder(
             tgw(1L), tgw(2L), tgw(4L), tgw(8L), tgw(9L), tgw(7L), tgw(6L), 
tgw(5L), tgw(3L),
             tgw(0L)));
-
-    UncommittedBundle<Long> secondOutput = bundleFactory.createBundle(longs);
-    when(context.createBundle(longs)).thenReturn(secondOutput);
-    TransformEvaluator<?> secondEvaluator =
-        factory.forApplication(longs.getProducingTransformInternal(), null);
-    TransformResult secondResult = secondEvaluator.finishBundle();
-    assertThat(
-        secondResult.getWatermarkHold(),
-        Matchers.<ReadableInstant>lessThan(DateTime.now().toInstant()));
-    assertThat(
-        secondOutput.commit(Instant.now()).getElements(),
-        containsInAnyOrder(tgw(11L), tgw(12L), tgw(14L), tgw(18L), tgw(19L), 
tgw(17L), tgw(16L),
-            tgw(15L), tgw(13L), tgw(10L)));
   }
 
   @Test
@@ -148,18 +140,32 @@ public class UnboundedReadEvaluatorFactoryTest {
     PCollection<Long> pcollection = p.apply(Read.from(source));
     AppliedPTransform<?, ?, ?> sourceTransform = 
pcollection.getProducingTransformInternal();
 
+    
when(context.createRootBundle()).thenReturn(bundleFactory.createRootBundle());
+    Collection<CommittedBundle<?>> initialInputs = 
factory.getInitialInputs(sourceTransform);
+
     UncommittedBundle<Long> output = bundleFactory.createBundle(pcollection);
     when(context.createBundle(pcollection)).thenReturn(output);
-    TransformEvaluator<?> evaluator = factory.forApplication(sourceTransform, 
null);
+    CommittedBundle<?> inputBundle = Iterables.getOnlyElement(initialInputs);
+    TransformEvaluator<UnboundedSourceShard<Long, TestCheckpointMark>> 
evaluator =
+        factory.forApplication(sourceTransform, inputBundle);
 
-    evaluator.finishBundle();
+    for (WindowedValue<?> value : inputBundle.getElements()) {
+      evaluator.processElement(
+          (WindowedValue<UnboundedSourceShard<Long, TestCheckpointMark>>) 
value);
+    }
+    TransformResult result = evaluator.finishBundle();
     assertThat(
         output.commit(Instant.now()).getElements(),
         containsInAnyOrder(tgw(1L), tgw(2L), tgw(4L), tgw(3L), tgw(0L)));
 
     UncommittedBundle<Long> secondOutput = bundleFactory.createBundle(longs);
     when(context.createBundle(longs)).thenReturn(secondOutput);
-    TransformEvaluator<?> secondEvaluator = 
factory.forApplication(sourceTransform, null);
+    TransformEvaluator<UnboundedSourceShard<Long, TestCheckpointMark>> 
secondEvaluator =
+        factory.forApplication(sourceTransform, inputBundle);
+    WindowedValue<UnboundedSourceShard<Long, TestCheckpointMark>> residual =
+        (WindowedValue<UnboundedSourceShard<Long, TestCheckpointMark>>)
+            Iterables.getOnlyElement(result.getUnprocessedElements());
+    secondEvaluator.processElement(residual);
     secondEvaluator.finishBundle();
     assertThat(
         secondOutput.commit(Instant.now()).getElements(),
@@ -167,10 +173,8 @@ public class UnboundedReadEvaluatorFactoryTest {
   }
 
   @Test
-  public void evaluatorClosesReaderAfterOutputCount() throws Exception {
-    ContiguousSet<Long> elems = ContiguousSet.create(
-        Range.closed(0L, 20L * 
UnboundedReadEvaluatorFactory.MAX_READER_REUSE_COUNT),
-        DiscreteDomain.longs());
+  public void evaluatorReusesReader() throws Exception {
+    ContiguousSet<Long> elems = ContiguousSet.create(Range.closed(0L, 20L), 
DiscreteDomain.longs());
     TestUnboundedSource<Long> source =
         new TestUnboundedSource<>(BigEndianLongCoder.of(), elems.toArray(new 
Long[0]));
 
@@ -178,86 +182,80 @@ public class UnboundedReadEvaluatorFactoryTest {
     PCollection<Long> pcollection = p.apply(Read.from(source));
     AppliedPTransform<?, ?, ?> sourceTransform = 
pcollection.getProducingTransformInternal();
 
+    
when(context.createRootBundle()).thenReturn(bundleFactory.createRootBundle());
     UncommittedBundle<Long> output = bundleFactory.createBundle(pcollection);
     when(context.createBundle(pcollection)).thenReturn(output);
 
-    for (int i = 0; i < UnboundedReadEvaluatorFactory.MAX_READER_REUSE_COUNT + 
1; i++) {
-      TransformEvaluator<?> evaluator = 
factory.forApplication(sourceTransform, null);
-      evaluator.finishBundle();
-    }
-    assertThat(TestUnboundedSource.readerClosedCount, equalTo(1));
-  }
-
-  @Test
-  public void evaluatorReusesReaderBeforeCount() throws Exception {
-    TestUnboundedSource<Long> source =
-        new TestUnboundedSource<>(BigEndianLongCoder.of(), 1L, 2L, 3L);
-
-    TestPipeline p = TestPipeline.create();
-    PCollection<Long> pcollection = p.apply(Read.from(source));
-    AppliedPTransform<?, ?, ?> sourceTransform = 
pcollection.getProducingTransformInternal();
+    WindowedValue<UnboundedSourceShard<Long, TestCheckpointMark>> shard =
+        WindowedValue.valueInGlobalWindow(
+            UnboundedSourceShard.unstarted(source, 
NeverDeduplicator.create()));
+    CommittedBundle<UnboundedSourceShard<Long, TestCheckpointMark>> 
inputBundle =
+        bundleFactory
+            .<UnboundedSourceShard<Long, TestCheckpointMark>>createRootBundle()
+            .add(shard)
+            .commit(Instant.now());
+    UnboundedReadEvaluatorFactory factory =
+        new UnboundedReadEvaluatorFactory(context, 1.0 /* Always reuse */);
+    factory.getInitialInputs(pcollection.getProducingTransformInternal());
+    TransformEvaluator<UnboundedSourceShard<Long, TestCheckpointMark>> 
evaluator =
+        factory.forApplication(sourceTransform, inputBundle);
+    evaluator.processElement(shard);
+    TransformResult result = evaluator.finishBundle();
 
-    UncommittedBundle<Long> output = bundleFactory.createBundle(pcollection);
-    when(context.createBundle(pcollection)).thenReturn(output);
+    CommittedBundle<UnboundedSourceShard<Long, TestCheckpointMark>> residual =
+        inputBundle.withElements(
+        (Iterable<WindowedValue<UnboundedSourceShard<Long, 
TestCheckpointMark>>>)
+            result.getUnprocessedElements());
 
-    TransformEvaluator<?> evaluator = factory.forApplication(sourceTransform, 
null);
-    evaluator.finishBundle();
-    CommittedBundle<Long> committed = output.commit(Instant.now());
-    assertThat(ImmutableList.copyOf(committed.getElements()), hasSize(3));
-    assertThat(TestUnboundedSource.readerClosedCount, equalTo(0));
-    assertThat(TestUnboundedSource.readerAdvancedCount, equalTo(4));
+    TransformEvaluator<UnboundedSourceShard<Long, TestCheckpointMark>> 
secondEvaluator =
+        factory.forApplication(sourceTransform, residual);
+    
secondEvaluator.processElement(Iterables.getOnlyElement(residual.getElements()));
+    secondEvaluator.finishBundle();
 
-    evaluator = factory.forApplication(sourceTransform, null);
-    evaluator.finishBundle();
     assertThat(TestUnboundedSource.readerClosedCount, equalTo(0));
-    // Tried to advance again, even with no elements
-    assertThat(TestUnboundedSource.readerAdvancedCount, equalTo(5));
   }
 
   @Test
-  public void evaluatorNoElementsReusesReaderAlways() throws Exception {
-    TestUnboundedSource<Long> source = new 
TestUnboundedSource<>(BigEndianLongCoder.of());
+  public void evaluatorClosesReaderAndResumesFromCheckpoint() throws Exception 
{
+    ContiguousSet<Long> elems = ContiguousSet.create(Range.closed(0L, 20L), 
DiscreteDomain.longs());
+    TestUnboundedSource<Long> source =
+        new TestUnboundedSource<>(BigEndianLongCoder.of(), elems.toArray(new 
Long[0]));
 
     TestPipeline p = TestPipeline.create();
     PCollection<Long> pcollection = p.apply(Read.from(source));
     AppliedPTransform<?, ?, ?> sourceTransform = 
pcollection.getProducingTransformInternal();
 
+    
when(context.createRootBundle()).thenReturn(bundleFactory.createRootBundle());
     UncommittedBundle<Long> output = bundleFactory.createBundle(pcollection);
     when(context.createBundle(pcollection)).thenReturn(output);
 
-    for (int i = 0; i < 2 * 
UnboundedReadEvaluatorFactory.MAX_READER_REUSE_COUNT; i++) {
-      TransformEvaluator<?> evaluator = 
factory.forApplication(sourceTransform, null);
-      evaluator.finishBundle();
-    }
-    assertThat(TestUnboundedSource.readerClosedCount, equalTo(0));
-    assertThat(TestUnboundedSource.readerAdvancedCount,
-        equalTo(2 * UnboundedReadEvaluatorFactory.MAX_READER_REUSE_COUNT));
-  }
-
-  // TODO: Once the source is split into multiple sources before evaluating, 
this test will have to
-  // be updated.
-  /**
-   * Demonstrate that only a single unfinished instance of TransformEvaluator 
can be created at a
-   * time, with other calls returning an empty evaluator.
-   */
-  @Test
-  public void unboundedSourceWithMultipleSimultaneousEvaluatorsIndependent() 
throws Exception {
-    TransformEvaluator<?> evaluator =
-        factory.forApplication(longs.getProducingTransformInternal(), null);
+    WindowedValue<UnboundedSourceShard<Long, TestCheckpointMark>> shard =
+        WindowedValue.valueInGlobalWindow(
+            UnboundedSourceShard.unstarted(source, 
NeverDeduplicator.create()));
+    CommittedBundle<UnboundedSourceShard<Long, TestCheckpointMark>> 
inputBundle =
+        bundleFactory
+            .<UnboundedSourceShard<Long, TestCheckpointMark>>createRootBundle()
+            .add(shard)
+            .commit(Instant.now());
+    UnboundedReadEvaluatorFactory factory =
+        new UnboundedReadEvaluatorFactory(context, 0.0 /* never reuse */);
+    factory.getInitialInputs(pcollection.getProducingTransformInternal());
+    TransformEvaluator<UnboundedSourceShard<Long, TestCheckpointMark>> 
evaluator =
+        factory.forApplication(sourceTransform, inputBundle);
+    evaluator.processElement(shard);
+    TransformResult result = evaluator.finishBundle();
 
-    TransformEvaluator<?> secondEvaluator =
-        factory.forApplication(longs.getProducingTransformInternal(), null);
+    CommittedBundle<UnboundedSourceShard<Long, TestCheckpointMark>> residual =
+        inputBundle.withElements(
+            (Iterable<WindowedValue<UnboundedSourceShard<Long, 
TestCheckpointMark>>>)
+                result.getUnprocessedElements());
 
-    assertThat(secondEvaluator, nullValue());
-    TransformResult result = evaluator.finishBundle();
+    TransformEvaluator<UnboundedSourceShard<Long, TestCheckpointMark>> 
secondEvaluator =
+        factory.forApplication(sourceTransform, residual);
+    
secondEvaluator.processElement(Iterables.getOnlyElement(residual.getElements()));
+    secondEvaluator.finishBundle();
 
-    assertThat(
-        result.getWatermarkHold(), 
Matchers.<ReadableInstant>lessThan(DateTime.now().toInstant()));
-    assertThat(
-        output.commit(Instant.now()).getElements(),
-        containsInAnyOrder(
-            tgw(1L), tgw(2L), tgw(4L), tgw(8L), tgw(9L), tgw(7L), tgw(6L), 
tgw(5L), tgw(3L),
-            tgw(0L)));
+    assertThat(TestUnboundedSource.readerClosedCount, equalTo(2));
   }
 
   /**

Reply via email to