fredia commented on code in PR #24698:
URL: https://github.com/apache/flink/pull/24698#discussion_r1590680460


##########
flink-core/src/main/java/org/apache/flink/core/state/StateFutureImpl.java:
##########
@@ -46,123 +46,169 @@ public class StateFutureImpl<T> implements 
InternalStateFuture<T> {
     /** The callback runner. */
     protected final CallbackRunner callbackRunner;
 
-    public StateFutureImpl(CallbackRunner callbackRunner) {
+    /** The exception handler that handles callback framework's error. */
+    protected final CallbackExceptionHandler exceptionHandler;
+
+    public StateFutureImpl(
+            CallbackRunner callbackRunner, CallbackExceptionHandler 
exceptionHandler) {
         this.completableFuture = new CompletableFuture<>();
         this.callbackRunner = callbackRunner;
+        this.exceptionHandler = exceptionHandler;
     }
 
     @Override
-    public <U> StateFuture<U> thenApply(Function<? super T, ? extends U> fn) {
+    public <U> StateFuture<U> thenApply(
+            FunctionWithException<? super T, ? extends U, ? extends Exception> 
fn) {
         callbackRegistered();
-        try {
-            if (completableFuture.isDone()) {
+
+        if (completableFuture.isDone()) {
+            try {
                 U r = fn.apply(completableFuture.get());
                 callbackFinished();
                 return StateFutureUtils.completedFuture(r);
-            } else {
-                StateFutureImpl<U> ret = makeNewStateFuture();
-                completableFuture.thenAccept(
-                        (t) -> {
-                            callbackRunner.submit(
-                                    () -> {
-                                        
ret.completeInCallbackRunner(fn.apply(t));
-                                        callbackFinished();
-                                    });
-                        });
-                return ret;
+            } catch (Throwable e) {
+                exceptionHandler.handleException(
+                        "Caught exception when processing completed 
StateFuture's callback.", e);
+                return null;
             }
-        } catch (Throwable e) {
-            throw new FlinkRuntimeException("Error binding or executing 
callback", e);
+        } else {
+            StateFutureImpl<U> ret = makeNewStateFuture();
+            completableFuture
+                    .thenAccept(
+                            (t) -> {
+                                callbackRunner.submit(
+                                        () -> {
+                                            
ret.completeInCallbackRunner(fn.apply(t));
+                                            callbackFinished();
+                                        });
+                            })
+                    .exceptionally(
+                            (e) -> {
+                                exceptionHandler.handleException(
+                                        "Caught exception when submitting 
StateFuture's callback.",
+                                        e);
+                                return null;
+                            });
+            return ret;
         }
     }
 
     @Override
-    public StateFuture<Void> thenAccept(Consumer<? super T> action) {
+    public StateFuture<Void> thenAccept(ThrowingConsumer<? super T, ? extends 
Exception> action) {
         callbackRegistered();
-        try {
-            if (completableFuture.isDone()) {
+        if (completableFuture.isDone()) {
+            try {
                 action.accept(completableFuture.get());
                 callbackFinished();
                 return StateFutureUtils.completedVoidFuture();

Review Comment:
   Thanks for the suggestion, added some description here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to