This is an automated email from the ASF dual-hosted git repository.
scwhittle pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new e5db1b3c63e [Dataflow Java Streaming] Reset state using finally blocks
instead of catching Exception, in cases where it may otherwise corrupt
datastructures if an OutOfMemoryError is thrown. (#37746)
e5db1b3c63e is described below
commit e5db1b3c63e2ab8f59ca9bb36b18201aab3cb1f3
Author: Sam Whittle <[email protected]>
AuthorDate: Wed Mar 4 09:27:24 2026 +0000
[Dataflow Java Streaming] Reset state using finally blocks instead of
catching Exception, in cases where it may otherwise corrupt datastructures if
an OutOfMemoryError is thrown. (#37746)
---
.../beam/runners/dataflow/worker/WindmillSink.java | 54 +++++++++++++---------
.../windmill/client/AbstractWindmillStream.java | 39 +++++++++-------
.../windmill/client/grpc/GrpcGetDataStream.java | 14 +++---
3 files changed, 61 insertions(+), 46 deletions(-)
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillSink.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillSink.java
index 5cb3cb56d9e..2ed29125bd4 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillSink.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillSink.java
@@ -18,6 +18,7 @@
package org.apache.beam.runners.dataflow.worker;
import static org.apache.beam.runners.dataflow.util.Structs.getString;
+import static
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull;
import com.google.auto.service.AutoService;
import java.io.IOException;
@@ -51,16 +52,12 @@ import org.checkerframework.checker.nullness.qual.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-@SuppressWarnings({
- "rawtypes", // TODO(https://github.com/apache/beam/issues/20447)
- "nullness" // TODO(https://github.com/apache/beam/issues/20497)
-})
class WindmillSink<T> extends Sink<WindowedValue<T>> {
- private WindmillStreamWriter writer;
+ private final WindmillStreamWriter writer;
private final Coder<T> valueCoder;
private final Coder<Collection<? extends BoundedWindow>> windowsCoder;
- private StreamingModeExecutionContext context;
+ private final StreamingModeExecutionContext context;
private static final Logger LOG =
LoggerFactory.getLogger(WindmillSink.class);
WindmillSink(
@@ -81,6 +78,7 @@ class WindmillSink<T> extends Sink<WindowedValue<T>> {
PaneInfo paneInfo,
BeamFnApi.Elements.ElementMetadata metadata)
throws IOException {
+ boolean resetNeeded = true;
try {
// element metadata is behind the experiment
boolean elementMetadata =
WindowedValues.WindowedValueCoder.isMetadataSupported();
@@ -92,10 +90,13 @@ class WindmillSink<T> extends Sink<WindowedValue<T>> {
PaneInfoCoder.INSTANCE.encode(paneInfo, stream);
windowsCoder.encode(windows, stream, Coder.Context.OUTER);
}
- return stream.toByteStringAndReset();
- } catch (Exception e) {
- stream.reset();
- throw e;
+ ByteString result = stream.toByteStringAndReset();
+ resetNeeded = false;
+ return result;
+ } finally {
+ if (resetNeeded) {
+ stream.reset();
+ }
}
}
@@ -150,6 +151,7 @@ class WindmillSink<T> extends Sink<WindowedValue<T>> {
}
}
+ @SuppressWarnings("rawtypes")
public static class Factory implements SinkFactory {
@Override
@@ -166,7 +168,7 @@ class WindmillSink<T> extends Sink<WindowedValue<T>> {
return new WindmillSink<>(
getString(spec, "stream_id"),
typedCoder,
- (StreamingModeExecutionContext) executionContext);
+ checkNotNull((StreamingModeExecutionContext) executionContext));
}
}
@@ -198,17 +200,21 @@ class WindmillSink<T> extends Sink<WindowedValue<T>> {
throw new IllegalStateException(
"Expected output stream to be empty but had " +
stream.toByteString());
}
+ boolean resetNeeded = true;
try {
coder.encode(object, stream, Coder.Context.OUTER);
- return stream.toByteStringAndReset();
- } catch (Exception e) {
- stream.reset();
- throw e;
+ ByteString result = stream.toByteStringAndReset();
+ resetNeeded = false;
+ return result;
+ } finally {
+ if (resetNeeded) {
+ stream.reset();
+ }
}
}
@Override
- @SuppressWarnings("NestedInstanceOfConditions")
+ @SuppressWarnings({"rawtypes", "NestedInstanceOfConditions"})
public long add(WindowedValue<T> data) throws IOException {
ByteString key, value;
ByteString id = ByteString.EMPTY;
@@ -220,13 +226,13 @@ class WindmillSink<T> extends Sink<WindowedValue<T>> {
stream, windowsCoder, data.getWindows(), data.getPaneInfo(),
additionalMetadata);
if (valueCoder instanceof KvCoder) {
KvCoder kvCoder = (KvCoder) valueCoder;
- KV kv = (KV) data.getValue();
+ KV kv = checkNotNull((KV) data.getValue());
key = encode(kvCoder.getKeyCoder(), kv.getKey());
Coder valueCoder = kvCoder.getValueCoder();
// If ids are explicitly provided, use that instead of the
windmill-generated id.
// This is used when reading an UnboundedSource to deduplicate records.
if (valueCoder instanceof ValueWithRecordId.ValueWithRecordIdCoder) {
- ValueWithRecordId valueAndId = (ValueWithRecordId) kv.getValue();
+ ValueWithRecordId valueAndId = checkNotNull((ValueWithRecordId)
kv.getValue());
value =
encode(((ValueWithRecordIdCoder) valueCoder).getValueCoder(),
valueAndId.getValue());
id = ByteString.copyFrom(valueAndId.getId());
@@ -234,7 +240,7 @@ class WindmillSink<T> extends Sink<WindowedValue<T>> {
value = encode(valueCoder, kv.getValue());
}
} else {
- key = context.getSerializedKey();
+ key = checkNotNull(context.getSerializedKey());
value = encode(valueCoder, data.getValue());
}
if (key.size() > context.getMaxOutputKeyBytes()) {
@@ -291,8 +297,9 @@ class WindmillSink<T> extends Sink<WindowedValue<T>> {
}
byte[] rawId = null;
- if (data.getRecordId() != null) {
- rawId = data.getRecordId().getBytes(StandardCharsets.UTF_8);
+ @Nullable String recordId = data.getRecordId();
+ if (recordId != null) {
+ rawId = recordId.getBytes(StandardCharsets.UTF_8);
} else {
rawId = context.getCurrentRecordId();
}
@@ -303,8 +310,9 @@ class WindmillSink<T> extends Sink<WindowedValue<T>> {
id = ByteString.copyFrom(rawId);
byte[] rawOffset = null;
- if (data.getRecordOffset() != null) {
- rawOffset = Longs.toByteArray(data.getRecordOffset());
+ @Nullable Long recordOffset = data.getRecordOffset();
+ if (recordOffset != null) {
+ rawOffset = Longs.toByteArray(recordOffset);
} else {
rawOffset = context.getCurrentRecordOffset();
}
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java
index ed99ae1bbd6..cc3f555a115 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java
@@ -268,29 +268,36 @@ public abstract class AbstractWindmillStream<RequestT,
ResponseT> implements Win
debugMetrics.recordStart();
streamHandler.streamDebugMetrics.recordStart();
currentPhysicalStream = streamHandler;
- currentPhysicalStreamForDebug.set(currentPhysicalStream);
- requestObserver.reset(physicalStreamFactory.apply(new
ResponseObserver(streamHandler)));
- onFlushPending(true);
- if (clientClosed) {
- // The logical stream is half-closed so after flushing the
remaining requests close the
- // physical stream.
- streamHandler.streamDebugMetrics.recordHalfClose();
- requestObserver.onCompleted();
- } else if (!halfClosePhysicalStreamAfter.isZero()) {
- halfCloseFuture =
- executor.schedule(
- () -> onHalfClosePhysicalStreamTimeout(streamHandler),
- halfClosePhysicalStreamAfter.getSeconds(),
- TimeUnit.SECONDS);
+ boolean resetCurrentPhysicalStream = true;
+ try {
+ currentPhysicalStreamForDebug.set(currentPhysicalStream);
+ requestObserver.reset(physicalStreamFactory.apply(new
ResponseObserver(streamHandler)));
+ onFlushPending(true);
+ if (clientClosed) {
+ // The logical stream is half-closed so after flushing the
remaining requests close
+ // the
+ // physical stream.
+ streamHandler.streamDebugMetrics.recordHalfClose();
+ requestObserver.onCompleted();
+ } else if (!halfClosePhysicalStreamAfter.isZero()) {
+ halfCloseFuture =
+ executor.schedule(
+ () -> onHalfClosePhysicalStreamTimeout(streamHandler),
+ halfClosePhysicalStreamAfter.getSeconds(),
+ TimeUnit.SECONDS);
+ }
+ resetCurrentPhysicalStream = false;
+ } finally {
+ if (resetCurrentPhysicalStream) {
+ clearCurrentPhysicalStream(true);
+ }
}
return;
} catch (WindmillStreamShutdownException e) {
logger.debug("Stream was shutdown while creating new stream.", e);
- clearCurrentPhysicalStream(true);
break;
} catch (Exception e) {
logger.error("Failed to create new stream, retrying: ", e);
- clearCurrentPhysicalStream(true);
debugMetrics.recordRestartReason("Failed to create new stream,
retrying: " + e);
}
}
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetDataStream.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetDataStream.java
index bd1c9eed408..9503893e2cf 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetDataStream.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetDataStream.java
@@ -289,11 +289,11 @@ final class GrpcGetDataStream
// Notify all waiters with requests in this batch as well as the sender
// of the next batch (if one exists).
batch.notifySent();
- } catch (Exception e) {
- LOG.debug("Batch failed to send on new stream", e);
+ } catch (Throwable t) {
// Free waiters if the send() failed.
batch.notifyFailed();
- throw e;
+ LOG.debug("Batch failed to send on new stream", t);
+ throw t;
}
}
}
@@ -535,12 +535,12 @@ final class GrpcGetDataStream
// Notify all waiters with requests in this batch as well as the sender
// of the next batch (if one exists).
batch.notifySent();
- } catch (Exception e) {
- LOG.debug("Batch failed to send", e);
+ } catch (Throwable t) {
// Free waiters if the send() failed.
batch.notifyFailed();
- // Propagate the exception to the calling thread.
- throw e;
+ LOG.debug("Batch failed to send", t);
+ // Propagate the exception/error to the calling thread.
+ throw t;
}
}