This is an automated email from the ASF dual-hosted git repository.

scwhittle pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 26ba4a733e1 [Java Dataflow Streaming] Fix IllegalStateException in 
WindmillSink by clearing Stream after coder exceptions (#31914) (#35942)
26ba4a733e1 is described below

commit 26ba4a733e10a8bedd992ab053fe5f4d1d6a2c43
Author: Suvrat Acharya <140749446+suvrat1...@users.noreply.github.com>
AuthorDate: Mon Aug 25 18:40:13 2025 +0530

    [Java Dataflow Streaming] Fix IllegalStateException in WindmillSink by 
clearing Stream after coder exceptions (#31914) (#35942)
---
 .../org/apache/beam/runners/dataflow/worker/WindmillSink.java    | 9 +++++++--
 1 file changed, 7 insertions(+), 2 deletions(-)

diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillSink.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillSink.java
index 31782634ce4..ee94bc202ee 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillSink.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillSink.java
@@ -148,8 +148,13 @@ class WindmillSink<T> extends Sink<WindowedValue<T>> {
           stream.size() == 0,
           "Expected output stream to be empty but had %s",
           stream.toByteString());
-      coder.encode(object, stream, Coder.Context.OUTER);
-      return stream.toByteStringAndReset();
+      try {
+        coder.encode(object, stream, Coder.Context.OUTER);
+        return stream.toByteStringAndReset();
+      } catch (Exception e) {
+        stream.toByteStringAndReset();
+        throw e;
+      }
     }
 
     @Override

Reply via email to