yunfengzhou-hub commented on code in PR #24698:
URL: https://github.com/apache/flink/pull/24698#discussion_r1584175343
##########
flink-core/src/main/java/org/apache/flink/core/state/StateFutureImpl.java:
##########
@@ -46,123 +46,169 @@ public class StateFutureImpl<T> implements
InternalStateFuture<T> {
/** The callback runner. */
protected final CallbackRunner callbackRunner;
- public StateFutureImpl(CallbackRunner callbackRunner) {
+ /** The exception handler that handles callback framework's error. */
+ protected final CallbackExceptionHandler exceptionHandler;
+
+ public StateFutureImpl(
+ CallbackRunner callbackRunner, CallbackExceptionHandler
exceptionHandler) {
this.completableFuture = new CompletableFuture<>();
this.callbackRunner = callbackRunner;
+ this.exceptionHandler = exceptionHandler;
}
@Override
- public <U> StateFuture<U> thenApply(Function<? super T, ? extends U> fn) {
+ public <U> StateFuture<U> thenApply(
+ FunctionWithException<? super T, ? extends U, ? extends Exception>
fn) {
callbackRegistered();
- try {
- if (completableFuture.isDone()) {
+
+ if (completableFuture.isDone()) {
+ try {
U r = fn.apply(completableFuture.get());
callbackFinished();
return StateFutureUtils.completedFuture(r);
- } else {
- StateFutureImpl<U> ret = makeNewStateFuture();
- completableFuture.thenAccept(
- (t) -> {
- callbackRunner.submit(
- () -> {
-
ret.completeInCallbackRunner(fn.apply(t));
- callbackFinished();
- });
- });
- return ret;
+ } catch (Throwable e) {
+ exceptionHandler.handleException(
+ "Caught exception when processing completed
StateFuture's callback.", e);
+ return null;
}
- } catch (Throwable e) {
- throw new FlinkRuntimeException("Error binding or executing
callback", e);
+ } else {
+ StateFutureImpl<U> ret = makeNewStateFuture();
+ completableFuture
+ .thenAccept(
+ (t) -> {
+ callbackRunner.submit(
+ () -> {
+
ret.completeInCallbackRunner(fn.apply(t));
+ callbackFinished();
+ });
+ })
+ .exceptionally(
+ (e) -> {
+ exceptionHandler.handleException(
+ "Caught exception when submitting
StateFuture's callback.",
+ e);
+ return null;
+ });
+ return ret;
}
}
@Override
- public StateFuture<Void> thenAccept(Consumer<? super T> action) {
+ public StateFuture<Void> thenAccept(ThrowingConsumer<? super T, ? extends
Exception> action) {
callbackRegistered();
- try {
- if (completableFuture.isDone()) {
+ if (completableFuture.isDone()) {
+ try {
action.accept(completableFuture.get());
callbackFinished();
return StateFutureUtils.completedVoidFuture();
Review Comment:
Shall we wrap the three lines above into `callbakRunner.submit` as well? Or
shall we add some description in the method's JavaDoc stating that this method
must be invoked in the callbackRunner/mailboxThread? Same for other methods.
##########
flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/ContextStateFutureImpl.java:
##########
@@ -68,7 +72,17 @@ public void postComplete(boolean inCallbackRunner) {
if (inCallbackRunner) {
recordContext.release(Runnable::run);
} else {
- recordContext.release(callbackRunner::submit);
+ recordContext.release(
+ runnable -> {
+ try {
+ ThrowingRunnable<? extends Exception>
throwingRunnable =
+ () -> runnable.run();
Review Comment:
nit: runnable::run.
##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/AbstractAsyncStateStreamOperator.java:
##########
@@ -78,13 +81,18 @@ public void setup(
this.asyncExecutionController =
new AsyncExecutionController(
mailboxExecutor,
+ this::handleStateCallbackException,
null,
maxParallelism,
asyncBufferSize,
asyncBufferTimeout,
inFlightRecordsLimit);
}
+ private void handleStateCallbackException(String message, Throwable
exception) {
Review Comment:
Shall we pass StreamTask.asyncExceptionHandler::handleAsyncException into
this method, instead of introducing the same implementation? I'm a little
concerned that if StreamTask.asyncExceptionHandler is modified in future, that
committer might forget to change the implementation here, causing diverging
implementation between sync and async mailbox model.
##########
flink-core/src/main/java/org/apache/flink/core/state/StateFutureImpl.java:
##########
@@ -203,12 +249,29 @@ public void callbackFinished() {
}
@Override
- public void thenSyncAccept(Consumer<? super T> action) {
- completableFuture.thenAccept(action);
+ public void thenSyncAccept(ThrowingConsumer<? super T, ? extends
Exception> action) {
+ completableFuture
+ .thenAccept(ThrowingConsumer.unchecked(action))
+ .exceptionally(
+ (e) -> {
+ exceptionHandler.handleException(
+ "Caught exception when processing
completed StateFuture's callback.",
+ e);
+ return null;
+ });
}
/** The entry for a state future to submit task to mailbox. */
public interface CallbackRunner {
- void submit(Runnable task);
+ void submit(ThrowingRunnable<? extends Exception> task);
+ }
+
+ /**
+ * Handles an exception thrown by callback framework, borrowed idea from
{@code
+ * AsyncExceptionHandler}.
+ */
+ public interface CallbackExceptionHandler {
Review Comment:
What will happen in the current implementation, when there was not this pull
request and the callbacks throw RuntimeException directly? Will the exceptions
be caught by StreamTask.asyncExceptionHandler?
##########
flink-core-api/src/main/java/org/apache/flink/api/common/state/v2/StateFuture.java:
##########
@@ -49,7 +49,7 @@ public interface StateFuture<T> {
* @param action the action to perform before completing the returned
StateFuture.
* @return the new StateFuture.
*/
- StateFuture<Void> thenAccept(Consumer<? super T> action);
+ StateFuture<Void> thenAccept(ConsumerWithException<? super T, ? extends
Exception> action);
Review Comment:
I suppose `CompletableFuture` has not been forcing users to handle
exception. If users want to handle exception, they can use `whenComplete`,
otherwise they can use `thenAccept`. The freedom is at the users'.
##########
flink-core/src/main/java/org/apache/flink/core/state/StateFutureUtils.java:
##########
@@ -32,12 +32,15 @@
*/
@Experimental
public class StateFutureUtils {
+
/** Returns a completed future that does nothing and return null. */
+ @SuppressWarnings("unchecked")
Review Comment:
Changes in this file can be reversed.
##########
flink-core/src/main/java/org/apache/flink/core/state/StateFutureImpl.java:
##########
@@ -203,12 +249,29 @@ public void callbackFinished() {
}
@Override
- public void thenSyncAccept(Consumer<? super T> action) {
- completableFuture.thenAccept(action);
+ public void thenSyncAccept(ThrowingConsumer<? super T, ? extends
Exception> action) {
+ completableFuture
+ .thenAccept(ThrowingConsumer.unchecked(action))
+ .exceptionally(
+ (e) -> {
+ exceptionHandler.handleException(
+ "Caught exception when processing
completed StateFuture's callback.",
+ e);
+ return null;
+ });
}
/** The entry for a state future to submit task to mailbox. */
public interface CallbackRunner {
- void submit(Runnable task);
+ void submit(ThrowingRunnable<? extends Exception> task);
+ }
+
+ /**
+ * Handles an exception thrown by callback framework, borrowed idea from
{@code
+ * AsyncExceptionHandler}.
Review Comment:
It might be unnecessary to mention where the idea comes from.
##########
flink-core/src/test/java/org/apache/flink/core/state/StateFutureTest.java:
##########
@@ -37,19 +38,23 @@
/** Tests for {@link StateFuture} related implementations. */
public class StateFutureTest {
+ static StateFutureImpl.CallbackExceptionHandler exceptionHandler =
Review Comment:
It might be better to add unit tests to verify that exceptions thrown by a
StateFuture or its subsequent processes can be correctly passed to this
exceptionHandler.
##########
flink-core/src/main/java/org/apache/flink/core/state/StateFutureImpl.java:
##########
@@ -46,123 +46,169 @@ public class StateFutureImpl<T> implements
InternalStateFuture<T> {
/** The callback runner. */
protected final CallbackRunner callbackRunner;
- public StateFutureImpl(CallbackRunner callbackRunner) {
+ /** The exception handler that handles callback framework's error. */
+ protected final CallbackExceptionHandler exceptionHandler;
+
+ public StateFutureImpl(
+ CallbackRunner callbackRunner, CallbackExceptionHandler
exceptionHandler) {
this.completableFuture = new CompletableFuture<>();
this.callbackRunner = callbackRunner;
+ this.exceptionHandler = exceptionHandler;
}
@Override
- public <U> StateFuture<U> thenApply(Function<? super T, ? extends U> fn) {
+ public <U> StateFuture<U> thenApply(
+ FunctionWithException<? super T, ? extends U, ? extends Exception>
fn) {
callbackRegistered();
- try {
- if (completableFuture.isDone()) {
+
+ if (completableFuture.isDone()) {
+ try {
U r = fn.apply(completableFuture.get());
callbackFinished();
return StateFutureUtils.completedFuture(r);
Review Comment:
The `exceptionHandler` used by this `StateFutureImpl` will be lost in the
returned StateFuture, which I think is not expected. Shall we add the
`exceptionHandler` to `CompletedStateFuture` as well?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]