This is an automated email from the ASF dual-hosted git repository.

scwhittle pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 4a7f19d9973 [Dataflow Streaming] Improve exception handling in 
PubsubSink to reset stream to prevent precondition exception on reuse (#34418)
4a7f19d9973 is described below

commit 4a7f19d997395264aaed32adde544fa340ce6124
Author: Sam Whittle <[email protected]>
AuthorDate: Tue Mar 25 13:28:43 2025 +0000

    [Dataflow Streaming] Improve exception handling in PubsubSink to reset 
stream to prevent precondition exception on reuse (#34418)
---
 .../beam/runners/dataflow/worker/PubsubSink.java   | 24 +++++----
 .../runners/dataflow/worker/PubsubSinkTest.java    | 59 ++++++++++++++++++++++
 2 files changed, 73 insertions(+), 10 deletions(-)

diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/PubsubSink.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/PubsubSink.java
index 3d542da3a4b..b4962422b37 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/PubsubSink.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/PubsubSink.java
@@ -161,17 +161,21 @@ class PubsubSink<T> extends Sink<WindowedValue<T>> {
           "Expected output stream to be empty but had %s",
           stream.toByteString());
       ByteString byteString = null;
-      if (formatFn != null) {
-        PubsubMessage formatted = formatFn.apply(data.getValue());
-        Pubsub.PubsubMessage.Builder pubsubMessageBuilder =
-            
Pubsub.PubsubMessage.newBuilder().setData(ByteString.copyFrom(formatted.getPayload()));
-        if (formatted.getAttributeMap() != null) {
-          pubsubMessageBuilder.putAllAttributes(formatted.getAttributeMap());
+      try {
+        if (formatFn != null) {
+          PubsubMessage formatted = formatFn.apply(data.getValue());
+          Pubsub.PubsubMessage.Builder pubsubMessageBuilder =
+              Pubsub.PubsubMessage.newBuilder()
+                  .setData(ByteString.copyFrom(formatted.getPayload()));
+          if (formatted.getAttributeMap() != null) {
+            pubsubMessageBuilder.putAllAttributes(formatted.getAttributeMap());
+          }
+          pubsubMessageBuilder.build().writeTo(stream);
+        } else {
+          coder.encode(data.getValue(), stream, Coder.Context.OUTER);
         }
-        pubsubMessageBuilder.build().writeTo(stream);
-        byteString = stream.toByteStringAndReset();
-      } else {
-        coder.encode(data.getValue(), stream, Coder.Context.OUTER);
+      } finally {
+        // Use a final block to ensure the stream is reset even in the case of 
an exception.
         byteString = stream.toByteStringAndReset();
       }
 
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/PubsubSinkTest.java
 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/PubsubSinkTest.java
index fdeecb8d96e..a03f7bc24a6 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/PubsubSinkTest.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/PubsubSinkTest.java
@@ -18,14 +18,21 @@
 package org.apache.beam.runners.dataflow.worker;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThrows;
 import static org.mockito.Mockito.when;
 
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import org.apache.beam.runners.dataflow.util.CloudObject;
 import org.apache.beam.runners.dataflow.util.PropertyNames;
 import org.apache.beam.runners.dataflow.worker.util.common.worker.Sink;
 import org.apache.beam.runners.dataflow.worker.windmill.Windmill;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderException;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
 import org.apache.beam.sdk.util.WindowedValue;
@@ -117,4 +124,56 @@ public class PubsubSinkTest {
   public void testEmptyParseFn() throws Exception {
     testWriteWith("");
   }
+
+  private static class ErrorCoder extends Coder<String> {
+    @Override
+    public void encode(String value, OutputStream outStream) throws 
CoderException, IOException {
+      outStream.write(1);
+      throw new CoderException("encode error");
+    }
+
+    @Override
+    public String decode(InputStream inStream) throws IOException {
+      throw new CoderException("decode error");
+    }
+
+    @Override
+    public List<? extends Coder<?>> getCoderArguments() {
+      return null;
+    }
+
+    @Override
+    public void verifyDeterministic() {}
+  }
+
+  // Regression test that the PubsubSink properly resets internal state on 
encoding exceptions to
+  // prevent precondition failures on further output.
+  @Test
+  public void testExceptionAfterEncoding() throws Exception {
+    Map<String, Object> spec = new HashMap<>();
+    spec.put(PropertyNames.OBJECT_TYPE_NAME, "");
+    spec.put(PropertyNames.PUBSUB_TOPIC, "topic");
+    spec.put(PropertyNames.PUBSUB_TIMESTAMP_ATTRIBUTE, "ts");
+    spec.put(PropertyNames.PUBSUB_ID_ATTRIBUTE, "id");
+    CloudObject cloudSinkSpec = CloudObject.fromSpec(spec);
+    PubsubSink.Factory factory = new PubsubSink.Factory();
+    PubsubSink<String> sink =
+        (PubsubSink<String>)
+            factory.create(
+                cloudSinkSpec,
+                WindowedValue.getFullCoder(new ErrorCoder(), 
IntervalWindow.getCoder()),
+                null,
+                mockContext,
+                null);
+
+    Sink.SinkWriter<WindowedValue<String>> writer = sink.writer();
+    assertThrows(
+        "encode error",
+        CoderException.class,
+        () -> writer.add(WindowedValue.timestampedValueInGlobalWindow("e0", 
new Instant(0))));
+    assertThrows(
+        "encode error",
+        CoderException.class,
+        () -> writer.add(WindowedValue.timestampedValueInGlobalWindow("e0", 
new Instant(0))));
+  }
 }

Reply via email to