[
https://issues.apache.org/jira/browse/BEAM-12297?focusedWorklogId=609210&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-609210
]
ASF GitHub Bot logged work on BEAM-12297:
-----------------------------------------
Author: ASF GitHub Bot
Created on: 09/Jun/21 15:42
Start Date: 09/Jun/21 15:42
Worklog Time Spent: 10m
Work Description: TheNeuralBit commented on a change in pull request
#14971:
URL: https://github.com/apache/beam/pull/14971#discussion_r648441321
##########
File path:
sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOTest.java
##########
@@ -416,6 +423,96 @@ public void after() throws IOException {
}
}
+ @Test
+ public void testProto() {
+ ProtoCoder<Primitive> coder = ProtoCoder.of(Primitive.class);
+ ImmutableList<Primitive> inputs =
+ ImmutableList.of(
+ Primitive.newBuilder().setPrimitiveInt32(42).build(),
+ Primitive.newBuilder().setPrimitiveBool(true).build(),
+ Primitive.newBuilder().setPrimitiveString("Hello,
World!").build());
+ setupTestClient(inputs, coder);
+ PCollection<Primitive> read =
+ readPipeline.apply(
+ PubsubIO.readProtos(Primitive.class)
+ .fromSubscription(SUBSCRIPTION.getPath())
+ .withClock(CLOCK)
+ .withClientFactory(clientFactory));
+ PAssert.that(read).containsInAnyOrder(inputs);
+ readPipeline.run();
+ }
+
+ @Test
+ public void testProtoDynamicMessage() {
+ ProtoCoder<Primitive> coder = ProtoCoder.of(Primitive.class);
+ ImmutableList<Primitive> inputs =
+ ImmutableList.of(
+ Primitive.newBuilder().setPrimitiveInt32(42).build(),
+ Primitive.newBuilder().setPrimitiveBool(true).build(),
+ Primitive.newBuilder().setPrimitiveString("Hello,
World!").build());
+ setupTestClient(inputs, coder);
+
+ ProtoDomain domain = ProtoDomain.buildFrom(Primitive.getDescriptor());
+ String name = Primitive.getDescriptor().getFullName();
+ PCollection<Primitive> read =
+ readPipeline
+ .apply(
+ PubsubIO.readProtoDynamicMessage(domain, name)
+ .fromSubscription(SUBSCRIPTION.getPath())
+ .withClock(CLOCK)
+ .withClientFactory(clientFactory))
+ // DynamicMessage doesn't work well with PAssert, but if the
content can be successfully
+ // converted back into the original Primitive, then that should be
good enough to
+ // consider it a successful read.
+ .apply(
+ "Return To Primitive",
+ MapElements.into(TypeDescriptor.of(Primitive.class))
+ .via(
+ (DynamicMessage message) -> {
Review comment:
`MapElements` does have `exceptionsInto` and
[`exceptionsVia`](https://beam.apache.org/releases/javadoc/2.29.0/org/apache/beam/sdk/transforms/MapElements.html#exceptionsVia-org.apache.beam.sdk.transforms.InferableFunction-)
- would that work here?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 609210)
Time Spent: 1h (was: 50m)
> PubsubIO support for reading protos from schema
> -----------------------------------------------
>
> Key: BEAM-12297
> URL: https://issues.apache.org/jira/browse/BEAM-12297
> Project: Beam
> Issue Type: New Feature
> Components: io-java-gcp
> Reporter: Zachary Houfek
> Assignee: Zachary Houfek
> Priority: P2
> Time Spent: 1h
> Remaining Estimate: 0h
>
> We're trying to add a Dataflow template for Pubsub -> BigQuery where the
> Pubsub topic contains serialized proto data. This is similar to our existing
> template for Pubsub -> BigQuery using Avro.
> However, it seems the PubsubIO currently only supports protos where the class
> is known.[1] We'll need something similar to the generic Avro reader[2].
>
> [1]
> [https://github.com/apache/beam/blob/243128a8fc52798e1b58b0cf1a271d95ee7aa241/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java#L478]
> [2]
> [https://github.com/apache/beam/blob/243128a8fc52798e1b58b0cf1a271d95ee7aa241/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java#L515]
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)