Repository: incubator-beam
Updated Branches:
  refs/heads/master eeb400e11 -> 6901dc09a


Revert "This closes #178"

This reverts commit 9039949d5f518fed84bc7cf7e08870e023b53951, reversing
changes made to e9f1b579a4f5a134b3f00ef011af8d83185e8598.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/9a3a6d6b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/9a3a6d6b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/9a3a6d6b

Branch: refs/heads/master
Commit: 9a3a6d6befd25f2f1aaa75b2bbbd64cf0c1213a5
Parents: eeb400e
Author: Daniel Halperin <[email protected]>
Authored: Sat Apr 16 17:12:49 2016 -0700
Committer: Daniel Halperin <[email protected]>
Committed: Sat Apr 16 17:12:49 2016 -0700

----------------------------------------------------------------------
 .../beam/sdk/options/PipelineOptions.java       |  4 +-
 .../ImmutabilityCheckingBundleFactory.java      | 20 ++---
 .../sdk/options/PipelineOptionsFactoryTest.java |  3 +-
 .../beam/sdk/options/PipelineOptionsTest.java   |  4 +-
 .../beam/sdk/runners/TransformTreeTest.java     | 79 ++++++++++----------
 .../EncodabilityEnforcementFactoryTest.java     |  2 +-
 .../ImmutabilityCheckingBundleFactoryTest.java  | 17 +++--
 .../apache/beam/sdk/transforms/ParDoTest.java   | 14 ++--
 .../beam/sdk/transforms/WithKeysJava8Test.java  |  3 +-
 9 files changed, 78 insertions(+), 68 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9a3a6d6b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptions.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptions.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptions.java
index d87e396..17cf5b3 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptions.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptions.java
@@ -21,8 +21,8 @@ import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.options.GoogleApiDebugOptions.GoogleApiTracer;
 import org.apache.beam.sdk.options.ProxyInvocationHandler.Deserializer;
 import org.apache.beam.sdk.options.ProxyInvocationHandler.Serializer;
+import org.apache.beam.sdk.runners.DirectPipelineRunner;
 import org.apache.beam.sdk.runners.PipelineRunner;
-import org.apache.beam.sdk.runners.inprocess.InProcessPipelineRunner;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.DoFn.Context;
 
@@ -225,7 +225,7 @@ public interface PipelineOptions {
   @Description("The pipeline runner that will be used to execute the pipeline. 
"
       + "For registered runners, the class name can be specified, otherwise 
the fully "
       + "qualified name needs to be specified.")
-  @Default.Class(InProcessPipelineRunner.class)
+  @Default.Class(DirectPipelineRunner.class)
   Class<? extends PipelineRunner<?>> getRunner();
   void setRunner(Class<? extends PipelineRunner<?>> kls);
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9a3a6d6b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/ImmutabilityCheckingBundleFactory.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/ImmutabilityCheckingBundleFactory.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/ImmutabilityCheckingBundleFactory.java
index bb3d501..0852269 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/ImmutabilityCheckingBundleFactory.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/ImmutabilityCheckingBundleFactory.java
@@ -28,6 +28,7 @@ import org.apache.beam.sdk.util.IllegalMutationException;
 import org.apache.beam.sdk.util.MutationDetector;
 import org.apache.beam.sdk.util.MutationDetectors;
 import org.apache.beam.sdk.util.SerializableUtils;
+import org.apache.beam.sdk.util.UserCodeException;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.PCollection;
 
@@ -112,16 +113,17 @@ class ImmutabilityCheckingBundleFactory implements 
BundleFactory {
         try {
           detector.verifyUnmodified();
         } catch (IllegalMutationException exn) {
-          throw new IllegalMutationException(
-              String.format(
-                  "PTransform %s mutated value %s after it was output (new 
value was %s)."
-                      + " Values must not be mutated in any way after being 
output.",
-                  
underlying.getPCollection().getProducingTransformInternal().getFullName(),
+          throw UserCodeException.wrap(
+              new IllegalMutationException(
+                  String.format(
+                      "PTransform %s mutated value %s after it was output (new 
value was %s)."
+                          + " Values must not be mutated in any way after 
being output.",
+                      
underlying.getPCollection().getProducingTransformInternal().getFullName(),
+                      exn.getSavedValue(),
+                      exn.getNewValue()),
                   exn.getSavedValue(),
-                  exn.getNewValue()),
-              exn.getSavedValue(),
-              exn.getNewValue(),
-              exn);
+                  exn.getNewValue(),
+                  exn));
         }
       }
       return underlying.commit(synchronizedProcessingTime);

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9a3a6d6b/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PipelineOptionsFactoryTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PipelineOptionsFactoryTest.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PipelineOptionsFactoryTest.java
index e2d4342..62c6909 100644
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PipelineOptionsFactoryTest.java
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PipelineOptionsFactoryTest.java
@@ -31,7 +31,6 @@ import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.PipelineResult;
 import org.apache.beam.sdk.runners.DirectPipelineRunner;
 import org.apache.beam.sdk.runners.PipelineRunner;
-import org.apache.beam.sdk.runners.inprocess.InProcessPipelineRunner;
 import org.apache.beam.sdk.testing.ExpectedLogs;
 import org.apache.beam.sdk.testing.RestoreSystemProperties;
 
@@ -61,7 +60,7 @@ import java.util.Set;
 @RunWith(JUnit4.class)
 public class PipelineOptionsFactoryTest {
   private static final Class<? extends PipelineRunner<?>> DEFAULT_RUNNER_CLASS 
=
-      InProcessPipelineRunner.class;
+      DirectPipelineRunner.class;
 
   @Rule public ExpectedException expectedException = ExpectedException.none();
   @Rule public TestRule restoreSystemProperties = new 
RestoreSystemProperties();

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9a3a6d6b/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PipelineOptionsTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PipelineOptionsTest.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PipelineOptionsTest.java
index 459272e..dfda528 100644
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PipelineOptionsTest.java
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PipelineOptionsTest.java
@@ -24,7 +24,7 @@ import static org.junit.Assert.assertNotSame;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
-import org.apache.beam.sdk.runners.inprocess.InProcessPipelineRunner;
+import org.apache.beam.sdk.runners.DirectPipelineRunner;
 
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
@@ -87,7 +87,7 @@ public class PipelineOptionsTest {
 
   @Test
   public void testDefaultRunnerIsSet() {
-    assertEquals(InProcessPipelineRunner.class, 
PipelineOptionsFactory.create().getRunner());
+    assertEquals(DirectPipelineRunner.class, 
PipelineOptionsFactory.create().getRunner());
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9a3a6d6b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformTreeTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformTreeTest.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformTreeTest.java
index a778a0d..7690d2b 100644
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformTreeTest.java
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformTreeTest.java
@@ -128,46 +128,45 @@ public class TransformTreeTest {
     final EnumSet<TransformsSeen> left =
         EnumSet.noneOf(TransformsSeen.class);
 
-    p.traverseTopologically(
-        new Pipeline.PipelineVisitor() {
-          @Override
-          public void enterCompositeTransform(TransformTreeNode node) {
-            PTransform<?, ?> transform = node.getTransform();
-            if (transform instanceof Sample.SampleAny) {
-              assertTrue(visited.add(TransformsSeen.SAMPLE_ANY));
-              assertNotNull(node.getEnclosingNode());
-              assertTrue(node.isCompositeNode());
-            } else if (transform instanceof Write.Bound) {
-              assertTrue(visited.add(TransformsSeen.WRITE));
-              assertNotNull(node.getEnclosingNode());
-              assertTrue(node.isCompositeNode());
-            }
-            assertThat(transform, not(instanceOf(Read.Bounded.class)));
-          }
-
-          @Override
-          public void leaveCompositeTransform(TransformTreeNode node) {
-            PTransform<?, ?> transform = node.getTransform();
-            if (transform instanceof Sample.SampleAny) {
-              assertTrue(left.add(TransformsSeen.SAMPLE_ANY));
-            }
-          }
-
-          @Override
-          public void visitTransform(TransformTreeNode node) {
-            PTransform<?, ?> transform = node.getTransform();
-            // Pick is a composite, should not be visited here.
-            assertThat(transform, not(instanceOf(Sample.SampleAny.class)));
-            assertThat(transform, not(instanceOf(Write.Bound.class)));
-            if (transform instanceof Read.Bounded
-                && node.getEnclosingNode().getTransform() instanceof 
TextIO.Read.Bound) {
-              assertTrue(visited.add(TransformsSeen.READ));
-            }
-          }
-
-          @Override
-          public void visitValue(PValue value, TransformTreeNode producer) {}
-        });
+    p.traverseTopologically(new Pipeline.PipelineVisitor() {
+      @Override
+      public void enterCompositeTransform(TransformTreeNode node) {
+        PTransform<?, ?> transform = node.getTransform();
+        if (transform instanceof Sample.SampleAny) {
+          assertTrue(visited.add(TransformsSeen.SAMPLE_ANY));
+          assertNotNull(node.getEnclosingNode());
+          assertTrue(node.isCompositeNode());
+        } else if (transform instanceof Write.Bound) {
+          assertTrue(visited.add(TransformsSeen.WRITE));
+          assertNotNull(node.getEnclosingNode());
+          assertTrue(node.isCompositeNode());
+        }
+        assertThat(transform, not(instanceOf(Read.Bounded.class)));
+      }
+
+      @Override
+      public void leaveCompositeTransform(TransformTreeNode node) {
+        PTransform<?, ?> transform = node.getTransform();
+        if (transform instanceof Sample.SampleAny) {
+          assertTrue(left.add(TransformsSeen.SAMPLE_ANY));
+        }
+      }
+
+      @Override
+      public void visitTransform(TransformTreeNode node) {
+        PTransform<?, ?> transform = node.getTransform();
+        // Pick is a composite, should not be visited here.
+        assertThat(transform, not(instanceOf(Sample.SampleAny.class)));
+        assertThat(transform, not(instanceOf(Write.Bound.class)));
+        if (transform instanceof Read.Bounded) {
+          assertTrue(visited.add(TransformsSeen.READ));
+        }
+      }
+
+      @Override
+      public void visitValue(PValue value, TransformTreeNode producer) {
+      }
+    });
 
     assertTrue(visited.equals(EnumSet.allOf(TransformsSeen.class)));
     assertTrue(left.equals(EnumSet.of(TransformsSeen.SAMPLE_ANY)));

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9a3a6d6b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/EncodabilityEnforcementFactoryTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/EncodabilityEnforcementFactoryTest.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/EncodabilityEnforcementFactoryTest.java
index b3a7d15..7720589 100644
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/EncodabilityEnforcementFactoryTest.java
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/EncodabilityEnforcementFactoryTest.java
@@ -55,7 +55,7 @@ public class EncodabilityEnforcementFactoryTest {
   public void encodeFailsThrows() {
     TestPipeline p = TestPipeline.create();
     PCollection<Record> unencodable =
-        p.apply(Create.<Record>of().withCoder(new RecordNoEncodeCoder()));
+        p.apply(Create.of(new Record()).withCoder(new RecordNoEncodeCoder()));
     AppliedPTransform<?, ?, ?> consumer =
         
unencodable.apply(Count.<Record>globally()).getProducingTransformInternal();
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9a3a6d6b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/ImmutabilityCheckingBundleFactoryTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/ImmutabilityCheckingBundleFactoryTest.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/ImmutabilityCheckingBundleFactoryTest.java
index 06e71b8..386eacc 100644
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/ImmutabilityCheckingBundleFactoryTest.java
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/ImmutabilityCheckingBundleFactoryTest.java
@@ -18,6 +18,7 @@
 package org.apache.beam.sdk.runners.inprocess;
 
 import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.isA;
 import static org.junit.Assert.assertThat;
 
 import org.apache.beam.sdk.coders.ByteArrayCoder;
@@ -30,6 +31,7 @@ import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.util.IllegalMutationException;
+import org.apache.beam.sdk.util.UserCodeException;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.PCollection;
 
@@ -161,9 +163,10 @@ public class ImmutabilityCheckingBundleFactoryTest {
     root.add(WindowedValue.valueInGlobalWindow(array));
 
     array[1] = 2;
-    thrown.expect(IllegalMutationException.class);
+    thrown.expect(UserCodeException.class);
+    thrown.expectCause(isA(IllegalMutationException.class));
     thrown.expectMessage("Values must not be mutated in any way after being 
output");
-    root.commit(Instant.now());
+    CommittedBundle<byte[]> committed = root.commit(Instant.now());
   }
 
   @Test
@@ -181,9 +184,10 @@ public class ImmutabilityCheckingBundleFactoryTest {
     keyed.add(windowedArray);
 
     array[0] = Byte.MAX_VALUE;
-    thrown.expect(IllegalMutationException.class);
+    thrown.expect(UserCodeException.class);
+    thrown.expectCause(isA(IllegalMutationException.class));
     thrown.expectMessage("Values must not be mutated in any way after being 
output");
-    keyed.commit(Instant.now());
+    CommittedBundle<byte[]> committed = keyed.commit(Instant.now());
   }
 
   @Test
@@ -201,9 +205,10 @@ public class ImmutabilityCheckingBundleFactoryTest {
     intermediate.add(windowedArray);
 
     array[2] = -3;
-    thrown.expect(IllegalMutationException.class);
+    thrown.expect(UserCodeException.class);
+    thrown.expectCause(isA(IllegalMutationException.class));
     thrown.expectMessage("Values must not be mutated in any way after being 
output");
-    intermediate.commit(Instant.now());
+    CommittedBundle<byte[]> committed = intermediate.commit(Instant.now());
   }
 
   private static class IdentityDoFn<T> extends DoFn<T, T> {

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9a3a6d6b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
index 83e0f2c..44154e6 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
@@ -26,6 +26,7 @@ import static 
org.apache.beam.sdk.util.StringUtils.byteArrayToJsonString;
 import static org.apache.beam.sdk.util.StringUtils.jsonStringToByteArray;
 
 import static org.hamcrest.Matchers.allOf;
+import static org.hamcrest.Matchers.isA;
 import static 
org.hamcrest.collection.IsIterableContainingInAnyOrder.containsInAnyOrder;
 import static org.hamcrest.collection.IsIterableContainingInOrder.contains;
 import static org.hamcrest.core.AnyOf.anyOf;
@@ -35,6 +36,7 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertThat;
 
 import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.Pipeline.PipelineExecutionException;
 import org.apache.beam.sdk.coders.AtomicCoder;
 import org.apache.beam.sdk.coders.CoderException;
 import org.apache.beam.sdk.coders.ListCoder;
@@ -1117,7 +1119,7 @@ public class ParDoTest implements Serializable {
     input.apply(ParDo.of(new SideOutputDummyFn(sideOutputTag))
         .withOutputTags(mainOutputTag, TupleTagList.of(sideOutputTag)));
 
-    thrown.expect(IllegalStateException.class);
+    thrown.expect(PipelineExecutionException.class);
     thrown.expectMessage("Unable to return a default Coder");
     pipeline.run();
   }
@@ -1420,7 +1422,8 @@ public class ParDoTest implements Serializable {
           }
         }));
 
-    thrown.expect(IllegalMutationException.class);
+    thrown.expect(PipelineExecutionException.class);
+    thrown.expectCause(isA(IllegalMutationException.class));
     thrown.expectMessage("output");
     thrown.expectMessage("must not be mutated");
     pipeline.run();
@@ -1469,7 +1472,8 @@ public class ParDoTest implements Serializable {
           }
         }));
 
-    thrown.expect(IllegalMutationException.class);
+    thrown.expect(PipelineExecutionException.class);
+    thrown.expectCause(isA(IllegalMutationException.class));
     thrown.expectMessage("output");
     thrown.expectMessage("must not be mutated");
     pipeline.run();
@@ -1495,7 +1499,7 @@ public class ParDoTest implements Serializable {
         }));
 
     thrown.expect(IllegalMutationException.class);
-    thrown.expectMessage("Input");
+    thrown.expectMessage("input");
     thrown.expectMessage("must not be mutated");
     pipeline.run();
   }
@@ -1519,7 +1523,7 @@ public class ParDoTest implements Serializable {
         }));
 
     thrown.expect(IllegalMutationException.class);
-    thrown.expectMessage("Input");
+    thrown.expectMessage("input");
     thrown.expectMessage("must not be mutated");
     pipeline.run();
   }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9a3a6d6b/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/WithKeysJava8Test.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/WithKeysJava8Test.java
 
b/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/WithKeysJava8Test.java
index 1ffb147..a0d1a63 100644
--- 
a/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/WithKeysJava8Test.java
+++ 
b/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/WithKeysJava8Test.java
@@ -17,6 +17,7 @@
  */
 package org.apache.beam.sdk.transforms;
 
+import org.apache.beam.sdk.Pipeline.PipelineExecutionException;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.RunnableOnService;
 import org.apache.beam.sdk.testing.TestPipeline;
@@ -64,7 +65,7 @@ public class WithKeysJava8Test {
 
     values.apply("ApplyKeysWithWithKeys", WithKeys.of((String s) -> 
Integer.valueOf(s)));
 
-    thrown.expect(IllegalStateException.class);
+    thrown.expect(PipelineExecutionException.class);
     thrown.expectMessage("Unable to return a default Coder for 
ApplyKeysWithWithKeys");
     thrown.expectMessage("Cannot provide a coder for type variable K");
     thrown.expectMessage("the actual type is unknown due to erasure.");

Reply via email to