[ 
https://issues.apache.org/jira/browse/BEAM-12297?focusedWorklogId=609230&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-609230
 ]

ASF GitHub Bot logged work on BEAM-12297:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 09/Jun/21 16:33
            Start Date: 09/Jun/21 16:33
    Worklog Time Spent: 10m 
      Work Description: zhoufek commented on a change in pull request #14971:
URL: https://github.com/apache/beam/pull/14971#discussion_r648482674



##########
File path: 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java
##########
@@ -483,6 +488,54 @@ public String toString() {
     return 
Read.newBuilder(parsePayloadUsingCoder(coder)).setCoder(coder).build();
   }
 
+  /**
+   * Returns a {@link PTransform} that continuously reads binary encoded 
protobuf messages for the
+   * type specified by {@code fullMessageName}.
+   *
+   * <p>This is primarily here for cases where the message type cannot be 
known at compile time. If
+   * it can be known, prefer {@link PubsubIO#readProtos(Class)}, as {@link 
DynamicMessage} tends to
+   * perform worse than concrete types.
+   *
+   * <p>Beam will infer a schema for the {@link DynamicMessage} schema. Note 
that some proto schema
+   * features are not supported by all sinks.
+   *
+   * @param domain The {@link ProtoDomain} that contains the target message 
and its dependencies.
+   * @param fullMessageName The full name of the message for lookup in {@code 
domain}.
+   */
+  @Experimental(Kind.SCHEMAS)
+  public static Read<DynamicMessage> readProtoDynamicMessage(
+      ProtoDomain domain, String fullMessageName) {
+    SerializableFunction<PubsubMessage, DynamicMessage> parser =
+        message -> {
+          try {
+            return DynamicMessage.parseFrom(
+                domain.getDescriptor(fullMessageName), message.getPayload());
+          } catch (InvalidProtocolBufferException e) {
+            throw new RuntimeException("Could not parse Pub/Sub message", e);

Review comment:
       Currently, it isn't supported, and the other methods that parse binary 
content for Protos or Avro throw a RuntimeException like here. 
   
   I do think it would be valuable to have something like `withDeadLetter` to 
configure the returned `Read` object, but I think that's for a separate PR/Jira.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 609230)
    Time Spent: 1h 20m  (was: 1h 10m)

> PubsubIO support for reading protos from schema
> -----------------------------------------------
>
>                 Key: BEAM-12297
>                 URL: https://issues.apache.org/jira/browse/BEAM-12297
>             Project: Beam
>          Issue Type: New Feature
>          Components: io-java-gcp
>            Reporter: Zachary Houfek
>            Assignee: Zachary Houfek
>            Priority: P2
>          Time Spent: 1h 20m
>  Remaining Estimate: 0h
>
> We're trying to add a Dataflow template for Pubsub -> BigQuery where the 
> Pubsub topic contains serialized proto data. This is similar to our existing 
> template for Pubsub -> BigQuery using Avro.
> However, it seems the PubsubIO currently only supports protos where the class 
> is known.[1] We'll need something similar to the generic Avro reader[2].
>  
> [1] 
> [https://github.com/apache/beam/blob/243128a8fc52798e1b58b0cf1a271d95ee7aa241/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java#L478]
> [2] 
> [https://github.com/apache/beam/blob/243128a8fc52798e1b58b0cf1a271d95ee7aa241/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java#L515]
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to