This is an automated email from the ASF dual-hosted git repository. scwhittle pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new 26ba4a733e1 [Java Dataflow Streaming] Fix IllegalStateException in WindmillSink by clearing Stream after coder exceptions (#31914) (#35942) 26ba4a733e1 is described below commit 26ba4a733e10a8bedd992ab053fe5f4d1d6a2c43 Author: Suvrat Acharya <140749446+suvrat1...@users.noreply.github.com> AuthorDate: Mon Aug 25 18:40:13 2025 +0530 [Java Dataflow Streaming] Fix IllegalStateException in WindmillSink by clearing Stream after coder exceptions (#31914) (#35942) --- .../org/apache/beam/runners/dataflow/worker/WindmillSink.java | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillSink.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillSink.java index 31782634ce4..ee94bc202ee 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillSink.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillSink.java @@ -148,8 +148,13 @@ class WindmillSink<T> extends Sink<WindowedValue<T>> { stream.size() == 0, "Expected output stream to be empty but had %s", stream.toByteString()); - coder.encode(object, stream, Coder.Context.OUTER); - return stream.toByteStringAndReset(); + try { + coder.encode(object, stream, Coder.Context.OUTER); + return stream.toByteStringAndReset(); + } catch (Exception e) { + stream.toByteStringAndReset(); + throw e; + } } @Override