scwhittle commented on code in PR #36165: URL: https://github.com/apache/beam/pull/36165#discussion_r2355460176
########## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateUtil.java: ########## @@ -18,30 +18,65 @@ package org.apache.beam.runners.dataflow.worker.windmill.state; import java.io.IOException; +import java.lang.ref.SoftReference; import org.apache.beam.runners.core.StateNamespace; import org.apache.beam.runners.core.StateTag; import org.apache.beam.sdk.util.ByteStringOutputStream; import org.apache.beam.vendor.grpc.v1p69p0.com.google.protobuf.ByteString; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; +import org.checkerframework.checker.nullness.qual.Nullable; class WindmillStateUtil { + private static final ThreadLocal<@Nullable SoftReference<@Nullable ByteStringOutputStream>> + threadLocalOutputStream = new ThreadLocal<>(); + // True when threadLocalOutputStream is already in use by the current thread. + // Used to avoid reusing the same stream from nested calls if any. + private static final ThreadLocal<@Nullable Boolean> threadLocalOutputStreamInUse = + new ThreadLocal<>(); + /** Encodes the given namespace and address as {@code <namespace>+<address>}. */ @VisibleForTesting static ByteString encodeKey(StateNamespace namespace, StateTag<?> address) { + // Use ByteStringOutputStream rather than concatenation and String.format. We build these keys + // a lot, and this leads to better performance results. See associated benchmarks. + ByteStringOutputStream stream; + boolean releaseThreadLocal = false; + if (Boolean.TRUE.equals(threadLocalOutputStreamInUse.get())) { + stream = new ByteStringOutputStream(); + } else { + stream = getByteStringOutputStreamFromThreadLocal(); + threadLocalOutputStreamInUse.set(true); + releaseThreadLocal = true; + } try { - // Use ByteStringOutputStream rather than concatenation and String.format. We build these keys - // a lot, and this leads to better performance results. See associated benchmarks. - ByteStringOutputStream stream = new ByteStringOutputStream(); // stringKey starts and ends with a slash. We separate it from the // StateTag ID by a '+' (which is guaranteed not to be in the stringKey) because the // ID comes from the user. namespace.appendTo(stream); stream.append('+'); address.appendTo(stream); - return stream.toByteString(); + return stream.toByteStringAndReset(); } catch (IOException e) { throw new RuntimeException(e); + } finally { + if (stream.size() > 0) { + stream.toByteStringAndReset(); Review Comment: we could add a reset or clear method that is cheap if empty and perhaps remove the size check. could use in the Windmill sink exception handling also ########## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateUtil.java: ########## @@ -18,30 +18,65 @@ package org.apache.beam.runners.dataflow.worker.windmill.state; import java.io.IOException; +import java.lang.ref.SoftReference; import org.apache.beam.runners.core.StateNamespace; import org.apache.beam.runners.core.StateTag; import org.apache.beam.sdk.util.ByteStringOutputStream; import org.apache.beam.vendor.grpc.v1p69p0.com.google.protobuf.ByteString; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; +import org.checkerframework.checker.nullness.qual.Nullable; class WindmillStateUtil { + private static final ThreadLocal<@Nullable SoftReference<@Nullable ByteStringOutputStream>> + threadLocalOutputStream = new ThreadLocal<>(); + // True when threadLocalOutputStream is already in use by the current thread. + // Used to avoid reusing the same stream from nested calls if any. + private static final ThreadLocal<@Nullable Boolean> threadLocalOutputStreamInUse = Review Comment: instead of two thread-locals have a single threadlocal containing a holder object of bool and soft-ref to bytestringoutputstream that also means you can just get the holder once then update it instead of going through the thread-local again -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org