Migrated the beam-sdks-java-io-java8tests module to TestPipeline as a JUnit rule + fixed WithTimestampsJava8Test.withTimestampsLambdaShouldApplyTimestamps.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/4b23d42c Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/4b23d42c Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/4b23d42c Branch: refs/heads/python-sdk Commit: 4b23d42c31c95bed0d64bfc393fa193311e93498 Parents: fce4f65 Author: Stas Levin <stasle...@gmail.com> Authored: Tue Dec 20 18:57:57 2016 +0200 Committer: Kenneth Knowles <k...@google.com> Committed: Tue Dec 20 09:55:46 2016 -0800 ---------------------------------------------------------------------- .../apache/beam/sdk/transforms/CombineJava8Test.java | 8 +++----- .../apache/beam/sdk/transforms/DistinctJava8Test.java | 5 +++-- .../apache/beam/sdk/transforms/FilterJava8Test.java | 9 +++------ .../beam/sdk/transforms/FlatMapElementsJava8Test.java | 7 ++++--- .../beam/sdk/transforms/MapElementsJava8Test.java | 9 ++++++--- .../beam/sdk/transforms/PartitionJava8Test.java | 7 ++++--- .../apache/beam/sdk/transforms/WithKeysJava8Test.java | 6 ++++-- .../beam/sdk/transforms/WithTimestampsJava8Test.java | 14 ++++++++++---- 8 files changed, 37 insertions(+), 28 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4b23d42c/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/CombineJava8Test.java ---------------------------------------------------------------------- diff --git a/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/CombineJava8Test.java b/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/CombineJava8Test.java index 98d99ce..a0f7ce6 100644 --- a/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/CombineJava8Test.java +++ b/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/CombineJava8Test.java @@ -23,7 +23,6 @@ import static org.hamcrest.Matchers.not; import com.google.common.collect.Iterables; import java.io.Serializable; -import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.display.DisplayData; @@ -45,6 +44,9 @@ import org.junit.runners.JUnit4; public class CombineJava8Test implements Serializable { @Rule + public final transient TestPipeline pipeline = TestPipeline.create(); + + @Rule public transient ExpectedException thrown = ExpectedException.none(); /** @@ -65,7 +67,6 @@ public class CombineJava8Test implements Serializable { */ @Test public void testCombineGloballyLambda() { - Pipeline pipeline = TestPipeline.create(); PCollection<Integer> output = pipeline .apply(Create.of(1, 2, 3, 4)) @@ -86,7 +87,6 @@ public class CombineJava8Test implements Serializable { */ @Test public void testCombineGloballyInstanceMethodReference() { - Pipeline pipeline = TestPipeline.create(); PCollection<Integer> output = pipeline .apply(Create.of(1, 2, 3, 4)) @@ -101,7 +101,6 @@ public class CombineJava8Test implements Serializable { */ @Test public void testCombinePerKeyLambda() { - Pipeline pipeline = TestPipeline.create(); PCollection<KV<String, Integer>> output = pipeline .apply(Create.of(KV.of("a", 1), KV.of("b", 2), KV.of("a", 3), KV.of("c", 4))) @@ -125,7 +124,6 @@ public class CombineJava8Test implements Serializable { */ @Test public void testCombinePerKeyInstanceMethodReference() { - Pipeline pipeline = TestPipeline.create(); PCollection<KV<String, Integer>> output = pipeline .apply(Create.of(KV.of("a", 1), KV.of("b", 2), KV.of("a", 3), KV.of("c", 4))) http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4b23d42c/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/DistinctJava8Test.java ---------------------------------------------------------------------- diff --git a/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/DistinctJava8Test.java b/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/DistinctJava8Test.java index 99ef232..790f51e 100644 --- a/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/DistinctJava8Test.java +++ b/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/DistinctJava8Test.java @@ -44,11 +44,13 @@ import org.junit.runners.JUnit4; public class DistinctJava8Test { @Rule + public final transient TestPipeline p = TestPipeline.create(); + + @Rule public ExpectedException thrown = ExpectedException.none(); @Test public void withLambdaRepresentativeValuesFnAndTypeDescriptorShouldApplyFn() { - TestPipeline p = TestPipeline.create(); Multimap<Integer, String> predupedContents = HashMultimap.create(); predupedContents.put(3, "foo"); @@ -76,7 +78,6 @@ public class DistinctJava8Test { @Test public void withLambdaRepresentativeValuesFnNoTypeDescriptorShouldThrow() { - TestPipeline p = TestPipeline.create(); Multimap<Integer, String> predupedContents = HashMultimap.create(); predupedContents.put(3, "foo"); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4b23d42c/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/FilterJava8Test.java ---------------------------------------------------------------------- diff --git a/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/FilterJava8Test.java b/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/FilterJava8Test.java index afd1c8b..f91371e 100644 --- a/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/FilterJava8Test.java +++ b/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/FilterJava8Test.java @@ -18,7 +18,6 @@ package org.apache.beam.sdk.transforms; import java.io.Serializable; -import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.CannotProvideCoderException; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.RunnableOnService; @@ -39,12 +38,14 @@ import org.junit.runners.JUnit4; public class FilterJava8Test implements Serializable { @Rule + public final transient TestPipeline pipeline = TestPipeline.create(); + + @Rule public transient ExpectedException thrown = ExpectedException.none(); @Test @Category(RunnableOnService.class) public void testIdentityFilterByPredicate() { - Pipeline pipeline = TestPipeline.create(); PCollection<Integer> output = pipeline .apply(Create.of(591, 11789, 1257, 24578, 24799, 307)) @@ -56,7 +57,6 @@ public class FilterJava8Test implements Serializable { @Test public void testNoFilterByPredicate() { - Pipeline pipeline = TestPipeline.create(); PCollection<Integer> output = pipeline .apply(Create.of(1, 2, 4, 5)) @@ -69,7 +69,6 @@ public class FilterJava8Test implements Serializable { @Test @Category(RunnableOnService.class) public void testFilterByPredicate() { - Pipeline pipeline = TestPipeline.create(); PCollection<Integer> output = pipeline .apply(Create.of(1, 2, 3, 4, 5, 6, 7)) @@ -85,7 +84,6 @@ public class FilterJava8Test implements Serializable { */ @Test public void testFilterParDoOutputTypeDescriptorRaw() throws Exception { - Pipeline pipeline = TestPipeline.create(); @SuppressWarnings({"unchecked", "rawtypes"}) PCollection<String> output = pipeline @@ -99,7 +97,6 @@ public class FilterJava8Test implements Serializable { @Test @Category(RunnableOnService.class) public void testFilterByMethodReference() { - Pipeline pipeline = TestPipeline.create(); PCollection<Integer> output = pipeline .apply(Create.of(1, 2, 3, 4, 5, 6, 7)) http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4b23d42c/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/FlatMapElementsJava8Test.java ---------------------------------------------------------------------- diff --git a/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/FlatMapElementsJava8Test.java b/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/FlatMapElementsJava8Test.java index 70cc04d..471724d 100644 --- a/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/FlatMapElementsJava8Test.java +++ b/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/FlatMapElementsJava8Test.java @@ -20,7 +20,6 @@ package org.apache.beam.sdk.transforms; import com.google.common.collect.ImmutableList; import java.io.Serializable; import java.util.List; -import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.values.PCollection; @@ -38,6 +37,9 @@ import org.junit.runners.JUnit4; public class FlatMapElementsJava8Test implements Serializable { @Rule + public final transient TestPipeline pipeline = TestPipeline.create(); + + @Rule public transient ExpectedException thrown = ExpectedException.none(); /** @@ -46,7 +48,6 @@ public class FlatMapElementsJava8Test implements Serializable { */ @Test public void testFlatMapBasic() throws Exception { - Pipeline pipeline = TestPipeline.create(); PCollection<Integer> output = pipeline .apply(Create.of(1, 2, 3)) .apply(FlatMapElements @@ -63,7 +64,7 @@ public class FlatMapElementsJava8Test implements Serializable { */ @Test public void testFlatMapMethodReference() throws Exception { - Pipeline pipeline = TestPipeline.create(); + PCollection<Integer> output = pipeline .apply(Create.of(1, 2, 3)) .apply(FlatMapElements http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4b23d42c/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/MapElementsJava8Test.java ---------------------------------------------------------------------- diff --git a/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/MapElementsJava8Test.java b/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/MapElementsJava8Test.java index 9b556b9..ce0f111 100644 --- a/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/MapElementsJava8Test.java +++ b/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/MapElementsJava8Test.java @@ -18,11 +18,11 @@ package org.apache.beam.sdk.transforms; import java.io.Serializable; -import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.TypeDescriptor; +import org.junit.Rule; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; @@ -33,13 +33,16 @@ import org.junit.runners.JUnit4; @RunWith(JUnit4.class) public class MapElementsJava8Test implements Serializable { + @Rule + public final transient TestPipeline pipeline = TestPipeline.create(); + /** * Basic test of {@link MapElements} with a lambda (which is instantiated as a * {@link SerializableFunction}). */ @Test public void testMapBasic() throws Exception { - Pipeline pipeline = TestPipeline.create(); + PCollection<Integer> output = pipeline .apply(Create.of(1, 2, 3)) .apply(MapElements @@ -56,7 +59,7 @@ public class MapElementsJava8Test implements Serializable { */ @Test public void testMapMethodReference() throws Exception { - Pipeline pipeline = TestPipeline.create(); + PCollection<Integer> output = pipeline .apply(Create.of(1, 2, 3)) .apply(MapElements http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4b23d42c/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/PartitionJava8Test.java ---------------------------------------------------------------------- diff --git a/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/PartitionJava8Test.java b/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/PartitionJava8Test.java index 0aeb41f..7d97740 100644 --- a/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/PartitionJava8Test.java +++ b/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/PartitionJava8Test.java @@ -20,7 +20,6 @@ package org.apache.beam.sdk.transforms; import static org.junit.Assert.assertEquals; import java.io.Serializable; -import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.CannotProvideCoderException; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; @@ -39,11 +38,14 @@ import org.junit.runners.JUnit4; public class PartitionJava8Test implements Serializable { @Rule + public final transient TestPipeline pipeline = TestPipeline.create(); + + @Rule public transient ExpectedException thrown = ExpectedException.none(); @Test public void testModPartition() { - Pipeline pipeline = TestPipeline.create(); + PCollectionList<Integer> outputs = pipeline .apply(Create.of(1, 2, 4, 5)) @@ -61,7 +63,6 @@ public class PartitionJava8Test implements Serializable { */ @Test public void testPartitionFnOutputTypeDescriptorRaw() throws Exception { - Pipeline pipeline = TestPipeline.create(); PCollectionList<String> output = pipeline .apply(Create.of("hello")) http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4b23d42c/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/WithKeysJava8Test.java ---------------------------------------------------------------------- diff --git a/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/WithKeysJava8Test.java b/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/WithKeysJava8Test.java index a5b9cb1..6ba41fa 100644 --- a/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/WithKeysJava8Test.java +++ b/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/WithKeysJava8Test.java @@ -39,12 +39,15 @@ import org.junit.runners.JUnit4; public class WithKeysJava8Test { @Rule + public final transient TestPipeline p = TestPipeline.create(); + + @Rule public ExpectedException thrown = ExpectedException.none(); @Test @Category(RunnableOnService.class) public void withLambdaAndTypeDescriptorShouldSucceed() { - TestPipeline p = TestPipeline.create(); + PCollection<String> values = p.apply(Create.of("1234", "3210", "0", "-12")); PCollection<KV<Integer, String>> kvs = values.apply( @@ -59,7 +62,6 @@ public class WithKeysJava8Test { @Test public void withLambdaAndNoTypeDescriptorShouldThrow() { - TestPipeline p = TestPipeline.create(); PCollection<String> values = p.apply(Create.of("1234", "3210", "0", "-12")); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4b23d42c/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/WithTimestampsJava8Test.java ---------------------------------------------------------------------- diff --git a/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/WithTimestampsJava8Test.java b/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/WithTimestampsJava8Test.java index 5f1e74b..a0c6370 100644 --- a/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/WithTimestampsJava8Test.java +++ b/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/WithTimestampsJava8Test.java @@ -24,6 +24,7 @@ import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; import org.joda.time.Instant; +import org.junit.Rule; import org.junit.Test; import org.junit.experimental.categories.Category; import org.junit.runner.RunWith; @@ -34,15 +35,18 @@ import org.junit.runners.JUnit4; */ @RunWith(JUnit4.class) public class WithTimestampsJava8Test implements Serializable { + + @Rule + public final transient TestPipeline p = TestPipeline.create(); + @Test @Category(RunnableOnService.class) public void withTimestampsLambdaShouldApplyTimestamps() { - TestPipeline p = TestPipeline.create(); - String yearTwoThousand = "946684800000"; + final String yearTwoThousand = "946684800000"; PCollection<String> timestamped = p.apply(Create.of("1234", "0", Integer.toString(Integer.MAX_VALUE), yearTwoThousand)) - .apply(WithTimestamps.of((String input) -> new Instant(Long.valueOf(yearTwoThousand)))); + .apply(WithTimestamps.of((String input) -> new Instant(Long.valueOf(input)))); PCollection<KV<String, Instant>> timestampedVals = timestamped.apply(ParDo.of(new DoFn<String, KV<String, Instant>>() { @@ -58,8 +62,10 @@ public class WithTimestampsJava8Test implements Serializable { PAssert.that(timestampedVals) .containsInAnyOrder( KV.of("0", new Instant(0)), - KV.of("1234", new Instant("1234")), + KV.of("1234", new Instant(Long.valueOf("1234"))), KV.of(Integer.toString(Integer.MAX_VALUE), new Instant(Integer.MAX_VALUE)), KV.of(yearTwoThousand, new Instant(Long.valueOf(yearTwoThousand)))); + + p.run(); } }