scwhittle commented on code in PR #36742:
URL: https://github.com/apache/beam/pull/36742#discussion_r2502963916
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/ThreadLocalByteStringOutputStream.java:
##########
@@ -74,38 +82,39 @@ public ByteStringOutputStream stream() {
@Override
public void close() {
stream.reset();
- if (releaseThreadLocal) {
+ if (refHolder != null) {
refHolder.inUse = false;
}
}
}
private static class RefHolder {
- public SoftReference<@Nullable ByteStringOutputStream> streamRef =
- new SoftReference<>(new ByteStringOutputStream());
+ public ByteStringOutputStream stream = new ByteStringOutputStream();
// Boolean is true when the thread local stream is already in use by the
current thread.
// Used to avoid reusing the same stream from nested calls if any.
public boolean inUse = false;
- }
- private static RefHolder getRefHolderFromThreadLocal() {
- @Nullable RefHolder refHolder = threadLocalRefHolder.get();
- if (refHolder == null) {
- refHolder = new RefHolder();
- threadLocalRefHolder.set(refHolder);
+ public @Nullable StreamHandle streamHandle = null;
+
+ public static RefHolder create() {
+ RefHolder refHolder = new RefHolder();
+ refHolder.streamHandle = new StreamHandle(refHolder);
+ return refHolder;
}
- return refHolder;
}
- private static ByteStringOutputStream getByteStringOutputStream(RefHolder
refHolder) {
- @Nullable
- ByteStringOutputStream stream = refHolder.streamRef == null ? null :
refHolder.streamRef.get();
- if (stream == null) {
- stream = new ByteStringOutputStream();
- refHolder.streamRef = new SoftReference<>(stream);
+ private static RefHolder getRefHolderFromThreadLocal() {
+ @Nullable SoftReference<RefHolder> refHolderSoftReference =
threadLocalRefHolder.get();
+ @Nullable RefHolder refHolder = null;
+ if (refHolderSoftReference != null) {
+ refHolder = refHolderSoftReference.get();
}
- return stream;
+ if (refHolderSoftReference == null || refHolder == null) {
+ refHolder = RefHolder.create();
+ threadLocalRefHolder.set(new SoftReference<>(refHolder));
Review Comment:
note that ThreadLocal.set is slow, we've had it show up on profiles before.
What about making the streamHandle within RefHolder a SoftReference instead of
storing SoftReference directly in the ThreadLocal? Then we can just keep the
ThreadLocal state around once created.
You could also use ThreadLocal.withInitialValue to construct RefHolders and
get rid of this function and just inline the softref stuff into acquire.
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/ThreadLocalByteStringOutputStream.java:
##########
@@ -74,38 +82,39 @@ public ByteStringOutputStream stream() {
@Override
public void close() {
stream.reset();
- if (releaseThreadLocal) {
+ if (refHolder != null) {
refHolder.inUse = false;
}
}
}
private static class RefHolder {
- public SoftReference<@Nullable ByteStringOutputStream> streamRef =
- new SoftReference<>(new ByteStringOutputStream());
+ public ByteStringOutputStream stream = new ByteStringOutputStream();
Review Comment:
remove
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]