[ 
https://issues.apache.org/jira/browse/BEAM-7738?focusedWorklogId=289231&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-289231
 ]

ASF GitHub Bot logged work on BEAM-7738:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 05/Aug/19 22:21
            Start Date: 05/Aug/19 22:21
    Worklog Time Spent: 10m 
      Work Description: chadrik commented on pull request #9268: [BEAM-7738] 
Add external transform support to PubSubIO
URL: https://github.com/apache/beam/pull/9268#discussion_r310815296
 
 

 ##########
 File path: 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java
 ##########
 @@ -674,6 +680,85 @@ public String toString() {
       abstract Builder<T> setClock(@Nullable Clock clock);
 
       abstract Read<T> build();
+
+      @Override
+      public PTransform<PBegin, PCollection<T>> 
buildExternal(External.Configuration config) {
+        if (config.topic != null) {
+          StaticValueProvider<String> topic = 
StaticValueProvider.of(utf8String(config.topic));
+          setTopicProvider(NestedValueProvider.of(topic, new 
TopicTranslator()));
+        }
+        if (config.subscription != null) {
+          StaticValueProvider<String> subscription =
+              StaticValueProvider.of(utf8String(config.subscription));
+          setSubscriptionProvider(
+              NestedValueProvider.of(subscription, new 
SubscriptionTranslator()));
+        }
+        if (config.idAttribute != null) {
+          String idAttribute = utf8String(config.idAttribute);
+          setIdAttribute(idAttribute);
+        }
+        if (config.timestampAttribute != null) {
+          String timestampAttribute = utf8String(config.timestampAttribute);
+          setTimestampAttribute(timestampAttribute);
+        }
+        setNeedsAttributes(config.needsAttributes);
+        setPubsubClientFactory(FACTORY);
+        if (config.needsAttributes) {
+          SimpleFunction<PubsubMessage, T> parseFn =
+              (SimpleFunction<PubsubMessage, T>) new IdentityMessageFn();
+          setParseFn(parseFn);
+          // FIXME: call setCoder(). need to use PubsubMessage proto to be 
compatible with python
 
 Review comment:
   In the case that an external sdk has requested `needsAttributes`, this 
transform needs to produce `PubsubMessage` instances rather than byte arrays.  
There's a wrinkle here:  python deserializes `PubsubMessage` using protobufs, 
whereas Java encodes with `PubsubMessageWithAttributesCoder`. 
   
   I believe we want to use protobufs from Java.  Can I get confirmation of 
that, please?
   
 
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 289231)
    Time Spent: 20m  (was: 10m)

> Support PubSubIO to be configured externally for use with other SDKs
> --------------------------------------------------------------------
>
>                 Key: BEAM-7738
>                 URL: https://issues.apache.org/jira/browse/BEAM-7738
>             Project: Beam
>          Issue Type: New Feature
>          Components: io-java-gcp, runner-flink, sdk-py-core
>            Reporter: Chad Dombrova
>            Assignee: Chad Dombrova
>            Priority: Major
>              Labels: portability
>          Time Spent: 20m
>  Remaining Estimate: 0h
>
> Now that KafkaIO is supported via the external transform API (BEAM-7029) we 
> should add support for PubSub.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)

Reply via email to