arunpandianp commented on code in PR #36165:
URL: https://github.com/apache/beam/pull/36165#discussion_r2354857481
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateUtil.java:
##########
@@ -18,30 +18,50 @@
package org.apache.beam.runners.dataflow.worker.windmill.state;
import java.io.IOException;
+import java.lang.ref.SoftReference;
import org.apache.beam.runners.core.StateNamespace;
import org.apache.beam.runners.core.StateTag;
import org.apache.beam.sdk.util.ByteStringOutputStream;
import org.apache.beam.vendor.grpc.v1p69p0.com.google.protobuf.ByteString;
import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
+import org.checkerframework.checker.nullness.qual.Nullable;
class WindmillStateUtil {
+ private static final ThreadLocal<@Nullable SoftReference<@Nullable
ByteStringOutputStream>>
+ threadLocalOutputStream = new ThreadLocal<>();
+
/** Encodes the given namespace and address as {@code
<namespace>+<address>}. */
@VisibleForTesting
static ByteString encodeKey(StateNamespace namespace, StateTag<?> address) {
+ // Use ByteStringOutputStream rather than concatenation and String.format.
We build these keys
+ // a lot, and this leads to better performance results. See associated
benchmarks.
+ ByteStringOutputStream stream = getByteStringOutputStream();
try {
- // Use ByteStringOutputStream rather than concatenation and
String.format. We build these keys
- // a lot, and this leads to better performance results. See associated
benchmarks.
- ByteStringOutputStream stream = new ByteStringOutputStream();
// stringKey starts and ends with a slash. We separate it from the
// StateTag ID by a '+' (which is guaranteed not to be in the stringKey)
because the
// ID comes from the user.
namespace.appendTo(stream);
stream.append('+');
address.appendTo(stream);
- return stream.toByteString();
+ return stream.toByteStringAndReset();
} catch (IOException e) {
+ stream.toByteStringAndReset();
throw new RuntimeException(e);
+ } catch (RuntimeException e) {
+ stream.toByteStringAndReset();
+ throw e;
+ }
+ }
+
+ private static ByteStringOutputStream getByteStringOutputStream() {
Review Comment:
Added nested call detection to the class.
I tried creating a ThreadLocalByteStringOutputStream implementing
AutoClosable, close() in OutputStream is conflicting with the custom close
logic that we want to add. So I added the logic to WindmillStateUtil directly.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]