fredia commented on code in PR #24698:
URL: https://github.com/apache/flink/pull/24698#discussion_r1590678974
##########
flink-core/src/main/java/org/apache/flink/core/state/StateFutureImpl.java:
##########
@@ -46,123 +46,169 @@ public class StateFutureImpl<T> implements
InternalStateFuture<T> {
/** The callback runner. */
protected final CallbackRunner callbackRunner;
- public StateFutureImpl(CallbackRunner callbackRunner) {
+ /** The exception handler that handles callback framework's error. */
+ protected final CallbackExceptionHandler exceptionHandler;
+
+ public StateFutureImpl(
+ CallbackRunner callbackRunner, CallbackExceptionHandler
exceptionHandler) {
this.completableFuture = new CompletableFuture<>();
this.callbackRunner = callbackRunner;
+ this.exceptionHandler = exceptionHandler;
}
@Override
- public <U> StateFuture<U> thenApply(Function<? super T, ? extends U> fn) {
+ public <U> StateFuture<U> thenApply(
+ FunctionWithException<? super T, ? extends U, ? extends Exception>
fn) {
callbackRegistered();
- try {
- if (completableFuture.isDone()) {
+
+ if (completableFuture.isDone()) {
+ try {
U r = fn.apply(completableFuture.get());
callbackFinished();
return StateFutureUtils.completedFuture(r);
Review Comment:
The `exceptionHandler` is used to handle the framework error, such as fail
to submitting to mailbox.
While the exceptions thrown by `completableFuture.isDone()` branch and
`CompletedStateFuture` are the exceptions of user code, they would be handled
by mailbox directly.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]