Repository: beam Updated Branches: refs/heads/master f15b52fa3 -> 338012d14
PubSubIO: fix and improve testing for DisplayData Also adds better type/nullability checking in the code. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/4406414a Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/4406414a Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/4406414a Branch: refs/heads/master Commit: 4406414a52e45213de5521bff4a7599b8aa53a71 Parents: f15b52f Author: Dan Halperin <[email protected]> Authored: Mon Jan 23 10:01:22 2017 -0800 Committer: Dan Halperin <[email protected]> Committed: Mon Jan 23 16:21:23 2017 -0800 ---------------------------------------------------------------------- .../java/org/apache/beam/sdk/io/PubsubIO.java | 28 ++++++++++---- .../beam/sdk/io/PubsubUnboundedSource.java | 32 +++++++++++----- .../org/apache/beam/sdk/io/PubsubIOTest.java | 39 ++++++++++++++++---- 3 files changed, 75 insertions(+), 24 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/4406414a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java index 1471953..806b7da 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java @@ -556,9 +556,10 @@ public class PubsubIO { // Validate. PubsubSubscription.fromPath(subscription.get()); } - return new Read<>(name, - NestedValueProvider.of(subscription, new SubscriptionTranslator()), - topic, timestampLabel, coder, idLabel, maxNumRecords, maxReadTime, parseFn); + return new Read<>( + name, NestedValueProvider.of(subscription, new SubscriptionTranslator()), + null /* reset topic to null */, timestampLabel, coder, idLabel, maxNumRecords, + maxReadTime, parseFn); } /** @@ -584,7 +585,7 @@ public class PubsubIO { // Validate. PubsubTopic.fromPath(topic.get()); } - return new Read<>(name, subscription, + return new Read<>(name, null /* reset subscription to null */, NestedValueProvider.of(topic, new TopicTranslator()), timestampLabel, coder, idLabel, maxNumRecords, maxReadTime, parseFn); } @@ -757,6 +758,7 @@ public class PubsubIO { /** * Get the topic being read from. */ + @Nullable public PubsubTopic getTopic() { return topic == null ? null : topic.get(); } @@ -771,6 +773,7 @@ public class PubsubIO { /** * Get the subscription being read from. */ + @Nullable public PubsubSubscription getSubscription() { return subscription == null ? null : subscription.get(); } @@ -785,6 +788,7 @@ public class PubsubIO { /** * Get the timestamp label. */ + @Nullable public String getTimestampLabel() { return timestampLabel; } @@ -792,6 +796,7 @@ public class PubsubIO { /** * Get the id label. */ + @Nullable public String getIdLabel() { return idLabel; } @@ -800,6 +805,7 @@ public class PubsubIO { /** * Get the {@link Coder} used for the transform's output. */ + @Nullable public Coder<T> getCoder() { return coder; } @@ -814,6 +820,7 @@ public class PubsubIO { /** * Get the maximum read time. */ + @Nullable public Duration getMaxReadTime() { return maxReadTime; } @@ -821,6 +828,7 @@ public class PubsubIO { /** * Get the parse function used for PubSub attributes. */ + @Nullable public SimpleFunction<PubsubMessage, T> getPubSubMessageParseFn() { return parseFn; } @@ -1074,15 +1082,17 @@ public class PubsubIO { } /** - * Returns the PubSub topic being read from. + * Returns the PubSub topic being written to. */ + @Nullable public PubsubTopic getTopic() { - return topic.get(); + return (topic == null) ? null : topic.get(); } /** - * Returns the {@link ValueProvider} for the topic being read from. + * Returns the {@link ValueProvider} for the topic being written to. */ + @Nullable public ValueProvider<PubsubTopic> getTopicProvider() { return topic; } @@ -1090,6 +1100,7 @@ public class PubsubIO { /** * Returns the timestamp label. */ + @Nullable public String getTimestampLabel() { return timestampLabel; } @@ -1097,6 +1108,7 @@ public class PubsubIO { /** * Returns the id label. */ + @Nullable public String getIdLabel() { return idLabel; } @@ -1104,6 +1116,7 @@ public class PubsubIO { /** * Returns the output coder. */ + @Nullable public Coder<T> getCoder() { return coder; } @@ -1111,6 +1124,7 @@ public class PubsubIO { /** * Returns the formatting function used if publishing attributes. */ + @Nullable public SimpleFunction<T, PubsubMessage> getFormatFn() { return formatFn; } http://git-wip-us.apache.org/repos/asf/beam/blob/4406414a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSource.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSource.java index c1f8720..6c8a788 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSource.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSource.java @@ -1173,21 +1173,25 @@ public class PubsubUnboundedSource<T> extends PTransform<PBegin, PCollection<T>> createAggregator("elements", Sum.ofLongs()); private final PubsubClientFactory pubsubFactory; + @Nullable private final ValueProvider<SubscriptionPath> subscription; @Nullable + private final ValueProvider<TopicPath> topic; + @Nullable private final String timestampLabel; @Nullable private final String idLabel; public StatsFn( PubsubClientFactory pubsubFactory, - ValueProvider<SubscriptionPath> subscription, - @Nullable - String timestampLabel, - @Nullable - String idLabel) { + @Nullable ValueProvider<SubscriptionPath> subscription, + @Nullable ValueProvider<TopicPath> topic, + @Nullable String timestampLabel, + @Nullable String idLabel) { + checkArgument(pubsubFactory != null, "pubsubFactory should not be null"); this.pubsubFactory = pubsubFactory; this.subscription = subscription; + this.topic = topic; this.timestampLabel = timestampLabel; this.idLabel = idLabel; } @@ -1201,11 +1205,18 @@ public class PubsubUnboundedSource<T> extends PTransform<PBegin, PCollection<T>> @Override public void populateDisplayData(Builder builder) { super.populateDisplayData(builder); - String subscriptionString = - subscription == null ? null - : subscription.isAccessible() ? subscription.get().getPath() + if (subscription != null) { + String subscriptionString = subscription.isAccessible() + ? subscription.get().getPath() : subscription.toString(); - builder.add(DisplayData.item("subscription", subscriptionString)); + builder.add(DisplayData.item("subscription", subscriptionString)); + } + if (topic != null) { + String topicString = topic.isAccessible() + ? topic.get().getPath() + : topic.toString(); + builder.add(DisplayData.item("topic", topicString)); + } builder.add(DisplayData.item("transport", pubsubFactory.getKind())); builder.addIfNotNull(DisplayData.item("timestampLabel", timestampLabel)); builder.addIfNotNull(DisplayData.item("idLabel", idLabel)); @@ -1397,7 +1408,8 @@ public class PubsubUnboundedSource<T> extends PTransform<PBegin, PCollection<T>> return input.getPipeline().begin() .apply(Read.from(new PubsubSource<T>(this))) .apply("PubsubUnboundedSource.Stats", - ParDo.of(new StatsFn<T>(pubsubFactory, subscription, timestampLabel, idLabel))); + ParDo.of(new StatsFn<T>( + pubsubFactory, subscription, topic, timestampLabel, idLabel))); } private SubscriptionPath createRandomSubscription(PipelineOptions options) { http://git-wip-us.apache.org/repos/asf/beam/blob/4406414a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubIOTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubIOTest.java index e15562e..a0d58ea 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubIOTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubIOTest.java @@ -88,13 +88,12 @@ public class PubsubIOTest { } @Test - public void testReadDisplayData() { + public void testReadTopicDisplayData() { String topic = "projects/project/topics/topic"; String subscription = "projects/project/subscriptions/subscription"; Duration maxReadTime = Duration.standardMinutes(5); PubsubIO.Read<String> read = PubsubIO.<String>read() .topic(StaticValueProvider.of(topic)) - .subscription(StaticValueProvider.of(subscription)) .timestampLabel("myTimestamp") .idLabel("myId") .maxNumRecords(1234) @@ -103,6 +102,26 @@ public class PubsubIOTest { DisplayData displayData = DisplayData.from(read); assertThat(displayData, hasDisplayItem("topic", topic)); + assertThat(displayData, hasDisplayItem("timestampLabel", "myTimestamp")); + assertThat(displayData, hasDisplayItem("idLabel", "myId")); + assertThat(displayData, hasDisplayItem("maxNumRecords", 1234)); + assertThat(displayData, hasDisplayItem("maxReadTime", maxReadTime)); + } + + @Test + public void testReadSubscriptionDisplayData() { + String topic = "projects/project/topics/topic"; + String subscription = "projects/project/subscriptions/subscription"; + Duration maxReadTime = Duration.standardMinutes(5); + PubsubIO.Read<String> read = PubsubIO.<String>read() + .subscription(StaticValueProvider.of(subscription)) + .timestampLabel("myTimestamp") + .idLabel("myId") + .maxNumRecords(1234) + .maxReadTime(maxReadTime); + + DisplayData displayData = DisplayData.from(read); + assertThat(displayData, hasDisplayItem("subscription", subscription)); assertThat(displayData, hasDisplayItem("timestampLabel", "myTimestamp")); assertThat(displayData, hasDisplayItem("idLabel", "myId")); @@ -134,14 +153,20 @@ public class PubsubIOTest { @Category(RunnableOnService.class) public void testPrimitiveReadDisplayData() { DisplayDataEvaluator evaluator = DisplayDataEvaluator.create(); - PubsubIO.Read<String> read = - PubsubIO.<String>read().subscription("projects/project/subscriptions/subscription") - .maxNumRecords(1) - .withCoder(StringUtf8Coder.of()); + Set<DisplayData> displayData; + PubsubIO.Read<String> read = PubsubIO.<String>read().withCoder(StringUtf8Coder.of()); - Set<DisplayData> displayData = evaluator.displayDataForPrimitiveSourceTransforms(read); + // Reading from a subscription. + read = read.subscription("projects/project/subscriptions/subscription"); + displayData = evaluator.displayDataForPrimitiveSourceTransforms(read); assertThat("PubsubIO.Read should include the subscription in its primitive display data", displayData, hasItem(hasDisplayItem("subscription"))); + + // Reading from a topic. + read = read.topic("projects/project/topics/topic"); + displayData = evaluator.displayDataForPrimitiveSourceTransforms(read); + assertThat("PubsubIO.Read should include the topic in its primitive display data", + displayData, hasItem(hasDisplayItem("topic"))); } @Test
