This is an automated email from the ASF dual-hosted git repository.
scwhittle pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 4a7f19d9973 [Dataflow Streaming] Improve exception handling in
PubsubSink to reset stream to prevent precondition exception on reuse (#34418)
4a7f19d9973 is described below
commit 4a7f19d997395264aaed32adde544fa340ce6124
Author: Sam Whittle <[email protected]>
AuthorDate: Tue Mar 25 13:28:43 2025 +0000
[Dataflow Streaming] Improve exception handling in PubsubSink to reset
stream to prevent precondition exception on reuse (#34418)
---
.../beam/runners/dataflow/worker/PubsubSink.java | 24 +++++----
.../runners/dataflow/worker/PubsubSinkTest.java | 59 ++++++++++++++++++++++
2 files changed, 73 insertions(+), 10 deletions(-)
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/PubsubSink.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/PubsubSink.java
index 3d542da3a4b..b4962422b37 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/PubsubSink.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/PubsubSink.java
@@ -161,17 +161,21 @@ class PubsubSink<T> extends Sink<WindowedValue<T>> {
"Expected output stream to be empty but had %s",
stream.toByteString());
ByteString byteString = null;
- if (formatFn != null) {
- PubsubMessage formatted = formatFn.apply(data.getValue());
- Pubsub.PubsubMessage.Builder pubsubMessageBuilder =
-
Pubsub.PubsubMessage.newBuilder().setData(ByteString.copyFrom(formatted.getPayload()));
- if (formatted.getAttributeMap() != null) {
- pubsubMessageBuilder.putAllAttributes(formatted.getAttributeMap());
+ try {
+ if (formatFn != null) {
+ PubsubMessage formatted = formatFn.apply(data.getValue());
+ Pubsub.PubsubMessage.Builder pubsubMessageBuilder =
+ Pubsub.PubsubMessage.newBuilder()
+ .setData(ByteString.copyFrom(formatted.getPayload()));
+ if (formatted.getAttributeMap() != null) {
+ pubsubMessageBuilder.putAllAttributes(formatted.getAttributeMap());
+ }
+ pubsubMessageBuilder.build().writeTo(stream);
+ } else {
+ coder.encode(data.getValue(), stream, Coder.Context.OUTER);
}
- pubsubMessageBuilder.build().writeTo(stream);
- byteString = stream.toByteStringAndReset();
- } else {
- coder.encode(data.getValue(), stream, Coder.Context.OUTER);
+ } finally {
+ // Use a final block to ensure the stream is reset even in the case of
an exception.
byteString = stream.toByteStringAndReset();
}
diff --git
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/PubsubSinkTest.java
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/PubsubSinkTest.java
index fdeecb8d96e..a03f7bc24a6 100644
---
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/PubsubSinkTest.java
+++
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/PubsubSinkTest.java
@@ -18,14 +18,21 @@
package org.apache.beam.runners.dataflow.worker;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThrows;
import static org.mockito.Mockito.when;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import org.apache.beam.runners.dataflow.util.CloudObject;
import org.apache.beam.runners.dataflow.util.PropertyNames;
import org.apache.beam.runners.dataflow.worker.util.common.worker.Sink;
import org.apache.beam.runners.dataflow.worker.windmill.Windmill;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderException;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
import org.apache.beam.sdk.util.WindowedValue;
@@ -117,4 +124,56 @@ public class PubsubSinkTest {
public void testEmptyParseFn() throws Exception {
testWriteWith("");
}
+
+ private static class ErrorCoder extends Coder<String> {
+ @Override
+ public void encode(String value, OutputStream outStream) throws
CoderException, IOException {
+ outStream.write(1);
+ throw new CoderException("encode error");
+ }
+
+ @Override
+ public String decode(InputStream inStream) throws IOException {
+ throw new CoderException("decode error");
+ }
+
+ @Override
+ public List<? extends Coder<?>> getCoderArguments() {
+ return null;
+ }
+
+ @Override
+ public void verifyDeterministic() {}
+ }
+
+ // Regression test that the PubsubSink properly resets internal state on
encoding exceptions to
+ // prevent precondition failures on further output.
+ @Test
+ public void testExceptionAfterEncoding() throws Exception {
+ Map<String, Object> spec = new HashMap<>();
+ spec.put(PropertyNames.OBJECT_TYPE_NAME, "");
+ spec.put(PropertyNames.PUBSUB_TOPIC, "topic");
+ spec.put(PropertyNames.PUBSUB_TIMESTAMP_ATTRIBUTE, "ts");
+ spec.put(PropertyNames.PUBSUB_ID_ATTRIBUTE, "id");
+ CloudObject cloudSinkSpec = CloudObject.fromSpec(spec);
+ PubsubSink.Factory factory = new PubsubSink.Factory();
+ PubsubSink<String> sink =
+ (PubsubSink<String>)
+ factory.create(
+ cloudSinkSpec,
+ WindowedValue.getFullCoder(new ErrorCoder(),
IntervalWindow.getCoder()),
+ null,
+ mockContext,
+ null);
+
+ Sink.SinkWriter<WindowedValue<String>> writer = sink.writer();
+ assertThrows(
+ "encode error",
+ CoderException.class,
+ () -> writer.add(WindowedValue.timestampedValueInGlobalWindow("e0",
new Instant(0))));
+ assertThrows(
+ "encode error",
+ CoderException.class,
+ () -> writer.add(WindowedValue.timestampedValueInGlobalWindow("e0",
new Instant(0))));
+ }
}