Repository: incubator-beam Updated Branches: refs/heads/master 0ddba6d8d -> ecbc64117
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7306e16b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java index 77c0bcb..25642dd 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java @@ -19,7 +19,7 @@ package org.apache.beam.runners.direct; import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.hasSize; +import static org.hamcrest.Matchers.not; import static org.hamcrest.Matchers.nullValue; import static org.junit.Assert.assertThat; import static org.mockito.Mockito.mock; @@ -28,16 +28,20 @@ import static org.mockito.Mockito.when; import com.google.common.collect.ContiguousSet; import com.google.common.collect.DiscreteDomain; import com.google.common.collect.ImmutableList; +import com.google.common.collect.Iterables; import com.google.common.collect.Range; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.util.Arrays; +import java.util.Collection; import java.util.List; import java.util.NoSuchElementException; import javax.annotation.Nullable; import org.apache.beam.runners.direct.DirectRunner.CommittedBundle; import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle; +import org.apache.beam.runners.direct.UnboundedReadDeduplicator.NeverDeduplicator; +import org.apache.beam.runners.direct.UnboundedReadEvaluatorFactory.UnboundedSourceShard; import org.apache.beam.sdk.coders.AtomicCoder; import org.apache.beam.sdk.coders.BigEndianLongCoder; import org.apache.beam.sdk.coders.Coder; @@ -68,16 +72,17 @@ import org.junit.runners.JUnit4; @RunWith(JUnit4.class) public class UnboundedReadEvaluatorFactoryTest { private PCollection<Long> longs; - private TransformEvaluatorFactory factory; + private UnboundedReadEvaluatorFactory factory; private EvaluationContext context; private UncommittedBundle<Long> output; private BundleFactory bundleFactory = ImmutableListBundleFactory.create(); + private UnboundedSource<Long, ?> source; + @Before public void setup() { - UnboundedSource<Long, ?> source = - CountingSource.unboundedWithTimestampFn(new LongToInstantFn()); + source = CountingSource.unboundedWithTimestampFn(new LongToInstantFn()); TestPipeline p = TestPipeline.create(); longs = p.apply(Read.from(source)); @@ -89,49 +94,36 @@ public class UnboundedReadEvaluatorFactoryTest { @Test public void unboundedSourceInMemoryTransformEvaluatorProducesElements() throws Exception { - TransformEvaluator<?> evaluator = - factory.forApplication(longs.getProducingTransformInternal(), null); + when(context.createRootBundle()).thenReturn(bundleFactory.createRootBundle()); - TransformResult result = evaluator.finishBundle(); - assertThat( - result.getWatermarkHold(), Matchers.<ReadableInstant>lessThan(DateTime.now().toInstant())); - assertThat( - output.commit(Instant.now()).getElements(), - containsInAnyOrder( - tgw(1L), tgw(2L), tgw(4L), tgw(8L), tgw(9L), tgw(7L), tgw(6L), tgw(5L), tgw(3L), - tgw(0L))); - } + Collection<CommittedBundle<?>> initialInputs = + factory.getInitialInputs(longs.getProducingTransformInternal()); - /** - * Demonstrate that multiple sequential creations will produce additional elements if the source - * can provide them. - */ - @Test - public void unboundedSourceInMemoryTransformEvaluatorMultipleSequentialCalls() throws Exception { - TransformEvaluator<?> evaluator = - factory.forApplication(longs.getProducingTransformInternal(), null); + CommittedBundle<?> inputShards = Iterables.getOnlyElement(initialInputs); + UnboundedSourceShard<Long, ?> inputShard = + (UnboundedSourceShard<Long, ?>) + Iterables.getOnlyElement(inputShards.getElements()).getValue(); + TransformEvaluator<? super UnboundedSourceShard<Long, ?>> evaluator = + factory.forApplication( + longs.getProducingTransformInternal(), inputShards); + evaluator.processElement((WindowedValue) Iterables.getOnlyElement(inputShards.getElements())); TransformResult result = evaluator.finishBundle(); + + WindowedValue<?> residual = Iterables.getOnlyElement(result.getUnprocessedElements()); + assertThat( + residual.getTimestamp(), Matchers.<ReadableInstant>lessThan(DateTime.now().toInstant())); + UnboundedSourceShard<Long, ?> residualShard = + (UnboundedSourceShard<Long, ?>) residual.getValue(); assertThat( - result.getWatermarkHold(), Matchers.<ReadableInstant>lessThan(DateTime.now().toInstant())); + residualShard.getSource(), + Matchers.<UnboundedSource<Long, ?>>equalTo(inputShard.getSource())); + assertThat(residualShard.getCheckpoint(), not(nullValue())); assertThat( output.commit(Instant.now()).getElements(), containsInAnyOrder( tgw(1L), tgw(2L), tgw(4L), tgw(8L), tgw(9L), tgw(7L), tgw(6L), tgw(5L), tgw(3L), tgw(0L))); - - UncommittedBundle<Long> secondOutput = bundleFactory.createBundle(longs); - when(context.createBundle(longs)).thenReturn(secondOutput); - TransformEvaluator<?> secondEvaluator = - factory.forApplication(longs.getProducingTransformInternal(), null); - TransformResult secondResult = secondEvaluator.finishBundle(); - assertThat( - secondResult.getWatermarkHold(), - Matchers.<ReadableInstant>lessThan(DateTime.now().toInstant())); - assertThat( - secondOutput.commit(Instant.now()).getElements(), - containsInAnyOrder(tgw(11L), tgw(12L), tgw(14L), tgw(18L), tgw(19L), tgw(17L), tgw(16L), - tgw(15L), tgw(13L), tgw(10L))); } @Test @@ -148,18 +140,32 @@ public class UnboundedReadEvaluatorFactoryTest { PCollection<Long> pcollection = p.apply(Read.from(source)); AppliedPTransform<?, ?, ?> sourceTransform = pcollection.getProducingTransformInternal(); + when(context.createRootBundle()).thenReturn(bundleFactory.createRootBundle()); + Collection<CommittedBundle<?>> initialInputs = factory.getInitialInputs(sourceTransform); + UncommittedBundle<Long> output = bundleFactory.createBundle(pcollection); when(context.createBundle(pcollection)).thenReturn(output); - TransformEvaluator<?> evaluator = factory.forApplication(sourceTransform, null); + CommittedBundle<?> inputBundle = Iterables.getOnlyElement(initialInputs); + TransformEvaluator<UnboundedSourceShard<Long, TestCheckpointMark>> evaluator = + factory.forApplication(sourceTransform, inputBundle); - evaluator.finishBundle(); + for (WindowedValue<?> value : inputBundle.getElements()) { + evaluator.processElement( + (WindowedValue<UnboundedSourceShard<Long, TestCheckpointMark>>) value); + } + TransformResult result = evaluator.finishBundle(); assertThat( output.commit(Instant.now()).getElements(), containsInAnyOrder(tgw(1L), tgw(2L), tgw(4L), tgw(3L), tgw(0L))); UncommittedBundle<Long> secondOutput = bundleFactory.createBundle(longs); when(context.createBundle(longs)).thenReturn(secondOutput); - TransformEvaluator<?> secondEvaluator = factory.forApplication(sourceTransform, null); + TransformEvaluator<UnboundedSourceShard<Long, TestCheckpointMark>> secondEvaluator = + factory.forApplication(sourceTransform, inputBundle); + WindowedValue<UnboundedSourceShard<Long, TestCheckpointMark>> residual = + (WindowedValue<UnboundedSourceShard<Long, TestCheckpointMark>>) + Iterables.getOnlyElement(result.getUnprocessedElements()); + secondEvaluator.processElement(residual); secondEvaluator.finishBundle(); assertThat( secondOutput.commit(Instant.now()).getElements(), @@ -167,10 +173,8 @@ public class UnboundedReadEvaluatorFactoryTest { } @Test - public void evaluatorClosesReaderAfterOutputCount() throws Exception { - ContiguousSet<Long> elems = ContiguousSet.create( - Range.closed(0L, 20L * UnboundedReadEvaluatorFactory.MAX_READER_REUSE_COUNT), - DiscreteDomain.longs()); + public void evaluatorReusesReader() throws Exception { + ContiguousSet<Long> elems = ContiguousSet.create(Range.closed(0L, 20L), DiscreteDomain.longs()); TestUnboundedSource<Long> source = new TestUnboundedSource<>(BigEndianLongCoder.of(), elems.toArray(new Long[0])); @@ -178,86 +182,80 @@ public class UnboundedReadEvaluatorFactoryTest { PCollection<Long> pcollection = p.apply(Read.from(source)); AppliedPTransform<?, ?, ?> sourceTransform = pcollection.getProducingTransformInternal(); + when(context.createRootBundle()).thenReturn(bundleFactory.createRootBundle()); UncommittedBundle<Long> output = bundleFactory.createBundle(pcollection); when(context.createBundle(pcollection)).thenReturn(output); - for (int i = 0; i < UnboundedReadEvaluatorFactory.MAX_READER_REUSE_COUNT + 1; i++) { - TransformEvaluator<?> evaluator = factory.forApplication(sourceTransform, null); - evaluator.finishBundle(); - } - assertThat(TestUnboundedSource.readerClosedCount, equalTo(1)); - } - - @Test - public void evaluatorReusesReaderBeforeCount() throws Exception { - TestUnboundedSource<Long> source = - new TestUnboundedSource<>(BigEndianLongCoder.of(), 1L, 2L, 3L); - - TestPipeline p = TestPipeline.create(); - PCollection<Long> pcollection = p.apply(Read.from(source)); - AppliedPTransform<?, ?, ?> sourceTransform = pcollection.getProducingTransformInternal(); + WindowedValue<UnboundedSourceShard<Long, TestCheckpointMark>> shard = + WindowedValue.valueInGlobalWindow( + UnboundedSourceShard.unstarted(source, NeverDeduplicator.create())); + CommittedBundle<UnboundedSourceShard<Long, TestCheckpointMark>> inputBundle = + bundleFactory + .<UnboundedSourceShard<Long, TestCheckpointMark>>createRootBundle() + .add(shard) + .commit(Instant.now()); + UnboundedReadEvaluatorFactory factory = + new UnboundedReadEvaluatorFactory(context, 1.0 /* Always reuse */); + factory.getInitialInputs(pcollection.getProducingTransformInternal()); + TransformEvaluator<UnboundedSourceShard<Long, TestCheckpointMark>> evaluator = + factory.forApplication(sourceTransform, inputBundle); + evaluator.processElement(shard); + TransformResult result = evaluator.finishBundle(); - UncommittedBundle<Long> output = bundleFactory.createBundle(pcollection); - when(context.createBundle(pcollection)).thenReturn(output); + CommittedBundle<UnboundedSourceShard<Long, TestCheckpointMark>> residual = + inputBundle.withElements( + (Iterable<WindowedValue<UnboundedSourceShard<Long, TestCheckpointMark>>>) + result.getUnprocessedElements()); - TransformEvaluator<?> evaluator = factory.forApplication(sourceTransform, null); - evaluator.finishBundle(); - CommittedBundle<Long> committed = output.commit(Instant.now()); - assertThat(ImmutableList.copyOf(committed.getElements()), hasSize(3)); - assertThat(TestUnboundedSource.readerClosedCount, equalTo(0)); - assertThat(TestUnboundedSource.readerAdvancedCount, equalTo(4)); + TransformEvaluator<UnboundedSourceShard<Long, TestCheckpointMark>> secondEvaluator = + factory.forApplication(sourceTransform, residual); + secondEvaluator.processElement(Iterables.getOnlyElement(residual.getElements())); + secondEvaluator.finishBundle(); - evaluator = factory.forApplication(sourceTransform, null); - evaluator.finishBundle(); assertThat(TestUnboundedSource.readerClosedCount, equalTo(0)); - // Tried to advance again, even with no elements - assertThat(TestUnboundedSource.readerAdvancedCount, equalTo(5)); } @Test - public void evaluatorNoElementsReusesReaderAlways() throws Exception { - TestUnboundedSource<Long> source = new TestUnboundedSource<>(BigEndianLongCoder.of()); + public void evaluatorClosesReaderAndResumesFromCheckpoint() throws Exception { + ContiguousSet<Long> elems = ContiguousSet.create(Range.closed(0L, 20L), DiscreteDomain.longs()); + TestUnboundedSource<Long> source = + new TestUnboundedSource<>(BigEndianLongCoder.of(), elems.toArray(new Long[0])); TestPipeline p = TestPipeline.create(); PCollection<Long> pcollection = p.apply(Read.from(source)); AppliedPTransform<?, ?, ?> sourceTransform = pcollection.getProducingTransformInternal(); + when(context.createRootBundle()).thenReturn(bundleFactory.createRootBundle()); UncommittedBundle<Long> output = bundleFactory.createBundle(pcollection); when(context.createBundle(pcollection)).thenReturn(output); - for (int i = 0; i < 2 * UnboundedReadEvaluatorFactory.MAX_READER_REUSE_COUNT; i++) { - TransformEvaluator<?> evaluator = factory.forApplication(sourceTransform, null); - evaluator.finishBundle(); - } - assertThat(TestUnboundedSource.readerClosedCount, equalTo(0)); - assertThat(TestUnboundedSource.readerAdvancedCount, - equalTo(2 * UnboundedReadEvaluatorFactory.MAX_READER_REUSE_COUNT)); - } - - // TODO: Once the source is split into multiple sources before evaluating, this test will have to - // be updated. - /** - * Demonstrate that only a single unfinished instance of TransformEvaluator can be created at a - * time, with other calls returning an empty evaluator. - */ - @Test - public void unboundedSourceWithMultipleSimultaneousEvaluatorsIndependent() throws Exception { - TransformEvaluator<?> evaluator = - factory.forApplication(longs.getProducingTransformInternal(), null); + WindowedValue<UnboundedSourceShard<Long, TestCheckpointMark>> shard = + WindowedValue.valueInGlobalWindow( + UnboundedSourceShard.unstarted(source, NeverDeduplicator.create())); + CommittedBundle<UnboundedSourceShard<Long, TestCheckpointMark>> inputBundle = + bundleFactory + .<UnboundedSourceShard<Long, TestCheckpointMark>>createRootBundle() + .add(shard) + .commit(Instant.now()); + UnboundedReadEvaluatorFactory factory = + new UnboundedReadEvaluatorFactory(context, 0.0 /* never reuse */); + factory.getInitialInputs(pcollection.getProducingTransformInternal()); + TransformEvaluator<UnboundedSourceShard<Long, TestCheckpointMark>> evaluator = + factory.forApplication(sourceTransform, inputBundle); + evaluator.processElement(shard); + TransformResult result = evaluator.finishBundle(); - TransformEvaluator<?> secondEvaluator = - factory.forApplication(longs.getProducingTransformInternal(), null); + CommittedBundle<UnboundedSourceShard<Long, TestCheckpointMark>> residual = + inputBundle.withElements( + (Iterable<WindowedValue<UnboundedSourceShard<Long, TestCheckpointMark>>>) + result.getUnprocessedElements()); - assertThat(secondEvaluator, nullValue()); - TransformResult result = evaluator.finishBundle(); + TransformEvaluator<UnboundedSourceShard<Long, TestCheckpointMark>> secondEvaluator = + factory.forApplication(sourceTransform, residual); + secondEvaluator.processElement(Iterables.getOnlyElement(residual.getElements())); + secondEvaluator.finishBundle(); - assertThat( - result.getWatermarkHold(), Matchers.<ReadableInstant>lessThan(DateTime.now().toInstant())); - assertThat( - output.commit(Instant.now()).getElements(), - containsInAnyOrder( - tgw(1L), tgw(2L), tgw(4L), tgw(8L), tgw(9L), tgw(7L), tgw(6L), tgw(5L), tgw(3L), - tgw(0L))); + assertThat(TestUnboundedSource.readerClosedCount, equalTo(2)); } /**