This is an automated email from the ASF dual-hosted git repository. leiyanfei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new 797cb9cd843 [hotfix][runtime] Make sure run RecordContext#release() in Task thread. (#24705) 797cb9cd843 is described below commit 797cb9cd843b1f7c102339019b3dfc45c2fa329d Author: Wang FeiFan <zoltar9...@163.com> AuthorDate: Thu Apr 25 17:59:27 2024 +0800 [hotfix][runtime] Make sure run RecordContext#release() in Task thread. (#24705) --- .../org/apache/flink/core/state/StateFutureImpl.java | 18 ++++++++++++------ .../asyncprocessing/ContextStateFutureImpl.java | 10 +++++++--- .../flink/runtime/asyncprocessing/RecordContext.java | 17 ++++++++++++++--- .../runtime/asyncprocessing/ReferenceCounted.java | 11 ++++++++--- .../runtime/asyncprocessing/ReferenceCountedTest.java | 4 ++-- 5 files changed, 43 insertions(+), 17 deletions(-) diff --git a/flink-core/src/main/java/org/apache/flink/core/state/StateFutureImpl.java b/flink-core/src/main/java/org/apache/flink/core/state/StateFutureImpl.java index d7db0be5311..ed6e5963e75 100644 --- a/flink-core/src/main/java/org/apache/flink/core/state/StateFutureImpl.java +++ b/flink-core/src/main/java/org/apache/flink/core/state/StateFutureImpl.java @@ -65,7 +65,7 @@ public class StateFutureImpl<T> implements InternalStateFuture<T> { (t) -> { callbackRunner.submit( () -> { - ret.complete(fn.apply(t)); + ret.completeInCallbackRunner(fn.apply(t)); callbackFinished(); }); }); @@ -91,7 +91,7 @@ public class StateFutureImpl<T> implements InternalStateFuture<T> { callbackRunner.submit( () -> { action.accept(t); - ret.complete(null); + ret.completeInCallbackRunner(null); callbackFinished(); }); }); @@ -116,7 +116,7 @@ public class StateFutureImpl<T> implements InternalStateFuture<T> { callbackRunner.submit( () -> { StateFuture<U> su = action.apply(t); - su.thenAccept(ret::complete); + su.thenAccept(ret::completeInCallbackRunner); callbackFinished(); }); }); @@ -153,7 +153,8 @@ public class StateFutureImpl<T> implements InternalStateFuture<T> { (t) -> { callbackRunner.submit( () -> { - ret.complete(fn.apply(t, u)); + ret.completeInCallbackRunner( + fn.apply(t, u)); callbackFinished(); }); }); @@ -178,7 +179,12 @@ public class StateFutureImpl<T> implements InternalStateFuture<T> { @Override public void complete(T result) { completableFuture.complete(result); - postComplete(); + postComplete(false); + } + + private void completeInCallbackRunner(T result) { + completableFuture.complete(result); + postComplete(true); } /** Will be triggered when a callback is registered. */ @@ -187,7 +193,7 @@ public class StateFutureImpl<T> implements InternalStateFuture<T> { } /** Will be triggered when this future completes. */ - public void postComplete() { + public void postComplete(boolean inCallbackRunner) { // does nothing by default. } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/ContextStateFutureImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/ContextStateFutureImpl.java index 9355a43a795..cccd815af0e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/ContextStateFutureImpl.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/ContextStateFutureImpl.java @@ -61,17 +61,21 @@ public class ContextStateFutureImpl<T> extends StateFutureImpl<T> { } @Override - public void postComplete() { + public void postComplete(boolean inCallbackRunner) { // When a state request completes, ref count -1, as described in FLIP-425: // To cover the statements without a callback, in addition to the reference count marked // in Fig.5, each state request itself is also protected by a paired reference count. - recordContext.release(); + if (inCallbackRunner) { + recordContext.release(Runnable::run); + } else { + recordContext.release(callbackRunner::submit); + } } @Override public void callbackFinished() { // When a callback ends, as shown in Fig.5 of FLIP-425, at the // point of 2,4 and 6, the ref count -1. - recordContext.release(); + recordContext.release(Runnable::run); } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/RecordContext.java b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/RecordContext.java index d7dd393ccbb..0df885cc938 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/RecordContext.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/RecordContext.java @@ -18,6 +18,8 @@ package org.apache.flink.runtime.asyncprocessing; +import javax.annotation.Nullable; + import java.util.Objects; import java.util.function.Consumer; @@ -29,9 +31,10 @@ import java.util.function.Consumer; * * @param <K> The type of the key inside the record. */ -public class RecordContext<K> extends ReferenceCounted { +public class RecordContext<K> extends ReferenceCounted<RecordContext.DisposerRunner> { /** The empty record for timer and non-record input usage. */ static final Object EMPTY_RECORD = new Object(); + /** The record to be processed. */ private final Object record; @@ -75,10 +78,14 @@ public class RecordContext<K> extends ReferenceCounted { } @Override - protected void referenceCountReachedZero() { + protected void referenceCountReachedZero(@Nullable DisposerRunner disposerRunner) { if (keyOccupied) { keyOccupied = false; - disposer.accept(this); + if (disposerRunner != null) { + disposerRunner.runDisposer(() -> disposer.accept(this)); + } else { + disposer.accept(this); + } } } @@ -115,4 +122,8 @@ public class RecordContext<K> extends ReferenceCounted { + getReferenceCount() + "}"; } + + public interface DisposerRunner { + void runDisposer(Runnable task); + } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/ReferenceCounted.java b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/ReferenceCounted.java index aebd19ec4e1..ae1035deddb 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/ReferenceCounted.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/ReferenceCounted.java @@ -23,6 +23,7 @@ import org.apache.flink.core.memory.MemoryUtils; import sun.misc.Unsafe; +import javax.annotation.Nullable; import javax.annotation.concurrent.ThreadSafe; /** @@ -32,7 +33,7 @@ import javax.annotation.concurrent.ThreadSafe; */ @Internal @ThreadSafe -public abstract class ReferenceCounted { +public abstract class ReferenceCounted<ReleaseHelper> { /** The "unsafe", which can be used to perform native memory accesses. */ @SuppressWarnings({"restriction", "UseOfSunClasses"}) @@ -87,9 +88,13 @@ public abstract class ReferenceCounted { } public int release() { + return release(null); + } + + public int release(@Nullable ReleaseHelper releaseHelper) { int r = unsafe.getAndAddInt(this, referenceOffset, -1) - 1; if (r == 0) { - referenceCountReachedZero(); + referenceCountReachedZero(releaseHelper); } return r; } @@ -99,5 +104,5 @@ public abstract class ReferenceCounted { } /** A method called when the reference count reaches zero. */ - protected abstract void referenceCountReachedZero(); + protected abstract void referenceCountReachedZero(@Nullable ReleaseHelper releaseHelper); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/ReferenceCountedTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/ReferenceCountedTest.java index 1de590cad94..af9f1de948b 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/ReferenceCountedTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/ReferenceCountedTest.java @@ -57,7 +57,7 @@ class ReferenceCountedTest { assertThat(referenceCounted.getReferenceCount()).isEqualTo(0); } - private static class TestReferenceCounted extends ReferenceCounted { + private static class TestReferenceCounted extends ReferenceCounted<Void> { private boolean reachedZero = false; public TestReferenceCounted() { @@ -65,7 +65,7 @@ class ReferenceCountedTest { } @Override - protected void referenceCountReachedZero() { + protected void referenceCountReachedZero(Void v) { reachedZero = true; } }