Repository: incubator-beam Updated Branches: refs/heads/master eeb400e11 -> 6901dc09a
Revert "This closes #178" This reverts commit 9039949d5f518fed84bc7cf7e08870e023b53951, reversing changes made to e9f1b579a4f5a134b3f00ef011af8d83185e8598. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/9a3a6d6b Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/9a3a6d6b Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/9a3a6d6b Branch: refs/heads/master Commit: 9a3a6d6befd25f2f1aaa75b2bbbd64cf0c1213a5 Parents: eeb400e Author: Daniel Halperin <[email protected]> Authored: Sat Apr 16 17:12:49 2016 -0700 Committer: Daniel Halperin <[email protected]> Committed: Sat Apr 16 17:12:49 2016 -0700 ---------------------------------------------------------------------- .../beam/sdk/options/PipelineOptions.java | 4 +- .../ImmutabilityCheckingBundleFactory.java | 20 ++--- .../sdk/options/PipelineOptionsFactoryTest.java | 3 +- .../beam/sdk/options/PipelineOptionsTest.java | 4 +- .../beam/sdk/runners/TransformTreeTest.java | 79 ++++++++++---------- .../EncodabilityEnforcementFactoryTest.java | 2 +- .../ImmutabilityCheckingBundleFactoryTest.java | 17 +++-- .../apache/beam/sdk/transforms/ParDoTest.java | 14 ++-- .../beam/sdk/transforms/WithKeysJava8Test.java | 3 +- 9 files changed, 78 insertions(+), 68 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9a3a6d6b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptions.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptions.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptions.java index d87e396..17cf5b3 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptions.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptions.java @@ -21,8 +21,8 @@ import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.options.GoogleApiDebugOptions.GoogleApiTracer; import org.apache.beam.sdk.options.ProxyInvocationHandler.Deserializer; import org.apache.beam.sdk.options.ProxyInvocationHandler.Serializer; +import org.apache.beam.sdk.runners.DirectPipelineRunner; import org.apache.beam.sdk.runners.PipelineRunner; -import org.apache.beam.sdk.runners.inprocess.InProcessPipelineRunner; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.DoFn.Context; @@ -225,7 +225,7 @@ public interface PipelineOptions { @Description("The pipeline runner that will be used to execute the pipeline. " + "For registered runners, the class name can be specified, otherwise the fully " + "qualified name needs to be specified.") - @Default.Class(InProcessPipelineRunner.class) + @Default.Class(DirectPipelineRunner.class) Class<? extends PipelineRunner<?>> getRunner(); void setRunner(Class<? extends PipelineRunner<?>> kls); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9a3a6d6b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/ImmutabilityCheckingBundleFactory.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/ImmutabilityCheckingBundleFactory.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/ImmutabilityCheckingBundleFactory.java index bb3d501..0852269 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/ImmutabilityCheckingBundleFactory.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/ImmutabilityCheckingBundleFactory.java @@ -28,6 +28,7 @@ import org.apache.beam.sdk.util.IllegalMutationException; import org.apache.beam.sdk.util.MutationDetector; import org.apache.beam.sdk.util.MutationDetectors; import org.apache.beam.sdk.util.SerializableUtils; +import org.apache.beam.sdk.util.UserCodeException; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.PCollection; @@ -112,16 +113,17 @@ class ImmutabilityCheckingBundleFactory implements BundleFactory { try { detector.verifyUnmodified(); } catch (IllegalMutationException exn) { - throw new IllegalMutationException( - String.format( - "PTransform %s mutated value %s after it was output (new value was %s)." - + " Values must not be mutated in any way after being output.", - underlying.getPCollection().getProducingTransformInternal().getFullName(), + throw UserCodeException.wrap( + new IllegalMutationException( + String.format( + "PTransform %s mutated value %s after it was output (new value was %s)." + + " Values must not be mutated in any way after being output.", + underlying.getPCollection().getProducingTransformInternal().getFullName(), + exn.getSavedValue(), + exn.getNewValue()), exn.getSavedValue(), - exn.getNewValue()), - exn.getSavedValue(), - exn.getNewValue(), - exn); + exn.getNewValue(), + exn)); } } return underlying.commit(synchronizedProcessingTime); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9a3a6d6b/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PipelineOptionsFactoryTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PipelineOptionsFactoryTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PipelineOptionsFactoryTest.java index e2d4342..62c6909 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PipelineOptionsFactoryTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PipelineOptionsFactoryTest.java @@ -31,7 +31,6 @@ import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.PipelineResult; import org.apache.beam.sdk.runners.DirectPipelineRunner; import org.apache.beam.sdk.runners.PipelineRunner; -import org.apache.beam.sdk.runners.inprocess.InProcessPipelineRunner; import org.apache.beam.sdk.testing.ExpectedLogs; import org.apache.beam.sdk.testing.RestoreSystemProperties; @@ -61,7 +60,7 @@ import java.util.Set; @RunWith(JUnit4.class) public class PipelineOptionsFactoryTest { private static final Class<? extends PipelineRunner<?>> DEFAULT_RUNNER_CLASS = - InProcessPipelineRunner.class; + DirectPipelineRunner.class; @Rule public ExpectedException expectedException = ExpectedException.none(); @Rule public TestRule restoreSystemProperties = new RestoreSystemProperties(); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9a3a6d6b/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PipelineOptionsTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PipelineOptionsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PipelineOptionsTest.java index 459272e..dfda528 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PipelineOptionsTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PipelineOptionsTest.java @@ -24,7 +24,7 @@ import static org.junit.Assert.assertNotSame; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; -import org.apache.beam.sdk.runners.inprocess.InProcessPipelineRunner; +import org.apache.beam.sdk.runners.DirectPipelineRunner; import com.google.common.collect.Lists; import com.google.common.collect.Sets; @@ -87,7 +87,7 @@ public class PipelineOptionsTest { @Test public void testDefaultRunnerIsSet() { - assertEquals(InProcessPipelineRunner.class, PipelineOptionsFactory.create().getRunner()); + assertEquals(DirectPipelineRunner.class, PipelineOptionsFactory.create().getRunner()); } @Test http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9a3a6d6b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformTreeTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformTreeTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformTreeTest.java index a778a0d..7690d2b 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformTreeTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformTreeTest.java @@ -128,46 +128,45 @@ public class TransformTreeTest { final EnumSet<TransformsSeen> left = EnumSet.noneOf(TransformsSeen.class); - p.traverseTopologically( - new Pipeline.PipelineVisitor() { - @Override - public void enterCompositeTransform(TransformTreeNode node) { - PTransform<?, ?> transform = node.getTransform(); - if (transform instanceof Sample.SampleAny) { - assertTrue(visited.add(TransformsSeen.SAMPLE_ANY)); - assertNotNull(node.getEnclosingNode()); - assertTrue(node.isCompositeNode()); - } else if (transform instanceof Write.Bound) { - assertTrue(visited.add(TransformsSeen.WRITE)); - assertNotNull(node.getEnclosingNode()); - assertTrue(node.isCompositeNode()); - } - assertThat(transform, not(instanceOf(Read.Bounded.class))); - } - - @Override - public void leaveCompositeTransform(TransformTreeNode node) { - PTransform<?, ?> transform = node.getTransform(); - if (transform instanceof Sample.SampleAny) { - assertTrue(left.add(TransformsSeen.SAMPLE_ANY)); - } - } - - @Override - public void visitTransform(TransformTreeNode node) { - PTransform<?, ?> transform = node.getTransform(); - // Pick is a composite, should not be visited here. - assertThat(transform, not(instanceOf(Sample.SampleAny.class))); - assertThat(transform, not(instanceOf(Write.Bound.class))); - if (transform instanceof Read.Bounded - && node.getEnclosingNode().getTransform() instanceof TextIO.Read.Bound) { - assertTrue(visited.add(TransformsSeen.READ)); - } - } - - @Override - public void visitValue(PValue value, TransformTreeNode producer) {} - }); + p.traverseTopologically(new Pipeline.PipelineVisitor() { + @Override + public void enterCompositeTransform(TransformTreeNode node) { + PTransform<?, ?> transform = node.getTransform(); + if (transform instanceof Sample.SampleAny) { + assertTrue(visited.add(TransformsSeen.SAMPLE_ANY)); + assertNotNull(node.getEnclosingNode()); + assertTrue(node.isCompositeNode()); + } else if (transform instanceof Write.Bound) { + assertTrue(visited.add(TransformsSeen.WRITE)); + assertNotNull(node.getEnclosingNode()); + assertTrue(node.isCompositeNode()); + } + assertThat(transform, not(instanceOf(Read.Bounded.class))); + } + + @Override + public void leaveCompositeTransform(TransformTreeNode node) { + PTransform<?, ?> transform = node.getTransform(); + if (transform instanceof Sample.SampleAny) { + assertTrue(left.add(TransformsSeen.SAMPLE_ANY)); + } + } + + @Override + public void visitTransform(TransformTreeNode node) { + PTransform<?, ?> transform = node.getTransform(); + // Pick is a composite, should not be visited here. + assertThat(transform, not(instanceOf(Sample.SampleAny.class))); + assertThat(transform, not(instanceOf(Write.Bound.class))); + if (transform instanceof Read.Bounded) { + assertTrue(visited.add(TransformsSeen.READ)); + } + } + + @Override + public void visitValue(PValue value, TransformTreeNode producer) { + } + }); assertTrue(visited.equals(EnumSet.allOf(TransformsSeen.class))); assertTrue(left.equals(EnumSet.of(TransformsSeen.SAMPLE_ANY))); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9a3a6d6b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/EncodabilityEnforcementFactoryTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/EncodabilityEnforcementFactoryTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/EncodabilityEnforcementFactoryTest.java index b3a7d15..7720589 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/EncodabilityEnforcementFactoryTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/EncodabilityEnforcementFactoryTest.java @@ -55,7 +55,7 @@ public class EncodabilityEnforcementFactoryTest { public void encodeFailsThrows() { TestPipeline p = TestPipeline.create(); PCollection<Record> unencodable = - p.apply(Create.<Record>of().withCoder(new RecordNoEncodeCoder())); + p.apply(Create.of(new Record()).withCoder(new RecordNoEncodeCoder())); AppliedPTransform<?, ?, ?> consumer = unencodable.apply(Count.<Record>globally()).getProducingTransformInternal(); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9a3a6d6b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/ImmutabilityCheckingBundleFactoryTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/ImmutabilityCheckingBundleFactoryTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/ImmutabilityCheckingBundleFactoryTest.java index 06e71b8..386eacc 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/ImmutabilityCheckingBundleFactoryTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/ImmutabilityCheckingBundleFactoryTest.java @@ -18,6 +18,7 @@ package org.apache.beam.sdk.runners.inprocess; import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.hamcrest.Matchers.isA; import static org.junit.Assert.assertThat; import org.apache.beam.sdk.coders.ByteArrayCoder; @@ -30,6 +31,7 @@ import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.windowing.IntervalWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.util.IllegalMutationException; +import org.apache.beam.sdk.util.UserCodeException; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.PCollection; @@ -161,9 +163,10 @@ public class ImmutabilityCheckingBundleFactoryTest { root.add(WindowedValue.valueInGlobalWindow(array)); array[1] = 2; - thrown.expect(IllegalMutationException.class); + thrown.expect(UserCodeException.class); + thrown.expectCause(isA(IllegalMutationException.class)); thrown.expectMessage("Values must not be mutated in any way after being output"); - root.commit(Instant.now()); + CommittedBundle<byte[]> committed = root.commit(Instant.now()); } @Test @@ -181,9 +184,10 @@ public class ImmutabilityCheckingBundleFactoryTest { keyed.add(windowedArray); array[0] = Byte.MAX_VALUE; - thrown.expect(IllegalMutationException.class); + thrown.expect(UserCodeException.class); + thrown.expectCause(isA(IllegalMutationException.class)); thrown.expectMessage("Values must not be mutated in any way after being output"); - keyed.commit(Instant.now()); + CommittedBundle<byte[]> committed = keyed.commit(Instant.now()); } @Test @@ -201,9 +205,10 @@ public class ImmutabilityCheckingBundleFactoryTest { intermediate.add(windowedArray); array[2] = -3; - thrown.expect(IllegalMutationException.class); + thrown.expect(UserCodeException.class); + thrown.expectCause(isA(IllegalMutationException.class)); thrown.expectMessage("Values must not be mutated in any way after being output"); - intermediate.commit(Instant.now()); + CommittedBundle<byte[]> committed = intermediate.commit(Instant.now()); } private static class IdentityDoFn<T> extends DoFn<T, T> { http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9a3a6d6b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java index 83e0f2c..44154e6 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java @@ -26,6 +26,7 @@ import static org.apache.beam.sdk.util.StringUtils.byteArrayToJsonString; import static org.apache.beam.sdk.util.StringUtils.jsonStringToByteArray; import static org.hamcrest.Matchers.allOf; +import static org.hamcrest.Matchers.isA; import static org.hamcrest.collection.IsIterableContainingInAnyOrder.containsInAnyOrder; import static org.hamcrest.collection.IsIterableContainingInOrder.contains; import static org.hamcrest.core.AnyOf.anyOf; @@ -35,6 +36,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertThat; import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.Pipeline.PipelineExecutionException; import org.apache.beam.sdk.coders.AtomicCoder; import org.apache.beam.sdk.coders.CoderException; import org.apache.beam.sdk.coders.ListCoder; @@ -1117,7 +1119,7 @@ public class ParDoTest implements Serializable { input.apply(ParDo.of(new SideOutputDummyFn(sideOutputTag)) .withOutputTags(mainOutputTag, TupleTagList.of(sideOutputTag))); - thrown.expect(IllegalStateException.class); + thrown.expect(PipelineExecutionException.class); thrown.expectMessage("Unable to return a default Coder"); pipeline.run(); } @@ -1420,7 +1422,8 @@ public class ParDoTest implements Serializable { } })); - thrown.expect(IllegalMutationException.class); + thrown.expect(PipelineExecutionException.class); + thrown.expectCause(isA(IllegalMutationException.class)); thrown.expectMessage("output"); thrown.expectMessage("must not be mutated"); pipeline.run(); @@ -1469,7 +1472,8 @@ public class ParDoTest implements Serializable { } })); - thrown.expect(IllegalMutationException.class); + thrown.expect(PipelineExecutionException.class); + thrown.expectCause(isA(IllegalMutationException.class)); thrown.expectMessage("output"); thrown.expectMessage("must not be mutated"); pipeline.run(); @@ -1495,7 +1499,7 @@ public class ParDoTest implements Serializable { })); thrown.expect(IllegalMutationException.class); - thrown.expectMessage("Input"); + thrown.expectMessage("input"); thrown.expectMessage("must not be mutated"); pipeline.run(); } @@ -1519,7 +1523,7 @@ public class ParDoTest implements Serializable { })); thrown.expect(IllegalMutationException.class); - thrown.expectMessage("Input"); + thrown.expectMessage("input"); thrown.expectMessage("must not be mutated"); pipeline.run(); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9a3a6d6b/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/WithKeysJava8Test.java ---------------------------------------------------------------------- diff --git a/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/WithKeysJava8Test.java b/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/WithKeysJava8Test.java index 1ffb147..a0d1a63 100644 --- a/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/WithKeysJava8Test.java +++ b/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/WithKeysJava8Test.java @@ -17,6 +17,7 @@ */ package org.apache.beam.sdk.transforms; +import org.apache.beam.sdk.Pipeline.PipelineExecutionException; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.RunnableOnService; import org.apache.beam.sdk.testing.TestPipeline; @@ -64,7 +65,7 @@ public class WithKeysJava8Test { values.apply("ApplyKeysWithWithKeys", WithKeys.of((String s) -> Integer.valueOf(s))); - thrown.expect(IllegalStateException.class); + thrown.expect(PipelineExecutionException.class); thrown.expectMessage("Unable to return a default Coder for ApplyKeysWithWithKeys"); thrown.expectMessage("Cannot provide a coder for type variable K"); thrown.expectMessage("the actual type is unknown due to erasure.");
