This is an automated email from the ASF dual-hosted git repository.
scwhittle pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new addc06e8200 [Dataflow Streaming] Reuse ByteStringOutputStream buffers
in WindmillBag (#36742)
addc06e8200 is described below
commit addc06e82006ce65b05001dc1071b3e710f7866d
Author: Arun Pandian <[email protected]>
AuthorDate: Wed Nov 12 15:39:43 2025 -0800
[Dataflow Streaming] Reuse ByteStringOutputStream buffers in WindmillBag
(#36742)
---
.../util/ThreadLocalByteStringOutputStream.java | 103 +++++++++++++++++++++
.../worker/windmill/state/WindmillBag.java | 25 +++--
.../windmill/state/WindmillStateTagUtil.java | 58 ++----------
.../ThreadLocalByteStringOutputStreamTest.java | 68 ++++++++++++++
4 files changed, 192 insertions(+), 62 deletions(-)
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/ThreadLocalByteStringOutputStream.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/ThreadLocalByteStringOutputStream.java
new file mode 100644
index 00000000000..8e33be639e4
--- /dev/null
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/ThreadLocalByteStringOutputStream.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.util;
+
+import java.lang.ref.SoftReference;
+import javax.annotation.concurrent.ThreadSafe;
+import org.apache.beam.sdk.annotations.Internal;
+import org.apache.beam.sdk.util.ByteStringOutputStream;
+import org.apache.beam.sdk.util.Preconditions;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+@Internal
+@ThreadSafe
+/*
+ * A utility class for caching a thread-local {@link ByteStringOutputStream}.
+ *
+ * Example Usage:
+ * try (StreamHandle streamHandle =
ThreadLocalByteStringOutputStream.acquire()) {
+ * ByteStringOutputStream stream = streamHandle.stream();
+ * stream.write(1);
+ * ByteString byteString = stream.toByteStringAndReset();
+ * }
+ */
+public class ThreadLocalByteStringOutputStream {
+
+ private static final ThreadLocal<@Nullable SoftRefHolder>
threadLocalSoftRefHolder =
+ ThreadLocal.withInitial(SoftRefHolder::new);
+
+ // Private constructor to prevent instantiations from outside.
+ private ThreadLocalByteStringOutputStream() {}
+
+ /** @return An AutoClosable StreamHandle that holds a cached
ByteStringOutputStream. */
+ public static StreamHandle acquire() {
+ StreamHandle streamHandle = getStreamHandleFromThreadLocal();
+ if (streamHandle.inUse) {
+ // Stream is already in use, create a new uncached one
+ return new StreamHandle();
+ }
+ streamHandle.inUse = true;
+ return streamHandle; // inUse will be unset when streamHandle closes.
+ }
+
+ /**
+ * Handle to a thread-local {@link ByteStringOutputStream}. If the thread
local stream is already
+ * in use, a new one is used. The streams are cached and reused across
calls. Users should not
+ * keep a reference to the stream after closing the StreamHandle.
+ */
+ public static class StreamHandle implements AutoCloseable {
+
+ private final ByteStringOutputStream stream = new ByteStringOutputStream();
+
+ private boolean inUse = false;
+
+ /**
+ * Returns the underlying cached ByteStringOutputStream. Callers should
not keep a reference to
+ * the stream after closing the StreamHandle.
+ */
+ public ByteStringOutputStream stream() {
+ return stream;
+ }
+
+ @Override
+ public void close() {
+ stream.reset();
+ inUse = false;
+ }
+ }
+
+ private static class SoftRefHolder {
+ private @Nullable SoftReference<StreamHandle> softReference;
+ }
+
+ private static StreamHandle getStreamHandleFromThreadLocal() {
+ // softRefHolder is only set by Threadlocal initializer and should not be
null
+ SoftRefHolder softRefHolder =
+ Preconditions.checkArgumentNotNull(threadLocalSoftRefHolder.get());
+ @Nullable StreamHandle streamHandle = null;
+ @Nullable SoftReference<StreamHandle> softReference =
softRefHolder.softReference;
+ if (softReference != null) {
+ streamHandle = softReference.get();
+ }
+ if (streamHandle == null) {
+ streamHandle = new StreamHandle();
+ softRefHolder.softReference = new SoftReference<>(streamHandle);
+ }
+ return streamHandle;
+ }
+}
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillBag.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillBag.java
index b15064ff81e..db1f3e7a6de 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillBag.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillBag.java
@@ -24,6 +24,8 @@ import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import org.apache.beam.runners.core.StateNamespace;
+import
org.apache.beam.runners.dataflow.worker.util.ThreadLocalByteStringOutputStream;
+import
org.apache.beam.runners.dataflow.worker.util.ThreadLocalByteStringOutputStream.StreamHandle;
import
org.apache.beam.runners.dataflow.worker.util.common.worker.InternedByteString;
import org.apache.beam.runners.dataflow.worker.windmill.Windmill;
import org.apache.beam.sdk.coders.Coder;
@@ -165,17 +167,20 @@ public class WindmillBag<T> extends SimpleWindmillState
implements BagState<T> {
if (bagUpdatesBuilder == null) {
bagUpdatesBuilder = commitBuilder.addBagUpdatesBuilder();
}
- for (T value : localAdditions) {
- ByteStringOutputStream stream = new ByteStringOutputStream();
- // Encode the value
- elemCoder.encode(value, stream, Coder.Context.OUTER);
- ByteString encoded = stream.toByteString();
- if (cachedValues != null) {
- // We'll capture this value in the cache below.
- // Capture the value's size now since we have it.
- encodedSize += encoded.size();
+ try (StreamHandle streamHandle =
ThreadLocalByteStringOutputStream.acquire()) {
+ ByteStringOutputStream stream = streamHandle.stream();
+ for (T value : localAdditions) {
+ elemCoder.encode(value, stream, Coder.Context.OUTER);
+ ByteString encoded = stream.toByteStringAndReset();
+ if (cachedValues != null) {
+ // We'll capture this value in the cache below.
+ // Capture the value's size now since we have it.
+ encodedSize += encoded.size();
+ }
+ bagUpdatesBuilder.addValues(encoded);
}
- bagUpdatesBuilder.addValues(encoded);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
}
}
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateTagUtil.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateTagUtil.java
index dbb5f57f8a5..12b4001d530 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateTagUtil.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateTagUtil.java
@@ -18,24 +18,23 @@
package org.apache.beam.runners.dataflow.worker.windmill.state;
import java.io.IOException;
-import java.lang.ref.SoftReference;
import javax.annotation.concurrent.ThreadSafe;
import org.apache.beam.runners.core.StateNamespace;
import org.apache.beam.runners.core.StateTag;
import org.apache.beam.runners.core.TimerInternals.TimerData;
import org.apache.beam.runners.dataflow.worker.WindmillNamespacePrefix;
+import
org.apache.beam.runners.dataflow.worker.util.ThreadLocalByteStringOutputStream;
+import
org.apache.beam.runners.dataflow.worker.util.ThreadLocalByteStringOutputStream.StreamHandle;
import
org.apache.beam.runners.dataflow.worker.util.common.worker.InternedByteString;
import org.apache.beam.sdk.annotations.Internal;
import org.apache.beam.sdk.util.ByteStringOutputStream;
import org.apache.beam.vendor.grpc.v1p69p0.com.google.protobuf.ByteString;
import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
-import org.checkerframework.checker.nullness.qual.Nullable;
@Internal
@ThreadSafe
public class WindmillStateTagUtil {
- private static final ThreadLocal<@Nullable RefHolder> threadLocalRefHolder =
new ThreadLocal<>();
private static final String TIMER_HOLD_PREFIX = "/h";
private static final WindmillStateTagUtil INSTANCE = new
WindmillStateTagUtil();
@@ -48,21 +47,10 @@ public class WindmillStateTagUtil {
*/
@VisibleForTesting
InternedByteString encodeKey(StateNamespace namespace, StateTag<?> address) {
- RefHolder refHolder = getRefHolderFromThreadLocal();
- // Use ByteStringOutputStream rather than concatenation and String.format.
We build these keys
- // a lot, and this leads to better performance results. See associated
benchmarks.
- ByteStringOutputStream stream;
- boolean releaseThreadLocal;
- if (refHolder.inUse) {
- // If the thread local stream is already in use, create a new one
- stream = new ByteStringOutputStream();
- releaseThreadLocal = false;
- } else {
- stream = getByteStringOutputStream(refHolder);
- refHolder.inUse = true;
- releaseThreadLocal = true;
- }
- try {
+ try (StreamHandle streamHandle =
ThreadLocalByteStringOutputStream.acquire()) {
+ // Use ByteStringOutputStream rather than concatenation and
String.format. We build these keys
+ // a lot, and this leads to better performance results. See associated
benchmarks.
+ ByteStringOutputStream stream = streamHandle.stream();
// stringKey starts and ends with a slash. We separate it from the
// StateTag ID by a '+' (which is guaranteed not to be in the stringKey)
because the
// ID comes from the user.
@@ -72,11 +60,6 @@ public class WindmillStateTagUtil {
return InternedByteString.of(stream.toByteStringAndReset());
} catch (IOException e) {
throw new RuntimeException(e);
- } finally {
- stream.reset();
- if (releaseThreadLocal) {
- refHolder.inUse = false;
- }
}
}
@@ -116,35 +99,6 @@ public class WindmillStateTagUtil {
return ByteString.copyFromUtf8(tagString);
}
- private static class RefHolder {
-
- public SoftReference<@Nullable ByteStringOutputStream> streamRef =
- new SoftReference<>(new ByteStringOutputStream());
-
- // Boolean is true when the thread local stream is already in use by the
current thread.
- // Used to avoid reusing the same stream from nested calls if any.
- public boolean inUse = false;
- }
-
- private static RefHolder getRefHolderFromThreadLocal() {
- @Nullable RefHolder refHolder = threadLocalRefHolder.get();
- if (refHolder == null) {
- refHolder = new RefHolder();
- threadLocalRefHolder.set(refHolder);
- }
- return refHolder;
- }
-
- private static ByteStringOutputStream getByteStringOutputStream(RefHolder
refHolder) {
- @Nullable
- ByteStringOutputStream stream = refHolder.streamRef == null ? null :
refHolder.streamRef.get();
- if (stream == null) {
- stream = new ByteStringOutputStream();
- refHolder.streamRef = new SoftReference<>(stream);
- }
- return stream;
- }
-
/** @return the singleton WindmillStateTagUtil */
public static WindmillStateTagUtil instance() {
return INSTANCE;
diff --git
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/ThreadLocalByteStringOutputStreamTest.java
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/ThreadLocalByteStringOutputStreamTest.java
new file mode 100644
index 00000000000..ef167203a96
--- /dev/null
+++
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/ThreadLocalByteStringOutputStreamTest.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.util;
+
+import static org.junit.Assert.*;
+
+import
org.apache.beam.runners.dataflow.worker.util.ThreadLocalByteStringOutputStream.StreamHandle;
+import org.apache.beam.sdk.util.ByteStringOutputStream;
+import org.apache.beam.vendor.grpc.v1p69p0.com.google.protobuf.ByteString;
+import org.junit.Test;
+
+public class ThreadLocalByteStringOutputStreamTest {
+
+ @Test
+ public void simple() {
+ try (StreamHandle streamHandle =
ThreadLocalByteStringOutputStream.acquire()) {
+ ByteStringOutputStream stream = streamHandle.stream();
+ stream.write(1);
+ stream.write(2);
+ stream.write(3);
+ assertEquals(ByteString.copyFrom(new byte[] {1, 2, 3}),
stream.toByteStringAndReset());
+ }
+ }
+
+ @Test
+ public void nested() {
+ try (StreamHandle streamHandle =
ThreadLocalByteStringOutputStream.acquire()) {
+ ByteStringOutputStream stream = streamHandle.stream();
+ stream.write(1);
+ try (StreamHandle streamHandle1 =
ThreadLocalByteStringOutputStream.acquire()) {
+ ByteStringOutputStream stream1 = streamHandle1.stream();
+ stream1.write(2);
+ assertEquals(ByteString.copyFrom(new byte[] {2}),
stream1.toByteStringAndReset());
+ }
+ stream.write(3);
+ assertEquals(ByteString.copyFrom(new byte[] {1, 3}),
stream.toByteStringAndReset());
+ }
+ }
+
+ @Test
+ public void resetDirtyStream() {
+ try (StreamHandle streamHandle =
ThreadLocalByteStringOutputStream.acquire()) {
+ ByteStringOutputStream stream = streamHandle.stream();
+ stream.write(1);
+ // Don't read/reset stream
+ }
+
+ try (StreamHandle streamHandle =
ThreadLocalByteStringOutputStream.acquire()) {
+ ByteStringOutputStream stream = streamHandle.stream();
+ assertEquals(ByteString.EMPTY, stream.toByteStringAndReset());
+ }
+ }
+}