This is an automated email from the ASF dual-hosted git repository.
scwhittle pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 26ba4a733e1 [Java Dataflow Streaming] Fix IllegalStateException in
WindmillSink by clearing Stream after coder exceptions (#31914) (#35942)
26ba4a733e1 is described below
commit 26ba4a733e10a8bedd992ab053fe5f4d1d6a2c43
Author: Suvrat Acharya <[email protected]>
AuthorDate: Mon Aug 25 18:40:13 2025 +0530
[Java Dataflow Streaming] Fix IllegalStateException in WindmillSink by
clearing Stream after coder exceptions (#31914) (#35942)
---
.../org/apache/beam/runners/dataflow/worker/WindmillSink.java | 9 +++++++--
1 file changed, 7 insertions(+), 2 deletions(-)
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillSink.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillSink.java
index 31782634ce4..ee94bc202ee 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillSink.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillSink.java
@@ -148,8 +148,13 @@ class WindmillSink<T> extends Sink<WindowedValue<T>> {
stream.size() == 0,
"Expected output stream to be empty but had %s",
stream.toByteString());
- coder.encode(object, stream, Coder.Context.OUTER);
- return stream.toByteStringAndReset();
+ try {
+ coder.encode(object, stream, Coder.Context.OUTER);
+ return stream.toByteStringAndReset();
+ } catch (Exception e) {
+ stream.toByteStringAndReset();
+ throw e;
+ }
}
@Override