This is an automated email from the ASF dual-hosted git repository.

scwhittle pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 98d4178baca [Dataflow Streaming] Reuse proto builders on hot path to 
reduce GC overhead (#36164)
98d4178baca is described below

commit 98d4178bacadf8ba675a9ed27d449d52e609468f
Author: Arun Pandian <[email protected]>
AuthorDate: Wed Sep 17 08:58:40 2025 -0700

    [Dataflow Streaming] Reuse proto builders on hot path to reduce GC overhead 
(#36164)
---
 .../beam/runners/dataflow/worker/WindmillSink.java | 42 ++++++++++++++--------
 .../client/grpc/GetWorkResponseChunkAssembler.java |  6 +++-
 2 files changed, 33 insertions(+), 15 deletions(-)

diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillSink.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillSink.java
index ee94bc202ee..aac882cae36 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillSink.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillSink.java
@@ -54,6 +54,7 @@ import org.slf4j.LoggerFactory;
   "nullness" // TODO(https://github.com/apache/beam/issues/20497)
 })
 class WindmillSink<T> extends Sink<WindowedValue<T>> {
+
   private WindmillStreamWriter writer;
   private final Coder<T> valueCoder;
   private final Coder<Collection<? extends BoundedWindow>> windowsCoder;
@@ -109,6 +110,7 @@ class WindmillSink<T> extends Sink<WindowedValue<T>> {
   }
 
   public static class Factory implements SinkFactory {
+
     @Override
     public WindmillSink<?> create(
         CloudObject spec,
@@ -133,14 +135,21 @@ class WindmillSink<T> extends Sink<WindowedValue<T>> {
   }
 
   class WindmillStreamWriter implements SinkWriter<WindowedValue<T>> {
+
     private Map<ByteString, Windmill.KeyedMessageBundle.Builder> productionMap;
     private final String destinationName;
     private final ByteStringOutputStream stream; // Kept across encodes for 
buffer reuse.
 
+    // Builders are reused to reduce GC overhead.
+    private final Windmill.Message.Builder messageBuilder;
+    private final Windmill.OutputMessageBundle.Builder outputBuilder;
+
     private WindmillStreamWriter(String destinationName) {
       this.destinationName = destinationName;
       productionMap = new HashMap<>();
       stream = new ByteStringOutputStream();
+      messageBuilder = Windmill.Message.newBuilder();
+      outputBuilder = Windmill.OutputMessageBundle.newBuilder();
     }
 
     private <EncodeT> ByteString encode(Coder<EncodeT> coder, EncodeT object) 
throws IOException {
@@ -215,13 +224,15 @@ class WindmillSink<T> extends Sink<WindowedValue<T>> {
         productionMap.put(key, keyedOutput);
       }
 
-      Windmill.Message.Builder builder =
-          Windmill.Message.newBuilder()
-              
.setTimestamp(WindmillTimeUtils.harnessToWindmillTimestamp(data.getTimestamp()))
-              .setData(value)
-              .setMetadata(metadata);
-      keyedOutput.addMessages(builder.build());
-
+      try {
+        messageBuilder
+            
.setTimestamp(WindmillTimeUtils.harnessToWindmillTimestamp(data.getTimestamp()))
+            .setData(value)
+            .setMetadata(metadata);
+        keyedOutput.addMessages(messageBuilder.build());
+      } finally {
+        messageBuilder.clear();
+      }
       long offsetSize = 0;
       if (context.offsetBasedDeduplicationSupported()) {
         if (id.size() > 0) {
@@ -263,14 +274,17 @@ class WindmillSink<T> extends Sink<WindowedValue<T>> {
 
     @Override
     public void close() throws IOException {
-      Windmill.OutputMessageBundle.Builder outputBuilder =
-          
Windmill.OutputMessageBundle.newBuilder().setDestinationStreamId(destinationName);
+      try {
+        outputBuilder.setDestinationStreamId(destinationName);
 
-      for (Windmill.KeyedMessageBundle.Builder keyedOutput : 
productionMap.values()) {
-        outputBuilder.addBundles(keyedOutput.build());
-      }
-      if (outputBuilder.getBundlesCount() > 0) {
-        context.getOutputBuilder().addOutputMessages(outputBuilder.build());
+        for (Windmill.KeyedMessageBundle.Builder keyedOutput : 
productionMap.values()) {
+          outputBuilder.addBundles(keyedOutput.build());
+        }
+        if (outputBuilder.getBundlesCount() > 0) {
+          context.getOutputBuilder().addOutputMessages(outputBuilder.build());
+        }
+      } finally {
+        outputBuilder.clear();
       }
       productionMap.clear();
     }
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GetWorkResponseChunkAssembler.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GetWorkResponseChunkAssembler.java
index f978bad01e6..0ebb4726d3a 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GetWorkResponseChunkAssembler.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GetWorkResponseChunkAssembler.java
@@ -51,6 +51,7 @@ final class GetWorkResponseChunkAssembler {
 
   private final GetWorkTimingInfosTracker workTimingInfosTracker;
   private @Nullable ComputationMetadata metadata;
+  private final WorkItem.Builder workItemBuilder; // Reused to reduce GC 
overhead.
   private ByteString data;
   private long bufferedSize;
 
@@ -59,6 +60,7 @@ final class GetWorkResponseChunkAssembler {
     data = ByteString.EMPTY;
     bufferedSize = 0;
     metadata = null;
+    workItemBuilder = WorkItem.newBuilder();
   }
 
   /**
@@ -94,15 +96,17 @@ final class GetWorkResponseChunkAssembler {
    */
   private Optional<AssembledWorkItem> flushToWorkItem() {
     try {
+      workItemBuilder.mergeFrom(data);
       return Optional.of(
           AssembledWorkItem.create(
-              WorkItem.parseFrom(data.newInput()),
+              workItemBuilder.build(),
               Preconditions.checkNotNull(metadata),
               workTimingInfosTracker.getLatencyAttributions(),
               bufferedSize));
     } catch (IOException e) {
       LOG.error("Failed to parse work item from stream: ", e);
     } finally {
+      workItemBuilder.clear();
       workTimingInfosTracker.reset();
       data = ByteString.EMPTY;
       bufferedSize = 0;

Reply via email to