Repository: incubator-beam
Updated Branches:
  refs/heads/master 1451a0ec9 -> 2b3216bbe


Improve ImmutabilityEnforcement

Check per-element, to catch failures within a call to ProcessElement
more quickly.

Move wrapping of exceptions over the course of calls to ProcessElement
to ParDoInProcessEvaluator.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/150eac59
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/150eac59
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/150eac59

Branch: refs/heads/master
Commit: 150eac594a265a700771e07f07a54b802c5c4776
Parents: 740242c
Author: Thomas Groh <[email protected]>
Authored: Tue Apr 5 13:19:24 2016 -0700
Committer: Thomas Groh <[email protected]>
Committed: Thu Apr 7 09:03:11 2016 -0700

----------------------------------------------------------------------
 .../ImmutabilityEnforcementFactory.java         | 36 ++++++++++++--------
 .../inprocess/ParDoInProcessEvaluator.java      | 13 +++++--
 .../ImmutabilityEnforcementFactoryTest.java     |  9 ++---
 .../inprocess/TransformExecutorTest.java        |  6 ++--
 4 files changed, 38 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/150eac59/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ImmutabilityEnforcementFactory.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ImmutabilityEnforcementFactory.java
 
b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ImmutabilityEnforcementFactory.java
index 2e4c07b..8b7ccba 100644
--- 
a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ImmutabilityEnforcementFactory.java
+++ 
b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ImmutabilityEnforcementFactory.java
@@ -71,25 +71,33 @@ class ImmutabilityEnforcementFactory implements 
ModelEnforcementFactory {
     }
 
     @Override
+    public void afterElement(WindowedValue<T> element) {
+      verifyUnmodified(mutationElements.get(element));
+    }
+
+    @Override
     public void afterFinish(
         CommittedBundle<T> input,
         InProcessTransformResult result,
         Iterable<? extends CommittedBundle<?>> outputs) {
       for (MutationDetector detector : mutationElements.values()) {
-        try {
-          detector.verifyUnmodified();
-        } catch (IllegalMutationException e) {
-          throw UserCodeException.wrap(
-              new IllegalMutationException(
-                  String.format(
-                      "PTransform %s illegaly mutated value %s of class %s."
-                          + " Input values must not be mutated in any way.",
-                      transform.getFullName(),
-                      e.getSavedValue(),
-                      e.getSavedValue().getClass()),
-                  e.getSavedValue(),
-                  e.getNewValue()));
-        }
+        verifyUnmodified(detector);
+      }
+    }
+
+    private void verifyUnmodified(MutationDetector detector) {
+      try {
+        detector.verifyUnmodified();
+      } catch (IllegalMutationException e) {
+        throw new IllegalMutationException(
+            String.format(
+                "PTransform %s illegaly mutated value %s of class %s."
+                    + " Input values must not be mutated in any way.",
+                transform.getFullName(),
+                e.getSavedValue(),
+                e.getSavedValue().getClass()),
+            e.getSavedValue(),
+            e.getNewValue());
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/150eac59/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ParDoInProcessEvaluator.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ParDoInProcessEvaluator.java
 
b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ParDoInProcessEvaluator.java
index 3942bff..4b4d699 100644
--- 
a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ParDoInProcessEvaluator.java
+++ 
b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ParDoInProcessEvaluator.java
@@ -22,6 +22,7 @@ import 
com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.U
 import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform;
 import com.google.cloud.dataflow.sdk.util.DoFnRunner;
 import com.google.cloud.dataflow.sdk.util.DoFnRunners.OutputManager;
+import com.google.cloud.dataflow.sdk.util.UserCodeException;
 import com.google.cloud.dataflow.sdk.util.WindowedValue;
 import com.google.cloud.dataflow.sdk.util.common.CounterSet;
 import 
com.google.cloud.dataflow.sdk.util.state.CopyOnAccessInMemoryStateInternals;
@@ -56,12 +57,20 @@ class ParDoInProcessEvaluator<T> implements 
TransformEvaluator<T> {
 
   @Override
   public void processElement(WindowedValue<T> element) {
-    fnRunner.processElement(element);
+    try {
+      fnRunner.processElement(element);
+    } catch (Exception e) {
+      throw UserCodeException.wrap(e);
+    }
   }
 
   @Override
   public InProcessTransformResult finishBundle() {
-    fnRunner.finishBundle();
+    try {
+      fnRunner.finishBundle();
+    } catch (Exception e) {
+      throw UserCodeException.wrap(e);
+    }
     StepTransformResult.Builder resultBuilder;
     CopyOnAccessInMemoryStateInternals<?> state = stepContext.commitState();
     if (state != null) {

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/150eac59/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/ImmutabilityEnforcementFactoryTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/ImmutabilityEnforcementFactoryTest.java
 
b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/ImmutabilityEnforcementFactoryTest.java
index e65b178..ec779c0 100644
--- 
a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/ImmutabilityEnforcementFactoryTest.java
+++ 
b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/ImmutabilityEnforcementFactoryTest.java
@@ -17,8 +17,6 @@
  */
 package com.google.cloud.dataflow.sdk.runners.inprocess;
 
-import static org.hamcrest.Matchers.isA;
-
 import 
com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle;
 import com.google.cloud.dataflow.sdk.testing.TestPipeline;
 import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform;
@@ -27,7 +25,6 @@ import com.google.cloud.dataflow.sdk.transforms.Create;
 import com.google.cloud.dataflow.sdk.transforms.DoFn;
 import com.google.cloud.dataflow.sdk.transforms.ParDo;
 import com.google.cloud.dataflow.sdk.util.IllegalMutationException;
-import com.google.cloud.dataflow.sdk.util.UserCodeException;
 import com.google.cloud.dataflow.sdk.util.WindowedValue;
 import com.google.cloud.dataflow.sdk.values.PCollection;
 
@@ -94,8 +91,7 @@ public class ImmutabilityEnforcementFactoryTest implements 
Serializable {
     ModelEnforcement<byte[]> enforcement = factory.forBundle(elements, 
consumer);
     enforcement.beforeElement(element);
     element.getValue()[0] = 'f';
-    thrown.expect(UserCodeException.class);
-    thrown.expectCause(isA(IllegalMutationException.class));
+    thrown.expect(IllegalMutationException.class);
     thrown.expectMessage(consumer.getFullName());
     thrown.expectMessage("illegaly mutated");
     thrown.expectMessage("Input values must not be mutated");
@@ -118,8 +114,7 @@ public class ImmutabilityEnforcementFactoryTest implements 
Serializable {
     enforcement.afterElement(element);
 
     element.getValue()[0] = 'f';
-    thrown.expect(UserCodeException.class);
-    thrown.expectCause(isA(IllegalMutationException.class));
+    thrown.expect(IllegalMutationException.class);
     thrown.expectMessage(consumer.getFullName());
     thrown.expectMessage("illegaly mutated");
     thrown.expectMessage("Input values must not be mutated");

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/150eac59/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/TransformExecutorTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/TransformExecutorTest.java
 
b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/TransformExecutorTest.java
index b029dd3..7e87515 100644
--- 
a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/TransformExecutorTest.java
+++ 
b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/TransformExecutorTest.java
@@ -34,7 +34,7 @@ import 
com.google.cloud.dataflow.sdk.transforms.AppliedPTransform;
 import com.google.cloud.dataflow.sdk.transforms.Create;
 import com.google.cloud.dataflow.sdk.transforms.PTransform;
 import com.google.cloud.dataflow.sdk.transforms.WithKeys;
-import com.google.cloud.dataflow.sdk.util.UserCodeException;
+import com.google.cloud.dataflow.sdk.util.IllegalMutationException;
 import com.google.cloud.dataflow.sdk.util.WindowedValue;
 import com.google.cloud.dataflow.sdk.values.KV;
 import com.google.cloud.dataflow.sdk.values.PCollection;
@@ -413,7 +413,7 @@ public class TransformExecutorTest {
     fooBytes.getValue()[0] = 'b';
     evaluatorLatch.countDown();
 
-    thrown.expectCause(isA(UserCodeException.class));
+    thrown.expectCause(isA(IllegalMutationException.class));
     task.get();
   }
 
@@ -472,7 +472,7 @@ public class TransformExecutorTest {
     fooBytes.getValue()[0] = 'b';
     evaluatorLatch.countDown();
 
-    thrown.expectCause(isA(UserCodeException.class));
+    thrown.expectCause(isA(IllegalMutationException.class));
     task.get();
   }
 

Reply via email to