[ 
https://issues.apache.org/jira/browse/BEAM-7738?focusedWorklogId=332151&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-332151
 ]

ASF GitHub Bot logged work on BEAM-7738:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 22/Oct/19 18:54
            Start Date: 22/Oct/19 18:54
    Worklog Time Spent: 10m 
      Work Description: chadrik commented on issue #9268: [BEAM-7738] Add 
external transform support to PubsubIO
URL: https://github.com/apache/beam/pull/9268#issuecomment-545103766
 
 
   > Are you thinking you'd use beam:coder:row:v1 as the interface for the 
external transform, and the Java ExternalTransform implementations would handle 
the conversion of Row to/from PubsubMessage? 
   
   There are two places that I see  beam:coder:row:v1 being useful:
   
   1. as a way to declare the construction interface of an external transform, 
and encode its values.  A schema coder would replace the `configuration` 
mapping in `pipeline.ExternalConfiguration.ExternalConfigurationPayload` proto.
   2. as a coder for structured elements that are exchanged between sdks
   
   Right now I'm mostly thinking about the latter, which is when 
`PubsubMessage` comes into play.
   
   > There's no trivial way to register a converter between Row and 
PubsubMessage since the latter isn't structured, 
   
   Maybe I'm thinking about this wrong, but I think the `PubsubMessage` _is_ 
structured:
   
   ```java
   public class PubsubMessage {
   
     private byte[] message;
     private Map<String, String> attributes;
     private String messageId;
   
     /** Returns the main PubSub message. */
     public byte[] getPayload() {
       return message;
     }
   
     /** Returns the full map of attributes. This is an unmodifiable map. */
     public Map<String, String> getAttributeMap() {
       return attributes;
     }
   
     /** Returns the messageId of the message populated by Cloud Pub/Sub. */
     @Nullable
     public String getMessageId() {
       return messageId;
     }
   ```
   
   I'm not a Java expert by any means, but this seems like a type that would 
work with AutoValue, we just need to rename `message` to `payload` and 
`attributes` to `attributeMap`.
   
   What are the requirements for registering a Row converter?
   
   > but of course on the Java side we could have code to serialize the Row to 
a variety of formats to put in the PubsubMessage payload: Avro, JSON, or the 
row serialization format itself (although I'm not sure we'd want to encourage 
using that outside of Beam), would be pretty simple to add. 
   
   I think the payload is not a concern when it comes to portability of 
external transforms:  it gets encoded/decoded by another transform, not 
PubsubRead/Write.  We can just assume that's a byte array.
   
   My grasp on the Java side is a bit tenuous, so I'd like for @mxm to confirm 
or deny what I've written here.
   
   
 
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 332151)
    Time Spent: 7h 10m  (was: 7h)

> Support PubSubIO to be configured externally for use with other SDKs
> --------------------------------------------------------------------
>
>                 Key: BEAM-7738
>                 URL: https://issues.apache.org/jira/browse/BEAM-7738
>             Project: Beam
>          Issue Type: New Feature
>          Components: io-java-gcp, runner-flink, sdk-py-core
>            Reporter: Chad Dombrova
>            Assignee: Chad Dombrova
>            Priority: Major
>              Labels: portability
>          Time Spent: 7h 10m
>  Remaining Estimate: 0h
>
> Now that KafkaIO is supported via the external transform API (BEAM-7029) we 
> should add support for PubSub.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to