Explicitly Throw in TransformExecutorTest
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/b4ee8b73 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/b4ee8b73 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/b4ee8b73 Branch: refs/heads/gearpump-runner Commit: b4ee8b730bffb31ee1178303f1dbd5058eb22a11 Parents: 37e891f Author: Thomas Groh <tg...@google.com> Authored: Fri Dec 2 10:56:15 2016 -0800 Committer: Thomas Groh <tg...@google.com> Committed: Fri Dec 2 13:58:38 2016 -0800 ---------------------------------------------------------------------- .../runners/direct/TransformExecutorTest.java | 184 ++++++++++--------- 1 file changed, 97 insertions(+), 87 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b4ee8b73/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java index 85eff65..08b1e18 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java @@ -37,13 +37,10 @@ import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.beam.runners.direct.CommittedResult.OutputType; import org.apache.beam.runners.direct.DirectRunner.CommittedBundle; -import org.apache.beam.sdk.coders.ByteArrayCoder; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.AppliedPTransform; import org.apache.beam.sdk.transforms.Create; -import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.WithKeys; -import org.apache.beam.sdk.util.IllegalMutationException; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; @@ -63,7 +60,9 @@ import org.mockito.MockitoAnnotations; public class TransformExecutorTest { @Rule public ExpectedException thrown = ExpectedException.none(); private PCollection<String> created; - private PCollection<KV<Integer, String>> downstream; + + private AppliedPTransform<?, ?, ?> createdProducer; + private AppliedPTransform<?, ?, ?> downstreamProducer; private CountDownLatch evaluatorCompleted; @@ -88,15 +87,17 @@ public class TransformExecutorTest { TestPipeline p = TestPipeline.create(); created = p.apply(Create.of("foo", "spam", "third")); - downstream = created.apply(WithKeys.<Integer, String>of(3)); + PCollection<KV<Integer, String>> downstream = created.apply(WithKeys.<Integer, String>of(3)); + + createdProducer = created.getProducingTransformInternal(); + downstreamProducer = downstream.getProducingTransformInternal(); when(evaluationContext.getMetrics()).thenReturn(metrics); } @Test public void callWithNullInputBundleFinishesBundleAndCompletes() throws Exception { - final TransformResult<Object> result = - StepTransformResult.withoutHold(created.getProducingTransformInternal()).build(); + final TransformResult<Object> result = StepTransformResult.withoutHold(createdProducer).build(); final AtomicBoolean finishCalled = new AtomicBoolean(false); TransformEvaluator<Object> evaluator = new TransformEvaluator<Object>() { @@ -112,8 +113,7 @@ public class TransformExecutorTest { } }; - when(registry.forApplication(created.getProducingTransformInternal(), null)) - .thenReturn(evaluator); + when(registry.forApplication(createdProducer, null)).thenReturn(evaluator); TransformExecutor<Object> executor = TransformExecutor.create( @@ -121,7 +121,7 @@ public class TransformExecutorTest { registry, Collections.<ModelEnforcementFactory>emptyList(), null, - created.getProducingTransformInternal(), + createdProducer, completionCallback, transformEvaluationState); executor.run(); @@ -133,7 +133,7 @@ public class TransformExecutorTest { @Test public void nullTransformEvaluatorTerminates() throws Exception { - when(registry.forApplication(created.getProducingTransformInternal(), null)).thenReturn(null); + when(registry.forApplication(createdProducer, null)).thenReturn(null); TransformExecutor<Object> executor = TransformExecutor.create( @@ -141,7 +141,7 @@ public class TransformExecutorTest { registry, Collections.<ModelEnforcementFactory>emptyList(), null, - created.getProducingTransformInternal(), + createdProducer, completionCallback, transformEvaluationState); executor.run(); @@ -154,7 +154,7 @@ public class TransformExecutorTest { @Test public void inputBundleProcessesEachElementFinishesAndCompletes() throws Exception { final TransformResult<String> result = - StepTransformResult.<String>withoutHold(downstream.getProducingTransformInternal()).build(); + StepTransformResult.<String>withoutHold(downstreamProducer).build(); final Collection<WindowedValue<String>> elementsProcessed = new ArrayList<>(); TransformEvaluator<String> evaluator = new TransformEvaluator<String>() { @@ -175,8 +175,7 @@ public class TransformExecutorTest { WindowedValue<String> third = WindowedValue.valueInGlobalWindow("third"); CommittedBundle<String> inputBundle = bundleFactory.createBundle(created).add(foo).add(spam).add(third).commit(Instant.now()); - when(registry.<String>forApplication(downstream.getProducingTransformInternal(), inputBundle)) - .thenReturn(evaluator); + when(registry.<String>forApplication(downstreamProducer, inputBundle)).thenReturn(evaluator); TransformExecutor<String> executor = TransformExecutor.create( @@ -184,7 +183,7 @@ public class TransformExecutorTest { registry, Collections.<ModelEnforcementFactory>emptyList(), inputBundle, - downstream.getProducingTransformInternal(), + downstreamProducer, completionCallback, transformEvaluationState); @@ -200,7 +199,7 @@ public class TransformExecutorTest { @Test public void processElementThrowsExceptionCallsback() throws Exception { final TransformResult<String> result = - StepTransformResult.<String>withoutHold(downstream.getProducingTransformInternal()).build(); + StepTransformResult.<String>withoutHold(downstreamProducer).build(); final Exception exception = new Exception(); TransformEvaluator<String> evaluator = new TransformEvaluator<String>() { @@ -218,8 +217,7 @@ public class TransformExecutorTest { WindowedValue<String> foo = WindowedValue.valueInGlobalWindow("foo"); CommittedBundle<String> inputBundle = bundleFactory.createBundle(created).add(foo).commit(Instant.now()); - when(registry.<String>forApplication(downstream.getProducingTransformInternal(), inputBundle)) - .thenReturn(evaluator); + when(registry.<String>forApplication(downstreamProducer, inputBundle)).thenReturn(evaluator); TransformExecutor<String> executor = TransformExecutor.create( @@ -227,7 +225,7 @@ public class TransformExecutorTest { registry, Collections.<ModelEnforcementFactory>emptyList(), inputBundle, - downstream.getProducingTransformInternal(), + downstreamProducer, completionCallback, transformEvaluationState); Executors.newSingleThreadExecutor().submit(executor); @@ -252,10 +250,8 @@ public class TransformExecutorTest { } }; - CommittedBundle<String> inputBundle = - bundleFactory.createBundle(created).commit(Instant.now()); - when(registry.<String>forApplication(downstream.getProducingTransformInternal(), inputBundle)) - .thenReturn(evaluator); + CommittedBundle<String> inputBundle = bundleFactory.createBundle(created).commit(Instant.now()); + when(registry.<String>forApplication(downstreamProducer, inputBundle)).thenReturn(evaluator); TransformExecutor<String> executor = TransformExecutor.create( @@ -263,7 +259,7 @@ public class TransformExecutorTest { registry, Collections.<ModelEnforcementFactory>emptyList(), inputBundle, - downstream.getProducingTransformInternal(), + downstreamProducer, completionCallback, transformEvaluationState); Executors.newSingleThreadExecutor().submit(executor); @@ -277,7 +273,7 @@ public class TransformExecutorTest { @Test public void callWithEnforcementAppliesEnforcement() throws Exception { final TransformResult<Object> result = - StepTransformResult.withoutHold(downstream.getProducingTransformInternal()).build(); + StepTransformResult.withoutHold(downstreamProducer).build(); TransformEvaluator<Object> evaluator = new TransformEvaluator<Object>() { @@ -294,8 +290,7 @@ public class TransformExecutorTest { WindowedValue<String> barElem = WindowedValue.valueInGlobalWindow("bar"); CommittedBundle<String> inputBundle = bundleFactory.createBundle(created).add(fooElem).add(barElem).commit(Instant.now()); - when(registry.forApplication(downstream.getProducingTransformInternal(), inputBundle)) - .thenReturn(evaluator); + when(registry.forApplication(downstreamProducer, inputBundle)).thenReturn(evaluator); TestEnforcementFactory enforcement = new TestEnforcementFactory(); TransformExecutor<String> executor = @@ -304,7 +299,7 @@ public class TransformExecutorTest { registry, Collections.<ModelEnforcementFactory>singleton(enforcement), inputBundle, - downstream.getProducingTransformInternal(), + downstreamProducer, completionCallback, transformEvaluationState); @@ -321,21 +316,8 @@ public class TransformExecutorTest { @Test public void callWithEnforcementThrowsOnFinishPropagates() throws Exception { - PCollection<byte[]> pcBytes = - created.apply( - new PTransform<PCollection<String>, PCollection<byte[]>>() { - @Override - public PCollection<byte[]> apply(PCollection<String> input) { - return PCollection.<byte[]>createPrimitiveOutputInternal( - input.getPipeline(), input.getWindowingStrategy(), input.isBounded()) - .setCoder(ByteArrayCoder.of()); - } - }); - final TransformResult<Object> result = - StepTransformResult.withoutHold(pcBytes.getProducingTransformInternal()).build(); - final CountDownLatch testLatch = new CountDownLatch(1); - final CountDownLatch evaluatorLatch = new CountDownLatch(1); + StepTransformResult.withoutHold(created.getProducingTransformInternal()).build(); TransformEvaluator<Object> evaluator = new TransformEvaluator<Object>() { @@ -344,62 +326,42 @@ public class TransformExecutorTest { @Override public TransformResult<Object> finishBundle() throws Exception { - testLatch.countDown(); - evaluatorLatch.await(); return result; } }; - WindowedValue<byte[]> fooBytes = WindowedValue.valueInGlobalWindow("foo".getBytes()); - CommittedBundle<byte[]> inputBundle = - bundleFactory.createBundle(pcBytes).add(fooBytes).commit(Instant.now()); - when(registry.forApplication(pcBytes.getProducingTransformInternal(), inputBundle)) - .thenReturn(evaluator); + WindowedValue<String> fooBytes = WindowedValue.valueInGlobalWindow("foo"); + CommittedBundle<String> inputBundle = + bundleFactory.createBundle(created).add(fooBytes).commit(Instant.now()); + when(registry.forApplication(downstreamProducer, inputBundle)).thenReturn(evaluator); - TransformExecutor<byte[]> executor = + TransformExecutor<String> executor = TransformExecutor.create( evaluationContext, registry, - Collections.<ModelEnforcementFactory>singleton(ImmutabilityEnforcementFactory.create()), + Collections.<ModelEnforcementFactory>singleton( + new ThrowingEnforcementFactory(ThrowingEnforcementFactory.When.AFTER_BUNDLE)), inputBundle, - pcBytes.getProducingTransformInternal(), + downstreamProducer, completionCallback, transformEvaluationState); Future<?> task = Executors.newSingleThreadExecutor().submit(executor); - testLatch.await(); - fooBytes.getValue()[0] = 'b'; - evaluatorLatch.countDown(); - thrown.expectCause(isA(IllegalMutationException.class)); + thrown.expectCause(isA(RuntimeException.class)); + thrown.expectMessage("afterFinish"); task.get(); } @Test public void callWithEnforcementThrowsOnElementPropagates() throws Exception { - PCollection<byte[]> pcBytes = - created.apply( - new PTransform<PCollection<String>, PCollection<byte[]>>() { - @Override - public PCollection<byte[]> apply(PCollection<String> input) { - return PCollection.<byte[]>createPrimitiveOutputInternal( - input.getPipeline(), input.getWindowingStrategy(), input.isBounded()) - .setCoder(ByteArrayCoder.of()); - } - }); - final TransformResult<Object> result = - StepTransformResult.withoutHold(pcBytes.getProducingTransformInternal()).build(); - final CountDownLatch testLatch = new CountDownLatch(1); - final CountDownLatch evaluatorLatch = new CountDownLatch(1); + StepTransformResult.withoutHold(created.getProducingTransformInternal()).build(); TransformEvaluator<Object> evaluator = new TransformEvaluator<Object>() { @Override - public void processElement(WindowedValue<Object> element) throws Exception { - testLatch.countDown(); - evaluatorLatch.await(); - } + public void processElement(WindowedValue<Object> element) throws Exception {} @Override public TransformResult<Object> finishBundle() throws Exception { @@ -407,28 +369,26 @@ public class TransformExecutorTest { } }; - WindowedValue<byte[]> fooBytes = WindowedValue.valueInGlobalWindow("foo".getBytes()); - CommittedBundle<byte[]> inputBundle = - bundleFactory.createBundle(pcBytes).add(fooBytes).commit(Instant.now()); - when(registry.forApplication(pcBytes.getProducingTransformInternal(), inputBundle)) - .thenReturn(evaluator); + WindowedValue<String> fooBytes = WindowedValue.valueInGlobalWindow("foo"); + CommittedBundle<String> inputBundle = + bundleFactory.createBundle(created).add(fooBytes).commit(Instant.now()); + when(registry.forApplication(downstreamProducer, inputBundle)).thenReturn(evaluator); - TransformExecutor<byte[]> executor = + TransformExecutor<String> executor = TransformExecutor.create( evaluationContext, registry, - Collections.<ModelEnforcementFactory>singleton(ImmutabilityEnforcementFactory.create()), + Collections.<ModelEnforcementFactory>singleton( + new ThrowingEnforcementFactory(ThrowingEnforcementFactory.When.AFTER_ELEMENT)), inputBundle, - pcBytes.getProducingTransformInternal(), + downstreamProducer, completionCallback, transformEvaluationState); Future<?> task = Executors.newSingleThreadExecutor().submit(executor); - testLatch.await(); - fooBytes.getValue()[0] = 'b'; - evaluatorLatch.countDown(); - thrown.expectCause(isA(IllegalMutationException.class)); + thrown.expectCause(isA(RuntimeException.class)); + thrown.expectMessage("afterElement"); task.get(); } @@ -509,4 +469,54 @@ public class TransformExecutorTest { finishedBundles.add(result); } } + + private static class ThrowingEnforcementFactory implements ModelEnforcementFactory { + private final When when; + + private ThrowingEnforcementFactory(When when) { + this.when = when; + } + + enum When { + BEFORE_BUNDLE, + BEFORE_ELEMENT, + AFTER_ELEMENT, + AFTER_BUNDLE + } + + @Override + public <T> ModelEnforcement<T> forBundle( + CommittedBundle<T> input, AppliedPTransform<?, ?, ?> consumer) { + if (when == When.BEFORE_BUNDLE) { + throw new RuntimeException("forBundle"); + } + return new ThrowingEnforcement<>(); + } + + private class ThrowingEnforcement<T> implements ModelEnforcement<T> { + @Override + public void beforeElement(WindowedValue<T> element) { + if (when == When.BEFORE_ELEMENT) { + throw new RuntimeException("beforeElement"); + } + } + + @Override + public void afterElement(WindowedValue<T> element) { + if (when == When.AFTER_ELEMENT) { + throw new RuntimeException("afterElement"); + } + } + + @Override + public void afterFinish( + CommittedBundle<T> input, + TransformResult<T> result, + Iterable<? extends CommittedBundle<?>> outputs) { + if (when == When.AFTER_BUNDLE) { + throw new RuntimeException("afterFinish"); + } + } + } + } }