Repository: incubator-beam
Updated Branches:
  refs/heads/python-sdk 3b4fd5c7d -> 3454d691f


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/75a4c918/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ViewTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ViewTest.java 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ViewTest.java
index 3bf63fd..1d8b32c 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ViewTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ViewTest.java
@@ -84,12 +84,14 @@ public class ViewTest implements Serializable {
   // anonymous inner classes inside the non-static test methods.
 
   @Rule
+  public final transient TestPipeline pipeline = TestPipeline.create();
+
+  @Rule
   public transient ExpectedException thrown = ExpectedException.none();
 
   @Test
   @Category(RunnableOnService.class)
   public void testSingletonSideInput() {
-    Pipeline pipeline = TestPipeline.create();
 
     final PCollectionView<Integer> view =
         pipeline.apply("Create47", 
Create.of(47)).apply(View.<Integer>asSingleton());
@@ -112,7 +114,6 @@ public class ViewTest implements Serializable {
   @Test
   @Category(RunnableOnService.class)
   public void testWindowedSingletonSideInput() {
-    Pipeline pipeline = TestPipeline.create();
 
     final PCollectionView<Integer> view =
         pipeline.apply("Create47", Create.timestamped(
@@ -143,7 +144,6 @@ public class ViewTest implements Serializable {
   @Test
   @Category(NeedsRunner.class)
   public void testEmptySingletonSideInput() throws Exception {
-    Pipeline pipeline = TestPipeline.create();
 
     final PCollectionView<Integer> view =
         pipeline.apply("CreateEmptyIntegers", 
Create.<Integer>of().withCoder(VarIntCoder.of()))
@@ -169,7 +169,6 @@ public class ViewTest implements Serializable {
   @Test
   @Category(NeedsRunner.class)
   public void testNonSingletonSideInput() throws Exception {
-    Pipeline pipeline = TestPipeline.create();
 
     PCollection<Integer> oneTwoThree = pipeline.apply(Create.<Integer>of(1, 2, 
3));
     final PCollectionView<Integer> view = 
oneTwoThree.apply(View.<Integer>asSingleton());
@@ -194,7 +193,6 @@ public class ViewTest implements Serializable {
   @Test
   @Category(RunnableOnService.class)
   public void testListSideInput() {
-    Pipeline pipeline = TestPipeline.create();
 
     final PCollectionView<List<Integer>> view =
         pipeline.apply("CreateSideInput", Create.of(11, 13, 17, 
23)).apply(View.<Integer>asList());
@@ -221,7 +219,6 @@ public class ViewTest implements Serializable {
   @Test
   @Category(RunnableOnService.class)
   public void testWindowedListSideInput() {
-    Pipeline pipeline = TestPipeline.create();
 
     final PCollectionView<List<Integer>> view =
         pipeline.apply("CreateSideInput", Create.timestamped(
@@ -262,7 +259,6 @@ public class ViewTest implements Serializable {
   @Test
   @Category(RunnableOnService.class)
   public void testEmptyListSideInput() throws Exception {
-    Pipeline pipeline = TestPipeline.create();
 
     final PCollectionView<List<Integer>> view =
         pipeline.apply("CreateEmptyView", 
Create.<Integer>of().withCoder(VarIntCoder.of()))
@@ -289,7 +285,6 @@ public class ViewTest implements Serializable {
   @Test
   @Category(RunnableOnService.class)
   public void testListSideInputIsImmutable() {
-    Pipeline pipeline = TestPipeline.create();
 
     final PCollectionView<List<Integer>> view =
         pipeline.apply("CreateSideInput", 
Create.of(11)).apply(View.<Integer>asList());
@@ -335,7 +330,6 @@ public class ViewTest implements Serializable {
   @Test
   @Category(RunnableOnService.class)
   public void testIterableSideInput() {
-    Pipeline pipeline = TestPipeline.create();
 
     final PCollectionView<Iterable<Integer>> view =
         pipeline.apply("CreateSideInput", Create.of(11, 13, 17, 23))
@@ -361,7 +355,6 @@ public class ViewTest implements Serializable {
   @Test
   @Category(RunnableOnService.class)
   public void testWindowedIterableSideInput() {
-    Pipeline pipeline = TestPipeline.create();
 
     final PCollectionView<Iterable<Integer>> view =
         pipeline.apply("CreateSideInput", Create.timestamped(
@@ -401,7 +394,6 @@ public class ViewTest implements Serializable {
   @Test
   @Category(RunnableOnService.class)
   public void testEmptyIterableSideInput() throws Exception {
-    Pipeline pipeline = TestPipeline.create();
 
     final PCollectionView<Iterable<Integer>> view =
         pipeline.apply("CreateEmptyView", 
Create.<Integer>of().withCoder(VarIntCoder.of()))
@@ -427,7 +419,6 @@ public class ViewTest implements Serializable {
   @Test
   @Category(RunnableOnService.class)
   public void testIterableSideInputIsImmutable() {
-    Pipeline pipeline = TestPipeline.create();
 
     final PCollectionView<Iterable<Integer>> view =
         pipeline.apply("CreateSideInput", 
Create.of(11)).apply(View.<Integer>asIterable());
@@ -459,7 +450,6 @@ public class ViewTest implements Serializable {
   @Test
   @Category(RunnableOnService.class)
   public void testMultimapSideInput() {
-    Pipeline pipeline = TestPipeline.create();
 
     final PCollectionView<Map<String, Iterable<Integer>>> view =
         pipeline.apply("CreateSideInput", Create.of(KV.of("a", 1), KV.of("a", 
2), KV.of("b", 3)))
@@ -487,7 +477,6 @@ public class ViewTest implements Serializable {
   @Test
   @Category(RunnableOnService.class)
   public void testMultimapAsEntrySetSideInput() {
-    Pipeline pipeline = TestPipeline.create();
 
     final PCollectionView<Map<String, Iterable<Integer>>> view =
         pipeline.apply("CreateSideInput", Create.of(KV.of("a", 1), KV.of("a", 
2), KV.of("b", 3)))
@@ -539,7 +528,6 @@ public class ViewTest implements Serializable {
   @Test
   @Category(RunnableOnService.class)
   public void testMultimapSideInputWithNonDeterministicKeyCoder() {
-    Pipeline pipeline = TestPipeline.create();
 
     final PCollectionView<Map<String, Iterable<Integer>>> view =
         pipeline.apply("CreateSideInput",
@@ -569,7 +557,6 @@ public class ViewTest implements Serializable {
   @Test
   @Category(RunnableOnService.class)
   public void testWindowedMultimapSideInput() {
-    Pipeline pipeline = TestPipeline.create();
 
     final PCollectionView<Map<String, Iterable<Integer>>> view =
         pipeline.apply("CreateSideInput", Create.timestamped(
@@ -608,7 +595,6 @@ public class ViewTest implements Serializable {
   @Test
   @Category(RunnableOnService.class)
   public void testWindowedMultimapAsEntrySetSideInput() {
-    Pipeline pipeline = TestPipeline.create();
 
     final PCollectionView<Map<String, Iterable<Integer>>> view =
         pipeline.apply("CreateSideInput", Create.timestamped(
@@ -651,7 +637,6 @@ public class ViewTest implements Serializable {
   @Test
   @Category(RunnableOnService.class)
   public void testWindowedMultimapSideInputWithNonDeterministicKeyCoder() {
-    Pipeline pipeline = TestPipeline.create();
 
     final PCollectionView<Map<String, Iterable<Integer>>> view =
         pipeline.apply("CreateSideInput",
@@ -691,7 +676,6 @@ public class ViewTest implements Serializable {
   @Test
   @Category(RunnableOnService.class)
   public void testEmptyMultimapSideInput() throws Exception {
-    Pipeline pipeline = TestPipeline.create();
 
     final PCollectionView<Map<String, Iterable<Integer>>> view =
         pipeline.apply("CreateEmptyView", Create.<KV<String, 
Integer>>of().withCoder(
@@ -720,7 +704,6 @@ public class ViewTest implements Serializable {
   @Test
   @Category(RunnableOnService.class)
   public void testEmptyMultimapSideInputWithNonDeterministicKeyCoder() throws 
Exception {
-    Pipeline pipeline = TestPipeline.create();
 
     final PCollectionView<Map<String, Iterable<Integer>>> view =
         pipeline.apply("CreateEmptyView",
@@ -750,7 +733,6 @@ public class ViewTest implements Serializable {
   @Test
   @Category(RunnableOnService.class)
   public void testMultimapSideInputIsImmutable() {
-    Pipeline pipeline = TestPipeline.create();
 
     final PCollectionView<Map<String, Iterable<Integer>>> view =
         pipeline.apply("CreateSideInput", Create.of(KV.of("a", 1)))
@@ -798,7 +780,6 @@ public class ViewTest implements Serializable {
   @Test
   @Category(RunnableOnService.class)
   public void testMapSideInput() {
-    Pipeline pipeline = TestPipeline.create();
 
     final PCollectionView<Map<String, Integer>> view =
         pipeline.apply("CreateSideInput", Create.of(KV.of("a", 1), KV.of("b", 
3)))
@@ -825,7 +806,6 @@ public class ViewTest implements Serializable {
   @Test
   @Category(RunnableOnService.class)
   public void testMapAsEntrySetSideInput() {
-    Pipeline pipeline = TestPipeline.create();
 
     final PCollectionView<Map<String, Integer>> view =
         pipeline.apply("CreateSideInput", Create.of(KV.of("a", 1), KV.of("b", 
3)))
@@ -855,7 +835,6 @@ public class ViewTest implements Serializable {
   @Test
   @Category(RunnableOnService.class)
   public void testMapSideInputWithNonDeterministicKeyCoder() {
-    Pipeline pipeline = TestPipeline.create();
 
     final PCollectionView<Map<String, Integer>> view =
         pipeline.apply("CreateSideInput",
@@ -884,7 +863,6 @@ public class ViewTest implements Serializable {
   @Test
   @Category(RunnableOnService.class)
   public void testWindowedMapSideInput() {
-    Pipeline pipeline = TestPipeline.create();
 
     final PCollectionView<Map<String, Integer>> view =
         pipeline.apply("CreateSideInput", Create.timestamped(
@@ -922,7 +900,6 @@ public class ViewTest implements Serializable {
   @Test
   @Category(RunnableOnService.class)
   public void testWindowedMapAsEntrySetSideInput() {
-    Pipeline pipeline = TestPipeline.create();
 
     final PCollectionView<Map<String, Integer>> view =
         pipeline.apply("CreateSideInput", Create.timestamped(
@@ -964,7 +941,6 @@ public class ViewTest implements Serializable {
   @Test
   @Category(RunnableOnService.class)
   public void testWindowedMapSideInputWithNonDeterministicKeyCoder() {
-    Pipeline pipeline = TestPipeline.create();
 
     final PCollectionView<Map<String, Integer>> view =
         pipeline.apply("CreateSideInput",
@@ -1004,7 +980,6 @@ public class ViewTest implements Serializable {
   @Test
   @Category(RunnableOnService.class)
   public void testEmptyMapSideInput() throws Exception {
-    Pipeline pipeline = TestPipeline.create();
 
     final PCollectionView<Map<String, Integer>> view =
         pipeline.apply("CreateEmptyView", Create.<KV<String, 
Integer>>of().withCoder(
@@ -1033,7 +1008,6 @@ public class ViewTest implements Serializable {
   @Test
   @Category(RunnableOnService.class)
   public void testEmptyMapSideInputWithNonDeterministicKeyCoder() throws 
Exception {
-    Pipeline pipeline = TestPipeline.create();
 
     final PCollectionView<Map<String, Integer>> view =
         pipeline.apply("CreateEmptyView", Create.<KV<String, 
Integer>>of().withCoder(
@@ -1062,7 +1036,6 @@ public class ViewTest implements Serializable {
   @Test
   @Category(NeedsRunner.class)
   public void testMapSideInputWithNullValuesCatchesDuplicates() {
-    Pipeline pipeline = TestPipeline.create();
 
     final PCollectionView<Map<String, Integer>> view =
         pipeline
@@ -1098,7 +1071,6 @@ public class ViewTest implements Serializable {
   @Test
   @Category(RunnableOnService.class)
   public void testMapSideInputIsImmutable() {
-    Pipeline pipeline = TestPipeline.create();
 
     final PCollectionView<Map<String, Integer>> view =
         pipeline.apply("CreateSideInput", Create.of(KV.of("a", 1)))
@@ -1145,7 +1117,6 @@ public class ViewTest implements Serializable {
   @Test
   @Category(RunnableOnService.class)
   public void testCombinedMapSideInput() {
-    Pipeline pipeline = TestPipeline.create();
 
     final PCollectionView<Map<String, Integer>> view =
         pipeline.apply("CreateSideInput", Create.of(KV.of("a", 1), KV.of("a", 
20), KV.of("b", 3)))
@@ -1172,10 +1143,9 @@ public class ViewTest implements Serializable {
   @Test
   @Category(RunnableOnService.class)
   public void testWindowedSideInputFixedToFixed() {
-    Pipeline p = TestPipeline.create();
 
     final PCollectionView<Integer> view =
-        p.apply(
+        pipeline.apply(
              "CreateSideInput",
              Create.timestamped(TimestampedValue.of(1, new Instant(1)),
                  TimestampedValue.of(2, new Instant(11)), 
TimestampedValue.of(3, new Instant(13))))
@@ -1184,7 +1154,7 @@ public class ViewTest implements Serializable {
             .apply(View.<Integer>asSingleton());
 
     PCollection<String> output =
-        p.apply("CreateMainInput", Create.timestamped(
+        pipeline.apply("CreateMainInput", Create.timestamped(
                                        TimestampedValue.of("A", new 
Instant(4)),
                                        TimestampedValue.of("B", new 
Instant(15)),
                                        TimestampedValue.of("C", new 
Instant(7))))
@@ -1199,16 +1169,15 @@ public class ViewTest implements Serializable {
 
     PAssert.that(output).containsInAnyOrder("A1", "B5", "C1");
 
-    p.run();
+    pipeline.run();
   }
 
   @Test
   @Category(RunnableOnService.class)
   public void testWindowedSideInputFixedToGlobal() {
-    Pipeline p = TestPipeline.create();
 
     final PCollectionView<Integer> view =
-        p.apply(
+        pipeline.apply(
              "CreateSideInput",
              Create.timestamped(TimestampedValue.of(1, new Instant(1)),
                  TimestampedValue.of(2, new Instant(11)), 
TimestampedValue.of(3, new Instant(13))))
@@ -1217,7 +1186,7 @@ public class ViewTest implements Serializable {
             .apply(View.<Integer>asSingleton());
 
     PCollection<String> output =
-        p.apply("CreateMainInput", Create.timestamped(
+        pipeline.apply("CreateMainInput", Create.timestamped(
                                        TimestampedValue.of("A", new 
Instant(4)),
                                        TimestampedValue.of("B", new 
Instant(15)),
                                        TimestampedValue.of("C", new 
Instant(7))))
@@ -1232,23 +1201,22 @@ public class ViewTest implements Serializable {
 
     PAssert.that(output).containsInAnyOrder("A6", "B6", "C6");
 
-    p.run();
+    pipeline.run();
   }
 
   @Test
   @Category(RunnableOnService.class)
   public void testWindowedSideInputFixedToFixedWithDefault() {
-    Pipeline p = TestPipeline.create();
 
     final PCollectionView<Integer> view =
-        p.apply("CreateSideInput", Create.timestamped(
+        pipeline.apply("CreateSideInput", Create.timestamped(
                                        TimestampedValue.of(2, new Instant(11)),
                                        TimestampedValue.of(3, new 
Instant(13))))
             .apply("WindowSideInput", 
Window.<Integer>into(FixedWindows.of(Duration.millis(10))))
             .apply(Sum.integersGlobally().asSingletonView());
 
     PCollection<String> output =
-        p.apply("CreateMainInput", Create.timestamped(
+        pipeline.apply("CreateMainInput", Create.timestamped(
                                        TimestampedValue.of("A", new 
Instant(4)),
                                        TimestampedValue.of("B", new 
Instant(15)),
                                        TimestampedValue.of("C", new 
Instant(7))))
@@ -1263,16 +1231,15 @@ public class ViewTest implements Serializable {
 
     PAssert.that(output).containsInAnyOrder("A0", "B5", "C0");
 
-    p.run();
+    pipeline.run();
   }
 
   @Test
   @Category(RunnableOnService.class)
   public void testSideInputWithNullDefault() {
-    Pipeline p = TestPipeline.create();
 
     final PCollectionView<Void> view =
-        p.apply("CreateSideInput", Create.of((Void) 
null).withCoder(VoidCoder.of()))
+        pipeline.apply("CreateSideInput", Create.of((Void) 
null).withCoder(VoidCoder.of()))
             .apply(Combine.globally(new SerializableFunction<Iterable<Void>, 
Void>() {
               @Override
               public Void apply(Iterable<Void> input) {
@@ -1281,7 +1248,7 @@ public class ViewTest implements Serializable {
             }).asSingletonView());
 
     PCollection<String> output =
-        p.apply("CreateMainInput", Create.of(""))
+        pipeline.apply("CreateMainInput", Create.of(""))
             .apply(
                 "OutputMainAndSideInputs",
                 ParDo.withSideInputs(view).of(new DoFn<String, String>() {
@@ -1293,13 +1260,12 @@ public class ViewTest implements Serializable {
 
     PAssert.that(output).containsInAnyOrder("null");
 
-    p.run();
+    pipeline.run();
   }
 
   @Test
   @Category(RunnableOnService.class)
   public void testSideInputWithNestedIterables() {
-    Pipeline pipeline = TestPipeline.create();
     final PCollectionView<Iterable<Integer>> view1 =
         pipeline.apply("CreateVoid1", Create.of((Void) 
null).withCoder(VoidCoder.of()))
             .apply("OutputOneInteger", ParDo.of(new DoFn<Void, Integer>() {
@@ -1386,51 +1352,51 @@ public class ViewTest implements Serializable {
 
   @Test
   public void testViewUnboundedAsSingletonDirect() {
-    testViewUnbounded(TestPipeline.create(), View.<KV<String, 
Integer>>asSingleton());
+    testViewUnbounded(pipeline, View.<KV<String, Integer>>asSingleton());
   }
 
   @Test
   public void testViewUnboundedAsIterableDirect() {
-    testViewUnbounded(TestPipeline.create(), View.<KV<String, 
Integer>>asIterable());
+    testViewUnbounded(pipeline, View.<KV<String, Integer>>asIterable());
   }
 
   @Test
   public void testViewUnboundedAsListDirect() {
-    testViewUnbounded(TestPipeline.create(), View.<KV<String, 
Integer>>asList());
+    testViewUnbounded(pipeline, View.<KV<String, Integer>>asList());
   }
 
   @Test
   public void testViewUnboundedAsMapDirect() {
-    testViewUnbounded(TestPipeline.create(), View.<String, Integer>asMap());
+    testViewUnbounded(pipeline, View.<String, Integer>asMap());
   }
 
   @Test
   public void testViewUnboundedAsMultimapDirect() {
-    testViewUnbounded(TestPipeline.create(), View.<String, 
Integer>asMultimap());
+    testViewUnbounded(pipeline, View.<String, Integer>asMultimap());
   }
 
   @Test
   public void testViewNonmergingAsSingletonDirect() {
-    testViewNonmerging(TestPipeline.create(), View.<KV<String, 
Integer>>asSingleton());
+    testViewNonmerging(pipeline, View.<KV<String, Integer>>asSingleton());
   }
 
   @Test
   public void testViewNonmergingAsIterableDirect() {
-    testViewNonmerging(TestPipeline.create(), View.<KV<String, 
Integer>>asIterable());
+    testViewNonmerging(pipeline, View.<KV<String, Integer>>asIterable());
   }
 
   @Test
   public void testViewNonmergingAsListDirect() {
-    testViewNonmerging(TestPipeline.create(), View.<KV<String, 
Integer>>asList());
+    testViewNonmerging(pipeline, View.<KV<String, Integer>>asList());
   }
 
   @Test
   public void testViewNonmergingAsMapDirect() {
-    testViewNonmerging(TestPipeline.create(), View.<String, Integer>asMap());
+    testViewNonmerging(pipeline, View.<String, Integer>asMap());
   }
 
   @Test
   public void testViewNonmergingAsMultimapDirect() {
-    testViewNonmerging(TestPipeline.create(), View.<String, 
Integer>asMultimap());
+    testViewNonmerging(pipeline, View.<String, Integer>asMultimap());
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/75a4c918/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/WithKeysTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/WithKeysTest.java 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/WithKeysTest.java
index f958807..8abbf1a 100644
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/WithKeysTest.java
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/WithKeysTest.java
@@ -21,7 +21,6 @@ import static org.junit.Assert.assertEquals;
 
 import java.util.Arrays;
 import java.util.List;
-import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.testing.NeedsRunner;
 import org.apache.beam.sdk.testing.PAssert;
@@ -29,6 +28,7 @@ import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.TypeDescriptor;
+import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.junit.runner.RunWith;
@@ -63,10 +63,12 @@ public class WithKeysTest {
     KV.of(100, "bbb")
   );
 
+  @Rule
+  public final TestPipeline p = TestPipeline.create();
+
   @Test
   @Category(NeedsRunner.class)
   public void testExtractKeys() {
-    Pipeline p = TestPipeline.create();
 
     PCollection<String> input =
         p.apply(Create.of(Arrays.asList(COLLECTION)).withCoder(
@@ -83,7 +85,6 @@ public class WithKeysTest {
   @Test
   @Category(NeedsRunner.class)
   public void testConstantKeys() {
-    Pipeline p = TestPipeline.create();
 
     PCollection<String> input =
         p.apply(Create.of(Arrays.asList(COLLECTION)).withCoder(
@@ -105,7 +106,6 @@ public class WithKeysTest {
   @Test
   @Category(NeedsRunner.class)
   public void testWithKeysWithUnneededWithKeyTypeSucceeds() {
-    TestPipeline p = TestPipeline.create();
 
     PCollection<String> input =
         p.apply(Create.of(Arrays.asList(COLLECTION)).withCoder(

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/75a4c918/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/WithTimestampsTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/WithTimestampsTest.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/WithTimestampsTest.java
index 923b97c..67a2658 100644
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/WithTimestampsTest.java
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/WithTimestampsTest.java
@@ -41,13 +41,16 @@ import org.junit.runners.JUnit4;
  */
 @RunWith(JUnit4.class)
 public class WithTimestampsTest implements Serializable {
+
+  @Rule
+  public final transient TestPipeline p = TestPipeline.create();
+
   @Rule
   public transient ExpectedException thrown = ExpectedException.none();
 
   @Test
   @Category(RunnableOnService.class)
   public void withTimestampsShouldApplyTimestamps() {
-    TestPipeline p = TestPipeline.create();
 
     SerializableFunction<String, Instant> timestampFn =
         new SerializableFunction<String, Instant>() {
@@ -86,7 +89,6 @@ public class WithTimestampsTest implements Serializable {
   @Test
   @Category(NeedsRunner.class)
   public void withTimestampsBackwardsInTimeShouldThrow() {
-    TestPipeline p = TestPipeline.create();
 
     SerializableFunction<String, Instant> timestampFn =
         new SerializableFunction<String, Instant>() {
@@ -120,7 +122,6 @@ public class WithTimestampsTest implements Serializable {
   @Test
   @Category(RunnableOnService.class)
   public void 
withTimestampsBackwardsInTimeAndWithAllowedTimestampSkewShouldSucceed() {
-    TestPipeline p = TestPipeline.create();
 
     SerializableFunction<String, Instant> timestampFn =
         new SerializableFunction<String, Instant>() {
@@ -181,7 +182,6 @@ public class WithTimestampsTest implements Serializable {
           }
         };
 
-    TestPipeline p = TestPipeline.create();
     String yearTwoThousand = "946684800000";
     p.apply(Create.of("1234", "0", Integer.toString(Integer.MAX_VALUE), 
yearTwoThousand))
      .apply(WithTimestamps.of(timestampFn));
@@ -197,7 +197,6 @@ public class WithTimestampsTest implements Serializable {
   @Test
   @Category(RunnableOnService.class)
   public void withTimestampsWithNullFnShouldThrowOnConstruction() {
-    TestPipeline p = TestPipeline.create();
 
     SerializableFunction<String, Instant> timestampFn = null;
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/75a4c918/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/join/CoGroupByKeyTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/join/CoGroupByKeyTest.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/join/CoGroupByKeyTest.java
index e8c8b15..0e5c177 100644
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/join/CoGroupByKeyTest.java
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/join/CoGroupByKeyTest.java
@@ -47,6 +47,7 @@ import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.TupleTag;
 import org.joda.time.Duration;
+import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.junit.runner.RunWith;
@@ -118,14 +119,15 @@ public class CoGroupByKeyTest implements Serializable {
     return coGbkResults;
   }
 
+  @Rule
+  public final transient TestPipeline p = TestPipeline.create();
+
   @Test
   @Category(RunnableOnService.class)
   public void testCoGroupByKeyGetOnly() {
     final TupleTag<String> tag1 = new TupleTag<>();
     final TupleTag<String> tag2 = new TupleTag<>();
 
-    Pipeline p = TestPipeline.create();
-
     PCollection<KV<Integer, CoGbkResult>> coGbkResults =
         buildGetOnlyGbk(p, tag1, tag2);
 
@@ -264,7 +266,6 @@ public class CoGroupByKeyTest implements Serializable {
     final TupleTag<String> addressesTag = new TupleTag<>();
     final TupleTag<String> purchasesTag = new TupleTag<>();
 
-    Pipeline p = TestPipeline.create();
 
     PCollection<KV<Integer, CoGbkResult>> coGbkResults =
         buildPurchasesCoGbk(p, purchasesTag, addressesTag, namesTag);
@@ -456,8 +457,6 @@ public class CoGroupByKeyTest implements Serializable {
     TupleTag<String> addressesTag = new TupleTag<>();
     TupleTag<String> purchasesTag = new TupleTag<>();
 
-    Pipeline p = TestPipeline.create();
-
     PCollection<KV<Integer, CoGbkResult>> coGbkResults =
         buildPurchasesCoGbk(p, purchasesTag, addressesTag, namesTag);
 
@@ -486,8 +485,6 @@ public class CoGroupByKeyTest implements Serializable {
     TupleTag<String> clicksTag = new TupleTag<>();
     TupleTag<String> purchasesTag = new TupleTag<>();
 
-    Pipeline p = TestPipeline.create();
-
     PCollection<KV<Integer, CoGbkResult>> coGbkResults =
         buildPurchasesCoGbkWithWindowing(p, clicksTag, purchasesTag);
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/75a4c918/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java
index 3125ae8..e21668e 100644
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java
@@ -29,7 +29,6 @@ import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.when;
 
 import java.io.Serializable;
-import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.Coder.NonDeterministicException;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
@@ -62,11 +61,15 @@ import org.mockito.Mockito;
 public class WindowTest implements Serializable {
 
   @Rule
+  public final transient TestPipeline pipeline = TestPipeline.create()
+                                                             
.enableAbandonedNodeEnforcement(false);
+
+  @Rule
   public transient ExpectedException thrown = ExpectedException.none();
 
   @Test
   public void testWindowIntoSetWindowfn() {
-    WindowingStrategy<?, ?> strategy = TestPipeline.create()
+    WindowingStrategy<?, ?> strategy = pipeline
       .apply(Create.of("hello", "world").withCoder(StringUtf8Coder.of()))
       
.apply(Window.<String>into(FixedWindows.of(Duration.standardMinutes(10))))
       .getWindowingStrategy();
@@ -79,7 +82,7 @@ public class WindowTest implements Serializable {
   public void testWindowIntoTriggersAndAccumulating() {
     FixedWindows fixed10 = FixedWindows.of(Duration.standardMinutes(10));
     Repeatedly trigger = Repeatedly.forever(AfterPane.elementCountAtLeast(5));
-    WindowingStrategy<?, ?> strategy = TestPipeline.create()
+    WindowingStrategy<?, ?> strategy = pipeline
       .apply(Create.of("hello", "world").withCoder(StringUtf8Coder.of()))
       .apply(Window.<String>into(fixed10)
           .triggering(trigger)
@@ -96,7 +99,7 @@ public class WindowTest implements Serializable {
   public void testWindowPropagatesEachPart() {
     FixedWindows fixed10 = FixedWindows.of(Duration.standardMinutes(10));
     Repeatedly trigger = Repeatedly.forever(AfterPane.elementCountAtLeast(5));
-    WindowingStrategy<?, ?> strategy = TestPipeline.create()
+    WindowingStrategy<?, ?> strategy = pipeline
       .apply(Create.of("hello", "world").withCoder(StringUtf8Coder.of()))
       .apply("Mode", Window.<String>accumulatingFiredPanes())
       .apply("Lateness", 
Window.<String>withAllowedLateness(Duration.standardDays(1)))
@@ -112,9 +115,10 @@ public class WindowTest implements Serializable {
 
   @Test
   public void testWindowIntoPropagatesLateness() {
+
     FixedWindows fixed10 = FixedWindows.of(Duration.standardMinutes(10));
     FixedWindows fixed25 = FixedWindows.of(Duration.standardMinutes(25));
-    WindowingStrategy<?, ?> strategy = TestPipeline.create()
+    WindowingStrategy<?, ?> strategy = pipeline
         .apply(Create.of("hello", "world").withCoder(StringUtf8Coder.of()))
         .apply("WindowInto10", Window.<String>into(fixed10)
             .withAllowedLateness(Duration.standardDays(1))
@@ -157,7 +161,7 @@ public class WindowTest implements Serializable {
 
     thrown.expect(IllegalArgumentException.class);
     thrown.expectMessage("requires that the accumulation mode");
-    TestPipeline.create()
+    pipeline
       .apply(Create.of("hello", "world").withCoder(StringUtf8Coder.of()))
       .apply("Window", Window.<String>into(fixed10))
       .apply("Lateness", 
Window.<String>withAllowedLateness(Duration.standardDays(1)))
@@ -171,7 +175,7 @@ public class WindowTest implements Serializable {
 
     thrown.expect(IllegalArgumentException.class);
     thrown.expectMessage("requires that the allowed lateness");
-    TestPipeline.create()
+    pipeline
       .apply(Create.of("hello", "world").withCoder(StringUtf8Coder.of()))
       .apply("Mode", Window.<String>accumulatingFiredPanes())
       .apply("Window", Window.<String>into(fixed10))
@@ -185,7 +189,7 @@ public class WindowTest implements Serializable {
   @Test
   @Category(RunnableOnService.class)
   public void testOutputTimeFnDefault() {
-    Pipeline pipeline = TestPipeline.create();
+    pipeline.enableAbandonedNodeEnforcement(true);
 
     pipeline
         .apply(
@@ -219,7 +223,7 @@ public class WindowTest implements Serializable {
   @Test
   @Category(RunnableOnService.class)
   public void testOutputTimeFnEndOfWindow() {
-    Pipeline pipeline = TestPipeline.create();
+    pipeline.enableAbandonedNodeEnforcement(true);
 
     pipeline.apply(
         Create.timestamped(

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/75a4c918/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowingTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowingTest.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowingTest.java
index d4fab17..f7ae5d8 100644
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowingTest.java
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowingTest.java
@@ -21,7 +21,6 @@ import java.io.File;
 import java.io.FileOutputStream;
 import java.io.PrintStream;
 import java.io.Serializable;
-import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.io.TextIO;
 import org.apache.beam.sdk.testing.NeedsRunner;
@@ -50,6 +49,10 @@ import org.junit.runners.JUnit4;
 /** Unit tests for bucketing. */
 @RunWith(JUnit4.class)
 public class WindowingTest implements Serializable {
+
+  @Rule
+  public final transient TestPipeline p = TestPipeline.create();
+
   @Rule
   public transient TemporaryFolder tmpFolder = new TemporaryFolder();
 
@@ -88,7 +91,6 @@ public class WindowingTest implements Serializable {
   @Test
   @Category(RunnableOnService.class)
   public void testPartitioningWindowing() {
-    Pipeline p = TestPipeline.create();
     PCollection<String> input =
         p.apply(
             Create.timestamped(
@@ -114,7 +116,6 @@ public class WindowingTest implements Serializable {
   @Test
   @Category(RunnableOnService.class)
   public void testNonPartitioningWindowing() {
-    Pipeline p = TestPipeline.create();
     PCollection<String> input =
         p.apply(
             Create.timestamped(
@@ -140,7 +141,6 @@ public class WindowingTest implements Serializable {
   @Test
   @Category(RunnableOnService.class)
   public void testMergingWindowing() {
-    Pipeline p = TestPipeline.create();
     PCollection<String> input =
         p.apply(
             Create.timestamped(
@@ -162,7 +162,6 @@ public class WindowingTest implements Serializable {
   @Test
   @Category(RunnableOnService.class)
   public void testWindowPreservation() {
-    Pipeline p = TestPipeline.create();
     PCollection<String> input1 = p.apply("Create12",
         Create.timestamped(
             TimestampedValue.of("a", new Instant(1)),
@@ -190,7 +189,6 @@ public class WindowingTest implements Serializable {
   @Test
   @Category(NeedsRunner.class)
   public void testEmptyInput() {
-    Pipeline p = TestPipeline.create();
     PCollection<String> input =
         p.apply(Create.<String>timestamped()
             .withCoder(StringUtf8Coder.of()));
@@ -218,7 +216,6 @@ public class WindowingTest implements Serializable {
       writer.println("d 11");
     }
 
-    Pipeline p = TestPipeline.create();
     PCollection<String> output = p.begin()
         .apply("ReadLines", TextIO.Read.from(filename))
         .apply(ParDo.of(new ExtractWordsWithTimestampsFn()))

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/75a4c918/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ReshuffleTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ReshuffleTest.java 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ReshuffleTest.java
index d990ee0..d47cddc 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ReshuffleTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ReshuffleTest.java
@@ -21,7 +21,6 @@ import static org.junit.Assert.assertEquals;
 
 import com.google.common.collect.ImmutableList;
 import java.util.List;
-import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.coders.VarIntCoder;
@@ -36,6 +35,7 @@ import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
 import org.joda.time.Duration;
+import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.junit.runner.RunWith;
@@ -65,10 +65,12 @@ public class ReshuffleTest {
         KV.of("k1", (Iterable<Integer>) ImmutableList.of(3)),
         KV.of("k2", (Iterable<Integer>) ImmutableList.of(4)));
 
+  @Rule
+  public final TestPipeline pipeline = TestPipeline.create();
+
   @Test
   @Category(RunnableOnService.class)
   public void testJustReshuffle() {
-    Pipeline pipeline = TestPipeline.create();
 
     PCollection<KV<String, Integer>> input = pipeline
         .apply(Create.of(ARBITRARY_KVS)
@@ -89,7 +91,6 @@ public class ReshuffleTest {
   @Test
   @Category(RunnableOnService.class)
   public void testReshuffleAfterSessionsAndGroupByKey() {
-    Pipeline pipeline = TestPipeline.create();
 
     PCollection<KV<String, Iterable<Integer>>> input = pipeline
         .apply(Create.of(GBK_TESTABLE_KVS)
@@ -113,7 +114,6 @@ public class ReshuffleTest {
   @Test
   @Category(RunnableOnService.class)
   public void testReshuffleAfterFixedWindowsAndGroupByKey() {
-    Pipeline pipeline = TestPipeline.create();
 
     PCollection<KV<String, Iterable<Integer>>> input = pipeline
         .apply(Create.of(GBK_TESTABLE_KVS)
@@ -137,7 +137,6 @@ public class ReshuffleTest {
   @Test
   @Category(RunnableOnService.class)
   public void testReshuffleAfterSlidingWindowsAndGroupByKey() {
-    Pipeline pipeline = TestPipeline.create();
 
     PCollection<KV<String, Iterable<Integer>>> input = pipeline
         .apply(Create.of(GBK_TESTABLE_KVS)
@@ -161,7 +160,6 @@ public class ReshuffleTest {
   @Test
   @Category(RunnableOnService.class)
   public void testReshuffleAfterFixedWindows() {
-    Pipeline pipeline = TestPipeline.create();
 
     PCollection<KV<String, Integer>> input = pipeline
         .apply(Create.of(ARBITRARY_KVS)
@@ -185,7 +183,6 @@ public class ReshuffleTest {
   @Test
   @Category(RunnableOnService.class)
   public void testReshuffleAfterSlidingWindows() {
-    Pipeline pipeline = TestPipeline.create();
 
     PCollection<KV<String, Integer>> input = pipeline
         .apply(Create.of(ARBITRARY_KVS)

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/75a4c918/sdks/java/core/src/test/java/org/apache/beam/sdk/values/PCollectionTupleTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/values/PCollectionTupleTest.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/values/PCollectionTupleTest.java
index 1467ae8..b5351da 100644
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/values/PCollectionTupleTest.java
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/values/PCollectionTupleTest.java
@@ -24,7 +24,6 @@ import static org.junit.Assert.assertTrue;
 import java.io.Serializable;
 import java.util.Arrays;
 import java.util.List;
-import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.RunnableOnService;
 import org.apache.beam.sdk.testing.TestPipeline;
@@ -33,6 +32,7 @@ import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.util.WindowingStrategy;
 import org.apache.beam.sdk.values.PCollection.IsBounded;
+import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.junit.runner.RunWith;
@@ -41,9 +41,14 @@ import org.junit.runners.JUnit4;
 /** Unit tests for {@link PCollectionTuple}. */
 @RunWith(JUnit4.class)
 public final class PCollectionTupleTest implements Serializable {
+
+  @Rule
+  public final transient TestPipeline pipeline = TestPipeline.create()
+                                                             
.enableAbandonedNodeEnforcement(false);
+
   @Test
   public void testOfThenHas() {
-    Pipeline pipeline = TestPipeline.create();
+
     PCollection<Object> pCollection = 
PCollection.createPrimitiveOutputInternal(
         pipeline, WindowingStrategy.globalDefault(), IsBounded.BOUNDED);
     TupleTag<Object> tag = new TupleTag<>();
@@ -53,7 +58,6 @@ public final class PCollectionTupleTest implements 
Serializable {
 
   @Test
   public void testEmpty() {
-    Pipeline pipeline = TestPipeline.create();
     TupleTag<Object> tag = new TupleTag<>();
     assertFalse(PCollectionTuple.empty(pipeline).has(tag));
   }
@@ -61,7 +65,7 @@ public final class PCollectionTupleTest implements 
Serializable {
   @Test
   @Category(RunnableOnService.class)
   public void testComposePCollectionTuple() {
-    Pipeline pipeline = TestPipeline.create();
+    pipeline.enableAbandonedNodeEnforcement(true);
 
     List<Integer> inputs = Arrays.asList(3, -42, 666);
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/75a4c918/sdks/java/core/src/test/java/org/apache/beam/sdk/values/PDoneTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/values/PDoneTest.java 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/values/PDoneTest.java
index e5f2019..ba7477d 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/values/PDoneTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/values/PDoneTest.java
@@ -20,7 +20,6 @@ package org.apache.beam.sdk.values;
 import static org.apache.beam.sdk.TestUtils.LINES;
 
 import java.io.File;
-import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.io.TextIO;
 import org.apache.beam.sdk.testing.NeedsRunner;
 import org.apache.beam.sdk.testing.RunnableOnService;
@@ -40,6 +39,10 @@ import org.junit.runners.JUnit4;
  */
 @RunWith(JUnit4.class)
 public class PDoneTest {
+
+  @Rule
+  public final TestPipeline p = TestPipeline.create();
+
   @Rule
   public TemporaryFolder tmpFolder = new TemporaryFolder();
 
@@ -78,8 +81,6 @@ public class PDoneTest {
   @Test
   @Category(RunnableOnService.class)
   public void testEmptyTransform() {
-    Pipeline p = TestPipeline.create();
-
     p.begin().apply(new EmptyTransform());
 
     p.run();
@@ -94,8 +95,6 @@ public class PDoneTest {
     File tmpFile = tmpFolder.newFile("file.txt");
     String filename = tmpFile.getPath();
 
-    Pipeline p = TestPipeline.create();
-
     p.begin().apply(new SimpleTransform(filename));
 
     p.run();

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/75a4c918/sdks/java/core/src/test/java/org/apache/beam/sdk/values/TypedPValueTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/values/TypedPValueTest.java 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/values/TypedPValueTest.java
index f33b3a2..8381f12 100644
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/values/TypedPValueTest.java
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/values/TypedPValueTest.java
@@ -22,7 +22,6 @@ import static org.hamcrest.Matchers.instanceOf;
 import static org.hamcrest.Matchers.not;
 import static org.junit.Assert.assertThat;
 
-import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.VarIntCoder;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Create;
@@ -40,6 +39,10 @@ import org.junit.runners.JUnit4;
  */
 @RunWith(JUnit4.class)
 public class TypedPValueTest {
+
+  @Rule
+  public final TestPipeline p = 
TestPipeline.create().enableAbandonedNodeEnforcement(false);
+
   @Rule
   public ExpectedException thrown = ExpectedException.none();
 
@@ -51,9 +54,8 @@ public class TypedPValueTest {
     }
   }
 
-  private static PCollectionTuple buildPCollectionTupleWithTags(
+  private PCollectionTuple buildPCollectionTupleWithTags(
       TupleTag<Integer> mainOutputTag, TupleTag<Integer> sideOutputTag) {
-    Pipeline p = TestPipeline.create();
     PCollection<Integer> input = p.apply(Create.of(1, 2, 3));
     PCollectionTuple tuple = input.apply(
         ParDo
@@ -138,7 +140,6 @@ public class TypedPValueTest {
 
   @Test
   public void testParDoWithNoSideOutputsErrorDoesNotMentionTupleTag() {
-    Pipeline p = TestPipeline.create();
     PCollection<EmptyClass> input =
         p.apply(Create.of(1, 2, 3)).apply(ParDo.of(new EmptyClassDoFn()));
 
@@ -158,7 +159,6 @@ public class TypedPValueTest {
 
   @Test
   public void testFinishSpecifyingShouldFailIfNoCoderInferrable() {
-    Pipeline p = TestPipeline.create();
     PCollection<EmptyClass> unencodable =
         p.apply(Create.of(1, 2, 3)).apply(ParDo.of(new EmptyClassDoFn()));
 


Reply via email to