arunpandianp commented on code in PR #36165:
URL: https://github.com/apache/beam/pull/36165#discussion_r2354857481


##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateUtil.java:
##########
@@ -18,30 +18,50 @@
 package org.apache.beam.runners.dataflow.worker.windmill.state;
 
 import java.io.IOException;
+import java.lang.ref.SoftReference;
 import org.apache.beam.runners.core.StateNamespace;
 import org.apache.beam.runners.core.StateTag;
 import org.apache.beam.sdk.util.ByteStringOutputStream;
 import org.apache.beam.vendor.grpc.v1p69p0.com.google.protobuf.ByteString;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
+import org.checkerframework.checker.nullness.qual.Nullable;
 
 class WindmillStateUtil {
 
+  private static final ThreadLocal<@Nullable SoftReference<@Nullable 
ByteStringOutputStream>>
+      threadLocalOutputStream = new ThreadLocal<>();
+
   /** Encodes the given namespace and address as {@code 
&lt;namespace&gt;+&lt;address&gt;}. */
   @VisibleForTesting
   static ByteString encodeKey(StateNamespace namespace, StateTag<?> address) {
+    // Use ByteStringOutputStream rather than concatenation and String.format. 
We build these keys
+    // a lot, and this leads to better performance results. See associated 
benchmarks.
+    ByteStringOutputStream stream = getByteStringOutputStream();
     try {
-      // Use ByteStringOutputStream rather than concatenation and 
String.format. We build these keys
-      // a lot, and this leads to better performance results. See associated 
benchmarks.
-      ByteStringOutputStream stream = new ByteStringOutputStream();
       // stringKey starts and ends with a slash.  We separate it from the
       // StateTag ID by a '+' (which is guaranteed not to be in the stringKey) 
because the
       // ID comes from the user.
       namespace.appendTo(stream);
       stream.append('+');
       address.appendTo(stream);
-      return stream.toByteString();
+      return stream.toByteStringAndReset();
     } catch (IOException e) {
+      stream.toByteStringAndReset();
       throw new RuntimeException(e);
+    } catch (RuntimeException e) {
+      stream.toByteStringAndReset();
+      throw e;
+    }
+  }
+
+  private static ByteStringOutputStream getByteStringOutputStream() {

Review Comment:
   Added nested call detection to the class. 
   
   I tried creating a ThreadLocalByteStringOutputStream implementing 
AutoClosable, close() in OutputStream is conflicting with the custom close 
logic that we want to add. So I added the logic to WindmillStateUtil  directly.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to