fredia commented on code in PR #24698:
URL: https://github.com/apache/flink/pull/24698#discussion_r1590672845


##########
flink-core/src/main/java/org/apache/flink/core/state/StateFutureImpl.java:
##########
@@ -203,12 +249,29 @@ public void callbackFinished() {
     }
 
     @Override
-    public void thenSyncAccept(Consumer<? super T> action) {
-        completableFuture.thenAccept(action);
+    public void thenSyncAccept(ThrowingConsumer<? super T, ? extends 
Exception> action) {
+        completableFuture
+                .thenAccept(ThrowingConsumer.unchecked(action))
+                .exceptionally(
+                        (e) -> {
+                            exceptionHandler.handleException(
+                                    "Caught exception when processing 
completed StateFuture's callback.",
+                                    e);
+                            return null;
+                        });
     }
 
     /** The entry for a state future to submit task to mailbox. */
     public interface CallbackRunner {
-        void submit(Runnable task);
+        void submit(ThrowingRunnable<? extends Exception> task);
+    }
+
+    /**
+     * Handles an exception thrown by callback framework, borrowed idea from 
{@code
+     * AsyncExceptionHandler}.
+     */
+    public interface CallbackExceptionHandler {

Review Comment:
   The callbacks throw RuntimeException directly without this PR.
   After this PR:
   1. the framework exception(such as fail to submit to mailbox) would be 
caught by `CallbackExceptionHandler`, and finally caught by 
`AbstractAsyncStateStreamOperator#handleStateCallbackException`.
   2. The exceptions thrown by user code or state access would be caught by 
mailbox.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to