Migrated the beam-sdks-java-io-kinesis module to TestPipeline as a JUnit rule.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/950aa7e1 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/950aa7e1 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/950aa7e1 Branch: refs/heads/python-sdk Commit: 950aa7e1d9c50167933eb192a16e15700e483377 Parents: 12be8b1 Author: Stas Levin <stasle...@gmail.com> Authored: Tue Dec 20 17:44:15 2016 +0200 Committer: Kenneth Knowles <k...@google.com> Committed: Tue Dec 20 09:55:46 2016 -0800 ---------------------------------------------------------------------- .../org/apache/beam/sdk/io/kinesis/KinesisMockReadTest.java | 7 +++++-- .../java/org/apache/beam/sdk/io/kinesis/KinesisReaderIT.java | 6 ++++-- 2 files changed, 9 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/950aa7e1/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisMockReadTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisMockReadTest.java b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisMockReadTest.java index f0ab46c..075805e 100644 --- a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisMockReadTest.java +++ b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisMockReadTest.java @@ -22,19 +22,23 @@ import static com.google.common.collect.Lists.newArrayList; import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream; import com.google.common.collect.Iterables; import java.util.List; -import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.values.PCollection; import org.joda.time.DateTime; +import org.junit.Rule; import org.junit.Test; /** * Tests {@link AmazonKinesisMock}. */ public class KinesisMockReadTest { + + @Rule + public final transient TestPipeline p = TestPipeline.create(); + @Test public void readsDataFromMockKinesis() { int noOfShards = 3; @@ -42,7 +46,6 @@ public class KinesisMockReadTest { List<List<AmazonKinesisMock.TestData>> testData = provideTestData(noOfShards, noOfEventsPerShard); - final Pipeline p = TestPipeline.create(); PCollection<AmazonKinesisMock.TestData> result = p. apply( KinesisIO.Read. http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/950aa7e1/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisReaderIT.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisReaderIT.java b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisReaderIT.java index 73a2455..690cc11 100644 --- a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisReaderIT.java +++ b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisReaderIT.java @@ -31,7 +31,6 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; -import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.PipelineResult; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.testing.PAssert; @@ -43,6 +42,7 @@ import org.apache.commons.lang.RandomStringUtils; import org.joda.time.Duration; import org.joda.time.Instant; import org.junit.Ignore; +import org.junit.Rule; import org.junit.Test; /** @@ -53,6 +53,8 @@ public class KinesisReaderIT { private static final long PIPELINE_STARTUP_TIME = TimeUnit.SECONDS.toMillis(10); private ExecutorService singleThreadExecutor = newSingleThreadExecutor(); + @Rule + public final transient TestPipeline p = TestPipeline.create(); @Ignore @Test @@ -76,7 +78,7 @@ public class KinesisReaderIT { private Future<?> startTestPipeline(List<String> testData, KinesisTestOptions options) throws InterruptedException { - final Pipeline p = TestPipeline.create(); + PCollection<String> result = p. apply(KinesisIO.Read. from(options.getAwsKinesisStream(), Instant.now()).