This is an automated email from the ASF dual-hosted git repository.
scwhittle pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 98d4178baca [Dataflow Streaming] Reuse proto builders on hot path to
reduce GC overhead (#36164)
98d4178baca is described below
commit 98d4178bacadf8ba675a9ed27d449d52e609468f
Author: Arun Pandian <[email protected]>
AuthorDate: Wed Sep 17 08:58:40 2025 -0700
[Dataflow Streaming] Reuse proto builders on hot path to reduce GC overhead
(#36164)
---
.../beam/runners/dataflow/worker/WindmillSink.java | 42 ++++++++++++++--------
.../client/grpc/GetWorkResponseChunkAssembler.java | 6 +++-
2 files changed, 33 insertions(+), 15 deletions(-)
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillSink.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillSink.java
index ee94bc202ee..aac882cae36 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillSink.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillSink.java
@@ -54,6 +54,7 @@ import org.slf4j.LoggerFactory;
"nullness" // TODO(https://github.com/apache/beam/issues/20497)
})
class WindmillSink<T> extends Sink<WindowedValue<T>> {
+
private WindmillStreamWriter writer;
private final Coder<T> valueCoder;
private final Coder<Collection<? extends BoundedWindow>> windowsCoder;
@@ -109,6 +110,7 @@ class WindmillSink<T> extends Sink<WindowedValue<T>> {
}
public static class Factory implements SinkFactory {
+
@Override
public WindmillSink<?> create(
CloudObject spec,
@@ -133,14 +135,21 @@ class WindmillSink<T> extends Sink<WindowedValue<T>> {
}
class WindmillStreamWriter implements SinkWriter<WindowedValue<T>> {
+
private Map<ByteString, Windmill.KeyedMessageBundle.Builder> productionMap;
private final String destinationName;
private final ByteStringOutputStream stream; // Kept across encodes for
buffer reuse.
+ // Builders are reused to reduce GC overhead.
+ private final Windmill.Message.Builder messageBuilder;
+ private final Windmill.OutputMessageBundle.Builder outputBuilder;
+
private WindmillStreamWriter(String destinationName) {
this.destinationName = destinationName;
productionMap = new HashMap<>();
stream = new ByteStringOutputStream();
+ messageBuilder = Windmill.Message.newBuilder();
+ outputBuilder = Windmill.OutputMessageBundle.newBuilder();
}
private <EncodeT> ByteString encode(Coder<EncodeT> coder, EncodeT object)
throws IOException {
@@ -215,13 +224,15 @@ class WindmillSink<T> extends Sink<WindowedValue<T>> {
productionMap.put(key, keyedOutput);
}
- Windmill.Message.Builder builder =
- Windmill.Message.newBuilder()
-
.setTimestamp(WindmillTimeUtils.harnessToWindmillTimestamp(data.getTimestamp()))
- .setData(value)
- .setMetadata(metadata);
- keyedOutput.addMessages(builder.build());
-
+ try {
+ messageBuilder
+
.setTimestamp(WindmillTimeUtils.harnessToWindmillTimestamp(data.getTimestamp()))
+ .setData(value)
+ .setMetadata(metadata);
+ keyedOutput.addMessages(messageBuilder.build());
+ } finally {
+ messageBuilder.clear();
+ }
long offsetSize = 0;
if (context.offsetBasedDeduplicationSupported()) {
if (id.size() > 0) {
@@ -263,14 +274,17 @@ class WindmillSink<T> extends Sink<WindowedValue<T>> {
@Override
public void close() throws IOException {
- Windmill.OutputMessageBundle.Builder outputBuilder =
-
Windmill.OutputMessageBundle.newBuilder().setDestinationStreamId(destinationName);
+ try {
+ outputBuilder.setDestinationStreamId(destinationName);
- for (Windmill.KeyedMessageBundle.Builder keyedOutput :
productionMap.values()) {
- outputBuilder.addBundles(keyedOutput.build());
- }
- if (outputBuilder.getBundlesCount() > 0) {
- context.getOutputBuilder().addOutputMessages(outputBuilder.build());
+ for (Windmill.KeyedMessageBundle.Builder keyedOutput :
productionMap.values()) {
+ outputBuilder.addBundles(keyedOutput.build());
+ }
+ if (outputBuilder.getBundlesCount() > 0) {
+ context.getOutputBuilder().addOutputMessages(outputBuilder.build());
+ }
+ } finally {
+ outputBuilder.clear();
}
productionMap.clear();
}
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GetWorkResponseChunkAssembler.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GetWorkResponseChunkAssembler.java
index f978bad01e6..0ebb4726d3a 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GetWorkResponseChunkAssembler.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GetWorkResponseChunkAssembler.java
@@ -51,6 +51,7 @@ final class GetWorkResponseChunkAssembler {
private final GetWorkTimingInfosTracker workTimingInfosTracker;
private @Nullable ComputationMetadata metadata;
+ private final WorkItem.Builder workItemBuilder; // Reused to reduce GC
overhead.
private ByteString data;
private long bufferedSize;
@@ -59,6 +60,7 @@ final class GetWorkResponseChunkAssembler {
data = ByteString.EMPTY;
bufferedSize = 0;
metadata = null;
+ workItemBuilder = WorkItem.newBuilder();
}
/**
@@ -94,15 +96,17 @@ final class GetWorkResponseChunkAssembler {
*/
private Optional<AssembledWorkItem> flushToWorkItem() {
try {
+ workItemBuilder.mergeFrom(data);
return Optional.of(
AssembledWorkItem.create(
- WorkItem.parseFrom(data.newInput()),
+ workItemBuilder.build(),
Preconditions.checkNotNull(metadata),
workTimingInfosTracker.getLatencyAttributions(),
bufferedSize));
} catch (IOException e) {
LOG.error("Failed to parse work item from stream: ", e);
} finally {
+ workItemBuilder.clear();
workTimingInfosTracker.reset();
data = ByteString.EMPTY;
bufferedSize = 0;