[
https://issues.apache.org/jira/browse/BEAM-5806?focusedWorklogId=156839&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-156839
]
ASF GitHub Bot logged work on BEAM-5806:
----------------------------------------
Author: ASF GitHub Bot
Created on: 22/Oct/18 09:55
Start Date: 22/Oct/18 09:55
Worklog Time Spent: 10m
Work Description: lhauspie closed pull request #6769: WIP: [BEAM-5806]
Update PubsubIO to be able to change the PubsubClientFactory
URL: https://github.com/apache/beam/pull/6769
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java
index e39b44f95bd..e944c6193b3 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java
@@ -29,6 +29,7 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import javax.annotation.Nullable;
@@ -534,6 +535,9 @@ public String toString() {
@Nullable
abstract ValueProvider<PubsubTopic> getTopicProvider();
+ @Nullable
+ abstract PubsubClient.PubsubClientFactory getClientFactory();
+
@Nullable
abstract ValueProvider<PubsubSubscription> getSubscriptionProvider();
@@ -573,6 +577,8 @@ public String toString() {
abstract Builder<T> setNeedsAttributes(boolean needsAttributes);
+ abstract Builder<T> setClientFactory(PubsubClient.PubsubClientFactory
factory);
+
abstract Read<T> build();
}
@@ -627,6 +633,16 @@ public String toString() {
.build();
}
+ /**
+ * The default client to write to Pub/Sub is the {@link PubsubJsonClient},
created by the {@link
+ * PubsubJsonClient.PubsubJsonClientFactory}. This function allows to
change the Pub/Sub client
+ * by providing another {@link PubsubClient.PubsubClientFactory} like the
{@link
+ * PubsubGrpcClientFactory}.
+ */
+ public Read<T> withClientFactory(PubsubClient.PubsubClientFactory factory)
{
+ return toBuilder().setClientFactory(factory).build();
+ }
+
/**
* When reading from Cloud Pub/Sub where record timestamps are provided as
Pub/Sub message
* attributes, specifies the name of the attribute that contains the
timestamp.
@@ -705,7 +721,7 @@ public String toString() {
: NestedValueProvider.of(getSubscriptionProvider(), new
SubscriptionPathTranslator());
PubsubUnboundedSource source =
new PubsubUnboundedSource(
- FACTORY,
+ Optional.ofNullable(getClientFactory()).orElse(FACTORY),
null /* always get project from runtime PipelineOptions */,
topicPath,
subscriptionPath,
@@ -740,6 +756,9 @@ private PubsubIO() {}
@Nullable
abstract ValueProvider<PubsubTopic> getTopicProvider();
+ @Nullable
+ abstract PubsubClient.PubsubClientFactory getClientFactory();
+
/** the batch size for bulk submissions to pubsub. */
@Nullable
abstract Integer getMaxBatchSize();
@@ -776,6 +795,8 @@ private PubsubIO() {}
abstract Builder<T> setFormatFn(SimpleFunction<T, PubsubMessage>
formatFn);
+ abstract Builder<T> setClientFactory(PubsubClient.PubsubClientFactory
factory);
+
abstract Write<T> build();
}
@@ -796,6 +817,16 @@ private PubsubIO() {}
.build();
}
+ /**
+ * The default client to write to Pub/Sub is the {@link PubsubJsonClient},
created by the {@link
+ * PubsubJsonClient.PubsubJsonClientFactory}. This function allows to
change the Pub/Sub client
+ * by providing another {@link PubsubClient.PubsubClientFactory} like the
{@link
+ * PubsubGrpcClientFactory}.
+ */
+ public Write<T> withClientFactory(PubsubClient.PubsubClientFactory
factory) {
+ return toBuilder().setClientFactory(factory).build();
+ }
+
/**
* Writes to Pub/Sub are batched to efficiently send data. The value of
the attribute will be a
* number representing the number of Pub/Sub messages to queue before
sending off the bulk
@@ -874,7 +905,7 @@ public PDone expand(PCollection<T> input) {
.apply(MapElements.via(getFormatFn()))
.apply(
new PubsubUnboundedSink(
- FACTORY,
+ Optional.ofNullable(getClientFactory()).orElse(FACTORY),
NestedValueProvider.of(getTopicProvider(), new
TopicPathTranslator()),
getTimestampAttribute(),
getIdAttribute(),
@@ -924,8 +955,10 @@ public void startBundle(StartBundleContext c) throws
IOException {
// NOTE: idAttribute is ignored.
this.pubsubClient =
- FACTORY.newClient(
- getTimestampAttribute(), null,
c.getPipelineOptions().as(PubsubOptions.class));
+ Optional.ofNullable(getClientFactory())
+ .orElse(FACTORY)
+ .newClient(
+ getTimestampAttribute(), null,
c.getPipelineOptions().as(PubsubOptions.class));
}
@ProcessElement
diff --git
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOTest.java
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOTest.java
index 48a6a7eb957..c58311d9452 100644
---
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOTest.java
+++
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOTest.java
@@ -220,6 +220,23 @@ public void testPrimitiveReadDisplayData() {
hasItem(hasDisplayItem("topic")));
}
+ @Test
+ public void testReadWithPubsubGrpcClientFactory() {
+ String topic = "projects/project/topics/topic";
+ PubsubIO.Read<String> read =
+ PubsubIO.readStrings()
+ .fromTopic(StaticValueProvider.of(topic))
+ .withClientFactory(PubsubGrpcClient.FACTORY)
+ .withTimestampAttribute("myTimestamp")
+ .withIdAttribute("myId");
+
+ DisplayData displayData = DisplayData.from(read);
+
+ assertThat(displayData, hasDisplayItem("topic", topic));
+ assertThat(displayData, hasDisplayItem("timestampAttribute",
"myTimestamp"));
+ assertThat(displayData, hasDisplayItem("idAttribute", "myId"));
+ }
+
@Test
public void testWriteDisplayData() {
String topic = "projects/project/topics/topic";
@@ -248,4 +265,22 @@ public void testPrimitiveWriteDisplayData() {
displayData,
hasItem(hasDisplayItem("topic")));
}
+
+
+ @Test
+ public void testWriteWithPubsubGrpcClientFactory() {
+ String topic = "projects/project/topics/topic";
+ PubsubIO.Write<?> write =
+ PubsubIO.writeStrings()
+ .to(topic)
+ .withClientFactory(PubsubGrpcClient.FACTORY)
+ .withTimestampAttribute("myTimestamp")
+ .withIdAttribute("myId");
+
+ DisplayData displayData = DisplayData.from(write);
+
+ assertThat(displayData, hasDisplayItem("topic", topic));
+ assertThat(displayData, hasDisplayItem("timestampAttribute",
"myTimestamp"));
+ assertThat(displayData, hasDisplayItem("idAttribute", "myId"));
+ }
}
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 156839)
Time Spent: 1h (was: 50m)
Remaining Estimate: 23h (was: 23h 10m)
> Allow to change the PubsubClientFactory when using PubsubIO
> -----------------------------------------------------------
>
> Key: BEAM-5806
> URL: https://issues.apache.org/jira/browse/BEAM-5806
> Project: Beam
> Issue Type: New Feature
> Components: io-java-gcp
> Reporter: Logan HAUSPIE
> Assignee: Chamikara Jayalath
> Priority: Minor
> Original Estimate: 24h
> Time Spent: 1h
> Remaining Estimate: 23h
>
> By using the PubsubIO to read from or write to Pub/Sub we are obliged to use
> the PubsubJsonClient to interact with the Pub/Sub API.
> This PubsubJsonClient encode the message in base 64 and increase the size of
> this one by 30% and there is no way to change the PubsubClient used by
> PubsubIO.
>
> What I suggest is to allow developper to change the PubsubClientFactory by
> specifying it at the definition-time like the following:
> {{^PubsubIO.Read<String> read = PubsubIO.readStrings()
> .fromTopic(StaticValueProvider.of(topic))^}}
> .withTimestampAttribute("myTimestamp")^}}
> .withIdAttribute("myId")^}}
> *.withClientFactory(PubsubGrpcClient.FACTORY)*;^}}
>
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)