Repository: incubator-beam Updated Branches: refs/heads/master ae06f759f -> aeff1d5c2
Demonstrate PubsubIO with NVP Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/f9225981 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/f9225981 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/f9225981 Branch: refs/heads/master Commit: f92259814964fb4d3b2381187247b3f11b5fe33f Parents: ae06f75 Author: Sam McVeety <s...@google.com> Authored: Sat Oct 29 19:02:51 2016 -0700 Committer: Dan Halperin <dhalp...@google.com> Committed: Mon Nov 28 21:14:33 2016 -0800 ---------------------------------------------------------------------- .../java/org/apache/beam/sdk/io/PubsubIO.java | 176 ++++++++++++++++--- .../apache/beam/sdk/io/PubsubUnboundedSink.java | 23 ++- .../beam/sdk/io/PubsubUnboundedSource.java | 40 +++-- .../org/apache/beam/sdk/io/PubsubIOTest.java | 43 +++-- .../beam/sdk/io/PubsubUnboundedSinkTest.java | 20 ++- .../beam/sdk/io/PubsubUnboundedSourceTest.java | 14 +- 6 files changed, 232 insertions(+), 84 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f9225981/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java index 72a6399..9768788 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java @@ -31,11 +31,15 @@ import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.coders.VoidCoder; import org.apache.beam.sdk.options.PubsubOptions; +import org.apache.beam.sdk.options.ValueProvider; +import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider; +import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider; import org.apache.beam.sdk.runners.PipelineRunner; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.transforms.windowing.AfterWatermark; import org.apache.beam.sdk.util.CoderUtils; @@ -134,7 +138,7 @@ public class PubsubIO { * Populate common {@link DisplayData} between Pubsub source and sink. */ private static void populateCommonDisplayData(DisplayData.Builder builder, - String timestampLabel, String idLabel, PubsubTopic topic) { + String timestampLabel, String idLabel, String topic) { builder .addIfNotNull(DisplayData.item("timestampLabel", timestampLabel) .withLabel("Timestamp Label Attribute")) @@ -142,7 +146,7 @@ public class PubsubIO { .withLabel("ID Label Attribute")); if (topic != null) { - builder.add(DisplayData.item("topic", topic.asPath()) + builder.add(DisplayData.item("topic", topic) .withLabel("Pubsub Topic")); } } @@ -253,6 +257,61 @@ public class PubsubIO { } /** + * Used to build a {@link ValueProvider} for {@link PubsubSubscription}. + */ + private static class SubscriptionTranslator + implements SerializableFunction<String, PubsubSubscription> { + @Override + public PubsubSubscription apply(String from) { + return PubsubSubscription.fromPath(from); + } + } + + /** + * Used to build a {@link ValueProvider} for {@link SubscriptionPath}. + */ + private static class SubscriptionPathTranslator + implements SerializableFunction<PubsubSubscription, SubscriptionPath> { + @Override + public SubscriptionPath apply(PubsubSubscription from) { + return PubsubClient.subscriptionPathFromName(from.project, from.subscription); + } + } + + /** + * Used to build a {@link ValueProvider} for {@link PubsubTopic}. + */ + private static class TopicTranslator + implements SerializableFunction<String, PubsubTopic> { + @Override + public PubsubTopic apply(String from) { + return PubsubTopic.fromPath(from); + } + } + + /** + * Used to build a {@link ValueProvider} for {@link TopicPath}. + */ + private static class TopicPathTranslator + implements SerializableFunction<PubsubTopic, TopicPath> { + @Override + public TopicPath apply(PubsubTopic from) { + return PubsubClient.topicPathFromName(from.project, from.topic); + } + } + + /** + * Used to build a {@link ValueProvider} for {@link ProjectPath}. + */ + private static class ProjectPathTranslator + implements SerializableFunction<PubsubTopic, ProjectPath> { + @Override + public ProjectPath apply(PubsubTopic from) { + return PubsubClient.projectPathFromId(from.project); + } + } + + /** * Class representing a Cloud Pub/Sub Topic. */ public static class PubsubTopic implements Serializable { @@ -380,6 +439,13 @@ public class PubsubIO { * by the runner. */ public static Bound<String> topic(String topic) { + return new Bound<>(DEFAULT_PUBSUB_CODER).topic(StaticValueProvider.of(topic)); + } + + /** + * Like {@code topic()} but with a {@link ValueProvider}. + */ + public static Bound<String> topic(ValueProvider<String> topic) { return new Bound<>(DEFAULT_PUBSUB_CODER).topic(topic); } @@ -391,6 +457,13 @@ public class PubsubIO { * of the {@code subscription} string. */ public static Bound<String> subscription(String subscription) { + return new Bound<>(DEFAULT_PUBSUB_CODER).subscription(StaticValueProvider.of(subscription)); + } + + /** + * Like {@code topic()} but with a {@link ValueProvider}. + */ + public static Bound<String> subscription(ValueProvider<String> subscription) { return new Bound<>(DEFAULT_PUBSUB_CODER).subscription(subscription); } @@ -484,10 +557,10 @@ public class PubsubIO { */ public static class Bound<T> extends PTransform<PBegin, PCollection<T>> { /** The Cloud Pub/Sub topic to read from. */ - @Nullable private final PubsubTopic topic; + @Nullable private final ValueProvider<PubsubTopic> topic; /** The Cloud Pub/Sub subscription to read from. */ - @Nullable private final PubsubSubscription subscription; + @Nullable private final ValueProvider<PubsubSubscription> subscription; /** The name of the message attribute to read timestamps from. */ @Nullable private final String timestampLabel; @@ -508,9 +581,9 @@ public class PubsubIO { this(null, null, null, null, coder, null, 0, null); } - private Bound(String name, PubsubSubscription subscription, PubsubTopic topic, - String timestampLabel, Coder<T> coder, String idLabel, int maxNumRecords, - Duration maxReadTime) { + private Bound(String name, ValueProvider<PubsubSubscription> subscription, + ValueProvider<PubsubTopic> topic, String timestampLabel, Coder<T> coder, + String idLabel, int maxNumRecords, Duration maxReadTime) { super(name); this.subscription = subscription; this.topic = topic; @@ -535,8 +608,16 @@ public class PubsubIO { * <p>Does not modify this object. */ public Bound<T> subscription(String subscription) { - return new Bound<>(name, PubsubSubscription.fromPath(subscription), topic, timestampLabel, - coder, idLabel, maxNumRecords, maxReadTime); + return subscription(StaticValueProvider.of(subscription)); + } + + /** + * Like {@code subscription()} but with a {@link ValueProvider}. + */ + public Bound<T> subscription(ValueProvider<String> subscription) { + return new Bound<>(name, + NestedValueProvider.of(subscription, new SubscriptionTranslator()), + topic, timestampLabel, coder, idLabel, maxNumRecords, maxReadTime); } /** @@ -548,8 +629,16 @@ public class PubsubIO { * <p>Does not modify this object. */ public Bound<T> topic(String topic) { - return new Bound<>(name, subscription, PubsubTopic.fromPath(topic), timestampLabel, coder, - idLabel, maxNumRecords, maxReadTime); + return topic(StaticValueProvider.of(topic)); + } + + /** + * Like {@code topic()} but with a {@link ValueProvider}. + */ + public Bound<T> topic(ValueProvider<String> topic) { + return new Bound<>(name, subscription, + NestedValueProvider.of(topic, new TopicTranslator()), + timestampLabel, coder, idLabel, maxNumRecords, maxReadTime); } /** @@ -629,15 +718,14 @@ public class PubsubIO { .apply(ParDo.of(new PubsubBoundedReader())) .setCoder(coder); } else { - @Nullable ProjectPath projectPath = - topic == null ? null : PubsubClient.projectPathFromId(topic.project); - @Nullable TopicPath topicPath = - topic == null ? null : PubsubClient.topicPathFromName(topic.project, topic.topic); - @Nullable SubscriptionPath subscriptionPath = + @Nullable ValueProvider<ProjectPath> projectPath = + topic == null ? null : NestedValueProvider.of(topic, new ProjectPathTranslator()); + @Nullable ValueProvider<TopicPath> topicPath = + topic == null ? null : NestedValueProvider.of(topic, new TopicPathTranslator()); + @Nullable ValueProvider<SubscriptionPath> subscriptionPath = subscription == null ? null - : PubsubClient.subscriptionPathFromName( - subscription.project, subscription.subscription); + : NestedValueProvider.of(subscription, new SubscriptionPathTranslator()); return input.getPipeline().begin() .apply(new PubsubUnboundedSource<T>( FACTORY, projectPath, topicPath, subscriptionPath, @@ -648,7 +736,11 @@ public class PubsubIO { @Override public void populateDisplayData(DisplayData.Builder builder) { super.populateDisplayData(builder); - populateCommonDisplayData(builder, timestampLabel, idLabel, topic); + String topicString = + topic == null ? null + : topic.isAccessible() ? topic.get().asPath() + : topic.toString(); + populateCommonDisplayData(builder, timestampLabel, idLabel, topicString); builder .addIfNotNull(DisplayData.item("maxReadTime", maxReadTime) @@ -657,8 +749,10 @@ public class PubsubIO { .withLabel("Maximum Read Records"), 0); if (subscription != null) { - builder.add(DisplayData.item("subscription", subscription.asPath()) - .withLabel("Pubsub Subscription")); + String subscriptionString = subscription.isAccessible() + ? subscription.get().asPath() : subscription.toString(); + builder.add(DisplayData.item("subscription", subscriptionString) + .withLabel("Pubsub Subscription")); } } @@ -668,10 +762,18 @@ public class PubsubIO { } public PubsubTopic getTopic() { + return topic == null ? null : topic.get(); + } + + public ValueProvider<PubsubTopic> getTopicProvider() { return topic; } public PubsubSubscription getSubscription() { + return subscription == null ? null : subscription.get(); + } + + public ValueProvider<PubsubSubscription> getSubscriptionProvider() { return subscription; } @@ -820,6 +922,13 @@ public class PubsubIO { * {@code topic} string. */ public static Bound<String> topic(String topic) { + return new Bound<>(DEFAULT_PUBSUB_CODER).topic(StaticValueProvider.of(topic)); + } + + /** + * Like {@code topic()} but with a {@link ValueProvider}. + */ + public static Bound<String> topic(ValueProvider<String> topic) { return new Bound<>(DEFAULT_PUBSUB_CODER).topic(topic); } @@ -869,7 +978,7 @@ public class PubsubIO { */ public static class Bound<T> extends PTransform<PCollection<T>, PDone> { /** The Cloud Pub/Sub topic to publish to. */ - @Nullable private final PubsubTopic topic; + @Nullable private final ValueProvider<PubsubTopic> topic; /** The name of the message attribute to publish message timestamps in. */ @Nullable private final String timestampLabel; /** The name of the message attribute to publish unique message IDs in. */ @@ -881,7 +990,8 @@ public class PubsubIO { } private Bound( - String name, PubsubTopic topic, String timestampLabel, String idLabel, Coder<T> coder) { + String name, ValueProvider<PubsubTopic> topic, String timestampLabel, + String idLabel, Coder<T> coder) { super(name); this.topic = topic; this.timestampLabel = timestampLabel; @@ -899,7 +1009,15 @@ public class PubsubIO { * <p>Does not modify this object. */ public Bound<T> topic(String topic) { - return new Bound<>(name, PubsubTopic.fromPath(topic), timestampLabel, idLabel, coder); + return topic(StaticValueProvider.of(topic)); + } + + /** + * Like {@code topic()} but with a {@link ValueProvider}. + */ + public Bound<T> topic(ValueProvider<String> topic) { + return new Bound<>(name, NestedValueProvider.of(topic, new TopicTranslator()), + timestampLabel, idLabel, coder); } /** @@ -950,7 +1068,7 @@ public class PubsubIO { case UNBOUNDED: return input.apply(new PubsubUnboundedSink<T>( FACTORY, - PubsubClient.topicPathFromName(topic.project, topic.topic), + NestedValueProvider.of(topic, new TopicPathTranslator()), coder, timestampLabel, idLabel, @@ -962,7 +1080,9 @@ public class PubsubIO { @Override public void populateDisplayData(DisplayData.Builder builder) { super.populateDisplayData(builder); - populateCommonDisplayData(builder, timestampLabel, idLabel, topic); + String topicString = topic.isAccessible() + ? topic.get().asPath() : topic.toString(); + populateCommonDisplayData(builder, timestampLabel, idLabel, topicString); } @Override @@ -971,6 +1091,10 @@ public class PubsubIO { } public PubsubTopic getTopic() { + return topic.get(); + } + + public ValueProvider<PubsubTopic> getTopicProvider() { return topic; } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f9225981/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSink.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSink.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSink.java index 179abf6..1e369c8 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSink.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSink.java @@ -40,6 +40,7 @@ import org.apache.beam.sdk.coders.NullableCoder; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.coders.VarIntCoder; import org.apache.beam.sdk.options.PubsubOptions; +import org.apache.beam.sdk.options.ValueProvider; import org.apache.beam.sdk.transforms.Aggregator; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.GroupByKey; @@ -205,7 +206,7 @@ public class PubsubUnboundedSink<T> extends PTransform<PCollection<T>, PDone> { private static class WriterFn extends DoFn<KV<Integer, Iterable<OutgoingMessage>>, Void> { private final PubsubClientFactory pubsubFactory; - private final TopicPath topic; + private final ValueProvider<TopicPath> topic; private final String timestampLabel; private final String idLabel; private final int publishBatchSize; @@ -225,8 +226,8 @@ public class PubsubUnboundedSink<T> extends PTransform<PCollection<T>, PDone> { createAggregator("bytes", new Sum.SumLongFn()); WriterFn( - PubsubClientFactory pubsubFactory, TopicPath topic, String timestampLabel, - String idLabel, int publishBatchSize, int publishBatchBytes) { + PubsubClientFactory pubsubFactory, ValueProvider<TopicPath> topic, + String timestampLabel, String idLabel, int publishBatchSize, int publishBatchBytes) { this.pubsubFactory = pubsubFactory; this.topic = topic; this.timestampLabel = timestampLabel; @@ -241,7 +242,7 @@ public class PubsubUnboundedSink<T> extends PTransform<PCollection<T>, PDone> { */ private void publishBatch(List<OutgoingMessage> messages, int bytes) throws IOException { - int n = pubsubClient.publish(topic, messages); + int n = pubsubClient.publish(topic.get(), messages); checkState(n == messages.size(), "Attempted to publish %s messages but %s were successful", messages.size(), n); batchCounter.addValue(1L); @@ -290,7 +291,11 @@ public class PubsubUnboundedSink<T> extends PTransform<PCollection<T>, PDone> { @Override public void populateDisplayData(Builder builder) { super.populateDisplayData(builder); - builder.add(DisplayData.item("topic", topic.getPath())); + String topicString = + topic == null ? null + : topic.isAccessible() ? topic.get().getPath() + : topic.toString(); + builder.add(DisplayData.item("topic", topicString)); builder.add(DisplayData.item("transport", pubsubFactory.getKind())); builder.addIfNotNull(DisplayData.item("timestampLabel", timestampLabel)); builder.addIfNotNull(DisplayData.item("idLabel", idLabel)); @@ -309,7 +314,7 @@ public class PubsubUnboundedSink<T> extends PTransform<PCollection<T>, PDone> { /** * Pubsub topic to publish to. */ - private final TopicPath topic; + private final ValueProvider<TopicPath> topic; /** * Coder for elements. It is the responsibility of the underlying Pubsub transport to @@ -363,7 +368,7 @@ public class PubsubUnboundedSink<T> extends PTransform<PCollection<T>, PDone> { @VisibleForTesting PubsubUnboundedSink( PubsubClientFactory pubsubFactory, - TopicPath topic, + ValueProvider<TopicPath> topic, Coder<T> elementCoder, String timestampLabel, String idLabel, @@ -386,7 +391,7 @@ public class PubsubUnboundedSink<T> extends PTransform<PCollection<T>, PDone> { public PubsubUnboundedSink( PubsubClientFactory pubsubFactory, - TopicPath topic, + ValueProvider<TopicPath> topic, Coder<T> elementCoder, String timestampLabel, String idLabel, @@ -397,7 +402,7 @@ public class PubsubUnboundedSink<T> extends PTransform<PCollection<T>, PDone> { } public TopicPath getTopic() { - return topic; + return topic.get(); } @Nullable http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f9225981/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSource.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSource.java index bfacb71..4ec8389 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSource.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSource.java @@ -52,6 +52,7 @@ import org.apache.beam.sdk.coders.NullableCoder; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PubsubOptions; +import org.apache.beam.sdk.options.ValueProvider; import org.apache.beam.sdk.transforms.Aggregator; import org.apache.beam.sdk.transforms.Combine; import org.apache.beam.sdk.transforms.DoFn; @@ -1161,7 +1162,7 @@ public class PubsubUnboundedSource<T> extends PTransform<PBegin, PCollection<T>> createAggregator("elements", new Sum.SumLongFn()); private final PubsubClientFactory pubsubFactory; - private final SubscriptionPath subscription; + private final ValueProvider<SubscriptionPath> subscription; @Nullable private final String timestampLabel; @Nullable @@ -1169,7 +1170,7 @@ public class PubsubUnboundedSource<T> extends PTransform<PBegin, PCollection<T>> public StatsFn( PubsubClientFactory pubsubFactory, - SubscriptionPath subscription, + ValueProvider<SubscriptionPath> subscription, @Nullable String timestampLabel, @Nullable @@ -1189,7 +1190,11 @@ public class PubsubUnboundedSource<T> extends PTransform<PBegin, PCollection<T>> @Override public void populateDisplayData(Builder builder) { super.populateDisplayData(builder); - builder.add(DisplayData.item("subscription", subscription.getPath())); + String subscriptionString = + subscription == null ? null + : subscription.isAccessible() ? subscription.get().getPath() + : subscription.toString(); + builder.add(DisplayData.item("subscription", subscriptionString)); builder.add(DisplayData.item("transport", pubsubFactory.getKind())); builder.addIfNotNull(DisplayData.item("timestampLabel", timestampLabel)); builder.addIfNotNull(DisplayData.item("idLabel", idLabel)); @@ -1215,14 +1220,14 @@ public class PubsubUnboundedSource<T> extends PTransform<PBegin, PCollection<T>> * Project under which to create a subscription if only the {@link #topic} was given. */ @Nullable - private final ProjectPath project; + private final ValueProvider<ProjectPath> project; /** * Topic to read from. If {@literal null}, then {@link #subscription} must be given. * Otherwise {@link #subscription} must be null. */ @Nullable - private final TopicPath topic; + private final ValueProvider<TopicPath> topic; /** * Subscription to read from. If {@literal null} then {@link #topic} must be given. @@ -1233,7 +1238,7 @@ public class PubsubUnboundedSource<T> extends PTransform<PBegin, PCollection<T>> * subscription is never deleted. */ @Nullable - private SubscriptionPath subscription; + private ValueProvider<SubscriptionPath> subscription; /** * Coder for elements. Elements are effectively double-encoded: first to a byte array @@ -1260,9 +1265,9 @@ public class PubsubUnboundedSource<T> extends PTransform<PBegin, PCollection<T>> PubsubUnboundedSource( Clock clock, PubsubClientFactory pubsubFactory, - @Nullable ProjectPath project, - @Nullable TopicPath topic, - @Nullable SubscriptionPath subscription, + @Nullable ValueProvider<ProjectPath> project, + @Nullable ValueProvider<TopicPath> topic, + @Nullable ValueProvider<SubscriptionPath> subscription, Coder<T> elementCoder, @Nullable String timestampLabel, @Nullable String idLabel) { @@ -1285,9 +1290,9 @@ public class PubsubUnboundedSource<T> extends PTransform<PBegin, PCollection<T>> */ public PubsubUnboundedSource( PubsubClientFactory pubsubFactory, - @Nullable ProjectPath project, - @Nullable TopicPath topic, - @Nullable SubscriptionPath subscription, + @Nullable ValueProvider<ProjectPath> project, + @Nullable ValueProvider<TopicPath> topic, + @Nullable ValueProvider<SubscriptionPath> subscription, Coder<T> elementCoder, @Nullable String timestampLabel, @Nullable String idLabel) { @@ -1300,17 +1305,17 @@ public class PubsubUnboundedSource<T> extends PTransform<PBegin, PCollection<T>> @Nullable public ProjectPath getProject() { - return project; + return project == null ? null : project.get(); } @Nullable public TopicPath getTopic() { - return topic; + return topic == null ? null : topic.get(); } @Nullable public SubscriptionPath getSubscription() { - return subscription; + return subscription == null ? null : subscription.get(); } @Nullable @@ -1335,8 +1340,11 @@ public class PubsubUnboundedSource<T> extends PTransform<PBegin, PCollection<T>> try { try (PubsubClient pubsubClient = pubsubFactory.newClient(timestampLabel, idLabel, options.as(PubsubOptions.class))) { + checkState(project.isAccessible(), "createRandomSubscription must be called at runtime."); + checkState(topic.isAccessible(), "createRandomSubscription must be called at runtime."); SubscriptionPath subscriptionPath = - pubsubClient.createRandomSubscription(project, topic, DEAULT_ACK_TIMEOUT_SEC); + pubsubClient.createRandomSubscription( + project.get(), topic.get(), DEAULT_ACK_TIMEOUT_SEC); LOG.warn( "Created subscription {} to topic {}." + " Note this subscription WILL NOT be deleted when the pipeline terminates", http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f9225981/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubIOTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubIOTest.java index 086b726..b73afb2 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubIOTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubIOTest.java @@ -20,9 +20,12 @@ package org.apache.beam.sdk.io; import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem; import static org.hamcrest.Matchers.hasItem; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertThat; import java.util.Set; +import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider; import org.apache.beam.sdk.testing.RunnableOnService; import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.transforms.display.DisplayDataEvaluator; @@ -65,29 +68,13 @@ public class PubsubIOTest { } @Test - public void testTopicValidationBadCharacter() throws Exception { - thrown.expect(IllegalArgumentException.class); - PubsubIO.Read.topic("projects/my-project/topics/abc-*-abc"); - } - - @Test - public void testTopicValidationTooLong() throws Exception { - thrown.expect(IllegalArgumentException.class); - PubsubIO.Read.topic(new StringBuilder().append("projects/my-project/topics/A-really-long-one-") - .append("111111111111111111111111111111111111111111111111111111111111111111111111111111111") - .append("111111111111111111111111111111111111111111111111111111111111111111111111111111111") - .append("1111111111111111111111111111111111111111111111111111111111111111111111111111") - .toString()); - } - - @Test public void testReadDisplayData() { String topic = "projects/project/topics/topic"; String subscription = "projects/project/subscriptions/subscription"; Duration maxReadTime = Duration.standardMinutes(5); PubsubIO.Read.Bound<String> read = PubsubIO.Read - .topic(topic) - .subscription(subscription) + .topic(StaticValueProvider.of(topic)) + .subscription(StaticValueProvider.of(subscription)) .timestampLabel("myTimestamp") .idLabel("myId") .maxNumRecords(1234) @@ -104,6 +91,26 @@ public class PubsubIOTest { } @Test + public void testNullTopic() { + String subscription = "projects/project/subscriptions/subscription"; + PubsubIO.Read.Bound<String> read = PubsubIO.Read + .subscription(StaticValueProvider.of(subscription)); + assertNull(read.getTopic()); + assertNotNull(read.getSubscription()); + assertNotNull(DisplayData.from(read)); + } + + @Test + public void testNullSubscription() { + String topic = "projects/project/topics/topic"; + PubsubIO.Read.Bound<String> read = PubsubIO.Read + .topic(StaticValueProvider.of(topic)); + assertNotNull(read.getTopic()); + assertNull(read.getSubscription()); + assertNotNull(DisplayData.from(read)); + } + + @Test @Category(RunnableOnService.class) public void testPrimitiveReadDisplayData() { DisplayDataEvaluator evaluator = DisplayDataEvaluator.create(); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f9225981/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubUnboundedSinkTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubUnboundedSinkTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubUnboundedSinkTest.java index 4edd9c1..518136f 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubUnboundedSinkTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubUnboundedSinkTest.java @@ -25,6 +25,7 @@ import java.util.ArrayList; import java.util.List; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.io.PubsubUnboundedSink.RecordIdMethod; +import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider; import org.apache.beam.sdk.testing.CoderProperties; import org.apache.beam.sdk.testing.NeedsRunner; import org.apache.beam.sdk.testing.TestPipeline; @@ -84,9 +85,9 @@ public class PubsubUnboundedSinkTest { PubsubTestClient.createFactoryForPublish(TOPIC, outgoing, ImmutableList.<OutgoingMessage>of())) { PubsubUnboundedSink<String> sink = - new PubsubUnboundedSink<>(factory, TOPIC, StringUtf8Coder.of(), TIMESTAMP_LABEL, ID_LABEL, - NUM_SHARDS, batchSize, batchBytes, Duration.standardSeconds(2), - RecordIdMethod.DETERMINISTIC); + new PubsubUnboundedSink<>(factory, StaticValueProvider.of(TOPIC), StringUtf8Coder.of(), + TIMESTAMP_LABEL, ID_LABEL, NUM_SHARDS, batchSize, batchBytes, + Duration.standardSeconds(2), RecordIdMethod.DETERMINISTIC); TestPipeline p = TestPipeline.create(); p.apply(Create.of(ImmutableList.of(DATA))) .apply(ParDo.of(new Stamp())) @@ -113,9 +114,9 @@ public class PubsubUnboundedSinkTest { PubsubTestClient.createFactoryForPublish(TOPIC, outgoing, ImmutableList.<OutgoingMessage>of())) { PubsubUnboundedSink<String> sink = - new PubsubUnboundedSink<>(factory, TOPIC, StringUtf8Coder.of(), TIMESTAMP_LABEL, ID_LABEL, - NUM_SHARDS, batchSize, batchBytes, Duration.standardSeconds(2), - RecordIdMethod.DETERMINISTIC); + new PubsubUnboundedSink<>(factory, StaticValueProvider.of(TOPIC), StringUtf8Coder.of(), + TIMESTAMP_LABEL, ID_LABEL, NUM_SHARDS, batchSize, batchBytes, + Duration.standardSeconds(2), RecordIdMethod.DETERMINISTIC); TestPipeline p = TestPipeline.create(); p.apply(Create.of(data)) .apply(ParDo.of(new Stamp())) @@ -148,9 +149,10 @@ public class PubsubUnboundedSinkTest { PubsubTestClient.createFactoryForPublish(TOPIC, outgoing, ImmutableList.<OutgoingMessage>of())) { PubsubUnboundedSink<String> sink = - new PubsubUnboundedSink<>(factory, TOPIC, StringUtf8Coder.of(), TIMESTAMP_LABEL, ID_LABEL, - NUM_SHARDS, batchSize, batchBytes, Duration.standardSeconds(2), - RecordIdMethod.DETERMINISTIC); + new PubsubUnboundedSink<>(factory, StaticValueProvider.of(TOPIC), + StringUtf8Coder.of(), TIMESTAMP_LABEL, ID_LABEL, + NUM_SHARDS, batchSize, batchBytes, Duration.standardSeconds(2), + RecordIdMethod.DETERMINISTIC); TestPipeline p = TestPipeline.create(); p.apply(Create.of(data)) .apply(ParDo.of(new Stamp())) http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f9225981/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubUnboundedSourceTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubUnboundedSourceTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubUnboundedSourceTest.java index bbc6c12..f6165c5 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubUnboundedSourceTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubUnboundedSourceTest.java @@ -45,6 +45,7 @@ import org.apache.beam.sdk.io.PubsubUnboundedSource.PubsubReader; import org.apache.beam.sdk.io.PubsubUnboundedSource.PubsubSource; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider; import org.apache.beam.sdk.testing.CoderProperties; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.util.CoderUtils; @@ -91,8 +92,9 @@ public class PubsubUnboundedSourceTest { }; factory = PubsubTestClient.createFactoryForPull(clock, SUBSCRIPTION, ACK_TIMEOUT_S, incoming); PubsubUnboundedSource<String> source = - new PubsubUnboundedSource<>(clock, factory, null, null, SUBSCRIPTION, StringUtf8Coder.of(), - TIMESTAMP_LABEL, ID_LABEL); + new PubsubUnboundedSource<>( + clock, factory, null, null, StaticValueProvider.of(SUBSCRIPTION), + StringUtf8Coder.of(), TIMESTAMP_LABEL, ID_LABEL); primSource = new PubsubSource<>(source); } @@ -332,8 +334,8 @@ public class PubsubUnboundedSourceTest { PubsubUnboundedSource<String> source = new PubsubUnboundedSource<>( factory, - PubsubClient.projectPathFromId("my_project"), - topicPath, + StaticValueProvider.of(PubsubClient.projectPathFromId("my_project")), + StaticValueProvider.of(topicPath), null, StringUtf8Coder.of(), null, @@ -363,8 +365,8 @@ public class PubsubUnboundedSourceTest { PubsubUnboundedSource<String> source = new PubsubUnboundedSource<>( factory, - PubsubClient.projectPathFromId("my_project"), - topicPath, + StaticValueProvider.of(PubsubClient.projectPathFromId("my_project")), + StaticValueProvider.of(topicPath), null, StringUtf8Coder.of(), null,