Repository: incubator-beam Updated Branches: refs/heads/python-sdk 3b4fd5c7d -> 3454d691f
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/75a4c918/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ViewTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ViewTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ViewTest.java index 3bf63fd..1d8b32c 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ViewTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ViewTest.java @@ -84,12 +84,14 @@ public class ViewTest implements Serializable { // anonymous inner classes inside the non-static test methods. @Rule + public final transient TestPipeline pipeline = TestPipeline.create(); + + @Rule public transient ExpectedException thrown = ExpectedException.none(); @Test @Category(RunnableOnService.class) public void testSingletonSideInput() { - Pipeline pipeline = TestPipeline.create(); final PCollectionView<Integer> view = pipeline.apply("Create47", Create.of(47)).apply(View.<Integer>asSingleton()); @@ -112,7 +114,6 @@ public class ViewTest implements Serializable { @Test @Category(RunnableOnService.class) public void testWindowedSingletonSideInput() { - Pipeline pipeline = TestPipeline.create(); final PCollectionView<Integer> view = pipeline.apply("Create47", Create.timestamped( @@ -143,7 +144,6 @@ public class ViewTest implements Serializable { @Test @Category(NeedsRunner.class) public void testEmptySingletonSideInput() throws Exception { - Pipeline pipeline = TestPipeline.create(); final PCollectionView<Integer> view = pipeline.apply("CreateEmptyIntegers", Create.<Integer>of().withCoder(VarIntCoder.of())) @@ -169,7 +169,6 @@ public class ViewTest implements Serializable { @Test @Category(NeedsRunner.class) public void testNonSingletonSideInput() throws Exception { - Pipeline pipeline = TestPipeline.create(); PCollection<Integer> oneTwoThree = pipeline.apply(Create.<Integer>of(1, 2, 3)); final PCollectionView<Integer> view = oneTwoThree.apply(View.<Integer>asSingleton()); @@ -194,7 +193,6 @@ public class ViewTest implements Serializable { @Test @Category(RunnableOnService.class) public void testListSideInput() { - Pipeline pipeline = TestPipeline.create(); final PCollectionView<List<Integer>> view = pipeline.apply("CreateSideInput", Create.of(11, 13, 17, 23)).apply(View.<Integer>asList()); @@ -221,7 +219,6 @@ public class ViewTest implements Serializable { @Test @Category(RunnableOnService.class) public void testWindowedListSideInput() { - Pipeline pipeline = TestPipeline.create(); final PCollectionView<List<Integer>> view = pipeline.apply("CreateSideInput", Create.timestamped( @@ -262,7 +259,6 @@ public class ViewTest implements Serializable { @Test @Category(RunnableOnService.class) public void testEmptyListSideInput() throws Exception { - Pipeline pipeline = TestPipeline.create(); final PCollectionView<List<Integer>> view = pipeline.apply("CreateEmptyView", Create.<Integer>of().withCoder(VarIntCoder.of())) @@ -289,7 +285,6 @@ public class ViewTest implements Serializable { @Test @Category(RunnableOnService.class) public void testListSideInputIsImmutable() { - Pipeline pipeline = TestPipeline.create(); final PCollectionView<List<Integer>> view = pipeline.apply("CreateSideInput", Create.of(11)).apply(View.<Integer>asList()); @@ -335,7 +330,6 @@ public class ViewTest implements Serializable { @Test @Category(RunnableOnService.class) public void testIterableSideInput() { - Pipeline pipeline = TestPipeline.create(); final PCollectionView<Iterable<Integer>> view = pipeline.apply("CreateSideInput", Create.of(11, 13, 17, 23)) @@ -361,7 +355,6 @@ public class ViewTest implements Serializable { @Test @Category(RunnableOnService.class) public void testWindowedIterableSideInput() { - Pipeline pipeline = TestPipeline.create(); final PCollectionView<Iterable<Integer>> view = pipeline.apply("CreateSideInput", Create.timestamped( @@ -401,7 +394,6 @@ public class ViewTest implements Serializable { @Test @Category(RunnableOnService.class) public void testEmptyIterableSideInput() throws Exception { - Pipeline pipeline = TestPipeline.create(); final PCollectionView<Iterable<Integer>> view = pipeline.apply("CreateEmptyView", Create.<Integer>of().withCoder(VarIntCoder.of())) @@ -427,7 +419,6 @@ public class ViewTest implements Serializable { @Test @Category(RunnableOnService.class) public void testIterableSideInputIsImmutable() { - Pipeline pipeline = TestPipeline.create(); final PCollectionView<Iterable<Integer>> view = pipeline.apply("CreateSideInput", Create.of(11)).apply(View.<Integer>asIterable()); @@ -459,7 +450,6 @@ public class ViewTest implements Serializable { @Test @Category(RunnableOnService.class) public void testMultimapSideInput() { - Pipeline pipeline = TestPipeline.create(); final PCollectionView<Map<String, Iterable<Integer>>> view = pipeline.apply("CreateSideInput", Create.of(KV.of("a", 1), KV.of("a", 2), KV.of("b", 3))) @@ -487,7 +477,6 @@ public class ViewTest implements Serializable { @Test @Category(RunnableOnService.class) public void testMultimapAsEntrySetSideInput() { - Pipeline pipeline = TestPipeline.create(); final PCollectionView<Map<String, Iterable<Integer>>> view = pipeline.apply("CreateSideInput", Create.of(KV.of("a", 1), KV.of("a", 2), KV.of("b", 3))) @@ -539,7 +528,6 @@ public class ViewTest implements Serializable { @Test @Category(RunnableOnService.class) public void testMultimapSideInputWithNonDeterministicKeyCoder() { - Pipeline pipeline = TestPipeline.create(); final PCollectionView<Map<String, Iterable<Integer>>> view = pipeline.apply("CreateSideInput", @@ -569,7 +557,6 @@ public class ViewTest implements Serializable { @Test @Category(RunnableOnService.class) public void testWindowedMultimapSideInput() { - Pipeline pipeline = TestPipeline.create(); final PCollectionView<Map<String, Iterable<Integer>>> view = pipeline.apply("CreateSideInput", Create.timestamped( @@ -608,7 +595,6 @@ public class ViewTest implements Serializable { @Test @Category(RunnableOnService.class) public void testWindowedMultimapAsEntrySetSideInput() { - Pipeline pipeline = TestPipeline.create(); final PCollectionView<Map<String, Iterable<Integer>>> view = pipeline.apply("CreateSideInput", Create.timestamped( @@ -651,7 +637,6 @@ public class ViewTest implements Serializable { @Test @Category(RunnableOnService.class) public void testWindowedMultimapSideInputWithNonDeterministicKeyCoder() { - Pipeline pipeline = TestPipeline.create(); final PCollectionView<Map<String, Iterable<Integer>>> view = pipeline.apply("CreateSideInput", @@ -691,7 +676,6 @@ public class ViewTest implements Serializable { @Test @Category(RunnableOnService.class) public void testEmptyMultimapSideInput() throws Exception { - Pipeline pipeline = TestPipeline.create(); final PCollectionView<Map<String, Iterable<Integer>>> view = pipeline.apply("CreateEmptyView", Create.<KV<String, Integer>>of().withCoder( @@ -720,7 +704,6 @@ public class ViewTest implements Serializable { @Test @Category(RunnableOnService.class) public void testEmptyMultimapSideInputWithNonDeterministicKeyCoder() throws Exception { - Pipeline pipeline = TestPipeline.create(); final PCollectionView<Map<String, Iterable<Integer>>> view = pipeline.apply("CreateEmptyView", @@ -750,7 +733,6 @@ public class ViewTest implements Serializable { @Test @Category(RunnableOnService.class) public void testMultimapSideInputIsImmutable() { - Pipeline pipeline = TestPipeline.create(); final PCollectionView<Map<String, Iterable<Integer>>> view = pipeline.apply("CreateSideInput", Create.of(KV.of("a", 1))) @@ -798,7 +780,6 @@ public class ViewTest implements Serializable { @Test @Category(RunnableOnService.class) public void testMapSideInput() { - Pipeline pipeline = TestPipeline.create(); final PCollectionView<Map<String, Integer>> view = pipeline.apply("CreateSideInput", Create.of(KV.of("a", 1), KV.of("b", 3))) @@ -825,7 +806,6 @@ public class ViewTest implements Serializable { @Test @Category(RunnableOnService.class) public void testMapAsEntrySetSideInput() { - Pipeline pipeline = TestPipeline.create(); final PCollectionView<Map<String, Integer>> view = pipeline.apply("CreateSideInput", Create.of(KV.of("a", 1), KV.of("b", 3))) @@ -855,7 +835,6 @@ public class ViewTest implements Serializable { @Test @Category(RunnableOnService.class) public void testMapSideInputWithNonDeterministicKeyCoder() { - Pipeline pipeline = TestPipeline.create(); final PCollectionView<Map<String, Integer>> view = pipeline.apply("CreateSideInput", @@ -884,7 +863,6 @@ public class ViewTest implements Serializable { @Test @Category(RunnableOnService.class) public void testWindowedMapSideInput() { - Pipeline pipeline = TestPipeline.create(); final PCollectionView<Map<String, Integer>> view = pipeline.apply("CreateSideInput", Create.timestamped( @@ -922,7 +900,6 @@ public class ViewTest implements Serializable { @Test @Category(RunnableOnService.class) public void testWindowedMapAsEntrySetSideInput() { - Pipeline pipeline = TestPipeline.create(); final PCollectionView<Map<String, Integer>> view = pipeline.apply("CreateSideInput", Create.timestamped( @@ -964,7 +941,6 @@ public class ViewTest implements Serializable { @Test @Category(RunnableOnService.class) public void testWindowedMapSideInputWithNonDeterministicKeyCoder() { - Pipeline pipeline = TestPipeline.create(); final PCollectionView<Map<String, Integer>> view = pipeline.apply("CreateSideInput", @@ -1004,7 +980,6 @@ public class ViewTest implements Serializable { @Test @Category(RunnableOnService.class) public void testEmptyMapSideInput() throws Exception { - Pipeline pipeline = TestPipeline.create(); final PCollectionView<Map<String, Integer>> view = pipeline.apply("CreateEmptyView", Create.<KV<String, Integer>>of().withCoder( @@ -1033,7 +1008,6 @@ public class ViewTest implements Serializable { @Test @Category(RunnableOnService.class) public void testEmptyMapSideInputWithNonDeterministicKeyCoder() throws Exception { - Pipeline pipeline = TestPipeline.create(); final PCollectionView<Map<String, Integer>> view = pipeline.apply("CreateEmptyView", Create.<KV<String, Integer>>of().withCoder( @@ -1062,7 +1036,6 @@ public class ViewTest implements Serializable { @Test @Category(NeedsRunner.class) public void testMapSideInputWithNullValuesCatchesDuplicates() { - Pipeline pipeline = TestPipeline.create(); final PCollectionView<Map<String, Integer>> view = pipeline @@ -1098,7 +1071,6 @@ public class ViewTest implements Serializable { @Test @Category(RunnableOnService.class) public void testMapSideInputIsImmutable() { - Pipeline pipeline = TestPipeline.create(); final PCollectionView<Map<String, Integer>> view = pipeline.apply("CreateSideInput", Create.of(KV.of("a", 1))) @@ -1145,7 +1117,6 @@ public class ViewTest implements Serializable { @Test @Category(RunnableOnService.class) public void testCombinedMapSideInput() { - Pipeline pipeline = TestPipeline.create(); final PCollectionView<Map<String, Integer>> view = pipeline.apply("CreateSideInput", Create.of(KV.of("a", 1), KV.of("a", 20), KV.of("b", 3))) @@ -1172,10 +1143,9 @@ public class ViewTest implements Serializable { @Test @Category(RunnableOnService.class) public void testWindowedSideInputFixedToFixed() { - Pipeline p = TestPipeline.create(); final PCollectionView<Integer> view = - p.apply( + pipeline.apply( "CreateSideInput", Create.timestamped(TimestampedValue.of(1, new Instant(1)), TimestampedValue.of(2, new Instant(11)), TimestampedValue.of(3, new Instant(13)))) @@ -1184,7 +1154,7 @@ public class ViewTest implements Serializable { .apply(View.<Integer>asSingleton()); PCollection<String> output = - p.apply("CreateMainInput", Create.timestamped( + pipeline.apply("CreateMainInput", Create.timestamped( TimestampedValue.of("A", new Instant(4)), TimestampedValue.of("B", new Instant(15)), TimestampedValue.of("C", new Instant(7)))) @@ -1199,16 +1169,15 @@ public class ViewTest implements Serializable { PAssert.that(output).containsInAnyOrder("A1", "B5", "C1"); - p.run(); + pipeline.run(); } @Test @Category(RunnableOnService.class) public void testWindowedSideInputFixedToGlobal() { - Pipeline p = TestPipeline.create(); final PCollectionView<Integer> view = - p.apply( + pipeline.apply( "CreateSideInput", Create.timestamped(TimestampedValue.of(1, new Instant(1)), TimestampedValue.of(2, new Instant(11)), TimestampedValue.of(3, new Instant(13)))) @@ -1217,7 +1186,7 @@ public class ViewTest implements Serializable { .apply(View.<Integer>asSingleton()); PCollection<String> output = - p.apply("CreateMainInput", Create.timestamped( + pipeline.apply("CreateMainInput", Create.timestamped( TimestampedValue.of("A", new Instant(4)), TimestampedValue.of("B", new Instant(15)), TimestampedValue.of("C", new Instant(7)))) @@ -1232,23 +1201,22 @@ public class ViewTest implements Serializable { PAssert.that(output).containsInAnyOrder("A6", "B6", "C6"); - p.run(); + pipeline.run(); } @Test @Category(RunnableOnService.class) public void testWindowedSideInputFixedToFixedWithDefault() { - Pipeline p = TestPipeline.create(); final PCollectionView<Integer> view = - p.apply("CreateSideInput", Create.timestamped( + pipeline.apply("CreateSideInput", Create.timestamped( TimestampedValue.of(2, new Instant(11)), TimestampedValue.of(3, new Instant(13)))) .apply("WindowSideInput", Window.<Integer>into(FixedWindows.of(Duration.millis(10)))) .apply(Sum.integersGlobally().asSingletonView()); PCollection<String> output = - p.apply("CreateMainInput", Create.timestamped( + pipeline.apply("CreateMainInput", Create.timestamped( TimestampedValue.of("A", new Instant(4)), TimestampedValue.of("B", new Instant(15)), TimestampedValue.of("C", new Instant(7)))) @@ -1263,16 +1231,15 @@ public class ViewTest implements Serializable { PAssert.that(output).containsInAnyOrder("A0", "B5", "C0"); - p.run(); + pipeline.run(); } @Test @Category(RunnableOnService.class) public void testSideInputWithNullDefault() { - Pipeline p = TestPipeline.create(); final PCollectionView<Void> view = - p.apply("CreateSideInput", Create.of((Void) null).withCoder(VoidCoder.of())) + pipeline.apply("CreateSideInput", Create.of((Void) null).withCoder(VoidCoder.of())) .apply(Combine.globally(new SerializableFunction<Iterable<Void>, Void>() { @Override public Void apply(Iterable<Void> input) { @@ -1281,7 +1248,7 @@ public class ViewTest implements Serializable { }).asSingletonView()); PCollection<String> output = - p.apply("CreateMainInput", Create.of("")) + pipeline.apply("CreateMainInput", Create.of("")) .apply( "OutputMainAndSideInputs", ParDo.withSideInputs(view).of(new DoFn<String, String>() { @@ -1293,13 +1260,12 @@ public class ViewTest implements Serializable { PAssert.that(output).containsInAnyOrder("null"); - p.run(); + pipeline.run(); } @Test @Category(RunnableOnService.class) public void testSideInputWithNestedIterables() { - Pipeline pipeline = TestPipeline.create(); final PCollectionView<Iterable<Integer>> view1 = pipeline.apply("CreateVoid1", Create.of((Void) null).withCoder(VoidCoder.of())) .apply("OutputOneInteger", ParDo.of(new DoFn<Void, Integer>() { @@ -1386,51 +1352,51 @@ public class ViewTest implements Serializable { @Test public void testViewUnboundedAsSingletonDirect() { - testViewUnbounded(TestPipeline.create(), View.<KV<String, Integer>>asSingleton()); + testViewUnbounded(pipeline, View.<KV<String, Integer>>asSingleton()); } @Test public void testViewUnboundedAsIterableDirect() { - testViewUnbounded(TestPipeline.create(), View.<KV<String, Integer>>asIterable()); + testViewUnbounded(pipeline, View.<KV<String, Integer>>asIterable()); } @Test public void testViewUnboundedAsListDirect() { - testViewUnbounded(TestPipeline.create(), View.<KV<String, Integer>>asList()); + testViewUnbounded(pipeline, View.<KV<String, Integer>>asList()); } @Test public void testViewUnboundedAsMapDirect() { - testViewUnbounded(TestPipeline.create(), View.<String, Integer>asMap()); + testViewUnbounded(pipeline, View.<String, Integer>asMap()); } @Test public void testViewUnboundedAsMultimapDirect() { - testViewUnbounded(TestPipeline.create(), View.<String, Integer>asMultimap()); + testViewUnbounded(pipeline, View.<String, Integer>asMultimap()); } @Test public void testViewNonmergingAsSingletonDirect() { - testViewNonmerging(TestPipeline.create(), View.<KV<String, Integer>>asSingleton()); + testViewNonmerging(pipeline, View.<KV<String, Integer>>asSingleton()); } @Test public void testViewNonmergingAsIterableDirect() { - testViewNonmerging(TestPipeline.create(), View.<KV<String, Integer>>asIterable()); + testViewNonmerging(pipeline, View.<KV<String, Integer>>asIterable()); } @Test public void testViewNonmergingAsListDirect() { - testViewNonmerging(TestPipeline.create(), View.<KV<String, Integer>>asList()); + testViewNonmerging(pipeline, View.<KV<String, Integer>>asList()); } @Test public void testViewNonmergingAsMapDirect() { - testViewNonmerging(TestPipeline.create(), View.<String, Integer>asMap()); + testViewNonmerging(pipeline, View.<String, Integer>asMap()); } @Test public void testViewNonmergingAsMultimapDirect() { - testViewNonmerging(TestPipeline.create(), View.<String, Integer>asMultimap()); + testViewNonmerging(pipeline, View.<String, Integer>asMultimap()); } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/75a4c918/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/WithKeysTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/WithKeysTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/WithKeysTest.java index f958807..8abbf1a 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/WithKeysTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/WithKeysTest.java @@ -21,7 +21,6 @@ import static org.junit.Assert.assertEquals; import java.util.Arrays; import java.util.List; -import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.testing.NeedsRunner; import org.apache.beam.sdk.testing.PAssert; @@ -29,6 +28,7 @@ import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.TypeDescriptor; +import org.junit.Rule; import org.junit.Test; import org.junit.experimental.categories.Category; import org.junit.runner.RunWith; @@ -63,10 +63,12 @@ public class WithKeysTest { KV.of(100, "bbb") ); + @Rule + public final TestPipeline p = TestPipeline.create(); + @Test @Category(NeedsRunner.class) public void testExtractKeys() { - Pipeline p = TestPipeline.create(); PCollection<String> input = p.apply(Create.of(Arrays.asList(COLLECTION)).withCoder( @@ -83,7 +85,6 @@ public class WithKeysTest { @Test @Category(NeedsRunner.class) public void testConstantKeys() { - Pipeline p = TestPipeline.create(); PCollection<String> input = p.apply(Create.of(Arrays.asList(COLLECTION)).withCoder( @@ -105,7 +106,6 @@ public class WithKeysTest { @Test @Category(NeedsRunner.class) public void testWithKeysWithUnneededWithKeyTypeSucceeds() { - TestPipeline p = TestPipeline.create(); PCollection<String> input = p.apply(Create.of(Arrays.asList(COLLECTION)).withCoder( http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/75a4c918/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/WithTimestampsTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/WithTimestampsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/WithTimestampsTest.java index 923b97c..67a2658 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/WithTimestampsTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/WithTimestampsTest.java @@ -41,13 +41,16 @@ import org.junit.runners.JUnit4; */ @RunWith(JUnit4.class) public class WithTimestampsTest implements Serializable { + + @Rule + public final transient TestPipeline p = TestPipeline.create(); + @Rule public transient ExpectedException thrown = ExpectedException.none(); @Test @Category(RunnableOnService.class) public void withTimestampsShouldApplyTimestamps() { - TestPipeline p = TestPipeline.create(); SerializableFunction<String, Instant> timestampFn = new SerializableFunction<String, Instant>() { @@ -86,7 +89,6 @@ public class WithTimestampsTest implements Serializable { @Test @Category(NeedsRunner.class) public void withTimestampsBackwardsInTimeShouldThrow() { - TestPipeline p = TestPipeline.create(); SerializableFunction<String, Instant> timestampFn = new SerializableFunction<String, Instant>() { @@ -120,7 +122,6 @@ public class WithTimestampsTest implements Serializable { @Test @Category(RunnableOnService.class) public void withTimestampsBackwardsInTimeAndWithAllowedTimestampSkewShouldSucceed() { - TestPipeline p = TestPipeline.create(); SerializableFunction<String, Instant> timestampFn = new SerializableFunction<String, Instant>() { @@ -181,7 +182,6 @@ public class WithTimestampsTest implements Serializable { } }; - TestPipeline p = TestPipeline.create(); String yearTwoThousand = "946684800000"; p.apply(Create.of("1234", "0", Integer.toString(Integer.MAX_VALUE), yearTwoThousand)) .apply(WithTimestamps.of(timestampFn)); @@ -197,7 +197,6 @@ public class WithTimestampsTest implements Serializable { @Test @Category(RunnableOnService.class) public void withTimestampsWithNullFnShouldThrowOnConstruction() { - TestPipeline p = TestPipeline.create(); SerializableFunction<String, Instant> timestampFn = null; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/75a4c918/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/join/CoGroupByKeyTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/join/CoGroupByKeyTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/join/CoGroupByKeyTest.java index e8c8b15..0e5c177 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/join/CoGroupByKeyTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/join/CoGroupByKeyTest.java @@ -47,6 +47,7 @@ import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.TupleTag; import org.joda.time.Duration; +import org.junit.Rule; import org.junit.Test; import org.junit.experimental.categories.Category; import org.junit.runner.RunWith; @@ -118,14 +119,15 @@ public class CoGroupByKeyTest implements Serializable { return coGbkResults; } + @Rule + public final transient TestPipeline p = TestPipeline.create(); + @Test @Category(RunnableOnService.class) public void testCoGroupByKeyGetOnly() { final TupleTag<String> tag1 = new TupleTag<>(); final TupleTag<String> tag2 = new TupleTag<>(); - Pipeline p = TestPipeline.create(); - PCollection<KV<Integer, CoGbkResult>> coGbkResults = buildGetOnlyGbk(p, tag1, tag2); @@ -264,7 +266,6 @@ public class CoGroupByKeyTest implements Serializable { final TupleTag<String> addressesTag = new TupleTag<>(); final TupleTag<String> purchasesTag = new TupleTag<>(); - Pipeline p = TestPipeline.create(); PCollection<KV<Integer, CoGbkResult>> coGbkResults = buildPurchasesCoGbk(p, purchasesTag, addressesTag, namesTag); @@ -456,8 +457,6 @@ public class CoGroupByKeyTest implements Serializable { TupleTag<String> addressesTag = new TupleTag<>(); TupleTag<String> purchasesTag = new TupleTag<>(); - Pipeline p = TestPipeline.create(); - PCollection<KV<Integer, CoGbkResult>> coGbkResults = buildPurchasesCoGbk(p, purchasesTag, addressesTag, namesTag); @@ -486,8 +485,6 @@ public class CoGroupByKeyTest implements Serializable { TupleTag<String> clicksTag = new TupleTag<>(); TupleTag<String> purchasesTag = new TupleTag<>(); - Pipeline p = TestPipeline.create(); - PCollection<KV<Integer, CoGbkResult>> coGbkResults = buildPurchasesCoGbkWithWindowing(p, clicksTag, purchasesTag); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/75a4c918/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java index 3125ae8..e21668e 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java @@ -29,7 +29,6 @@ import static org.junit.Assert.assertTrue; import static org.mockito.Mockito.when; import java.io.Serializable; -import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.Coder.NonDeterministicException; import org.apache.beam.sdk.coders.StringUtf8Coder; @@ -62,11 +61,15 @@ import org.mockito.Mockito; public class WindowTest implements Serializable { @Rule + public final transient TestPipeline pipeline = TestPipeline.create() + .enableAbandonedNodeEnforcement(false); + + @Rule public transient ExpectedException thrown = ExpectedException.none(); @Test public void testWindowIntoSetWindowfn() { - WindowingStrategy<?, ?> strategy = TestPipeline.create() + WindowingStrategy<?, ?> strategy = pipeline .apply(Create.of("hello", "world").withCoder(StringUtf8Coder.of())) .apply(Window.<String>into(FixedWindows.of(Duration.standardMinutes(10)))) .getWindowingStrategy(); @@ -79,7 +82,7 @@ public class WindowTest implements Serializable { public void testWindowIntoTriggersAndAccumulating() { FixedWindows fixed10 = FixedWindows.of(Duration.standardMinutes(10)); Repeatedly trigger = Repeatedly.forever(AfterPane.elementCountAtLeast(5)); - WindowingStrategy<?, ?> strategy = TestPipeline.create() + WindowingStrategy<?, ?> strategy = pipeline .apply(Create.of("hello", "world").withCoder(StringUtf8Coder.of())) .apply(Window.<String>into(fixed10) .triggering(trigger) @@ -96,7 +99,7 @@ public class WindowTest implements Serializable { public void testWindowPropagatesEachPart() { FixedWindows fixed10 = FixedWindows.of(Duration.standardMinutes(10)); Repeatedly trigger = Repeatedly.forever(AfterPane.elementCountAtLeast(5)); - WindowingStrategy<?, ?> strategy = TestPipeline.create() + WindowingStrategy<?, ?> strategy = pipeline .apply(Create.of("hello", "world").withCoder(StringUtf8Coder.of())) .apply("Mode", Window.<String>accumulatingFiredPanes()) .apply("Lateness", Window.<String>withAllowedLateness(Duration.standardDays(1))) @@ -112,9 +115,10 @@ public class WindowTest implements Serializable { @Test public void testWindowIntoPropagatesLateness() { + FixedWindows fixed10 = FixedWindows.of(Duration.standardMinutes(10)); FixedWindows fixed25 = FixedWindows.of(Duration.standardMinutes(25)); - WindowingStrategy<?, ?> strategy = TestPipeline.create() + WindowingStrategy<?, ?> strategy = pipeline .apply(Create.of("hello", "world").withCoder(StringUtf8Coder.of())) .apply("WindowInto10", Window.<String>into(fixed10) .withAllowedLateness(Duration.standardDays(1)) @@ -157,7 +161,7 @@ public class WindowTest implements Serializable { thrown.expect(IllegalArgumentException.class); thrown.expectMessage("requires that the accumulation mode"); - TestPipeline.create() + pipeline .apply(Create.of("hello", "world").withCoder(StringUtf8Coder.of())) .apply("Window", Window.<String>into(fixed10)) .apply("Lateness", Window.<String>withAllowedLateness(Duration.standardDays(1))) @@ -171,7 +175,7 @@ public class WindowTest implements Serializable { thrown.expect(IllegalArgumentException.class); thrown.expectMessage("requires that the allowed lateness"); - TestPipeline.create() + pipeline .apply(Create.of("hello", "world").withCoder(StringUtf8Coder.of())) .apply("Mode", Window.<String>accumulatingFiredPanes()) .apply("Window", Window.<String>into(fixed10)) @@ -185,7 +189,7 @@ public class WindowTest implements Serializable { @Test @Category(RunnableOnService.class) public void testOutputTimeFnDefault() { - Pipeline pipeline = TestPipeline.create(); + pipeline.enableAbandonedNodeEnforcement(true); pipeline .apply( @@ -219,7 +223,7 @@ public class WindowTest implements Serializable { @Test @Category(RunnableOnService.class) public void testOutputTimeFnEndOfWindow() { - Pipeline pipeline = TestPipeline.create(); + pipeline.enableAbandonedNodeEnforcement(true); pipeline.apply( Create.timestamped( http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/75a4c918/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowingTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowingTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowingTest.java index d4fab17..f7ae5d8 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowingTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowingTest.java @@ -21,7 +21,6 @@ import java.io.File; import java.io.FileOutputStream; import java.io.PrintStream; import java.io.Serializable; -import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.io.TextIO; import org.apache.beam.sdk.testing.NeedsRunner; @@ -50,6 +49,10 @@ import org.junit.runners.JUnit4; /** Unit tests for bucketing. */ @RunWith(JUnit4.class) public class WindowingTest implements Serializable { + + @Rule + public final transient TestPipeline p = TestPipeline.create(); + @Rule public transient TemporaryFolder tmpFolder = new TemporaryFolder(); @@ -88,7 +91,6 @@ public class WindowingTest implements Serializable { @Test @Category(RunnableOnService.class) public void testPartitioningWindowing() { - Pipeline p = TestPipeline.create(); PCollection<String> input = p.apply( Create.timestamped( @@ -114,7 +116,6 @@ public class WindowingTest implements Serializable { @Test @Category(RunnableOnService.class) public void testNonPartitioningWindowing() { - Pipeline p = TestPipeline.create(); PCollection<String> input = p.apply( Create.timestamped( @@ -140,7 +141,6 @@ public class WindowingTest implements Serializable { @Test @Category(RunnableOnService.class) public void testMergingWindowing() { - Pipeline p = TestPipeline.create(); PCollection<String> input = p.apply( Create.timestamped( @@ -162,7 +162,6 @@ public class WindowingTest implements Serializable { @Test @Category(RunnableOnService.class) public void testWindowPreservation() { - Pipeline p = TestPipeline.create(); PCollection<String> input1 = p.apply("Create12", Create.timestamped( TimestampedValue.of("a", new Instant(1)), @@ -190,7 +189,6 @@ public class WindowingTest implements Serializable { @Test @Category(NeedsRunner.class) public void testEmptyInput() { - Pipeline p = TestPipeline.create(); PCollection<String> input = p.apply(Create.<String>timestamped() .withCoder(StringUtf8Coder.of())); @@ -218,7 +216,6 @@ public class WindowingTest implements Serializable { writer.println("d 11"); } - Pipeline p = TestPipeline.create(); PCollection<String> output = p.begin() .apply("ReadLines", TextIO.Read.from(filename)) .apply(ParDo.of(new ExtractWordsWithTimestampsFn())) http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/75a4c918/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ReshuffleTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ReshuffleTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ReshuffleTest.java index d990ee0..d47cddc 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ReshuffleTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ReshuffleTest.java @@ -21,7 +21,6 @@ import static org.junit.Assert.assertEquals; import com.google.common.collect.ImmutableList; import java.util.List; -import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.coders.VarIntCoder; @@ -36,6 +35,7 @@ import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; import org.joda.time.Duration; +import org.junit.Rule; import org.junit.Test; import org.junit.experimental.categories.Category; import org.junit.runner.RunWith; @@ -65,10 +65,12 @@ public class ReshuffleTest { KV.of("k1", (Iterable<Integer>) ImmutableList.of(3)), KV.of("k2", (Iterable<Integer>) ImmutableList.of(4))); + @Rule + public final TestPipeline pipeline = TestPipeline.create(); + @Test @Category(RunnableOnService.class) public void testJustReshuffle() { - Pipeline pipeline = TestPipeline.create(); PCollection<KV<String, Integer>> input = pipeline .apply(Create.of(ARBITRARY_KVS) @@ -89,7 +91,6 @@ public class ReshuffleTest { @Test @Category(RunnableOnService.class) public void testReshuffleAfterSessionsAndGroupByKey() { - Pipeline pipeline = TestPipeline.create(); PCollection<KV<String, Iterable<Integer>>> input = pipeline .apply(Create.of(GBK_TESTABLE_KVS) @@ -113,7 +114,6 @@ public class ReshuffleTest { @Test @Category(RunnableOnService.class) public void testReshuffleAfterFixedWindowsAndGroupByKey() { - Pipeline pipeline = TestPipeline.create(); PCollection<KV<String, Iterable<Integer>>> input = pipeline .apply(Create.of(GBK_TESTABLE_KVS) @@ -137,7 +137,6 @@ public class ReshuffleTest { @Test @Category(RunnableOnService.class) public void testReshuffleAfterSlidingWindowsAndGroupByKey() { - Pipeline pipeline = TestPipeline.create(); PCollection<KV<String, Iterable<Integer>>> input = pipeline .apply(Create.of(GBK_TESTABLE_KVS) @@ -161,7 +160,6 @@ public class ReshuffleTest { @Test @Category(RunnableOnService.class) public void testReshuffleAfterFixedWindows() { - Pipeline pipeline = TestPipeline.create(); PCollection<KV<String, Integer>> input = pipeline .apply(Create.of(ARBITRARY_KVS) @@ -185,7 +183,6 @@ public class ReshuffleTest { @Test @Category(RunnableOnService.class) public void testReshuffleAfterSlidingWindows() { - Pipeline pipeline = TestPipeline.create(); PCollection<KV<String, Integer>> input = pipeline .apply(Create.of(ARBITRARY_KVS) http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/75a4c918/sdks/java/core/src/test/java/org/apache/beam/sdk/values/PCollectionTupleTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/values/PCollectionTupleTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/values/PCollectionTupleTest.java index 1467ae8..b5351da 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/values/PCollectionTupleTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/values/PCollectionTupleTest.java @@ -24,7 +24,6 @@ import static org.junit.Assert.assertTrue; import java.io.Serializable; import java.util.Arrays; import java.util.List; -import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.RunnableOnService; import org.apache.beam.sdk.testing.TestPipeline; @@ -33,6 +32,7 @@ import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.util.WindowingStrategy; import org.apache.beam.sdk.values.PCollection.IsBounded; +import org.junit.Rule; import org.junit.Test; import org.junit.experimental.categories.Category; import org.junit.runner.RunWith; @@ -41,9 +41,14 @@ import org.junit.runners.JUnit4; /** Unit tests for {@link PCollectionTuple}. */ @RunWith(JUnit4.class) public final class PCollectionTupleTest implements Serializable { + + @Rule + public final transient TestPipeline pipeline = TestPipeline.create() + .enableAbandonedNodeEnforcement(false); + @Test public void testOfThenHas() { - Pipeline pipeline = TestPipeline.create(); + PCollection<Object> pCollection = PCollection.createPrimitiveOutputInternal( pipeline, WindowingStrategy.globalDefault(), IsBounded.BOUNDED); TupleTag<Object> tag = new TupleTag<>(); @@ -53,7 +58,6 @@ public final class PCollectionTupleTest implements Serializable { @Test public void testEmpty() { - Pipeline pipeline = TestPipeline.create(); TupleTag<Object> tag = new TupleTag<>(); assertFalse(PCollectionTuple.empty(pipeline).has(tag)); } @@ -61,7 +65,7 @@ public final class PCollectionTupleTest implements Serializable { @Test @Category(RunnableOnService.class) public void testComposePCollectionTuple() { - Pipeline pipeline = TestPipeline.create(); + pipeline.enableAbandonedNodeEnforcement(true); List<Integer> inputs = Arrays.asList(3, -42, 666); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/75a4c918/sdks/java/core/src/test/java/org/apache/beam/sdk/values/PDoneTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/values/PDoneTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/values/PDoneTest.java index e5f2019..ba7477d 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/values/PDoneTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/values/PDoneTest.java @@ -20,7 +20,6 @@ package org.apache.beam.sdk.values; import static org.apache.beam.sdk.TestUtils.LINES; import java.io.File; -import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.io.TextIO; import org.apache.beam.sdk.testing.NeedsRunner; import org.apache.beam.sdk.testing.RunnableOnService; @@ -40,6 +39,10 @@ import org.junit.runners.JUnit4; */ @RunWith(JUnit4.class) public class PDoneTest { + + @Rule + public final TestPipeline p = TestPipeline.create(); + @Rule public TemporaryFolder tmpFolder = new TemporaryFolder(); @@ -78,8 +81,6 @@ public class PDoneTest { @Test @Category(RunnableOnService.class) public void testEmptyTransform() { - Pipeline p = TestPipeline.create(); - p.begin().apply(new EmptyTransform()); p.run(); @@ -94,8 +95,6 @@ public class PDoneTest { File tmpFile = tmpFolder.newFile("file.txt"); String filename = tmpFile.getPath(); - Pipeline p = TestPipeline.create(); - p.begin().apply(new SimpleTransform(filename)); p.run(); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/75a4c918/sdks/java/core/src/test/java/org/apache/beam/sdk/values/TypedPValueTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/values/TypedPValueTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/values/TypedPValueTest.java index f33b3a2..8381f12 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/values/TypedPValueTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/values/TypedPValueTest.java @@ -22,7 +22,6 @@ import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.not; import static org.junit.Assert.assertThat; -import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.VarIntCoder; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Create; @@ -40,6 +39,10 @@ import org.junit.runners.JUnit4; */ @RunWith(JUnit4.class) public class TypedPValueTest { + + @Rule + public final TestPipeline p = TestPipeline.create().enableAbandonedNodeEnforcement(false); + @Rule public ExpectedException thrown = ExpectedException.none(); @@ -51,9 +54,8 @@ public class TypedPValueTest { } } - private static PCollectionTuple buildPCollectionTupleWithTags( + private PCollectionTuple buildPCollectionTupleWithTags( TupleTag<Integer> mainOutputTag, TupleTag<Integer> sideOutputTag) { - Pipeline p = TestPipeline.create(); PCollection<Integer> input = p.apply(Create.of(1, 2, 3)); PCollectionTuple tuple = input.apply( ParDo @@ -138,7 +140,6 @@ public class TypedPValueTest { @Test public void testParDoWithNoSideOutputsErrorDoesNotMentionTupleTag() { - Pipeline p = TestPipeline.create(); PCollection<EmptyClass> input = p.apply(Create.of(1, 2, 3)).apply(ParDo.of(new EmptyClassDoFn())); @@ -158,7 +159,6 @@ public class TypedPValueTest { @Test public void testFinishSpecifyingShouldFailIfNoCoderInferrable() { - Pipeline p = TestPipeline.create(); PCollection<EmptyClass> unencodable = p.apply(Create.of(1, 2, 3)).apply(ParDo.of(new EmptyClassDoFn()));