yunfengzhou-hub commented on code in PR #24698: URL: https://github.com/apache/flink/pull/24698#discussion_r1584175343
########## flink-core/src/main/java/org/apache/flink/core/state/StateFutureImpl.java: ########## @@ -46,123 +46,169 @@ public class StateFutureImpl<T> implements InternalStateFuture<T> { /** The callback runner. */ protected final CallbackRunner callbackRunner; - public StateFutureImpl(CallbackRunner callbackRunner) { + /** The exception handler that handles callback framework's error. */ + protected final CallbackExceptionHandler exceptionHandler; + + public StateFutureImpl( + CallbackRunner callbackRunner, CallbackExceptionHandler exceptionHandler) { this.completableFuture = new CompletableFuture<>(); this.callbackRunner = callbackRunner; + this.exceptionHandler = exceptionHandler; } @Override - public <U> StateFuture<U> thenApply(Function<? super T, ? extends U> fn) { + public <U> StateFuture<U> thenApply( + FunctionWithException<? super T, ? extends U, ? extends Exception> fn) { callbackRegistered(); - try { - if (completableFuture.isDone()) { + + if (completableFuture.isDone()) { + try { U r = fn.apply(completableFuture.get()); callbackFinished(); return StateFutureUtils.completedFuture(r); - } else { - StateFutureImpl<U> ret = makeNewStateFuture(); - completableFuture.thenAccept( - (t) -> { - callbackRunner.submit( - () -> { - ret.completeInCallbackRunner(fn.apply(t)); - callbackFinished(); - }); - }); - return ret; + } catch (Throwable e) { + exceptionHandler.handleException( + "Caught exception when processing completed StateFuture's callback.", e); + return null; } - } catch (Throwable e) { - throw new FlinkRuntimeException("Error binding or executing callback", e); + } else { + StateFutureImpl<U> ret = makeNewStateFuture(); + completableFuture + .thenAccept( + (t) -> { + callbackRunner.submit( + () -> { + ret.completeInCallbackRunner(fn.apply(t)); + callbackFinished(); + }); + }) + .exceptionally( + (e) -> { + exceptionHandler.handleException( + "Caught exception when submitting StateFuture's callback.", + e); + return null; + }); + return ret; } } @Override - public StateFuture<Void> thenAccept(Consumer<? super T> action) { + public StateFuture<Void> thenAccept(ThrowingConsumer<? super T, ? extends Exception> action) { callbackRegistered(); - try { - if (completableFuture.isDone()) { + if (completableFuture.isDone()) { + try { action.accept(completableFuture.get()); callbackFinished(); return StateFutureUtils.completedVoidFuture(); Review Comment: Shall we wrap the three lines above into `callbakRunner.submit` as well? Or shall we add some description in the method's JavaDoc stating that this method must be invoked in the callbackRunner/mailboxThread? Same for other methods. ########## flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/ContextStateFutureImpl.java: ########## @@ -68,7 +72,17 @@ public void postComplete(boolean inCallbackRunner) { if (inCallbackRunner) { recordContext.release(Runnable::run); } else { - recordContext.release(callbackRunner::submit); + recordContext.release( + runnable -> { + try { + ThrowingRunnable<? extends Exception> throwingRunnable = + () -> runnable.run(); Review Comment: nit: runnable::run. ########## flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/AbstractAsyncStateStreamOperator.java: ########## @@ -78,13 +81,18 @@ public void setup( this.asyncExecutionController = new AsyncExecutionController( mailboxExecutor, + this::handleStateCallbackException, null, maxParallelism, asyncBufferSize, asyncBufferTimeout, inFlightRecordsLimit); } + private void handleStateCallbackException(String message, Throwable exception) { Review Comment: Shall we pass StreamTask.asyncExceptionHandler::handleAsyncException into this method, instead of introducing the same implementation? I'm a little concerned that if StreamTask.asyncExceptionHandler is modified in future, that committer might forget to change the implementation here, causing diverging implementation between sync and async mailbox model. ########## flink-core/src/main/java/org/apache/flink/core/state/StateFutureImpl.java: ########## @@ -203,12 +249,29 @@ public void callbackFinished() { } @Override - public void thenSyncAccept(Consumer<? super T> action) { - completableFuture.thenAccept(action); + public void thenSyncAccept(ThrowingConsumer<? super T, ? extends Exception> action) { + completableFuture + .thenAccept(ThrowingConsumer.unchecked(action)) + .exceptionally( + (e) -> { + exceptionHandler.handleException( + "Caught exception when processing completed StateFuture's callback.", + e); + return null; + }); } /** The entry for a state future to submit task to mailbox. */ public interface CallbackRunner { - void submit(Runnable task); + void submit(ThrowingRunnable<? extends Exception> task); + } + + /** + * Handles an exception thrown by callback framework, borrowed idea from {@code + * AsyncExceptionHandler}. + */ + public interface CallbackExceptionHandler { Review Comment: What will happen in the current implementation, when there was not this pull request and the callbacks throw RuntimeException directly? Will the exceptions be caught by StreamTask.asyncExceptionHandler? ########## flink-core-api/src/main/java/org/apache/flink/api/common/state/v2/StateFuture.java: ########## @@ -49,7 +49,7 @@ public interface StateFuture<T> { * @param action the action to perform before completing the returned StateFuture. * @return the new StateFuture. */ - StateFuture<Void> thenAccept(Consumer<? super T> action); + StateFuture<Void> thenAccept(ConsumerWithException<? super T, ? extends Exception> action); Review Comment: I suppose `CompletableFuture` has not been forcing users to handle exception. If users want to handle exception, they can use `whenComplete`, otherwise they can use `thenAccept`. The freedom is at the users'. ########## flink-core/src/main/java/org/apache/flink/core/state/StateFutureUtils.java: ########## @@ -32,12 +32,15 @@ */ @Experimental public class StateFutureUtils { + /** Returns a completed future that does nothing and return null. */ + @SuppressWarnings("unchecked") Review Comment: Changes in this file can be reversed. ########## flink-core/src/main/java/org/apache/flink/core/state/StateFutureImpl.java: ########## @@ -203,12 +249,29 @@ public void callbackFinished() { } @Override - public void thenSyncAccept(Consumer<? super T> action) { - completableFuture.thenAccept(action); + public void thenSyncAccept(ThrowingConsumer<? super T, ? extends Exception> action) { + completableFuture + .thenAccept(ThrowingConsumer.unchecked(action)) + .exceptionally( + (e) -> { + exceptionHandler.handleException( + "Caught exception when processing completed StateFuture's callback.", + e); + return null; + }); } /** The entry for a state future to submit task to mailbox. */ public interface CallbackRunner { - void submit(Runnable task); + void submit(ThrowingRunnable<? extends Exception> task); + } + + /** + * Handles an exception thrown by callback framework, borrowed idea from {@code + * AsyncExceptionHandler}. Review Comment: It might be unnecessary to mention where the idea comes from. ########## flink-core/src/test/java/org/apache/flink/core/state/StateFutureTest.java: ########## @@ -37,19 +38,23 @@ /** Tests for {@link StateFuture} related implementations. */ public class StateFutureTest { + static StateFutureImpl.CallbackExceptionHandler exceptionHandler = Review Comment: It might be better to add unit tests to verify that exceptions thrown by a StateFuture or its subsequent processes can be correctly passed to this exceptionHandler. ########## flink-core/src/main/java/org/apache/flink/core/state/StateFutureImpl.java: ########## @@ -46,123 +46,169 @@ public class StateFutureImpl<T> implements InternalStateFuture<T> { /** The callback runner. */ protected final CallbackRunner callbackRunner; - public StateFutureImpl(CallbackRunner callbackRunner) { + /** The exception handler that handles callback framework's error. */ + protected final CallbackExceptionHandler exceptionHandler; + + public StateFutureImpl( + CallbackRunner callbackRunner, CallbackExceptionHandler exceptionHandler) { this.completableFuture = new CompletableFuture<>(); this.callbackRunner = callbackRunner; + this.exceptionHandler = exceptionHandler; } @Override - public <U> StateFuture<U> thenApply(Function<? super T, ? extends U> fn) { + public <U> StateFuture<U> thenApply( + FunctionWithException<? super T, ? extends U, ? extends Exception> fn) { callbackRegistered(); - try { - if (completableFuture.isDone()) { + + if (completableFuture.isDone()) { + try { U r = fn.apply(completableFuture.get()); callbackFinished(); return StateFutureUtils.completedFuture(r); Review Comment: The `exceptionHandler` used by this `StateFutureImpl` will be lost in the returned StateFuture, which I think is not expected. Shall we add the `exceptionHandler` to `CompletedStateFuture` as well? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org