Migrated the beam-runners-direct-java module to TestPipeline as a JUnit rule.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/09c404a6 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/09c404a6 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/09c404a6 Branch: refs/heads/python-sdk Commit: 09c404a6c407898fcbc2fd22797cba4da8839b93 Parents: b671025 Author: Stas Levin <stasle...@gmail.com> Authored: Mon Dec 19 10:20:16 2016 +0200 Committer: Kenneth Knowles <k...@google.com> Committed: Tue Dec 20 09:55:45 2016 -0800 ---------------------------------------------------------------------- .../direct/BoundedReadEvaluatorFactoryTest.java | 13 ++++++------- .../beam/runners/direct/CloningBundleFactoryTest.java | 8 ++------ .../beam/runners/direct/CommittedResultTest.java | 6 +++++- .../CopyOnAccessInMemoryStateInternalsTest.java | 11 +++++++++-- .../beam/runners/direct/DirectGraphVisitorTest.java | 3 ++- .../beam/runners/direct/EvaluationContextTest.java | 7 ++++--- .../runners/direct/FlattenEvaluatorFactoryTest.java | 6 ++++-- .../direct/GroupByKeyEvaluatorFactoryTest.java | 5 ++++- .../direct/GroupByKeyOnlyEvaluatorFactoryTest.java | 5 ++++- .../direct/ImmutabilityCheckingBundleFactoryTest.java | 4 +++- .../direct/ImmutabilityEnforcementFactoryTest.java | 3 ++- .../direct/ImmutableListBundleFactoryTest.java | 14 +++++++++++--- .../direct/KeyedPValueTrackingVisitorTest.java | 6 +++--- .../beam/runners/direct/ParDoEvaluatorTest.java | 5 ++++- .../beam/runners/direct/SideInputContainerTest.java | 5 +++-- .../direct/StatefulParDoEvaluatorFactoryTest.java | 7 +++++-- .../beam/runners/direct/StepTransformResultTest.java | 5 ++++- .../direct/TestStreamEvaluatorFactoryTest.java | 5 ++++- .../beam/runners/direct/TransformExecutorTest.java | 4 +++- .../direct/UnboundedReadEvaluatorFactoryTest.java | 9 ++++----- .../beam/runners/direct/ViewEvaluatorFactoryTest.java | 5 ++++- .../runners/direct/WatermarkCallbackExecutorTest.java | 5 ++++- .../beam/runners/direct/WatermarkManagerTest.java | 6 ++++-- .../runners/direct/WindowEvaluatorFactoryTest.java | 5 ++++- .../runners/direct/WriteWithShardingFactoryTest.java | 14 +++++++------- 25 files changed, 109 insertions(+), 57 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/09c404a6/runners/direct-java/src/test/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactoryTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactoryTest.java index acb1444..97eae27 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactoryTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactoryTest.java @@ -62,6 +62,7 @@ import org.apache.beam.sdk.values.PCollection; import org.hamcrest.Matchers; import org.joda.time.Instant; import org.junit.Before; +import org.junit.Rule; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; @@ -82,11 +83,13 @@ public class BoundedReadEvaluatorFactoryTest { private BundleFactory bundleFactory; private AppliedPTransform<?, ?, ?> longsProducer; + @Rule + public TestPipeline p = TestPipeline.create().enableAbandonedNodeEnforcement(false); + @Before public void setup() { MockitoAnnotations.initMocks(this); source = CountingSource.upTo(10L); - TestPipeline p = TestPipeline.create(); longs = p.apply(Read.from(source)); factory = @@ -142,7 +145,7 @@ public class BoundedReadEvaluatorFactoryTest { elems[i] = (long) i; } PCollection<Long> read = - TestPipeline.create().apply(Read.from(new TestSource<>(VarLongCoder.of(), 5, elems))); + p.apply(Read.from(new TestSource<>(VarLongCoder.of(), 5, elems))); AppliedPTransform<?, ?, ?> transform = DirectGraphs.getProducer(read); Collection<CommittedBundle<?>> unreadInputs = new BoundedReadEvaluatorFactory.InputProvider(context).getInitialInputs(transform, 1); @@ -191,8 +194,7 @@ public class BoundedReadEvaluatorFactoryTest { BoundedReadEvaluatorFactory factory = new BoundedReadEvaluatorFactory(context, 0L); PCollection<Long> read = - TestPipeline.create() - .apply(Read.from(SourceTestUtils.toUnsplittableSource(CountingSource.upTo(10L)))); + p.apply(Read.from(SourceTestUtils.toUnsplittableSource(CountingSource.upTo(10L)))); AppliedPTransform<?, ?, ?> transform = DirectGraphs.getProducer(read); when(context.createRootBundle()).thenReturn(bundleFactory.createRootBundle()); @@ -298,8 +300,6 @@ public class BoundedReadEvaluatorFactoryTest { @Test public void boundedSourceEvaluatorClosesReader() throws Exception { TestSource<Long> source = new TestSource<>(BigEndianLongCoder.of(), 1L, 2L, 3L); - - TestPipeline p = TestPipeline.create(); PCollection<Long> pcollection = p.apply(Read.from(source)); AppliedPTransform<?, ?, ?> sourceTransform = DirectGraphs.getProducer(pcollection); @@ -320,7 +320,6 @@ public class BoundedReadEvaluatorFactoryTest { public void boundedSourceEvaluatorNoElementsClosesReader() throws Exception { TestSource<Long> source = new TestSource<>(BigEndianLongCoder.of()); - TestPipeline p = TestPipeline.create(); PCollection<Long> pcollection = p.apply(Read.from(source)); AppliedPTransform<?, ?, ?> sourceTransform = DirectGraphs.getProducer(pcollection); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/09c404a6/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CloningBundleFactoryTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CloningBundleFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CloningBundleFactoryTest.java index bafab59..e5299a2 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CloningBundleFactoryTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CloningBundleFactoryTest.java @@ -62,6 +62,8 @@ import org.junit.runners.JUnit4; @RunWith(JUnit4.class) public class CloningBundleFactoryTest { @Rule public ExpectedException thrown = ExpectedException.none(); + @Rule public TestPipeline p = TestPipeline.create().enableAbandonedNodeEnforcement(false); + private CloningBundleFactory factory = CloningBundleFactory.create(); @Test @@ -76,7 +78,6 @@ public class CloningBundleFactoryTest { @Test public void bundleWorkingCoderSucceedsClonesOutput() { - TestPipeline p = TestPipeline.create(); PCollection<Integer> created = p.apply(Create.of(1, 3).withCoder(VarIntCoder.of())); PCollection<KV<String, Integer>> kvs = created @@ -101,7 +102,6 @@ public class CloningBundleFactoryTest { @Test public void keyedBundleWorkingCoderSucceedsClonesOutput() { - TestPipeline p = TestPipeline.create(); PCollection<Integer> created = p.apply(Create.of(1, 3).withCoder(VarIntCoder.of())); PCollection<KV<String, Iterable<Integer>>> keyed = @@ -130,7 +130,6 @@ public class CloningBundleFactoryTest { @Test public void bundleEncodeFailsAddFails() { - TestPipeline p = TestPipeline.create(); PCollection<Record> pc = p.apply(Create.<Record>of().withCoder(new RecordNoEncodeCoder())); UncommittedBundle<Record> bundle = factory.createBundle(pc); @@ -142,7 +141,6 @@ public class CloningBundleFactoryTest { @Test public void bundleDecodeFailsAddFails() { - TestPipeline p = TestPipeline.create(); PCollection<Record> pc = p.apply(Create.<Record>of().withCoder(new RecordNoDecodeCoder())); UncommittedBundle<Record> bundle = factory.createBundle(pc); @@ -154,7 +152,6 @@ public class CloningBundleFactoryTest { @Test public void keyedBundleEncodeFailsAddFails() { - TestPipeline p = TestPipeline.create(); PCollection<Record> pc = p.apply(Create.<Record>of().withCoder(new RecordNoEncodeCoder())); UncommittedBundle<Record> bundle = factory.createKeyedBundle(StructuralKey.of("foo", StringUtf8Coder.of()), pc); @@ -167,7 +164,6 @@ public class CloningBundleFactoryTest { @Test public void keyedBundleDecodeFailsAddFails() { - TestPipeline p = TestPipeline.create(); PCollection<Record> pc = p.apply(Create.<Record>of().withCoder(new RecordNoDecodeCoder())); UncommittedBundle<Record> bundle = factory.createKeyedBundle(StructuralKey.of("foo", StringUtf8Coder.of()), pc); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/09c404a6/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CommittedResultTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CommittedResultTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CommittedResultTest.java index c6986c0..736f554 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CommittedResultTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CommittedResultTest.java @@ -38,6 +38,7 @@ import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PDone; import org.hamcrest.Matchers; import org.joda.time.Instant; +import org.junit.Rule; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; @@ -47,7 +48,10 @@ import org.junit.runners.JUnit4; */ @RunWith(JUnit4.class) public class CommittedResultTest implements Serializable { - private transient TestPipeline p = TestPipeline.create(); + + @Rule + public transient TestPipeline p = TestPipeline.create().enableAbandonedNodeEnforcement(false); + private transient PCollection<Integer> created = p.apply(Create.of(1, 2)); private transient AppliedPTransform<?, ?, ?> transform = AppliedPTransform.of("foo", p.begin(), PDone.in(p), new PTransform<PBegin, PDone>() { http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/09c404a6/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternalsTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternalsTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternalsTest.java index deefc68..35245f4 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternalsTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternalsTest.java @@ -61,8 +61,15 @@ import org.junit.runners.JUnit4; */ @RunWith(JUnit4.class) public class CopyOnAccessInMemoryStateInternalsTest { + + @Rule public TestPipeline pipeline = TestPipeline.create(); @Rule public ExpectedException thrown = ExpectedException.none(); private String key = "foo"; + + public CopyOnAccessInMemoryStateInternalsTest() { + pipeline = TestPipeline.create(); + } + @Test public void testGetWithEmpty() { CopyOnAccessInMemoryStateInternals<String> internals = @@ -167,7 +174,7 @@ public class CopyOnAccessInMemoryStateInternalsTest { CombineFn<Long, long[], Long> sumLongFn = new Sum.SumLongFn(); StateNamespace namespace = new StateNamespaceForTest("foo"); - CoderRegistry reg = TestPipeline.create().getCoderRegistry(); + CoderRegistry reg = pipeline.getCoderRegistry(); StateTag<Object, AccumulatorCombiningState<Long, long[], Long>> stateTag = StateTags.combiningValue("summer", sumLongFn.getAccumulatorCoder(reg, reg.getDefaultCoder(Long.class)), sumLongFn); @@ -197,7 +204,7 @@ public class CopyOnAccessInMemoryStateInternalsTest { KeyedCombineFn<String, Long, long[], Long> sumLongFn = new Sum.SumLongFn().asKeyedFn(); StateNamespace namespace = new StateNamespaceForTest("foo"); - CoderRegistry reg = TestPipeline.create().getCoderRegistry(); + CoderRegistry reg = pipeline.getCoderRegistry(); StateTag<String, AccumulatorCombiningState<Long, long[], Long>> stateTag = StateTags.keyedCombiningValue( "summer", http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/09c404a6/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectGraphVisitorTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectGraphVisitorTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectGraphVisitorTest.java index b88c9a4..c3bbe2d 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectGraphVisitorTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectGraphVisitorTest.java @@ -60,8 +60,9 @@ import org.junit.runners.JUnit4; @RunWith(JUnit4.class) public class DirectGraphVisitorTest implements Serializable { @Rule public transient ExpectedException thrown = ExpectedException.none(); + @Rule public transient TestPipeline p = TestPipeline.create() + .enableAbandonedNodeEnforcement(false); - private transient TestPipeline p = TestPipeline.create(); private transient DirectGraphVisitor visitor = new DirectGraphVisitor(); @Test http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/09c404a6/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java index f11c370..bf36204 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java @@ -68,6 +68,7 @@ import org.apache.beam.sdk.values.PCollectionView; import org.hamcrest.Matchers; import org.joda.time.Instant; import org.junit.Before; +import org.junit.Rule; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; @@ -77,7 +78,6 @@ import org.junit.runners.JUnit4; */ @RunWith(JUnit4.class) public class EvaluationContextTest { - private TestPipeline p; private EvaluationContext context; private PCollection<Integer> created; @@ -92,13 +92,14 @@ public class EvaluationContextTest { private AppliedPTransform<?, ?, ?> viewProducer; private AppliedPTransform<?, ?, ?> unboundedProducer; + @Rule + public TestPipeline p = TestPipeline.create().enableAbandonedNodeEnforcement(false); + @Before public void setup() { DirectRunner runner = DirectRunner.fromOptions(PipelineOptionsFactory.create()); - p = TestPipeline.create(); - created = p.apply(Create.of(1, 2, 3)); downstream = created.apply(WithKeys.<String, Integer>of("foo")); view = created.apply(View.<Integer>asIterable()); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/09c404a6/runners/direct-java/src/test/java/org/apache/beam/runners/direct/FlattenEvaluatorFactoryTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/FlattenEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/FlattenEvaluatorFactoryTest.java index 9e22c36..cda68f0 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/FlattenEvaluatorFactoryTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/FlattenEvaluatorFactoryTest.java @@ -37,6 +37,7 @@ import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionList; import org.hamcrest.Matchers; import org.joda.time.Instant; +import org.junit.Rule; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; @@ -48,9 +49,11 @@ import org.junit.runners.JUnit4; public class FlattenEvaluatorFactoryTest { private BundleFactory bundleFactory = ImmutableListBundleFactory.create(); + @Rule + public TestPipeline p = TestPipeline.create().enableAbandonedNodeEnforcement(false); + @Test public void testFlattenInMemoryEvaluator() throws Exception { - TestPipeline p = TestPipeline.create(); PCollection<Integer> left = p.apply("left", Create.of(1, 2, 4)); PCollection<Integer> right = p.apply("right", Create.of(-1, 2, -4)); PCollectionList<Integer> list = PCollectionList.of(left).and(right); @@ -118,7 +121,6 @@ public class FlattenEvaluatorFactoryTest { @Test public void testFlattenInMemoryEvaluatorWithEmptyPCollectionList() throws Exception { - TestPipeline p = TestPipeline.create(); PCollectionList<Integer> list = PCollectionList.empty(p); PCollection<Integer> flattened = list.apply(Flatten.<Integer>pCollections()); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/09c404a6/runners/direct-java/src/test/java/org/apache/beam/runners/direct/GroupByKeyEvaluatorFactoryTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/GroupByKeyEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/GroupByKeyEvaluatorFactoryTest.java index f0b29f0..fefafd0 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/GroupByKeyEvaluatorFactoryTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/GroupByKeyEvaluatorFactoryTest.java @@ -41,6 +41,7 @@ import org.apache.beam.sdk.values.PCollection; import org.hamcrest.BaseMatcher; import org.hamcrest.Description; import org.joda.time.Instant; +import org.junit.Rule; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; @@ -52,9 +53,11 @@ import org.junit.runners.JUnit4; public class GroupByKeyEvaluatorFactoryTest { private BundleFactory bundleFactory = ImmutableListBundleFactory.create(); + @Rule + public TestPipeline p = TestPipeline.create().enableAbandonedNodeEnforcement(false); + @Test public void testInMemoryEvaluator() throws Exception { - TestPipeline p = TestPipeline.create(); KV<String, Integer> firstFoo = KV.of("foo", -1); KV<String, Integer> secondFoo = KV.of("foo", 1); KV<String, Integer> thirdFoo = KV.of("foo", 3); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/09c404a6/runners/direct-java/src/test/java/org/apache/beam/runners/direct/GroupByKeyOnlyEvaluatorFactoryTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/GroupByKeyOnlyEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/GroupByKeyOnlyEvaluatorFactoryTest.java index 7efdb3d..94514ad 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/GroupByKeyOnlyEvaluatorFactoryTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/GroupByKeyOnlyEvaluatorFactoryTest.java @@ -41,6 +41,7 @@ import org.apache.beam.sdk.values.PCollection; import org.hamcrest.BaseMatcher; import org.hamcrest.Description; import org.joda.time.Instant; +import org.junit.Rule; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; @@ -52,9 +53,11 @@ import org.junit.runners.JUnit4; public class GroupByKeyOnlyEvaluatorFactoryTest { private BundleFactory bundleFactory = ImmutableListBundleFactory.create(); + @Rule + public TestPipeline p = TestPipeline.create().enableAbandonedNodeEnforcement(false); + @Test public void testInMemoryEvaluator() throws Exception { - TestPipeline p = TestPipeline.create(); KV<String, Integer> firstFoo = KV.of("foo", -1); KV<String, Integer> secondFoo = KV.of("foo", 1); KV<String, Integer> thirdFoo = KV.of("foo", 3); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/09c404a6/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityCheckingBundleFactoryTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityCheckingBundleFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityCheckingBundleFactoryTest.java index 6ab8aea..2448078 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityCheckingBundleFactoryTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityCheckingBundleFactoryTest.java @@ -46,14 +46,16 @@ import org.junit.runners.JUnit4; */ @RunWith(JUnit4.class) public class ImmutabilityCheckingBundleFactoryTest { + + @Rule public TestPipeline p = TestPipeline.create().enableAbandonedNodeEnforcement(false); @Rule public ExpectedException thrown = ExpectedException.none(); private ImmutabilityCheckingBundleFactory factory; private PCollection<byte[]> created; private PCollection<byte[]> transformed; + @Before public void setup() { - TestPipeline p = TestPipeline.create(); created = p.apply(Create.<byte[]>of().withCoder(ByteArrayCoder.of())); transformed = created.apply(ParDo.of(new IdentityDoFn<byte[]>())); DirectGraphVisitor visitor = new DirectGraphVisitor(); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/09c404a6/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityEnforcementFactoryTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityEnforcementFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityEnforcementFactoryTest.java index 1ad6ba6..cd3e9b4 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityEnforcementFactoryTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityEnforcementFactoryTest.java @@ -42,6 +42,8 @@ import org.junit.runners.JUnit4; */ @RunWith(JUnit4.class) public class ImmutabilityEnforcementFactoryTest implements Serializable { + @Rule public transient TestPipeline p = + TestPipeline.create().enableAbandonedNodeEnforcement(false); @Rule public transient ExpectedException thrown = ExpectedException.none(); private transient ImmutabilityEnforcementFactory factory; private transient BundleFactory bundleFactory; @@ -52,7 +54,6 @@ public class ImmutabilityEnforcementFactoryTest implements Serializable { public void setup() { factory = new ImmutabilityEnforcementFactory(); bundleFactory = ImmutableListBundleFactory.create(); - TestPipeline p = TestPipeline.create(); pcollection = p.apply(Create.of("foo".getBytes(), "spamhameggs".getBytes())) .apply( http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/09c404a6/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutableListBundleFactoryTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutableListBundleFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutableListBundleFactoryTest.java index a36c408..46f02cd 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutableListBundleFactoryTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutableListBundleFactoryTest.java @@ -57,6 +57,7 @@ import org.junit.runners.JUnit4; */ @RunWith(JUnit4.class) public class ImmutableListBundleFactoryTest { + @Rule public TestPipeline p = TestPipeline.create().enableAbandonedNodeEnforcement(false); @Rule public ExpectedException thrown = ExpectedException.none(); private ImmutableListBundleFactory bundleFactory = ImmutableListBundleFactory.create(); @@ -66,13 +67,12 @@ public class ImmutableListBundleFactoryTest { @Before public void setup() { - TestPipeline p = TestPipeline.create(); created = p.apply(Create.of(1, 2, 3)); downstream = created.apply(WithKeys.<String, Integer>of("foo")); } private <T> void createKeyedBundle(Coder<T> coder, T key) throws Exception { - PCollection<Integer> pcollection = TestPipeline.create().apply(Create.of(1)); + PCollection<Integer> pcollection = p.apply("Create", Create.of(1)); StructuralKey<?> skey = StructuralKey.of(key, coder); UncommittedBundle<Integer> inFlightBundle = bundleFactory.createKeyedBundle(skey, pcollection); @@ -87,9 +87,17 @@ public class ImmutableListBundleFactoryTest { } @Test - public void keyedWithKeyShouldCreateKeyedBundle() throws Exception { + public void keyedWithStringKeyShouldCreateKeyedBundle() throws Exception { createKeyedBundle(StringUtf8Coder.of(), "foo"); + } + + @Test + public void keyedWithVarIntKeyShouldCreateKeyedBundle() throws Exception { createKeyedBundle(VarIntCoder.of(), 1234); + } + + @Test + public void keyedWithByteArrayKeyShouldCreateKeyedBundle() throws Exception { createKeyedBundle(ByteArrayCoder.of(), new byte[] {0, 2, 4, 99}); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/09c404a6/runners/direct-java/src/test/java/org/apache/beam/runners/direct/KeyedPValueTrackingVisitorTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/KeyedPValueTrackingVisitorTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/KeyedPValueTrackingVisitorTest.java index 0852cd3..eef3375 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/KeyedPValueTrackingVisitorTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/KeyedPValueTrackingVisitorTest.java @@ -24,7 +24,6 @@ import static org.junit.Assert.assertThat; import com.google.common.collect.ImmutableSet; import java.util.Collections; import java.util.Set; -import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.IterableCoder; import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.coders.VarIntCoder; @@ -53,11 +52,12 @@ public class KeyedPValueTrackingVisitorTest { @Rule public ExpectedException thrown = ExpectedException.none(); private KeyedPValueTrackingVisitor visitor; - private Pipeline p; + @Rule + public TestPipeline p = TestPipeline.create().enableAbandonedNodeEnforcement(false); @Before public void setup() { - p = TestPipeline.create(); + @SuppressWarnings("rawtypes") Set<Class<? extends PTransform>> producesKeyed = ImmutableSet.<Class<? extends PTransform>>of(PrimitiveKeyer.class, CompositeKeyer.class); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/09c404a6/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java index d48ac14..1a3207b 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java @@ -54,6 +54,7 @@ import org.apache.beam.sdk.values.TupleTagList; import org.hamcrest.Matchers; import org.joda.time.Instant; import org.junit.Before; +import org.junit.Rule; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; @@ -72,10 +73,12 @@ public class ParDoEvaluatorTest { private List<TupleTag<?>> sideOutputTags; private BundleFactory bundleFactory; + @Rule + public TestPipeline p = TestPipeline.create().enableAbandonedNodeEnforcement(false); + @Before public void setup() { MockitoAnnotations.initMocks(this); - TestPipeline p = TestPipeline.create(); inputPc = p.apply(Create.of(1, 2, 3)); mainOutputTag = new TupleTag<Integer>() {}; sideOutputTags = TupleTagList.empty().getAll(); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/09c404a6/runners/direct-java/src/test/java/org/apache/beam/runners/direct/SideInputContainerTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/SideInputContainerTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/SideInputContainerTest.java index cc7d88a..183decd 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/SideInputContainerTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/SideInputContainerTest.java @@ -96,12 +96,14 @@ public class SideInputContainerTest { }; @Rule + public TestPipeline pipeline = TestPipeline.create().enableAbandonedNodeEnforcement(false); + + @Rule public ExpectedException thrown = ExpectedException.none(); @Mock private EvaluationContext context; - private TestPipeline pipeline; private SideInputContainer container; @@ -114,7 +116,6 @@ public class SideInputContainerTest { @Before public void setup() { MockitoAnnotations.initMocks(this); - pipeline = TestPipeline.create(); PCollection<Integer> create = pipeline.apply("forBaseCollection", Create.<Integer>of(1, 2, 3, 4)); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/09c404a6/runners/direct-java/src/test/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactoryTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactoryTest.java index 326310b..d312aa3 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactoryTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactoryTest.java @@ -67,6 +67,7 @@ import org.apache.beam.sdk.values.PCollectionView; import org.joda.time.Duration; import org.joda.time.Instant; import org.junit.Before; +import org.junit.Rule; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; @@ -91,6 +92,10 @@ public class StatefulParDoEvaluatorFactoryTest implements Serializable { private static final BundleFactory BUNDLE_FACTORY = ImmutableListBundleFactory.create(); + @Rule + public transient TestPipeline pipeline = + TestPipeline.create().enableAbandonedNodeEnforcement(false); + @Before public void setup() { MockitoAnnotations.initMocks(this); @@ -106,7 +111,6 @@ public class StatefulParDoEvaluatorFactoryTest implements Serializable { public void windowCleanupScheduled() throws Exception { // To test the factory, first we set up a pipeline and then we use the constructed // pipeline to create the right parameters to pass to the factory - TestPipeline pipeline = TestPipeline.create(); final String stateId = "my-state-id"; @@ -208,7 +212,6 @@ public class StatefulParDoEvaluatorFactoryTest implements Serializable { public void testUnprocessedElements() throws Exception { // To test the factory, first we set up a pipeline and then we use the constructed // pipeline to create the right parameters to pass to the factory - TestPipeline pipeline = TestPipeline.create(); final String stateId = "my-state-id"; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/09c404a6/runners/direct-java/src/test/java/org/apache/beam/runners/direct/StepTransformResultTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/StepTransformResultTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/StepTransformResultTest.java index d3a2cca..0d94b7a 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/StepTransformResultTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/StepTransformResultTest.java @@ -31,6 +31,7 @@ import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.values.PCollection; import org.hamcrest.Matchers; import org.junit.Before; +import org.junit.Rule; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; @@ -44,9 +45,11 @@ public class StepTransformResultTest { private BundleFactory bundleFactory; private PCollection<Integer> pc; + @Rule + public TestPipeline p = TestPipeline.create().enableAbandonedNodeEnforcement(false); + @Before public void setup() { - TestPipeline p = TestPipeline.create(); pc = p.apply(Create.of(1, 2, 3)); transform = DirectGraphs.getGraph(p).getProducer(pc); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/09c404a6/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactoryTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactoryTest.java index 6bb8623..c5b3b3d 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactoryTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactoryTest.java @@ -41,6 +41,7 @@ import org.hamcrest.Matchers; import org.joda.time.Duration; import org.joda.time.Instant; import org.junit.Before; +import org.junit.Rule; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; @@ -52,6 +53,9 @@ public class TestStreamEvaluatorFactoryTest { private BundleFactory bundleFactory; private EvaluationContext context; + @Rule + public TestPipeline p = TestPipeline.create().enableAbandonedNodeEnforcement(false); + @Before public void setup() { context = mock(EvaluationContext.class); @@ -62,7 +66,6 @@ public class TestStreamEvaluatorFactoryTest { /** Demonstrates that returned evaluators produce elements in sequence. */ @Test public void producesElementsInSequence() throws Exception { - TestPipeline p = TestPipeline.create(); PCollection<Integer> streamVals = p.apply( TestStream.create(VarIntCoder.of()) http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/09c404a6/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java index 4ad22bc..e66ffcf 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java @@ -73,6 +73,9 @@ public class TransformExecutorTest { @Mock private EvaluationContext evaluationContext; @Mock private TransformEvaluatorRegistry registry; + @Rule + public TestPipeline p = TestPipeline.create().enableAbandonedNodeEnforcement(false); + @Before public void setup() { MockitoAnnotations.initMocks(this); @@ -85,7 +88,6 @@ public class TransformExecutorTest { evaluatorCompleted = new CountDownLatch(1); completionCallback = new RegisteringCompletionCallback(evaluatorCompleted); - TestPipeline p = TestPipeline.create(); created = p.apply(Create.of("foo", "spam", "third")); PCollection<KV<Integer, String>> downstream = created.apply(WithKeys.<Integer, String>of(3)); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/09c404a6/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java index dd36a2f..92d668e 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java @@ -72,6 +72,7 @@ import org.joda.time.DateTime; import org.joda.time.Instant; import org.joda.time.ReadableInstant; import org.junit.Before; +import org.junit.Rule; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; @@ -93,10 +94,12 @@ public class UnboundedReadEvaluatorFactoryTest { private UnboundedSource<Long, ?> source; private DirectGraph graph; + @Rule + public TestPipeline p = TestPipeline.create().enableAbandonedNodeEnforcement(false); + @Before public void setup() { source = CountingSource.unboundedWithTimestampFn(new LongToInstantFn()); - TestPipeline p = TestPipeline.create(); longs = p.apply(Read.from(source)); context = mock(EvaluationContext.class); @@ -190,7 +193,6 @@ public class UnboundedReadEvaluatorFactoryTest { new TestUnboundedSource<>(BigEndianLongCoder.of(), outputs); source.dedupes = true; - TestPipeline p = TestPipeline.create(); PCollection<Long> pcollection = p.apply(Read.from(source)); AppliedPTransform<?, ?, ?> sourceTransform = getProducer(pcollection); @@ -231,7 +233,6 @@ public class UnboundedReadEvaluatorFactoryTest { @Test public void noElementsAvailableReaderIncludedInResidual() throws Exception { - TestPipeline p = TestPipeline.create(); // Read with a very slow rate so by the second read there are no more elements PCollection<Long> pcollection = p.apply(Read.from(new TestUnboundedSource<>(VarLongCoder.of(), 1L))); @@ -291,7 +292,6 @@ public class UnboundedReadEvaluatorFactoryTest { TestUnboundedSource<Long> source = new TestUnboundedSource<>(BigEndianLongCoder.of(), elems.toArray(new Long[0])); - TestPipeline p = TestPipeline.create(); PCollection<Long> pcollection = p.apply(Read.from(source)); DirectGraph graph = DirectGraphs.getGraph(p); AppliedPTransform<?, ?, ?> sourceTransform = @@ -337,7 +337,6 @@ public class UnboundedReadEvaluatorFactoryTest { TestUnboundedSource<Long> source = new TestUnboundedSource<>(BigEndianLongCoder.of(), elems.toArray(new Long[0])); - TestPipeline p = TestPipeline.create(); PCollection<Long> pcollection = p.apply(Read.from(source)); AppliedPTransform<?, ?, ?> sourceTransform = DirectGraphs.getGraph(p).getProducer(pcollection); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/09c404a6/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewEvaluatorFactoryTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewEvaluatorFactoryTest.java index 7c08009..6baf55a 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewEvaluatorFactoryTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewEvaluatorFactoryTest.java @@ -41,6 +41,7 @@ import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionView; import org.joda.time.Instant; +import org.junit.Rule; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; @@ -52,9 +53,11 @@ import org.junit.runners.JUnit4; public class ViewEvaluatorFactoryTest { private BundleFactory bundleFactory = ImmutableListBundleFactory.create(); + @Rule + public TestPipeline p = TestPipeline.create().enableAbandonedNodeEnforcement(false); + @Test public void testInMemoryEvaluator() throws Exception { - TestPipeline p = TestPipeline.create(); PCollection<String> input = p.apply(Create.of("foo", "bar")); CreatePCollectionView<String, Iterable<String>> createView = http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/09c404a6/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkCallbackExecutorTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkCallbackExecutorTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkCallbackExecutorTest.java index acdabb6..8d6e73f 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkCallbackExecutorTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkCallbackExecutorTest.java @@ -37,6 +37,7 @@ import org.apache.beam.sdk.values.PCollection; import org.joda.time.Duration; import org.joda.time.Instant; import org.junit.Before; +import org.junit.Rule; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; @@ -51,9 +52,11 @@ public class WatermarkCallbackExecutorTest { private AppliedPTransform<?, ?, ?> create; private AppliedPTransform<?, ?, ?> sum; + @Rule + public TestPipeline p = TestPipeline.create().enableAbandonedNodeEnforcement(false); + @Before public void setup() { - TestPipeline p = TestPipeline.create(); PCollection<Integer> created = p.apply(Create.of(1, 2, 3)); PCollection<Integer> summed = created.apply(Sum.integersGlobally()); DirectGraph graph = DirectGraphs.getGraph(p); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/09c404a6/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkManagerTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkManagerTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkManagerTest.java index eb4d0cd..abc8a28 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkManagerTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkManagerTest.java @@ -70,6 +70,7 @@ import org.hamcrest.Matchers; import org.joda.time.Instant; import org.joda.time.ReadableInstant; import org.junit.Before; +import org.junit.Rule; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; @@ -94,9 +95,11 @@ public class WatermarkManagerTest implements Serializable { private transient BundleFactory bundleFactory; private DirectGraph graph; + @Rule + public transient TestPipeline p = TestPipeline.create().enableAbandonedNodeEnforcement(false); + @Before public void setup() { - TestPipeline p = TestPipeline.create(); createdInts = p.apply("createdInts", Create.of(1, 2, 3)); @@ -278,7 +281,6 @@ public class WatermarkManagerTest implements Serializable { */ @Test public void getWatermarkMultiIdenticalInput() { - TestPipeline p = TestPipeline.create(); PCollection<Integer> created = p.apply(Create.of(1, 2, 3)); PCollection<Integer> multiConsumer = PCollectionList.of(created).and(created).apply(Flatten.<Integer>pCollections()); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/09c404a6/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WindowEvaluatorFactoryTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WindowEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WindowEvaluatorFactoryTest.java index 66c28ce..9d0c68d 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WindowEvaluatorFactoryTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WindowEvaluatorFactoryTest.java @@ -52,6 +52,7 @@ import org.hamcrest.Matchers; import org.joda.time.Duration; import org.joda.time.Instant; import org.junit.Before; +import org.junit.Rule; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; @@ -96,10 +97,12 @@ public class WindowEvaluatorFactoryTest { ImmutableList.of(GlobalWindow.INSTANCE, intervalWindow1, intervalWindow2), PaneInfo.NO_FIRING); + @Rule + public TestPipeline p = TestPipeline.create().enableAbandonedNodeEnforcement(false); + @Before public void setup() { MockitoAnnotations.initMocks(this); - TestPipeline p = TestPipeline.create(); input = p.apply(Create.of(1L, 2L, 3L)); bundleFactory = ImmutableListBundleFactory.create(); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/09c404a6/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java index aeb75ed..a8c4c02 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java @@ -69,6 +69,7 @@ public class WriteWithShardingFactoryTest { public static final int INPUT_SIZE = 10000; @Rule public TemporaryFolder tmp = new TemporaryFolder(); private WriteWithShardingFactory<Object> factory = new WriteWithShardingFactory<>(); + @Rule public TestPipeline p = TestPipeline.create().enableAbandonedNodeEnforcement(false); @Test public void dynamicallyReshardedWrite() throws Exception { @@ -81,7 +82,6 @@ public class WriteWithShardingFactoryTest { String fileName = "resharded_write"; String outputPath = tmp.getRoot().getAbsolutePath(); String targetLocation = IOChannelUtils.resolve(outputPath, fileName); - TestPipeline p = TestPipeline.create(); // TextIO is implemented in terms of the Write PTransform. When sharding is not specified, // resharding should be automatically applied p.apply(Create.of(strs)).apply(TextIO.Write.to(targetLocation)); @@ -134,7 +134,7 @@ public class WriteWithShardingFactoryTest { public void keyBasedOnCountFnWithOneElement() throws Exception { PCollectionView<Long> elementCountView = PCollectionViews.singletonView( - TestPipeline.create(), WindowingStrategy.globalDefault(), true, 0L, VarLongCoder.of()); + p, WindowingStrategy.globalDefault(), true, 0L, VarLongCoder.of()); KeyBasedOnCountFn<String> fn = new KeyBasedOnCountFn<>(elementCountView, 0); DoFnTester<String, KV<Integer, String>> fnTester = DoFnTester.of(fn); @@ -149,7 +149,7 @@ public class WriteWithShardingFactoryTest { public void keyBasedOnCountFnWithTwoElements() throws Exception { PCollectionView<Long> elementCountView = PCollectionViews.singletonView( - TestPipeline.create(), WindowingStrategy.globalDefault(), true, 0L, VarLongCoder.of()); + p, WindowingStrategy.globalDefault(), true, 0L, VarLongCoder.of()); KeyBasedOnCountFn<String> fn = new KeyBasedOnCountFn<>(elementCountView, 0); DoFnTester<String, KV<Integer, String>> fnTester = DoFnTester.of(fn); @@ -167,7 +167,7 @@ public class WriteWithShardingFactoryTest { public void keyBasedOnCountFnFewElementsThreeShards() throws Exception { PCollectionView<Long> elementCountView = PCollectionViews.singletonView( - TestPipeline.create(), WindowingStrategy.globalDefault(), true, 0L, VarLongCoder.of()); + p, WindowingStrategy.globalDefault(), true, 0L, VarLongCoder.of()); KeyBasedOnCountFn<String> fn = new KeyBasedOnCountFn<>(elementCountView, 0); DoFnTester<String, KV<Integer, String>> fnTester = DoFnTester.of(fn); @@ -191,7 +191,7 @@ public class WriteWithShardingFactoryTest { public void keyBasedOnCountFnManyElements() throws Exception { PCollectionView<Long> elementCountView = PCollectionViews.singletonView( - TestPipeline.create(), WindowingStrategy.globalDefault(), true, 0L, VarLongCoder.of()); + p, WindowingStrategy.globalDefault(), true, 0L, VarLongCoder.of()); KeyBasedOnCountFn<String> fn = new KeyBasedOnCountFn<>(elementCountView, 0); DoFnTester<String, KV<Integer, String>> fnTester = DoFnTester.of(fn); @@ -214,7 +214,7 @@ public class WriteWithShardingFactoryTest { public void keyBasedOnCountFnFewElementsExtraShards() throws Exception { PCollectionView<Long> elementCountView = PCollectionViews.singletonView( - TestPipeline.create(), WindowingStrategy.globalDefault(), true, 0L, VarLongCoder.of()); + p, WindowingStrategy.globalDefault(), true, 0L, VarLongCoder.of()); KeyBasedOnCountFn<String> fn = new KeyBasedOnCountFn<>(elementCountView, 10); DoFnTester<String, KV<Integer, String>> fnTester = DoFnTester.of(fn); @@ -238,7 +238,7 @@ public class WriteWithShardingFactoryTest { public void keyBasedOnCountFnManyElementsExtraShards() throws Exception { PCollectionView<Long> elementCountView = PCollectionViews.singletonView( - TestPipeline.create(), WindowingStrategy.globalDefault(), true, 0L, VarLongCoder.of()); + p, WindowingStrategy.globalDefault(), true, 0L, VarLongCoder.of()); KeyBasedOnCountFn<String> fn = new KeyBasedOnCountFn<>(elementCountView, 3); DoFnTester<String, KV<Integer, String>> fnTester = DoFnTester.of(fn);