Repository: incubator-beam Updated Branches: refs/heads/master 7dcb4c72c -> a5320607a
Test that multiple instances of TestStream are supported Add KeyedResourcePool This interface represents some shared pool of values that may be used by at most one caller at a time. Add LockedKeyedResourcePool which has at most one value per key and at most one user per value at a time. Use KeyedResourcePool in TestStream Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/89680975 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/89680975 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/89680975 Branch: refs/heads/master Commit: 89680975b5a89351ccc4bf99a3a6bd8772d87f40 Parents: 7dcb4c7 Author: Thomas Groh <[email protected]> Authored: Tue Aug 30 14:17:50 2016 -0700 Committer: bchambers <[email protected]> Committed: Wed Aug 31 15:00:39 2016 -0700 ---------------------------------------------------------------------- .../beam/runners/direct/KeyedResourcePool.java | 47 +++++ .../runners/direct/LockedKeyedResourcePool.java | 95 +++++++++ .../direct/TestStreamEvaluatorFactory.java | 141 +++++++------ .../direct/LockedKeyedResourcePoolTest.java | 163 +++++++++++++++ .../direct/TestStreamEvaluatorFactoryTest.java | 206 +++++++++++++++++++ .../apache/beam/sdk/testing/TestStreamTest.java | 29 +++ 6 files changed, 623 insertions(+), 58 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/89680975/runners/direct-java/src/main/java/org/apache/beam/runners/direct/KeyedResourcePool.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/KeyedResourcePool.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/KeyedResourcePool.java new file mode 100644 index 0000000..b976b69 --- /dev/null +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/KeyedResourcePool.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.direct; + +import com.google.common.base.Optional; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; + +/** + * A pool of resources associated with specific keys. Implementations enforce specific use patterns, + * such as limiting the the number of outstanding elements available per key. + */ +interface KeyedResourcePool<K, V> { + /** + * Tries to acquire a value for the provided key, loading it via the provided loader if necessary. + * + * <p>If the returned {@link Optional} contains a value, the caller obtains ownership of that + * value. The value should be released back to this {@link KeyedResourcePool} after the + * caller no longer has use of it using {@link #release(Object, Object)}. + * + * <p>The provided {@link Callable} <b>must not</b> return null; it may either return a non-null + * value or throw an exception. + */ + Optional<V> tryAcquire(K key, Callable<V> loader) throws ExecutionException; + + /** + * Release the provided value, relinquishing ownership of it. Future calls to + * {@link #tryAcquire(Object, Callable)} may return the released value. + */ + void release(K key, V value); +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/89680975/runners/direct-java/src/main/java/org/apache/beam/runners/direct/LockedKeyedResourcePool.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/LockedKeyedResourcePool.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/LockedKeyedResourcePool.java new file mode 100644 index 0000000..8b1e0b1 --- /dev/null +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/LockedKeyedResourcePool.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.direct; + +import static com.google.common.base.Preconditions.checkNotNull; +import static com.google.common.base.Preconditions.checkState; + +import com.google.common.base.Optional; +import com.google.common.util.concurrent.ExecutionError; +import com.google.common.util.concurrent.UncheckedExecutionException; +import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ExecutionException; + +/** + * A {@link KeyedResourcePool} which is limited to at most one outstanding instance at a time for + * each key. + */ +class LockedKeyedResourcePool<K, V> implements KeyedResourcePool<K, V> { + /** + * A map from each key to an {@link Optional} of the associated value. At most one value is stored + * per key, and it is obtained by at most one thread at a time. + * + * <p>For each key in this map: + * + * <ul> + * <li>If there is no associated value, then no value has been stored yet. + * <li>If the value is {@code Optional.absent()} then the value is currently in use. + * <li>If the value is {@code Optional.present()} then the contained value is available for use. + * </ul> + */ + public static <K, V> LockedKeyedResourcePool<K, V> create() { + return new LockedKeyedResourcePool<>(); + } + + private final ConcurrentMap<K, Optional<V>> cache; + + private LockedKeyedResourcePool() { + cache = new ConcurrentHashMap<>(); + } + + @Override + public Optional<V> tryAcquire(K key, Callable<V> loader) throws ExecutionException { + Optional<V> value = cache.replace(key, Optional.<V>absent()); + if (value == null) { + // No value already existed, so populate the cache with the value returned by the loader + cache.putIfAbsent(key, Optional.of(load(loader))); + // Some other thread may obtain the result after the putIfAbsent, so retry acquisition + value = cache.replace(key, Optional.<V>absent()); + } + return value; + } + + private V load(Callable<V> loader) throws ExecutionException { + try { + return loader.call(); + } catch (Error t) { + throw new ExecutionError(t); + } catch (RuntimeException e) { + throw new UncheckedExecutionException(e); + } catch (Exception e) { + throw new ExecutionException(e); + } + } + + @Override + public void release(K key, V value) { + Optional<V> replaced = cache.replace(key, Optional.of(value)); + checkNotNull(replaced, "Tried to release before a value was acquired"); + checkState( + !replaced.isPresent(), + "Released a value to a %s where there is already a value present for key %s (%s). " + + "At most one value may be present at a time.", + LockedKeyedResourcePool.class.getSimpleName(), + key, + replaced); + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/89680975/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java index e9f37ba..3dbd886 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java @@ -22,12 +22,12 @@ import static com.google.common.base.Preconditions.checkState; import com.google.common.base.Supplier; import java.util.List; -import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; import java.util.concurrent.atomic.AtomicReference; import javax.annotation.Nullable; import org.apache.beam.runners.direct.DirectRunner.CommittedBundle; import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle; -import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.runners.PipelineRunner; import org.apache.beam.sdk.testing.TestStream; import org.apache.beam.sdk.testing.TestStream.ElementEvent; @@ -49,43 +49,52 @@ import org.apache.beam.sdk.values.TimestampedValue; import org.joda.time.Duration; import org.joda.time.Instant; -/** - * The {@link TransformEvaluatorFactory} for the {@link TestStream} primitive. - */ +/** The {@link TransformEvaluatorFactory} for the {@link TestStream} primitive. */ class TestStreamEvaluatorFactory implements TransformEvaluatorFactory { - private final AtomicBoolean inUse = new AtomicBoolean(false); - private final AtomicReference<Evaluator<?>> evaluator = new AtomicReference<>(); + private final KeyedResourcePool<AppliedPTransform<?, ?, ?>, Evaluator<?>> evaluators = + LockedKeyedResourcePool.create(); @Nullable @Override public <InputT> TransformEvaluator<InputT> forApplication( AppliedPTransform<?, ?, ?> application, @Nullable CommittedBundle<?> inputBundle, - EvaluationContext evaluationContext) throws Exception { + EvaluationContext evaluationContext) + throws Exception { return createEvaluator((AppliedPTransform) application, evaluationContext); } @Override public void cleanup() throws Exception {} + /** + * Returns the evaluator for the provided application of {@link TestStream}, or null if it is + * already in use. + * + * <p>The documented behavior of {@link TestStream} requires the output of one event to travel + * completely through the pipeline before any additional event, so additional instances that have + * a separate collection of events cannot be created. + */ private <InputT, OutputT> TransformEvaluator<? super InputT> createEvaluator( AppliedPTransform<PBegin, PCollection<OutputT>, TestStream<OutputT>> application, - EvaluationContext evaluationContext) { - if (evaluator.get() == null) { - Evaluator<OutputT> createdEvaluator = new Evaluator<>(application, evaluationContext, inUse); - evaluator.compareAndSet(null, createdEvaluator); - } - if (inUse.compareAndSet(false, true)) { - return evaluator.get(); - } else { - return null; - } + EvaluationContext evaluationContext) + throws ExecutionException { + return evaluators + .tryAcquire(application, new CreateEvaluator<>(application, evaluationContext, evaluators)) + .orNull(); } + /** + * Release the provided {@link Evaluator} after completing an evaluation. The next call to {@link + * #createEvaluator(AppliedPTransform, EvaluationContext)} with the {@link AppliedPTransform} will + * return this evaluator. + */ + private void completeEvaluation(Evaluator<?> evaluator) {} + private static class Evaluator<T> implements TransformEvaluator<Object> { private final AppliedPTransform<PBegin, PCollection<T>, TestStream<T>> application; private final EvaluationContext context; - private final AtomicBoolean inUse; + private final KeyedResourcePool<AppliedPTransform<?, ?, ?>, Evaluator<?>> cache; private final List<Event<T>> events; private int index; private Instant currentWatermark; @@ -93,49 +102,48 @@ class TestStreamEvaluatorFactory implements TransformEvaluatorFactory { private Evaluator( AppliedPTransform<PBegin, PCollection<T>, TestStream<T>> application, EvaluationContext context, - AtomicBoolean inUse) { + KeyedResourcePool<AppliedPTransform<?, ?, ?>, Evaluator<?>> cache) { this.application = application; this.context = context; - this.inUse = inUse; + this.cache = cache; this.events = application.getTransform().getEvents(); index = 0; currentWatermark = BoundedWindow.TIMESTAMP_MIN_VALUE; } @Override - public void processElement(WindowedValue<Object> element) throws Exception { - } + public void processElement(WindowedValue<Object> element) throws Exception {} @Override public TransformResult finishBundle() throws Exception { - if (index >= events.size()) { - return StepTransformResult.withHold(application, BoundedWindow.TIMESTAMP_MAX_VALUE).build(); - } - Event<T> event = events.get(index); - if (event.getType().equals(EventType.WATERMARK)) { - currentWatermark = ((WatermarkEvent<T>) event).getWatermark(); - } - StepTransformResult.Builder result = - StepTransformResult.withHold(application, currentWatermark); - if (event.getType().equals(EventType.ELEMENT)) { - UncommittedBundle<T> bundle = context.createRootBundle(application.getOutput()); - for (TimestampedValue<T> elem : ((ElementEvent<T>) event).getElements()) { - bundle.add(WindowedValue.timestampedValueInGlobalWindow(elem.getValue(), - elem.getTimestamp())); + try { + if (index >= events.size()) { + return StepTransformResult.withHold(application, BoundedWindow.TIMESTAMP_MAX_VALUE) + .build(); } - result.addOutput(bundle); - } - if (event.getType().equals(EventType.PROCESSING_TIME)) { - ((TestClock) context.getClock()) - .advance(((ProcessingTimeEvent<T>) event).getProcessingTimeAdvance()); + Event<T> event = events.get(index); + if (event.getType().equals(EventType.WATERMARK)) { + currentWatermark = ((WatermarkEvent<T>) event).getWatermark(); + } + StepTransformResult.Builder result = + StepTransformResult.withHold(application, currentWatermark); + if (event.getType().equals(EventType.ELEMENT)) { + UncommittedBundle<T> bundle = context.createRootBundle(application.getOutput()); + for (TimestampedValue<T> elem : ((ElementEvent<T>) event).getElements()) { + bundle.add( + WindowedValue.timestampedValueInGlobalWindow(elem.getValue(), elem.getTimestamp())); + } + result.addOutput(bundle); + } + if (event.getType().equals(EventType.PROCESSING_TIME)) { + ((TestClock) context.getClock()) + .advance(((ProcessingTimeEvent<T>) event).getProcessingTimeAdvance()); + } + index++; + return result.build(); + } finally { + cache.release(application, this); } - index++; - checkState(inUse.compareAndSet(true, false), - "The InUse flag of a %s was changed while the source evaluator was executing. " - + "%s cannot be split or evaluated in parallel.", - TestStream.class.getSimpleName(), - TestStream.class.getSimpleName()); - return result.build(); } } @@ -181,20 +189,37 @@ class TestStreamEvaluatorFactory implements TransformEvaluatorFactory { @Override public PCollection<T> apply(PBegin input) { - setup(input.getPipeline()); - return PCollection.<T>createPrimitiveOutputInternal( - input.getPipeline(), WindowingStrategy.globalDefault(), IsBounded.UNBOUNDED) - .setCoder(original.getValueCoder()); - } - - private void setup(Pipeline p) { - PipelineRunner runner = p.getRunner(); - checkState(runner instanceof DirectRunner, + PipelineRunner runner = input.getPipeline().getRunner(); + checkState( + runner instanceof DirectRunner, "%s can only be used when running with the %s", getClass().getSimpleName(), DirectRunner.class.getSimpleName()); ((DirectRunner) runner).setClockSupplier(new TestClockSupplier()); + return PCollection.<T>createPrimitiveOutputInternal( + input.getPipeline(), WindowingStrategy.globalDefault(), IsBounded.UNBOUNDED) + .setCoder(original.getValueCoder()); } } } + + private static class CreateEvaluator<OutputT> implements Callable<Evaluator<?>> { + private final AppliedPTransform<PBegin, PCollection<OutputT>, TestStream<OutputT>> application; + private final EvaluationContext evaluationContext; + private final KeyedResourcePool<AppliedPTransform<?, ?, ?>, Evaluator<?>> evaluators; + + public CreateEvaluator( + AppliedPTransform<PBegin, PCollection<OutputT>, TestStream<OutputT>> application, + EvaluationContext evaluationContext, + KeyedResourcePool<AppliedPTransform<?, ?, ?>, Evaluator<?>> evaluators) { + this.application = application; + this.evaluationContext = evaluationContext; + this.evaluators = evaluators; + } + + @Override + public Evaluator<?> call() throws Exception { + return new Evaluator<>(application, evaluationContext, evaluators); + } + } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/89680975/runners/direct-java/src/test/java/org/apache/beam/runners/direct/LockedKeyedResourcePoolTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/LockedKeyedResourcePoolTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/LockedKeyedResourcePoolTest.java new file mode 100644 index 0000000..e1e24a3 --- /dev/null +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/LockedKeyedResourcePoolTest.java @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.direct; + +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; +import static org.junit.Assert.assertThat; + +import com.google.common.base.Optional; +import com.google.common.util.concurrent.ExecutionError; +import com.google.common.util.concurrent.UncheckedExecutionException; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** + * Tests for {@link LockedKeyedResourcePool}. + */ +@RunWith(JUnit4.class) +public class LockedKeyedResourcePoolTest { + @Rule public ExpectedException thrown = ExpectedException.none(); + private LockedKeyedResourcePool<String, Integer> cache = + LockedKeyedResourcePool.create(); + + @Test + public void acquireReleaseAcquireLastLoadedOrReleased() throws ExecutionException { + Optional<Integer> returned = cache.tryAcquire("foo", new Callable<Integer>() { + @Override + public Integer call() throws Exception { + return 3; + } + }); + assertThat(returned.get(), equalTo(3)); + + cache.release("foo", 4); + Optional<Integer> reacquired = cache.tryAcquire("foo", new Callable<Integer>() { + @Override + public Integer call() throws Exception { + return 5; + } + }); + assertThat(reacquired.get(), equalTo(4)); + } + + @Test + public void acquireReleaseReleaseThrows() throws ExecutionException { + Optional<Integer> returned = cache.tryAcquire("foo", new Callable<Integer>() { + @Override + public Integer call() throws Exception { + return 3; + } + }); + assertThat(returned.get(), equalTo(3)); + + cache.release("foo", 4); + thrown.expect(IllegalStateException.class); + thrown.expectMessage("already a value present"); + thrown.expectMessage("At most one"); + cache.release("foo", 4); + } + + @Test + public void releaseBeforeAcquireThrows() { + thrown.expect(NullPointerException.class); + thrown.expectMessage("before a value was acquired"); + cache.release("bar", 3); + } + + @Test + public void multipleAcquireWithoutReleaseAbsent() throws ExecutionException { + Optional<Integer> returned = cache.tryAcquire("foo", new Callable<Integer>() { + @Override + public Integer call() throws Exception { + return 3; + } + }); + Optional<Integer> secondReturned = cache.tryAcquire("foo", new Callable<Integer>() { + @Override + public Integer call() throws Exception { + return 3; + } + }); + assertThat(secondReturned.isPresent(), is(false)); + } + + @Test + public void acquireMultipleKeysSucceeds() throws ExecutionException { + Optional<Integer> returned = cache.tryAcquire("foo", new Callable<Integer>() { + @Override + public Integer call() throws Exception { + return 3; + } + }); + Optional<Integer> secondReturned = cache.tryAcquire("bar", new Callable<Integer>() { + @Override + public Integer call() throws Exception { + return 4; + } + }); + + assertThat(returned.get(), equalTo(3)); + assertThat(secondReturned.get(), equalTo(4)); + } + + @Test + public void acquireThrowsExceptionWrapped() throws ExecutionException { + final Exception cause = new Exception("checkedException"); + thrown.expect(ExecutionException.class); + thrown.expectCause(equalTo(cause)); + cache.tryAcquire("foo", new Callable<Integer>() { + @Override + public Integer call() throws Exception { + throw cause; + } + }); + } + + @Test + public void acquireThrowsRuntimeExceptionWrapped() throws ExecutionException { + final RuntimeException cause = new RuntimeException("UncheckedException"); + thrown.expect(UncheckedExecutionException.class); + thrown.expectCause(equalTo(cause)); + cache.tryAcquire("foo", new Callable<Integer>() { + @Override + public Integer call() throws Exception { + throw cause; + } + }); + } + + @Test + public void acquireThrowsErrorWrapped() throws ExecutionException { + final Error cause = new Error("Error"); + thrown.expect(ExecutionError.class); + thrown.expectCause(equalTo(cause)); + cache.tryAcquire("foo", new Callable<Integer>() { + @Override + public Integer call() throws Exception { + throw cause; + } + }); + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/89680975/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactoryTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactoryTest.java new file mode 100644 index 0000000..7703881 --- /dev/null +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactoryTest.java @@ -0,0 +1,206 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.direct; + +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.nullValue; +import static org.junit.Assert.assertThat; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import com.google.common.collect.Iterables; +import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.coders.VarIntCoder; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.testing.TestStream; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.values.PCollection; +import org.hamcrest.Matchers; +import org.joda.time.Instant; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** Tests for {@link TestStreamEvaluatorFactory}. */ +@RunWith(JUnit4.class) +public class TestStreamEvaluatorFactoryTest { + private TestStreamEvaluatorFactory factory = new TestStreamEvaluatorFactory(); + private BundleFactory bundleFactory = ImmutableListBundleFactory.create(); + + /** Demonstrates that returned evaluators produce elements in sequence. */ + @Test + public void producesElementsInSequence() throws Exception { + TestPipeline p = TestPipeline.create(); + PCollection<Integer> streamVals = + p.apply( + TestStream.create(VarIntCoder.of()) + .addElements(1, 2, 3) + .addElements(4, 5, 6) + .advanceWatermarkToInfinity()); + + EvaluationContext context = mock(EvaluationContext.class); + when(context.createRootBundle(streamVals)) + .thenReturn( + bundleFactory.createRootBundle(streamVals), bundleFactory.createRootBundle(streamVals)); + + TransformEvaluator<Object> firstEvaluator = + factory.forApplication(streamVals.getProducingTransformInternal(), null, context); + TransformResult firstResult = firstEvaluator.finishBundle(); + + TransformEvaluator<Object> secondEvaluator = + factory.forApplication(streamVals.getProducingTransformInternal(), null, context); + TransformResult secondResult = secondEvaluator.finishBundle(); + + TransformEvaluator<Object> thirdEvaluator = + factory.forApplication(streamVals.getProducingTransformInternal(), null, context); + TransformResult thirdResult = thirdEvaluator.finishBundle(); + + assertThat( + Iterables.getOnlyElement(firstResult.getOutputBundles()) + .commit(Instant.now()) + .getElements(), + Matchers.<WindowedValue<?>>containsInAnyOrder( + WindowedValue.valueInGlobalWindow(1), + WindowedValue.valueInGlobalWindow(2), + WindowedValue.valueInGlobalWindow(3))); + assertThat(firstResult.getWatermarkHold(), equalTo(BoundedWindow.TIMESTAMP_MIN_VALUE)); + + assertThat( + Iterables.getOnlyElement(secondResult.getOutputBundles()) + .commit(Instant.now()) + .getElements(), + Matchers.<WindowedValue<?>>containsInAnyOrder( + WindowedValue.valueInGlobalWindow(4), + WindowedValue.valueInGlobalWindow(5), + WindowedValue.valueInGlobalWindow(6))); + assertThat(secondResult.getWatermarkHold(), equalTo(BoundedWindow.TIMESTAMP_MIN_VALUE)); + + assertThat(Iterables.isEmpty(thirdResult.getOutputBundles()), is(true)); + assertThat(thirdResult.getWatermarkHold(), equalTo(BoundedWindow.TIMESTAMP_MAX_VALUE)); + } + + /** Demonstrates that at most one evaluator for an application is available at a time. */ + @Test + public void onlyOneEvaluatorAtATime() throws Exception { + TestPipeline p = TestPipeline.create(); + PCollection<Integer> streamVals = + p.apply( + TestStream.create(VarIntCoder.of()).addElements(4, 5, 6).advanceWatermarkToInfinity()); + + EvaluationContext context = mock(EvaluationContext.class); + TransformEvaluator<Object> firstEvaluator = + factory.forApplication(streamVals.getProducingTransformInternal(), null, context); + + // create a second evaluator before the first is finished. The evaluator should not be available + TransformEvaluator<Object> secondEvaluator = + factory.forApplication(streamVals.getProducingTransformInternal(), null, context); + assertThat(secondEvaluator, is(nullValue())); + } + + /** + * Demonstrates that multiple applications of the same {@link TestStream} produce separate + * evaluators. + */ + @Test + public void multipleApplicationsMultipleEvaluators() throws Exception { + TestPipeline p = TestPipeline.create(); + TestStream<Integer> stream = + TestStream.create(VarIntCoder.of()).addElements(2).advanceWatermarkToInfinity(); + PCollection<Integer> firstVals = p.apply("Stream One", stream); + PCollection<Integer> secondVals = p.apply("Stream A", stream); + + EvaluationContext context = mock(EvaluationContext.class); + when(context.createRootBundle(firstVals)).thenReturn(bundleFactory.createRootBundle(firstVals)); + when(context.createRootBundle(secondVals)) + .thenReturn(bundleFactory.createRootBundle(secondVals)); + + TransformEvaluator<Object> firstEvaluator = + factory.forApplication(firstVals.getProducingTransformInternal(), null, context); + // The two evaluators can exist independently + TransformEvaluator<Object> secondEvaluator = + factory.forApplication(secondVals.getProducingTransformInternal(), null, context); + + TransformResult firstResult = firstEvaluator.finishBundle(); + TransformResult secondResult = secondEvaluator.finishBundle(); + + assertThat( + Iterables.getOnlyElement(firstResult.getOutputBundles()) + .commit(Instant.now()) + .getElements(), + Matchers.<WindowedValue<?>>containsInAnyOrder(WindowedValue.valueInGlobalWindow(2))); + assertThat(firstResult.getWatermarkHold(), equalTo(BoundedWindow.TIMESTAMP_MIN_VALUE)); + + // They both produce equal results, and don't interfere with each other + assertThat( + Iterables.getOnlyElement(secondResult.getOutputBundles()) + .commit(Instant.now()) + .getElements(), + Matchers.<WindowedValue<?>>containsInAnyOrder(WindowedValue.valueInGlobalWindow(2))); + assertThat(secondResult.getWatermarkHold(), equalTo(BoundedWindow.TIMESTAMP_MIN_VALUE)); + } + + /** + * Demonstrates that multiple applications of different {@link TestStream} produce independent + * evaluators. + */ + @Test + public void multipleStreamsMultipleEvaluators() throws Exception { + TestPipeline p = TestPipeline.create(); + PCollection<Integer> firstVals = + p.apply( + "Stream One", + TestStream.create(VarIntCoder.of()).addElements(2).advanceWatermarkToInfinity()); + PCollection<String> secondVals = + p.apply( + "Stream A", + TestStream.create(StringUtf8Coder.of()) + .addElements("Two") + .advanceWatermarkToInfinity()); + + EvaluationContext context = mock(EvaluationContext.class); + when(context.createRootBundle(firstVals)).thenReturn(bundleFactory.createRootBundle(firstVals)); + when(context.createRootBundle(secondVals)) + .thenReturn(bundleFactory.createRootBundle(secondVals)); + + TransformEvaluator<Object> firstEvaluator = + factory.forApplication(firstVals.getProducingTransformInternal(), null, context); + // The two evaluators can exist independently + TransformEvaluator<Object> secondEvaluator = + factory.forApplication(secondVals.getProducingTransformInternal(), null, context); + + TransformResult firstResult = firstEvaluator.finishBundle(); + TransformResult secondResult = secondEvaluator.finishBundle(); + + assertThat( + Iterables.getOnlyElement(firstResult.getOutputBundles()) + .commit(Instant.now()) + .getElements(), + Matchers.<WindowedValue<?>>containsInAnyOrder(WindowedValue.valueInGlobalWindow(2))); + assertThat(firstResult.getWatermarkHold(), equalTo(BoundedWindow.TIMESTAMP_MIN_VALUE)); + + assertThat( + Iterables.getOnlyElement(secondResult.getOutputBundles()) + .commit(Instant.now()) + .getElements(), + Matchers.<WindowedValue<?>>containsInAnyOrder(WindowedValue.valueInGlobalWindow("Two"))); + assertThat(secondResult.getWatermarkHold(), equalTo(BoundedWindow.TIMESTAMP_MIN_VALUE)); + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/89680975/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestStreamTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestStreamTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestStreamTest.java index 6457f91..a1b4e4a 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestStreamTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestStreamTest.java @@ -265,6 +265,35 @@ public class TestStreamTest implements Serializable { } @Test + @Category(NeedsRunner.class) + public void testMultipleStreams() { + TestStream<String> stream = TestStream.create(StringUtf8Coder.of()) + .addElements("foo", "bar") + .advanceWatermarkToInfinity(); + + TestStream<Integer> other = + TestStream.create(VarIntCoder.of()).addElements(1, 2, 3, 4).advanceWatermarkToInfinity(); + + TestPipeline p = TestPipeline.create(); + PCollection<String> createStrings = + p.apply("CreateStrings", stream) + .apply("WindowStrings", + Window.<String>triggering(AfterPane.elementCountAtLeast(2)) + .withAllowedLateness(Duration.ZERO) + .accumulatingFiredPanes()); + PAssert.that(createStrings).containsInAnyOrder("foo", "bar"); + PCollection<Integer> createInts = + p.apply("CreateInts", other) + .apply("WindowInts", + Window.<Integer>triggering(AfterPane.elementCountAtLeast(4)) + .withAllowedLateness(Duration.ZERO) + .accumulatingFiredPanes()); + PAssert.that(createInts).containsInAnyOrder(1, 2, 3, 4); + + p.run(); + } + + @Test public void testElementAtPositiveInfinityThrows() { Builder<Integer> stream = TestStream.create(VarIntCoder.of())
