This is an automated email from the ASF dual-hosted git repository.

scwhittle pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 88af35e6df4 PubsubIO: Add 
readMessagesWithAttributesWithCoderAndParseFn (#31206)
88af35e6df4 is described below

commit 88af35e6df4de91a05b6e637ce840742700187c3
Author: Maja Kontrec Rönn <[email protected]>
AuthorDate: Tue May 21 12:22:31 2024 +0200

    PubsubIO: Add readMessagesWithAttributesWithCoderAndParseFn (#31206)
---
 .../apache/beam/sdk/io/gcp/pubsub/PubsubIO.java    | 11 ++++++
 .../beam/sdk/io/gcp/pubsub/PubsubIOTest.java       | 43 ++++++++++++++++++++++
 2 files changed, 54 insertions(+)

diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java
index 1d687812560..01848d92d92 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java
@@ -671,6 +671,17 @@ public class PubsubIO {
     return Read.newBuilder(parseFn).setCoder(coder).build();
   }
 
+  /**
+   * Returns A {@link PTransform} that continuously reads from a Google Cloud 
Pub/Sub stream,
+   * mapping each {@link PubsubMessage}, with attributes, into type T using 
the supplied parse
+   * function and coder. Similar to {@link 
#readMessagesWithCoderAndParseFn(Coder, SimpleFunction)},
+   * but with the with addition of making the message attributes available to 
the ParseFn.
+   */
+  public static <T> Read<T> readMessagesWithAttributesWithCoderAndParseFn(
+      Coder<T> coder, SimpleFunction<PubsubMessage, T> parseFn) {
+    return 
Read.newBuilder(parseFn).setCoder(coder).setNeedsAttributes(true).build();
+  }
+
   /**
    * Returns a {@link PTransform} that continuously reads binary encoded Avro 
messages into the Avro
    * {@link GenericRecord} type.
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOTest.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOTest.java
index abc35d0bb1b..fe6338a501c 100644
--- 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOTest.java
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOTest.java
@@ -780,6 +780,49 @@ public class PubsubIOTest {
     pipeline.run();
   }
 
+  static class AppendSuffixAttributeToStringPayloadParseFn
+      extends SimpleFunction<PubsubMessage, String> {
+    @Override
+    public String apply(PubsubMessage input) {
+      String payload = new String(input.getPayload(), StandardCharsets.UTF_8);
+      String suffixAttribute = input.getAttributeMap().get("suffix");
+      return payload + suffixAttribute;
+    }
+  }
+
+  private IncomingMessage messageWithSuffixAttribute(String payload, String 
suffix) {
+    return IncomingMessage.of(
+        com.google.pubsub.v1.PubsubMessage.newBuilder()
+            .setData(ByteString.copyFromUtf8(payload))
+            .putAttributes("suffix", suffix)
+            .build(),
+        1234L,
+        0,
+        UUID.randomUUID().toString(),
+        UUID.randomUUID().toString());
+  }
+
+  @Test
+  public void testReadMessagesWithAttributesWithCoderAndParseFn() {
+    ImmutableList<IncomingMessage> inputs =
+        ImmutableList.of(
+            messageWithSuffixAttribute("foo", "-some-suffix"),
+            messageWithSuffixAttribute("bar", "-some-other-suffix"));
+    clientFactory = PubsubTestClient.createFactoryForPull(CLOCK, SUBSCRIPTION, 
60, inputs);
+
+    PCollection<String> read =
+        pipeline.apply(
+            PubsubIO.readMessagesWithAttributesWithCoderAndParseFn(
+                    StringUtf8Coder.of(), new 
AppendSuffixAttributeToStringPayloadParseFn())
+                .fromSubscription(SUBSCRIPTION.getPath())
+                .withClock(CLOCK)
+                .withClientFactory(clientFactory));
+
+    List<String> outputs = ImmutableList.of("foo-some-suffix", 
"bar-some-other-suffix");
+    PAssert.that(read).containsInAnyOrder(outputs);
+    pipeline.run();
+  }
+
   @Test
   public void testDynamicTopicsBounded() throws IOException {
     testDynamicTopics(true);

Reply via email to