Migrated the beam-examples-java8 module to TestPipeline as a JUnit rule.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/7106e880 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/7106e880 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/7106e880 Branch: refs/heads/master Commit: 7106e8806f32089eab36e6feee4928026bf52714 Parents: 8306899 Author: Stas Levin <[email protected]> Authored: Sun Dec 18 18:38:11 2016 +0200 Committer: Kenneth Knowles <[email protected]> Committed: Tue Dec 20 09:55:44 2016 -0800 ---------------------------------------------------------------------- .../apache/beam/examples/MinimalWordCountJava8Test.java | 6 ++++-- .../beam/examples/complete/game/GameStatsTest.java | 7 ++++--- .../beam/examples/complete/game/HourlyTeamScoreTest.java | 5 +++-- .../beam/examples/complete/game/LeaderBoardTest.java | 11 +++++------ .../beam/examples/complete/game/UserScoreTest.java | 10 +++++----- 5 files changed, 21 insertions(+), 18 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7106e880/examples/java8/src/test/java/org/apache/beam/examples/MinimalWordCountJava8Test.java ---------------------------------------------------------------------- diff --git a/examples/java8/src/test/java/org/apache/beam/examples/MinimalWordCountJava8Test.java b/examples/java8/src/test/java/org/apache/beam/examples/MinimalWordCountJava8Test.java index f373343..c2f3efe 100644 --- a/examples/java8/src/test/java/org/apache/beam/examples/MinimalWordCountJava8Test.java +++ b/examples/java8/src/test/java/org/apache/beam/examples/MinimalWordCountJava8Test.java @@ -26,7 +26,6 @@ import java.nio.file.Files; import java.nio.file.StandardOpenOption; import java.util.Arrays; import java.util.List; -import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.io.TextIO; import org.apache.beam.sdk.options.GcsOptions; import org.apache.beam.sdk.testing.TestPipeline; @@ -38,6 +37,7 @@ import org.apache.beam.sdk.util.GcsUtil; import org.apache.beam.sdk.util.gcsfs.GcsPath; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.TypeDescriptors; +import org.junit.Rule; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; @@ -52,12 +52,14 @@ import org.mockito.stubbing.Answer; @RunWith(JUnit4.class) public class MinimalWordCountJava8Test implements Serializable { + @Rule + public TestPipeline p = TestPipeline.create().enableAbandonedNodeEnforcement(false); + /** * A basic smoke test that ensures there is no crash at pipeline construction time. */ @Test public void testMinimalWordCountJava8() throws Exception { - Pipeline p = TestPipeline.create(); p.getOptions().as(GcsOptions.class).setGcsUtil(buildMockGcsUtil()); p.apply(TextIO.Read.from("gs://apache-beam-samples/shakespeare/*")) http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7106e880/examples/java8/src/test/java/org/apache/beam/examples/complete/game/GameStatsTest.java ---------------------------------------------------------------------- diff --git a/examples/java8/src/test/java/org/apache/beam/examples/complete/game/GameStatsTest.java b/examples/java8/src/test/java/org/apache/beam/examples/complete/game/GameStatsTest.java index 8e12c46..da2bb91 100644 --- a/examples/java8/src/test/java/org/apache/beam/examples/complete/game/GameStatsTest.java +++ b/examples/java8/src/test/java/org/apache/beam/examples/complete/game/GameStatsTest.java @@ -21,7 +21,6 @@ import java.io.Serializable; import java.util.Arrays; import java.util.List; import org.apache.beam.examples.complete.game.GameStats.CalculateSpammyUsers; -import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.RunnableOnService; @@ -29,6 +28,7 @@ import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; +import org.junit.Rule; import org.junit.Test; import org.junit.experimental.categories.Category; import org.junit.runner.RunWith; @@ -58,12 +58,13 @@ public class GameStatsTest implements Serializable { static final List<KV<String, Integer>> SPAMMERS = Arrays.asList( KV.of("Robot-2", 66), KV.of("Robot-1", 116)); + @Rule + public TestPipeline p = TestPipeline.create(); + /** Test the calculation of 'spammy users'. */ @Test @Category(RunnableOnService.class) public void testCalculateSpammyUsers() throws Exception { - Pipeline p = TestPipeline.create(); - PCollection<KV<String, Integer>> input = p.apply(Create.of(USER_SCORES)); PCollection<KV<String, Integer>> output = input.apply(new CalculateSpammyUsers()); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7106e880/examples/java8/src/test/java/org/apache/beam/examples/complete/game/HourlyTeamScoreTest.java ---------------------------------------------------------------------- diff --git a/examples/java8/src/test/java/org/apache/beam/examples/complete/game/HourlyTeamScoreTest.java b/examples/java8/src/test/java/org/apache/beam/examples/complete/game/HourlyTeamScoreTest.java index cc42d52..34a0744 100644 --- a/examples/java8/src/test/java/org/apache/beam/examples/complete/game/HourlyTeamScoreTest.java +++ b/examples/java8/src/test/java/org/apache/beam/examples/complete/game/HourlyTeamScoreTest.java @@ -22,7 +22,6 @@ import java.util.Arrays; import java.util.List; import org.apache.beam.examples.complete.game.UserScore.GameActionInfo; import org.apache.beam.examples.complete.game.UserScore.ParseEventFn; -import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.testing.PAssert; @@ -36,6 +35,7 @@ import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.TypeDescriptors; import org.joda.time.Instant; +import org.junit.Rule; import org.junit.Test; import org.junit.experimental.categories.Category; import org.junit.runner.RunWith; @@ -81,12 +81,13 @@ public class HourlyTeamScoreTest implements Serializable { KV.of("user18_BananaEmu", 1), KV.of("user18_ApricotCaneToad", 14) }; + @Rule + public TestPipeline p = TestPipeline.create(); /** Test the filtering. */ @Test @Category(RunnableOnService.class) public void testUserScoresFilter() throws Exception { - Pipeline p = TestPipeline.create(); final Instant startMinTimestamp = new Instant(1447965680000L); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7106e880/examples/java8/src/test/java/org/apache/beam/examples/complete/game/LeaderBoardTest.java ---------------------------------------------------------------------- diff --git a/examples/java8/src/test/java/org/apache/beam/examples/complete/game/LeaderBoardTest.java b/examples/java8/src/test/java/org/apache/beam/examples/complete/game/LeaderBoardTest.java index 2b51da1..745c210 100644 --- a/examples/java8/src/test/java/org/apache/beam/examples/complete/game/LeaderBoardTest.java +++ b/examples/java8/src/test/java/org/apache/beam/examples/complete/game/LeaderBoardTest.java @@ -41,6 +41,7 @@ import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.TimestampedValue; import org.joda.time.Duration; import org.joda.time.Instant; +import org.junit.Rule; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; @@ -54,6 +55,8 @@ public class LeaderBoardTest implements Serializable { private static final Duration TEAM_WINDOW_DURATION = Duration.standardMinutes(20); private Instant baseTime = new Instant(0); + @Rule + public TestPipeline p = TestPipeline.create(); /** * Some example users, on two separate teams. */ @@ -84,7 +87,6 @@ public class LeaderBoardTest implements Serializable { */ @Test public void testTeamScoresOnTime() { - TestPipeline p = TestPipeline.create(); TestStream<GameActionInfo> createEvents = TestStream.create(AvroCoder.of(GameActionInfo.class)) // Start at the epoch @@ -120,7 +122,6 @@ public class LeaderBoardTest implements Serializable { */ @Test public void testTeamScoresSpeculative() { - TestPipeline p = TestPipeline.create(); TestStream<GameActionInfo> createEvents = TestStream.create(AvroCoder.of(GameActionInfo.class)) // Start at the epoch @@ -170,7 +171,6 @@ public class LeaderBoardTest implements Serializable { */ @Test public void testTeamScoresUnobservablyLate() { - TestPipeline p = TestPipeline.create(); BoundedWindow window = new IntervalWindow(baseTime, TEAM_WINDOW_DURATION); TestStream<GameActionInfo> createEvents = TestStream.create(AvroCoder.of(GameActionInfo.class)) @@ -208,7 +208,6 @@ public class LeaderBoardTest implements Serializable { */ @Test public void testTeamScoresObservablyLate() { - TestPipeline p = TestPipeline.create(); Instant firstWindowCloses = baseTime.plus(ALLOWED_LATENESS).plus(TEAM_WINDOW_DURATION); TestStream<GameActionInfo> createEvents = TestStream.create(AvroCoder.of(GameActionInfo.class)) @@ -268,7 +267,6 @@ public class LeaderBoardTest implements Serializable { */ @Test public void testTeamScoresDroppablyLate() { - TestPipeline p = TestPipeline.create(); BoundedWindow window = new IntervalWindow(baseTime, TEAM_WINDOW_DURATION); TestStream<GameActionInfo> infos = TestStream.create(AvroCoder.of(GameActionInfo.class)) @@ -299,6 +297,8 @@ public class LeaderBoardTest implements Serializable { // No elements are added before the watermark passes the end of the window plus the allowed // lateness, so no refinement should be emitted PAssert.that(teamScores).inFinalPane(window).empty(); + + p.run().waitUntilFinish(); } /** @@ -308,7 +308,6 @@ public class LeaderBoardTest implements Serializable { */ @Test public void testUserScore() { - TestPipeline p = TestPipeline.create(); TestStream<GameActionInfo> infos = TestStream.create(AvroCoder.of(GameActionInfo.class)) http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7106e880/examples/java8/src/test/java/org/apache/beam/examples/complete/game/UserScoreTest.java ---------------------------------------------------------------------- diff --git a/examples/java8/src/test/java/org/apache/beam/examples/complete/game/UserScoreTest.java b/examples/java8/src/test/java/org/apache/beam/examples/complete/game/UserScoreTest.java index 39de333..3b77b26 100644 --- a/examples/java8/src/test/java/org/apache/beam/examples/complete/game/UserScoreTest.java +++ b/examples/java8/src/test/java/org/apache/beam/examples/complete/game/UserScoreTest.java @@ -23,7 +23,6 @@ import java.util.List; import org.apache.beam.examples.complete.game.UserScore.ExtractAndSumScore; import org.apache.beam.examples.complete.game.UserScore.GameActionInfo; import org.apache.beam.examples.complete.game.UserScore.ParseEventFn; -import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.RunnableOnService; @@ -36,6 +35,7 @@ import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.TypeDescriptors; import org.junit.Assert; +import org.junit.Rule; import org.junit.Test; import org.junit.experimental.categories.Category; import org.junit.runner.RunWith; @@ -81,7 +81,10 @@ public class UserScoreTest implements Serializable { KV.of("AndroidGreenKookaburra", 23), KV.of("BisqueBilby", 14)); - /** Test the {@link ParseEventFn} {@link DoFn}. */ + @Rule + public TestPipeline p = TestPipeline.create(); + + /** Test the {@link ParseEventFn} {@link org.apache.beam.sdk.transforms.DoFn}. */ @Test public void testParseEventFn() throws Exception { DoFnTester<String, GameActionInfo> parseEventFn = @@ -98,7 +101,6 @@ public class UserScoreTest implements Serializable { @Test @Category(RunnableOnService.class) public void testUserScoreSums() throws Exception { - Pipeline p = TestPipeline.create(); PCollection<String> input = p.apply(Create.of(GAME_EVENTS).withCoder(StringUtf8Coder.of())); @@ -117,7 +119,6 @@ public class UserScoreTest implements Serializable { @Test @Category(RunnableOnService.class) public void testTeamScoreSums() throws Exception { - Pipeline p = TestPipeline.create(); PCollection<String> input = p.apply(Create.of(GAME_EVENTS).withCoder(StringUtf8Coder.of())); @@ -136,7 +137,6 @@ public class UserScoreTest implements Serializable { @Test @Category(RunnableOnService.class) public void testUserScoresBadInput() throws Exception { - Pipeline p = TestPipeline.create(); PCollection<String> input = p.apply(Create.of(GAME_EVENTS2).withCoder(StringUtf8Coder.of()));
