XComp commented on code in PR #22764:
URL: https://github.com/apache/flink/pull/22764#discussion_r1228191494
##########
flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/akka/RobustActorSystem.java:
##########
@@ -85,17 +86,52 @@ private static RobustActorSystem create(
.map(BootstrapSetup::defaultExecutionContext)
.flatMap(RobustActorSystem::toJavaOptional));
+ final PostShutdownClassLoadingErrorFilter
postShutdownClassLoadingErrorFilter =
+ new
PostShutdownClassLoadingErrorFilter(uncaughtExceptionHandler);
+
final RobustActorSystem robustActorSystem =
new RobustActorSystem(name, appConfig, classLoader, defaultEC,
setup) {
@Override
public Thread.UncaughtExceptionHandler
uncaughtExceptionHandler() {
- return
uncaughtExceptionHandler.getOrElse(super::uncaughtExceptionHandler);
+ return postShutdownClassLoadingErrorFilter;
}
};
+ robustActorSystem.registerOnTermination(
+ postShutdownClassLoadingErrorFilter::notifyShutdownComplete);
+
robustActorSystem.start();
return robustActorSystem;
}
+ private static class PostShutdownClassLoadingErrorFilter
+ implements Thread.UncaughtExceptionHandler {
+
+ private final AtomicBoolean shutdownComplete = new AtomicBoolean();
+ private final Thread.UncaughtExceptionHandler uncaughtExceptionHandler;
+
+ public PostShutdownClassLoadingErrorFilter(
+ Thread.UncaughtExceptionHandler uncaughtExceptionHandler) {
+ this.uncaughtExceptionHandler = uncaughtExceptionHandler;
+ }
+
+ public void notifyShutdownComplete() {
+ shutdownComplete.set(true);
+ }
+
+ @Override
+ public void uncaughtException(Thread t, Throwable e) {
+ if (shutdownComplete.get()
+ && (e instanceof NoClassDefFoundError || e instanceof
ClassNotFoundException)) {
Review Comment:
I'm wondering whether we should make this more restrictive (checking that
the thread's name includes the `akka` keyword). So far, we have seen only
`flink-akka` and `flink-metrics-akka` as the current thread in issues reported
under FLINK-32189.
##########
flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/akka/RobustActorSystem.java:
##########
Review Comment:
It's not part of the change but why do we have to declare
`RobustActorSystem` as abstract and overwriting the
`uncaughtExceptionHandler()` instead of setting the exception handler through
the constructor and implementing the method properly? :thinking:
##########
flink-rpc/flink-rpc-akka/src/test/java/org/apache/flink/runtime/rpc/akka/RobustActorSystemTest.java:
##########
@@ -84,6 +84,34 @@ void testUncaughtExceptionHandlerFromActor() {
assertThat(uncaughtException).isSameAs(error);
}
+ @Test
+ void testHonorClassloadingErrorBeforeShutdown() {
+ robustActorSystem
+ .uncaughtExceptionHandler()
+ .uncaughtException(Thread.currentThread(), new
NoClassDefFoundError(""));
+
+
assertThat(testingUncaughtExceptionHandler.findUncaughtExceptionNow()).isPresent();
+ }
+
+ @Test
+ void testIgnoreClassloadingErrorAfterShutdown() {
Review Comment:
nit: this test could be split up in two test methods (or through a
parameterized test), e.g.:
```
@ParameterizedTest
@ValueSource(classes = {NoClassDefFoundError.class,
ClassNotFoundException.class})
void testIgnoreClassloadingErrorAfterShutdown(Class<? extends Throwable>
clazz)
throws InstantiationException, IllegalAccessException {
// wait for termination
robustActorSystem.terminate();
robustActorSystem.getWhenTerminated().toCompletableFuture().join();
robustActorSystem
.uncaughtExceptionHandler()
.uncaughtException(Thread.currentThread(),
clazz.newInstance());
assertThat(testingUncaughtExceptionHandler.findUncaughtExceptionNow()).isEmpty();
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]