Handle multiple requests in InProcess Read Primitives Each source that is invoked by a read should produce its elements in serial. Using a queue of available sources enforces only one worker having access to a source at a time.
Add EmptyTransformEvaluator, to be returned in the case that there are no unused sources. EmptyTransformEvaluator ignores all input, produces no output, and cannot advance the watermark. ----Release Notes---- [] ------------- Created by MOE: https://github.com/google/moe MOE_MIGRATED_REVID=115578920 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/3eb30924 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/3eb30924 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/3eb30924 Branch: refs/heads/master Commit: 3eb309242047f08afb105cc8a4e0c05f72131fec Parents: 6c71040 Author: tgroh <[email protected]> Authored: Thu Feb 25 10:44:41 2016 -0800 Committer: Davor Bonaci <[email protected]> Committed: Thu Feb 25 23:58:28 2016 -0800 ---------------------------------------------------------------------- .../inprocess/BoundedReadEvaluatorFactory.java | 61 ++++++++++---- .../inprocess/EmptyTransformEvaluator.java | 49 ++++++++++++ .../UnboundedReadEvaluatorFactory.java | 75 +++++++++++++----- .../BoundedReadEvaluatorFactoryTest.java | 75 ++++++++++++++---- .../UnboundedReadEvaluatorFactoryTest.java | 83 +++++++++++++++----- 5 files changed, 273 insertions(+), 70 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3eb30924/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/BoundedReadEvaluatorFactory.java ---------------------------------------------------------------------- diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/BoundedReadEvaluatorFactory.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/BoundedReadEvaluatorFactory.java index d11187c..1c02798 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/BoundedReadEvaluatorFactory.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/BoundedReadEvaluatorFactory.java @@ -27,8 +27,10 @@ import com.google.cloud.dataflow.sdk.util.WindowedValue; import com.google.cloud.dataflow.sdk.values.PCollection; import java.io.IOException; -import java.util.Map; +import java.util.Queue; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ConcurrentMap; import javax.annotation.Nullable; @@ -42,34 +44,62 @@ final class BoundedReadEvaluatorFactory implements TransformEvaluatorFactory { * Evaluators are cached here to ensure that the reader is not restarted if the evaluator is * retriggered. */ - private final Map<EvaluatorKey, BoundedReadEvaluator<?>> sourceEvaluators = - new ConcurrentHashMap<>(); + private final ConcurrentMap<EvaluatorKey, Queue<? extends BoundedReadEvaluator<?>>> + sourceEvaluators = new ConcurrentHashMap<>(); @SuppressWarnings({"unchecked", "rawtypes"}) @Override public <InputT> TransformEvaluator<InputT> forApplication( AppliedPTransform<?, ?, ?> application, @Nullable CommittedBundle<?> inputBundle, - InProcessEvaluationContext evaluationContext) { + InProcessEvaluationContext evaluationContext) + throws IOException { return getTransformEvaluator((AppliedPTransform) application, evaluationContext); } private <OutputT> TransformEvaluator<?> getTransformEvaluator( final AppliedPTransform<?, PCollection<OutputT>, Bounded<OutputT>> transform, - final InProcessEvaluationContext evaluationContext) { + final InProcessEvaluationContext evaluationContext) + throws IOException { + BoundedReadEvaluator<?> evaluator = + getTransformEvaluatorQueue(transform, evaluationContext).poll(); + if (evaluator == null) { + return EmptyTransformEvaluator.create(transform); + } + return evaluator; + } + + /** + * Get the queue of {@link TransformEvaluator TransformEvaluators} that produce elements for the + * provided application of {@link Bounded Read.Bounded}, initializing it if required. + * + * <p>This method is thread-safe, and will only produce new evaluators if no other invocation has + * already done so. + */ + @SuppressWarnings("unchecked") + private <OutputT> Queue<BoundedReadEvaluator<OutputT>> getTransformEvaluatorQueue( + final AppliedPTransform<?, PCollection<OutputT>, Bounded<OutputT>> transform, + final InProcessEvaluationContext evaluationContext) + throws IOException { + // Key by the application and the context the evaluation is occurring in (which call to + // Pipeline#run). EvaluatorKey key = new EvaluatorKey(transform, evaluationContext); - @SuppressWarnings("unchecked") - BoundedReadEvaluator<OutputT> result = - (BoundedReadEvaluator<OutputT>) sourceEvaluators.get(key); - if (result == null) { - try { - result = new BoundedReadEvaluator<OutputT>(transform, evaluationContext); - } catch (IOException e) { - throw new RuntimeException(e); + Queue<BoundedReadEvaluator<OutputT>> evaluatorQueue = + (Queue<BoundedReadEvaluator<OutputT>>) sourceEvaluators.get(key); + if (evaluatorQueue == null) { + evaluatorQueue = new ConcurrentLinkedQueue<>(); + if (sourceEvaluators.putIfAbsent(key, evaluatorQueue) == null) { + // If no queue existed in the evaluators, add an evaluator to initialize the evaluator + // factory for this transform + BoundedReadEvaluator<OutputT> evaluator = + new BoundedReadEvaluator<OutputT>(transform, evaluationContext); + evaluatorQueue.offer(evaluator); + } else { + // otherwise return the existing Queue that arrived before us + evaluatorQueue = (Queue<BoundedReadEvaluator<OutputT>>) sourceEvaluators.get(key); } - sourceEvaluators.put(key, result); } - return result; + return evaluatorQueue; } private static class BoundedReadEvaluator<OutputT> implements TransformEvaluator<Object> { @@ -108,4 +138,3 @@ final class BoundedReadEvaluatorFactory implements TransformEvaluatorFactory { } } } - http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3eb30924/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/EmptyTransformEvaluator.java ---------------------------------------------------------------------- diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/EmptyTransformEvaluator.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/EmptyTransformEvaluator.java new file mode 100644 index 0000000..fc09237 --- /dev/null +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/EmptyTransformEvaluator.java @@ -0,0 +1,49 @@ +/* + * Copyright (C) 2016 Google Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package com.google.cloud.dataflow.sdk.runners.inprocess; + +import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform; +import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow; +import com.google.cloud.dataflow.sdk.util.WindowedValue; + +/** + * A {@link TransformEvaluator} that ignores all input and produces no output. The result of + * invoking {@link #finishBundle()} on this evaluator is to return an + * {@link InProcessTransformResult} with no elements and a timestamp hold equal to + * {@link BoundedWindow#TIMESTAMP_MIN_VALUE}. Because the result contains no elements, this hold + * will not affect the watermark. + */ +final class EmptyTransformEvaluator<T> implements TransformEvaluator<T> { + public static <T> TransformEvaluator<T> create(AppliedPTransform<?, ?, ?> transform) { + return new EmptyTransformEvaluator<T>(transform); + } + + private final AppliedPTransform<?, ?, ?> transform; + + private EmptyTransformEvaluator(AppliedPTransform<?, ?, ?> transform) { + this.transform = transform; + } + + @Override + public void processElement(WindowedValue<T> element) throws Exception {} + + @Override + public InProcessTransformResult finishBundle() throws Exception { + return StepTransformResult.withHold(transform, BoundedWindow.TIMESTAMP_MIN_VALUE) + .build(); + } +} + http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3eb30924/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/UnboundedReadEvaluatorFactory.java ---------------------------------------------------------------------- diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/UnboundedReadEvaluatorFactory.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/UnboundedReadEvaluatorFactory.java index 1852cee..4beac33 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/UnboundedReadEvaluatorFactory.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/UnboundedReadEvaluatorFactory.java @@ -29,8 +29,10 @@ import com.google.cloud.dataflow.sdk.util.WindowedValue; import com.google.cloud.dataflow.sdk.values.PCollection; import java.io.IOException; -import java.util.HashMap; -import java.util.Map; +import java.util.Queue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ConcurrentMap; import javax.annotation.Nullable; @@ -44,42 +46,74 @@ class UnboundedReadEvaluatorFactory implements TransformEvaluatorFactory { * Evaluators are cached here to ensure that the checkpoint mark is appropriately reused * and any splits are honored. */ - private final Map<EvaluatorKey, UnboundedReadEvaluator<?>> sourceEvaluators = new HashMap<>(); + private final ConcurrentMap<EvaluatorKey, Queue<? extends UnboundedReadEvaluator<?>>> + sourceEvaluators = new ConcurrentHashMap<>(); @SuppressWarnings({"unchecked", "rawtypes"}) @Override - public <InputT> TransformEvaluator<InputT> forApplication( - AppliedPTransform<?, ?, ?> application, - @Nullable CommittedBundle<?> inputBundle, - InProcessEvaluationContext evaluationContext) { + public <InputT> TransformEvaluator<InputT> forApplication(AppliedPTransform<?, ?, ?> application, + @Nullable CommittedBundle<?> inputBundle, InProcessEvaluationContext evaluationContext) { return getTransformEvaluator((AppliedPTransform) application, evaluationContext); } private <OutputT> TransformEvaluator<?> getTransformEvaluator( final AppliedPTransform<?, PCollection<OutputT>, Unbounded<OutputT>> transform, final InProcessEvaluationContext evaluationContext) { + UnboundedReadEvaluator<?> currentEvaluator = + getTransformEvaluatorQueue(transform, evaluationContext).poll(); + if (currentEvaluator == null) { + return EmptyTransformEvaluator.create(transform); + } + return currentEvaluator; + } + + /** + * Get the queue of {@link TransformEvaluator TransformEvaluators} that produce elements for the + * provided application of {@link Unbounded Read.Unbounded}, initializing it if required. + * + * <p>This method is thread-safe, and will only produce new evaluators if no other invocation has + * already done so. + */ + @SuppressWarnings("unchecked") + private <OutputT> Queue<UnboundedReadEvaluator<OutputT>> getTransformEvaluatorQueue( + final AppliedPTransform<?, PCollection<OutputT>, Unbounded<OutputT>> transform, + final InProcessEvaluationContext evaluationContext) { + // Key by the application and the context the evaluation is occurring in (which call to + // Pipeline#run). EvaluatorKey key = new EvaluatorKey(transform, evaluationContext); @SuppressWarnings("unchecked") - UnboundedReadEvaluator<OutputT> result = - (UnboundedReadEvaluator<OutputT>) sourceEvaluators.get(key); - if (result == null) { - result = new UnboundedReadEvaluator<OutputT>(transform, evaluationContext); - sourceEvaluators.put(key, result); + Queue<UnboundedReadEvaluator<OutputT>> evaluatorQueue = + (Queue<UnboundedReadEvaluator<OutputT>>) sourceEvaluators.get(key); + if (evaluatorQueue == null) { + evaluatorQueue = new ConcurrentLinkedQueue<>(); + if (sourceEvaluators.putIfAbsent(key, evaluatorQueue) == null) { + // If no queue existed in the evaluators, add an evaluator to initialize the evaluator + // factory for this transform + UnboundedReadEvaluator<OutputT> evaluator = + new UnboundedReadEvaluator<OutputT>(transform, evaluationContext, evaluatorQueue); + evaluatorQueue.offer(evaluator); + } else { + // otherwise return the existing Queue that arrived before us + evaluatorQueue = (Queue<UnboundedReadEvaluator<OutputT>>) sourceEvaluators.get(key); + } } - return result; + return evaluatorQueue; } private static class UnboundedReadEvaluator<OutputT> implements TransformEvaluator<Object> { private static final int ARBITRARY_MAX_ELEMENTS = 10; private final AppliedPTransform<?, PCollection<OutputT>, Unbounded<OutputT>> transform; private final InProcessEvaluationContext evaluationContext; + private final Queue<UnboundedReadEvaluator<OutputT>> evaluatorQueue; private CheckpointMark checkpointMark; public UnboundedReadEvaluator( AppliedPTransform<?, PCollection<OutputT>, Unbounded<OutputT>> transform, - InProcessEvaluationContext evaluationContext) { + InProcessEvaluationContext evaluationContext, + Queue<UnboundedReadEvaluator<OutputT>> evaluatorQueue) { this.transform = transform; this.evaluationContext = evaluationContext; + this.evaluatorQueue = evaluatorQueue; this.checkpointMark = null; } @@ -103,11 +137,14 @@ class UnboundedReadEvaluatorFactory implements TransformEvaluatorFactory { } checkpointMark = reader.getCheckpointMark(); checkpointMark.finalizeCheckpoint(); - // TODO: When exercising create initial splits, make this the minimum across all existing - // readers - return StepTransformResult.withHold(transform, reader.getWatermark()) - .addOutput(output) - .build(); + // TODO: When exercising create initial splits, make this the minimum watermark across all + // existing readers + StepTransformResult result = + StepTransformResult.withHold(transform, reader.getWatermark()) + .addOutput(output) + .build(); + evaluatorQueue.offer(this); + return result; } private <CheckpointMarkT extends CheckpointMark> UnboundedReader<OutputT> createReader( http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3eb30924/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/BoundedReadEvaluatorFactoryTest.java ---------------------------------------------------------------------- diff --git a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/BoundedReadEvaluatorFactoryTest.java b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/BoundedReadEvaluatorFactoryTest.java index 0a4c4a1..e17926d 100644 --- a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/BoundedReadEvaluatorFactoryTest.java +++ b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/BoundedReadEvaluatorFactoryTest.java @@ -33,6 +33,7 @@ import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow; import com.google.cloud.dataflow.sdk.util.WindowedValue; import com.google.cloud.dataflow.sdk.values.PCollection; +import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; @@ -42,14 +43,23 @@ import org.junit.runners.JUnit4; */ @RunWith(JUnit4.class) public class BoundedReadEvaluatorFactoryTest { - @Test - public void boundedSourceInMemoryTransformEvaluatorProducesElements() throws Exception { - BoundedSource<Long> source = CountingSource.upTo(10L); + private BoundedSource<Long> source; + private PCollection<Long> longs; + private TransformEvaluatorFactory factory; + private InProcessEvaluationContext context; + + @Before + public void setup() { + source = CountingSource.upTo(10L); TestPipeline p = TestPipeline.create(); - PCollection<Long> longs = p.apply(Read.from(source)); + longs = p.apply(Read.from(source)); + + factory = new BoundedReadEvaluatorFactory(); + context = mock(InProcessEvaluationContext.class); + } - TransformEvaluatorFactory factory = new BoundedReadEvaluatorFactory(); - InProcessEvaluationContext context = mock(InProcessEvaluationContext.class); + @Test + public void boundedSourceInMemoryTransformEvaluatorProducesElements() throws Exception { UncommittedBundle<Long> output = InProcessBundle.unkeyed(longs); when(context.createRootBundle(longs)).thenReturn(output); @@ -63,14 +73,13 @@ public class BoundedReadEvaluatorFactoryTest { gw(1L), gw(2L), gw(4L), gw(8L), gw(9L), gw(7L), gw(6L), gw(5L), gw(3L), gw(0L))); } + /** + * Demonstrate that acquiring multiple {@link TransformEvaluator TransformEvaluators} for the same + * {@link Bounded Read.Bounded} application with the same evaluation context only produces the + * elements once. + */ @Test - public void boundedSourceInMemoryTransformEvaluatorMultipleCalls() throws Exception { - BoundedSource<Long> source = CountingSource.upTo(10L); - TestPipeline p = TestPipeline.create(); - PCollection<Long> longs = p.apply(Read.from(source)); - - TransformEvaluatorFactory factory = new BoundedReadEvaluatorFactory(); - InProcessEvaluationContext context = mock(InProcessEvaluationContext.class); + public void boundedSourceInMemoryTransformEvaluatorAfterFinishIsEmpty() throws Exception { UncommittedBundle<Long> output = InProcessBundle.unkeyed(longs); when(context.createRootBundle(longs)).thenReturn(output); @@ -91,7 +100,45 @@ public class BoundedReadEvaluatorFactoryTest { TransformEvaluator<?> secondEvaluator = factory.forApplication(longs.getProducingTransformInternal(), null, context); InProcessTransformResult secondResult = secondEvaluator.finishBundle(); - assertThat(secondResult.getWatermarkHold(), equalTo(BoundedWindow.TIMESTAMP_MAX_VALUE)); + assertThat(secondResult.getWatermarkHold(), equalTo(BoundedWindow.TIMESTAMP_MIN_VALUE)); + assertThat(secondResult.getOutputBundles(), emptyIterable()); + assertThat( + secondOutput.commit(BoundedWindow.TIMESTAMP_MAX_VALUE).getElements(), emptyIterable()); + assertThat( + outputElements, + containsInAnyOrder( + gw(1L), gw(2L), gw(4L), gw(8L), gw(9L), gw(7L), gw(6L), gw(5L), gw(3L), gw(0L))); + } + + /** + * Demonstrates that acquiring multiple evaluators from the factory are independent, but + * the elements in the source are only produced once. + */ + @Test + public void boundedSourceEvaluatorSimultaneousEvaluations() throws Exception { + UncommittedBundle<Long> output = InProcessBundle.unkeyed(longs); + UncommittedBundle<Long> secondOutput = InProcessBundle.unkeyed(longs); + when(context.createRootBundle(longs)).thenReturn(output).thenReturn(secondOutput); + + // create both evaluators before finishing either. + TransformEvaluator<?> evaluator = + factory.forApplication(longs.getProducingTransformInternal(), null, context); + TransformEvaluator<?> secondEvaluator = + factory.forApplication(longs.getProducingTransformInternal(), null, context); + + InProcessTransformResult secondResult = secondEvaluator.finishBundle(); + + InProcessTransformResult result = evaluator.finishBundle(); + assertThat(result.getWatermarkHold(), equalTo(BoundedWindow.TIMESTAMP_MAX_VALUE)); + Iterable<? extends WindowedValue<Long>> outputElements = + output.commit(BoundedWindow.TIMESTAMP_MAX_VALUE).getElements(); + + assertThat( + outputElements, + containsInAnyOrder( + gw(1L), gw(2L), gw(4L), gw(8L), gw(9L), gw(7L), gw(6L), gw(5L), gw(3L), gw(0L))); + assertThat(secondResult.getWatermarkHold(), equalTo(BoundedWindow.TIMESTAMP_MIN_VALUE)); + assertThat(secondResult.getOutputBundles(), emptyIterable()); assertThat( secondOutput.commit(BoundedWindow.TIMESTAMP_MAX_VALUE).getElements(), emptyIterable()); assertThat( http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3eb30924/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/UnboundedReadEvaluatorFactoryTest.java ---------------------------------------------------------------------- diff --git a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/UnboundedReadEvaluatorFactoryTest.java b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/UnboundedReadEvaluatorFactoryTest.java index 28f2db5..8640056 100644 --- a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/UnboundedReadEvaluatorFactoryTest.java +++ b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/UnboundedReadEvaluatorFactoryTest.java @@ -16,6 +16,8 @@ package com.google.cloud.dataflow.sdk.runners.inprocess; import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.hamcrest.Matchers.emptyIterable; +import static org.hamcrest.Matchers.equalTo; import static org.junit.Assert.assertThat; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -28,6 +30,7 @@ import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.U import com.google.cloud.dataflow.sdk.runners.inprocess.util.InProcessBundle; import com.google.cloud.dataflow.sdk.testing.TestPipeline; import com.google.cloud.dataflow.sdk.transforms.SerializableFunction; +import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow; import com.google.cloud.dataflow.sdk.transforms.windowing.GlobalWindow; import com.google.cloud.dataflow.sdk.util.WindowedValue; import com.google.cloud.dataflow.sdk.values.PCollection; @@ -36,6 +39,7 @@ import org.hamcrest.Matchers; import org.joda.time.DateTime; import org.joda.time.Instant; import org.joda.time.ReadableInstant; +import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; @@ -44,50 +48,56 @@ import org.junit.runners.JUnit4; */ @RunWith(JUnit4.class) public class UnboundedReadEvaluatorFactoryTest { - @Test - public void unboundedSourceInMemoryTransformEvaluatorProducesElements() throws Exception { + private PCollection<Long> longs; + private TransformEvaluatorFactory factory; + private InProcessEvaluationContext context; + private UncommittedBundle<Long> output; + + @Before + public void setup() { UnboundedSource<Long, ?> source = CountingSource.unboundedWithTimestampFn(new LongToInstantFn()); TestPipeline p = TestPipeline.create(); - PCollection<Long> longs = p.apply(Read.from(source)); + longs = p.apply(Read.from(source)); - TransformEvaluatorFactory factory = new UnboundedReadEvaluatorFactory(); - InProcessEvaluationContext context = mock(InProcessEvaluationContext.class); - UncommittedBundle<Long> output = InProcessBundle.unkeyed(longs); + factory = new UnboundedReadEvaluatorFactory(); + context = mock(InProcessEvaluationContext.class); + output = InProcessBundle.unkeyed(longs); when(context.createRootBundle(longs)).thenReturn(output); + } + @Test + public void unboundedSourceInMemoryTransformEvaluatorProducesElements() throws Exception { TransformEvaluator<?> evaluator = factory.forApplication(longs.getProducingTransformInternal(), null, context); + InProcessTransformResult result = evaluator.finishBundle(); assertThat( result.getWatermarkHold(), Matchers.<ReadableInstant>lessThan(DateTime.now().toInstant())); assertThat( output.commit(Instant.now()).getElements(), - containsInAnyOrder(tgw(1L), tgw(2L), tgw(4L), tgw(8L), tgw(9L), tgw(7L), tgw(6L), tgw(5L), - tgw(3L), tgw(0L))); + containsInAnyOrder( + tgw(1L), tgw(2L), tgw(4L), tgw(8L), tgw(9L), tgw(7L), tgw(6L), tgw(5L), tgw(3L), + tgw(0L))); } + /** + * Demonstrate that multiple sequential creations will produce additional elements if the source + * can provide them. + */ @Test - public void unboundedSourceInMemoryTransformEvaluatorMultipleCalls() throws Exception { - UnboundedSource<Long, ?> source = - CountingSource.unboundedWithTimestampFn(new LongToInstantFn()); - TestPipeline p = TestPipeline.create(); - PCollection<Long> longs = p.apply(Read.from(source)); - - TransformEvaluatorFactory factory = new UnboundedReadEvaluatorFactory(); - InProcessEvaluationContext context = mock(InProcessEvaluationContext.class); - UncommittedBundle<Long> output = InProcessBundle.unkeyed(longs); - when(context.createRootBundle(longs)).thenReturn(output); - + public void unboundedSourceInMemoryTransformEvaluatorMultipleSequentialCalls() throws Exception { TransformEvaluator<?> evaluator = factory.forApplication(longs.getProducingTransformInternal(), null, context); + InProcessTransformResult result = evaluator.finishBundle(); assertThat( result.getWatermarkHold(), Matchers.<ReadableInstant>lessThan(DateTime.now().toInstant())); assertThat( output.commit(Instant.now()).getElements(), - containsInAnyOrder(tgw(1L), tgw(2L), tgw(4L), tgw(8L), tgw(9L), tgw(7L), tgw(6L), tgw(5L), - tgw(3L), tgw(0L))); + containsInAnyOrder( + tgw(1L), tgw(2L), tgw(4L), tgw(8L), tgw(9L), tgw(7L), tgw(6L), tgw(5L), tgw(3L), + tgw(0L))); UncommittedBundle<Long> secondOutput = InProcessBundle.unkeyed(longs); when(context.createRootBundle(longs)).thenReturn(secondOutput); @@ -103,6 +113,37 @@ public class UnboundedReadEvaluatorFactoryTest { tgw(15L), tgw(13L), tgw(10L))); } + // TODO: Once the source is split into multiple sources before evaluating, this test will have to + // be updated. + /** + * Demonstrate that only a single unfinished instance of TransformEvaluator can be created at a + * time, with other calls returning an empty evaluator. + */ + @Test + public void unboundedSourceWithMultipleSimultaneousEvaluatorsIndependent() throws Exception { + UncommittedBundle<Long> secondOutput = InProcessBundle.unkeyed(longs); + + TransformEvaluator<?> evaluator = + factory.forApplication(longs.getProducingTransformInternal(), null, context); + + TransformEvaluator<?> secondEvaluator = + factory.forApplication(longs.getProducingTransformInternal(), null, context); + + InProcessTransformResult secondResult = secondEvaluator.finishBundle(); + InProcessTransformResult result = evaluator.finishBundle(); + + assertThat( + result.getWatermarkHold(), Matchers.<ReadableInstant>lessThan(DateTime.now().toInstant())); + assertThat( + output.commit(Instant.now()).getElements(), + containsInAnyOrder( + tgw(1L), tgw(2L), tgw(4L), tgw(8L), tgw(9L), tgw(7L), tgw(6L), tgw(5L), tgw(3L), + tgw(0L))); + + assertThat(secondResult.getWatermarkHold(), equalTo(BoundedWindow.TIMESTAMP_MIN_VALUE)); + assertThat(secondOutput.commit(Instant.now()).getElements(), emptyIterable()); + } + /** * A terse alias for producing timestamped longs in the {@link GlobalWindow}, where * the timestamp is the epoch offset by the value of the element.
