This is an automated email from the ASF dual-hosted git repository.
scwhittle pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 88af35e6df4 PubsubIO: Add
readMessagesWithAttributesWithCoderAndParseFn (#31206)
88af35e6df4 is described below
commit 88af35e6df4de91a05b6e637ce840742700187c3
Author: Maja Kontrec Rönn <[email protected]>
AuthorDate: Tue May 21 12:22:31 2024 +0200
PubsubIO: Add readMessagesWithAttributesWithCoderAndParseFn (#31206)
---
.../apache/beam/sdk/io/gcp/pubsub/PubsubIO.java | 11 ++++++
.../beam/sdk/io/gcp/pubsub/PubsubIOTest.java | 43 ++++++++++++++++++++++
2 files changed, 54 insertions(+)
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java
index 1d687812560..01848d92d92 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java
@@ -671,6 +671,17 @@ public class PubsubIO {
return Read.newBuilder(parseFn).setCoder(coder).build();
}
+ /**
+ * Returns A {@link PTransform} that continuously reads from a Google Cloud
Pub/Sub stream,
+ * mapping each {@link PubsubMessage}, with attributes, into type T using
the supplied parse
+ * function and coder. Similar to {@link
#readMessagesWithCoderAndParseFn(Coder, SimpleFunction)},
+ * but with the with addition of making the message attributes available to
the ParseFn.
+ */
+ public static <T> Read<T> readMessagesWithAttributesWithCoderAndParseFn(
+ Coder<T> coder, SimpleFunction<PubsubMessage, T> parseFn) {
+ return
Read.newBuilder(parseFn).setCoder(coder).setNeedsAttributes(true).build();
+ }
+
/**
* Returns a {@link PTransform} that continuously reads binary encoded Avro
messages into the Avro
* {@link GenericRecord} type.
diff --git
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOTest.java
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOTest.java
index abc35d0bb1b..fe6338a501c 100644
---
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOTest.java
+++
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOTest.java
@@ -780,6 +780,49 @@ public class PubsubIOTest {
pipeline.run();
}
+ static class AppendSuffixAttributeToStringPayloadParseFn
+ extends SimpleFunction<PubsubMessage, String> {
+ @Override
+ public String apply(PubsubMessage input) {
+ String payload = new String(input.getPayload(), StandardCharsets.UTF_8);
+ String suffixAttribute = input.getAttributeMap().get("suffix");
+ return payload + suffixAttribute;
+ }
+ }
+
+ private IncomingMessage messageWithSuffixAttribute(String payload, String
suffix) {
+ return IncomingMessage.of(
+ com.google.pubsub.v1.PubsubMessage.newBuilder()
+ .setData(ByteString.copyFromUtf8(payload))
+ .putAttributes("suffix", suffix)
+ .build(),
+ 1234L,
+ 0,
+ UUID.randomUUID().toString(),
+ UUID.randomUUID().toString());
+ }
+
+ @Test
+ public void testReadMessagesWithAttributesWithCoderAndParseFn() {
+ ImmutableList<IncomingMessage> inputs =
+ ImmutableList.of(
+ messageWithSuffixAttribute("foo", "-some-suffix"),
+ messageWithSuffixAttribute("bar", "-some-other-suffix"));
+ clientFactory = PubsubTestClient.createFactoryForPull(CLOCK, SUBSCRIPTION,
60, inputs);
+
+ PCollection<String> read =
+ pipeline.apply(
+ PubsubIO.readMessagesWithAttributesWithCoderAndParseFn(
+ StringUtf8Coder.of(), new
AppendSuffixAttributeToStringPayloadParseFn())
+ .fromSubscription(SUBSCRIPTION.getPath())
+ .withClock(CLOCK)
+ .withClientFactory(clientFactory));
+
+ List<String> outputs = ImmutableList.of("foo-some-suffix",
"bar-some-other-suffix");
+ PAssert.that(read).containsInAnyOrder(outputs);
+ pipeline.run();
+ }
+
@Test
public void testDynamicTopicsBounded() throws IOException {
testDynamicTopics(true);