Repository: incubator-beam Updated Branches: refs/heads/master 1451a0ec9 -> 2b3216bbe
Improve ImmutabilityEnforcement Check per-element, to catch failures within a call to ProcessElement more quickly. Move wrapping of exceptions over the course of calls to ProcessElement to ParDoInProcessEvaluator. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/150eac59 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/150eac59 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/150eac59 Branch: refs/heads/master Commit: 150eac594a265a700771e07f07a54b802c5c4776 Parents: 740242c Author: Thomas Groh <[email protected]> Authored: Tue Apr 5 13:19:24 2016 -0700 Committer: Thomas Groh <[email protected]> Committed: Thu Apr 7 09:03:11 2016 -0700 ---------------------------------------------------------------------- .../ImmutabilityEnforcementFactory.java | 36 ++++++++++++-------- .../inprocess/ParDoInProcessEvaluator.java | 13 +++++-- .../ImmutabilityEnforcementFactoryTest.java | 9 ++--- .../inprocess/TransformExecutorTest.java | 6 ++-- 4 files changed, 38 insertions(+), 26 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/150eac59/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ImmutabilityEnforcementFactory.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ImmutabilityEnforcementFactory.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ImmutabilityEnforcementFactory.java index 2e4c07b..8b7ccba 100644 --- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ImmutabilityEnforcementFactory.java +++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ImmutabilityEnforcementFactory.java @@ -71,25 +71,33 @@ class ImmutabilityEnforcementFactory implements ModelEnforcementFactory { } @Override + public void afterElement(WindowedValue<T> element) { + verifyUnmodified(mutationElements.get(element)); + } + + @Override public void afterFinish( CommittedBundle<T> input, InProcessTransformResult result, Iterable<? extends CommittedBundle<?>> outputs) { for (MutationDetector detector : mutationElements.values()) { - try { - detector.verifyUnmodified(); - } catch (IllegalMutationException e) { - throw UserCodeException.wrap( - new IllegalMutationException( - String.format( - "PTransform %s illegaly mutated value %s of class %s." - + " Input values must not be mutated in any way.", - transform.getFullName(), - e.getSavedValue(), - e.getSavedValue().getClass()), - e.getSavedValue(), - e.getNewValue())); - } + verifyUnmodified(detector); + } + } + + private void verifyUnmodified(MutationDetector detector) { + try { + detector.verifyUnmodified(); + } catch (IllegalMutationException e) { + throw new IllegalMutationException( + String.format( + "PTransform %s illegaly mutated value %s of class %s." + + " Input values must not be mutated in any way.", + transform.getFullName(), + e.getSavedValue(), + e.getSavedValue().getClass()), + e.getSavedValue(), + e.getNewValue()); } } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/150eac59/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ParDoInProcessEvaluator.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ParDoInProcessEvaluator.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ParDoInProcessEvaluator.java index 3942bff..4b4d699 100644 --- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ParDoInProcessEvaluator.java +++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ParDoInProcessEvaluator.java @@ -22,6 +22,7 @@ import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.U import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform; import com.google.cloud.dataflow.sdk.util.DoFnRunner; import com.google.cloud.dataflow.sdk.util.DoFnRunners.OutputManager; +import com.google.cloud.dataflow.sdk.util.UserCodeException; import com.google.cloud.dataflow.sdk.util.WindowedValue; import com.google.cloud.dataflow.sdk.util.common.CounterSet; import com.google.cloud.dataflow.sdk.util.state.CopyOnAccessInMemoryStateInternals; @@ -56,12 +57,20 @@ class ParDoInProcessEvaluator<T> implements TransformEvaluator<T> { @Override public void processElement(WindowedValue<T> element) { - fnRunner.processElement(element); + try { + fnRunner.processElement(element); + } catch (Exception e) { + throw UserCodeException.wrap(e); + } } @Override public InProcessTransformResult finishBundle() { - fnRunner.finishBundle(); + try { + fnRunner.finishBundle(); + } catch (Exception e) { + throw UserCodeException.wrap(e); + } StepTransformResult.Builder resultBuilder; CopyOnAccessInMemoryStateInternals<?> state = stepContext.commitState(); if (state != null) { http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/150eac59/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/ImmutabilityEnforcementFactoryTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/ImmutabilityEnforcementFactoryTest.java b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/ImmutabilityEnforcementFactoryTest.java index e65b178..ec779c0 100644 --- a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/ImmutabilityEnforcementFactoryTest.java +++ b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/ImmutabilityEnforcementFactoryTest.java @@ -17,8 +17,6 @@ */ package com.google.cloud.dataflow.sdk.runners.inprocess; -import static org.hamcrest.Matchers.isA; - import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle; import com.google.cloud.dataflow.sdk.testing.TestPipeline; import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform; @@ -27,7 +25,6 @@ import com.google.cloud.dataflow.sdk.transforms.Create; import com.google.cloud.dataflow.sdk.transforms.DoFn; import com.google.cloud.dataflow.sdk.transforms.ParDo; import com.google.cloud.dataflow.sdk.util.IllegalMutationException; -import com.google.cloud.dataflow.sdk.util.UserCodeException; import com.google.cloud.dataflow.sdk.util.WindowedValue; import com.google.cloud.dataflow.sdk.values.PCollection; @@ -94,8 +91,7 @@ public class ImmutabilityEnforcementFactoryTest implements Serializable { ModelEnforcement<byte[]> enforcement = factory.forBundle(elements, consumer); enforcement.beforeElement(element); element.getValue()[0] = 'f'; - thrown.expect(UserCodeException.class); - thrown.expectCause(isA(IllegalMutationException.class)); + thrown.expect(IllegalMutationException.class); thrown.expectMessage(consumer.getFullName()); thrown.expectMessage("illegaly mutated"); thrown.expectMessage("Input values must not be mutated"); @@ -118,8 +114,7 @@ public class ImmutabilityEnforcementFactoryTest implements Serializable { enforcement.afterElement(element); element.getValue()[0] = 'f'; - thrown.expect(UserCodeException.class); - thrown.expectCause(isA(IllegalMutationException.class)); + thrown.expect(IllegalMutationException.class); thrown.expectMessage(consumer.getFullName()); thrown.expectMessage("illegaly mutated"); thrown.expectMessage("Input values must not be mutated"); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/150eac59/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/TransformExecutorTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/TransformExecutorTest.java b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/TransformExecutorTest.java index b029dd3..7e87515 100644 --- a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/TransformExecutorTest.java +++ b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/TransformExecutorTest.java @@ -34,7 +34,7 @@ import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform; import com.google.cloud.dataflow.sdk.transforms.Create; import com.google.cloud.dataflow.sdk.transforms.PTransform; import com.google.cloud.dataflow.sdk.transforms.WithKeys; -import com.google.cloud.dataflow.sdk.util.UserCodeException; +import com.google.cloud.dataflow.sdk.util.IllegalMutationException; import com.google.cloud.dataflow.sdk.util.WindowedValue; import com.google.cloud.dataflow.sdk.values.KV; import com.google.cloud.dataflow.sdk.values.PCollection; @@ -413,7 +413,7 @@ public class TransformExecutorTest { fooBytes.getValue()[0] = 'b'; evaluatorLatch.countDown(); - thrown.expectCause(isA(UserCodeException.class)); + thrown.expectCause(isA(IllegalMutationException.class)); task.get(); } @@ -472,7 +472,7 @@ public class TransformExecutorTest { fooBytes.getValue()[0] = 'b'; evaluatorLatch.countDown(); - thrown.expectCause(isA(UserCodeException.class)); + thrown.expectCause(isA(IllegalMutationException.class)); task.get(); }
