yunfengzhou-hub commented on code in PR #24698:
URL: https://github.com/apache/flink/pull/24698#discussion_r1584175343


##########
flink-core/src/main/java/org/apache/flink/core/state/StateFutureImpl.java:
##########
@@ -46,123 +46,169 @@ public class StateFutureImpl<T> implements 
InternalStateFuture<T> {
     /** The callback runner. */
     protected final CallbackRunner callbackRunner;
 
-    public StateFutureImpl(CallbackRunner callbackRunner) {
+    /** The exception handler that handles callback framework's error. */
+    protected final CallbackExceptionHandler exceptionHandler;
+
+    public StateFutureImpl(
+            CallbackRunner callbackRunner, CallbackExceptionHandler 
exceptionHandler) {
         this.completableFuture = new CompletableFuture<>();
         this.callbackRunner = callbackRunner;
+        this.exceptionHandler = exceptionHandler;
     }
 
     @Override
-    public <U> StateFuture<U> thenApply(Function<? super T, ? extends U> fn) {
+    public <U> StateFuture<U> thenApply(
+            FunctionWithException<? super T, ? extends U, ? extends Exception> 
fn) {
         callbackRegistered();
-        try {
-            if (completableFuture.isDone()) {
+
+        if (completableFuture.isDone()) {
+            try {
                 U r = fn.apply(completableFuture.get());
                 callbackFinished();
                 return StateFutureUtils.completedFuture(r);
-            } else {
-                StateFutureImpl<U> ret = makeNewStateFuture();
-                completableFuture.thenAccept(
-                        (t) -> {
-                            callbackRunner.submit(
-                                    () -> {
-                                        
ret.completeInCallbackRunner(fn.apply(t));
-                                        callbackFinished();
-                                    });
-                        });
-                return ret;
+            } catch (Throwable e) {
+                exceptionHandler.handleException(
+                        "Caught exception when processing completed 
StateFuture's callback.", e);
+                return null;
             }
-        } catch (Throwable e) {
-            throw new FlinkRuntimeException("Error binding or executing 
callback", e);
+        } else {
+            StateFutureImpl<U> ret = makeNewStateFuture();
+            completableFuture
+                    .thenAccept(
+                            (t) -> {
+                                callbackRunner.submit(
+                                        () -> {
+                                            
ret.completeInCallbackRunner(fn.apply(t));
+                                            callbackFinished();
+                                        });
+                            })
+                    .exceptionally(
+                            (e) -> {
+                                exceptionHandler.handleException(
+                                        "Caught exception when submitting 
StateFuture's callback.",
+                                        e);
+                                return null;
+                            });
+            return ret;
         }
     }
 
     @Override
-    public StateFuture<Void> thenAccept(Consumer<? super T> action) {
+    public StateFuture<Void> thenAccept(ThrowingConsumer<? super T, ? extends 
Exception> action) {
         callbackRegistered();
-        try {
-            if (completableFuture.isDone()) {
+        if (completableFuture.isDone()) {
+            try {
                 action.accept(completableFuture.get());
                 callbackFinished();
                 return StateFutureUtils.completedVoidFuture();

Review Comment:
   Shall we wrap the three lines above into `callbakRunner.submit` as well? Or 
shall we add some description in the method's JavaDoc stating that this method 
must be invoked in the callbackRunner/mailboxThread? Same for other methods.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/ContextStateFutureImpl.java:
##########
@@ -68,7 +72,17 @@ public void postComplete(boolean inCallbackRunner) {
         if (inCallbackRunner) {
             recordContext.release(Runnable::run);
         } else {
-            recordContext.release(callbackRunner::submit);
+            recordContext.release(
+                    runnable -> {
+                        try {
+                            ThrowingRunnable<? extends Exception> 
throwingRunnable =
+                                    () -> runnable.run();

Review Comment:
   nit: runnable::run.



##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/AbstractAsyncStateStreamOperator.java:
##########
@@ -78,13 +81,18 @@ public void setup(
         this.asyncExecutionController =
                 new AsyncExecutionController(
                         mailboxExecutor,
+                        this::handleStateCallbackException,
                         null,
                         maxParallelism,
                         asyncBufferSize,
                         asyncBufferTimeout,
                         inFlightRecordsLimit);
     }
 
+    private void handleStateCallbackException(String message, Throwable 
exception) {

Review Comment:
   Shall we pass StreamTask.asyncExceptionHandler::handleAsyncException into 
this method, instead of introducing the same implementation? I'm a little 
concerned that if StreamTask.asyncExceptionHandler is modified in future, that 
committer might forget to change the implementation here, causing diverging 
implementation between sync and async mailbox model.



##########
flink-core/src/main/java/org/apache/flink/core/state/StateFutureImpl.java:
##########
@@ -203,12 +249,29 @@ public void callbackFinished() {
     }
 
     @Override
-    public void thenSyncAccept(Consumer<? super T> action) {
-        completableFuture.thenAccept(action);
+    public void thenSyncAccept(ThrowingConsumer<? super T, ? extends 
Exception> action) {
+        completableFuture
+                .thenAccept(ThrowingConsumer.unchecked(action))
+                .exceptionally(
+                        (e) -> {
+                            exceptionHandler.handleException(
+                                    "Caught exception when processing 
completed StateFuture's callback.",
+                                    e);
+                            return null;
+                        });
     }
 
     /** The entry for a state future to submit task to mailbox. */
     public interface CallbackRunner {
-        void submit(Runnable task);
+        void submit(ThrowingRunnable<? extends Exception> task);
+    }
+
+    /**
+     * Handles an exception thrown by callback framework, borrowed idea from 
{@code
+     * AsyncExceptionHandler}.
+     */
+    public interface CallbackExceptionHandler {

Review Comment:
   What will happen in the current implementation, when there was not this pull 
request and the callbacks throw RuntimeException directly? Will the exceptions 
be caught by StreamTask.asyncExceptionHandler?



##########
flink-core-api/src/main/java/org/apache/flink/api/common/state/v2/StateFuture.java:
##########
@@ -49,7 +49,7 @@ public interface StateFuture<T> {
      * @param action the action to perform before completing the returned 
StateFuture.
      * @return the new StateFuture.
      */
-    StateFuture<Void> thenAccept(Consumer<? super T> action);
+    StateFuture<Void> thenAccept(ConsumerWithException<? super T, ? extends 
Exception> action);

Review Comment:
   I suppose `CompletableFuture` has not been forcing users to handle 
exception. If users want to handle exception, they can use `whenComplete`, 
otherwise they can use `thenAccept`. The freedom is at the users'.



##########
flink-core/src/main/java/org/apache/flink/core/state/StateFutureUtils.java:
##########
@@ -32,12 +32,15 @@
  */
 @Experimental
 public class StateFutureUtils {
+
     /** Returns a completed future that does nothing and return null. */
+    @SuppressWarnings("unchecked")

Review Comment:
   Changes in this file can be reversed.



##########
flink-core/src/main/java/org/apache/flink/core/state/StateFutureImpl.java:
##########
@@ -203,12 +249,29 @@ public void callbackFinished() {
     }
 
     @Override
-    public void thenSyncAccept(Consumer<? super T> action) {
-        completableFuture.thenAccept(action);
+    public void thenSyncAccept(ThrowingConsumer<? super T, ? extends 
Exception> action) {
+        completableFuture
+                .thenAccept(ThrowingConsumer.unchecked(action))
+                .exceptionally(
+                        (e) -> {
+                            exceptionHandler.handleException(
+                                    "Caught exception when processing 
completed StateFuture's callback.",
+                                    e);
+                            return null;
+                        });
     }
 
     /** The entry for a state future to submit task to mailbox. */
     public interface CallbackRunner {
-        void submit(Runnable task);
+        void submit(ThrowingRunnable<? extends Exception> task);
+    }
+
+    /**
+     * Handles an exception thrown by callback framework, borrowed idea from 
{@code
+     * AsyncExceptionHandler}.

Review Comment:
   It might be unnecessary to mention where the idea comes from.



##########
flink-core/src/test/java/org/apache/flink/core/state/StateFutureTest.java:
##########
@@ -37,19 +38,23 @@
 
 /** Tests for {@link StateFuture} related implementations. */
 public class StateFutureTest {
+    static StateFutureImpl.CallbackExceptionHandler exceptionHandler =

Review Comment:
   It might be better to add unit tests to verify that exceptions thrown by a 
StateFuture or its subsequent processes can be correctly passed to this 
exceptionHandler.



##########
flink-core/src/main/java/org/apache/flink/core/state/StateFutureImpl.java:
##########
@@ -46,123 +46,169 @@ public class StateFutureImpl<T> implements 
InternalStateFuture<T> {
     /** The callback runner. */
     protected final CallbackRunner callbackRunner;
 
-    public StateFutureImpl(CallbackRunner callbackRunner) {
+    /** The exception handler that handles callback framework's error. */
+    protected final CallbackExceptionHandler exceptionHandler;
+
+    public StateFutureImpl(
+            CallbackRunner callbackRunner, CallbackExceptionHandler 
exceptionHandler) {
         this.completableFuture = new CompletableFuture<>();
         this.callbackRunner = callbackRunner;
+        this.exceptionHandler = exceptionHandler;
     }
 
     @Override
-    public <U> StateFuture<U> thenApply(Function<? super T, ? extends U> fn) {
+    public <U> StateFuture<U> thenApply(
+            FunctionWithException<? super T, ? extends U, ? extends Exception> 
fn) {
         callbackRegistered();
-        try {
-            if (completableFuture.isDone()) {
+
+        if (completableFuture.isDone()) {
+            try {
                 U r = fn.apply(completableFuture.get());
                 callbackFinished();
                 return StateFutureUtils.completedFuture(r);

Review Comment:
   The `exceptionHandler` used by this `StateFutureImpl` will be lost in the 
returned StateFuture, which I think is not expected. Shall we add the 
`exceptionHandler` to `CompletedStateFuture` as well?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to