This is an automated email from the ASF dual-hosted git repository.

leiyanfei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 797cb9cd843 [hotfix][runtime] Make sure run RecordContext#release() in 
Task thread. (#24705)
797cb9cd843 is described below

commit 797cb9cd843b1f7c102339019b3dfc45c2fa329d
Author: Wang FeiFan <zoltar9...@163.com>
AuthorDate: Thu Apr 25 17:59:27 2024 +0800

    [hotfix][runtime] Make sure run RecordContext#release() in Task thread. 
(#24705)
---
 .../org/apache/flink/core/state/StateFutureImpl.java   | 18 ++++++++++++------
 .../asyncprocessing/ContextStateFutureImpl.java        | 10 +++++++---
 .../flink/runtime/asyncprocessing/RecordContext.java   | 17 ++++++++++++++---
 .../runtime/asyncprocessing/ReferenceCounted.java      | 11 ++++++++---
 .../runtime/asyncprocessing/ReferenceCountedTest.java  |  4 ++--
 5 files changed, 43 insertions(+), 17 deletions(-)

diff --git 
a/flink-core/src/main/java/org/apache/flink/core/state/StateFutureImpl.java 
b/flink-core/src/main/java/org/apache/flink/core/state/StateFutureImpl.java
index d7db0be5311..ed6e5963e75 100644
--- a/flink-core/src/main/java/org/apache/flink/core/state/StateFutureImpl.java
+++ b/flink-core/src/main/java/org/apache/flink/core/state/StateFutureImpl.java
@@ -65,7 +65,7 @@ public class StateFutureImpl<T> implements 
InternalStateFuture<T> {
                         (t) -> {
                             callbackRunner.submit(
                                     () -> {
-                                        ret.complete(fn.apply(t));
+                                        
ret.completeInCallbackRunner(fn.apply(t));
                                         callbackFinished();
                                     });
                         });
@@ -91,7 +91,7 @@ public class StateFutureImpl<T> implements 
InternalStateFuture<T> {
                             callbackRunner.submit(
                                     () -> {
                                         action.accept(t);
-                                        ret.complete(null);
+                                        ret.completeInCallbackRunner(null);
                                         callbackFinished();
                                     });
                         });
@@ -116,7 +116,7 @@ public class StateFutureImpl<T> implements 
InternalStateFuture<T> {
                             callbackRunner.submit(
                                     () -> {
                                         StateFuture<U> su = action.apply(t);
-                                        su.thenAccept(ret::complete);
+                                        
su.thenAccept(ret::completeInCallbackRunner);
                                         callbackFinished();
                                     });
                         });
@@ -153,7 +153,8 @@ public class StateFutureImpl<T> implements 
InternalStateFuture<T> {
                                             (t) -> {
                                                 callbackRunner.submit(
                                                         () -> {
-                                                            
ret.complete(fn.apply(t, u));
+                                                            
ret.completeInCallbackRunner(
+                                                                    
fn.apply(t, u));
                                                             callbackFinished();
                                                         });
                                             });
@@ -178,7 +179,12 @@ public class StateFutureImpl<T> implements 
InternalStateFuture<T> {
     @Override
     public void complete(T result) {
         completableFuture.complete(result);
-        postComplete();
+        postComplete(false);
+    }
+
+    private void completeInCallbackRunner(T result) {
+        completableFuture.complete(result);
+        postComplete(true);
     }
 
     /** Will be triggered when a callback is registered. */
@@ -187,7 +193,7 @@ public class StateFutureImpl<T> implements 
InternalStateFuture<T> {
     }
 
     /** Will be triggered when this future completes. */
-    public void postComplete() {
+    public void postComplete(boolean inCallbackRunner) {
         // does nothing by default.
     }
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/ContextStateFutureImpl.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/ContextStateFutureImpl.java
index 9355a43a795..cccd815af0e 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/ContextStateFutureImpl.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/ContextStateFutureImpl.java
@@ -61,17 +61,21 @@ public class ContextStateFutureImpl<T> extends 
StateFutureImpl<T> {
     }
 
     @Override
-    public void postComplete() {
+    public void postComplete(boolean inCallbackRunner) {
         // When a state request completes, ref count -1, as described in 
FLIP-425:
         // To cover the statements without a callback, in addition to the 
reference count marked
         // in Fig.5, each state request itself is also protected by a paired 
reference count.
-        recordContext.release();
+        if (inCallbackRunner) {
+            recordContext.release(Runnable::run);
+        } else {
+            recordContext.release(callbackRunner::submit);
+        }
     }
 
     @Override
     public void callbackFinished() {
         // When a callback ends, as shown in Fig.5 of FLIP-425, at the
         // point of 2,4 and 6, the ref count -1.
-        recordContext.release();
+        recordContext.release(Runnable::run);
     }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/RecordContext.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/RecordContext.java
index d7dd393ccbb..0df885cc938 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/RecordContext.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/RecordContext.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.runtime.asyncprocessing;
 
+import javax.annotation.Nullable;
+
 import java.util.Objects;
 import java.util.function.Consumer;
 
@@ -29,9 +31,10 @@ import java.util.function.Consumer;
  *
  * @param <K> The type of the key inside the record.
  */
-public class RecordContext<K> extends ReferenceCounted {
+public class RecordContext<K> extends 
ReferenceCounted<RecordContext.DisposerRunner> {
     /** The empty record for timer and non-record input usage. */
     static final Object EMPTY_RECORD = new Object();
+
     /** The record to be processed. */
     private final Object record;
 
@@ -75,10 +78,14 @@ public class RecordContext<K> extends ReferenceCounted {
     }
 
     @Override
-    protected void referenceCountReachedZero() {
+    protected void referenceCountReachedZero(@Nullable DisposerRunner 
disposerRunner) {
         if (keyOccupied) {
             keyOccupied = false;
-            disposer.accept(this);
+            if (disposerRunner != null) {
+                disposerRunner.runDisposer(() -> disposer.accept(this));
+            } else {
+                disposer.accept(this);
+            }
         }
     }
 
@@ -115,4 +122,8 @@ public class RecordContext<K> extends ReferenceCounted {
                 + getReferenceCount()
                 + "}";
     }
+
+    public interface DisposerRunner {
+        void runDisposer(Runnable task);
+    }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/ReferenceCounted.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/ReferenceCounted.java
index aebd19ec4e1..ae1035deddb 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/ReferenceCounted.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/ReferenceCounted.java
@@ -23,6 +23,7 @@ import org.apache.flink.core.memory.MemoryUtils;
 
 import sun.misc.Unsafe;
 
+import javax.annotation.Nullable;
 import javax.annotation.concurrent.ThreadSafe;
 
 /**
@@ -32,7 +33,7 @@ import javax.annotation.concurrent.ThreadSafe;
  */
 @Internal
 @ThreadSafe
-public abstract class ReferenceCounted {
+public abstract class ReferenceCounted<ReleaseHelper> {
 
     /** The "unsafe", which can be used to perform native memory accesses. */
     @SuppressWarnings({"restriction", "UseOfSunClasses"})
@@ -87,9 +88,13 @@ public abstract class ReferenceCounted {
     }
 
     public int release() {
+        return release(null);
+    }
+
+    public int release(@Nullable ReleaseHelper releaseHelper) {
         int r = unsafe.getAndAddInt(this, referenceOffset, -1) - 1;
         if (r == 0) {
-            referenceCountReachedZero();
+            referenceCountReachedZero(releaseHelper);
         }
         return r;
     }
@@ -99,5 +104,5 @@ public abstract class ReferenceCounted {
     }
 
     /** A method called when the reference count reaches zero. */
-    protected abstract void referenceCountReachedZero();
+    protected abstract void referenceCountReachedZero(@Nullable ReleaseHelper 
releaseHelper);
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/ReferenceCountedTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/ReferenceCountedTest.java
index 1de590cad94..af9f1de948b 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/ReferenceCountedTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/ReferenceCountedTest.java
@@ -57,7 +57,7 @@ class ReferenceCountedTest {
         assertThat(referenceCounted.getReferenceCount()).isEqualTo(0);
     }
 
-    private static class TestReferenceCounted extends ReferenceCounted {
+    private static class TestReferenceCounted extends ReferenceCounted<Void> {
         private boolean reachedZero = false;
 
         public TestReferenceCounted() {
@@ -65,7 +65,7 @@ class ReferenceCountedTest {
         }
 
         @Override
-        protected void referenceCountReachedZero() {
+        protected void referenceCountReachedZero(Void v) {
             reachedZero = true;
         }
     }

Reply via email to