XComp commented on code in PR #22764:
URL: https://github.com/apache/flink/pull/22764#discussion_r1228191494


##########
flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/akka/RobustActorSystem.java:
##########
@@ -85,17 +86,52 @@ private static RobustActorSystem create(
                                 .map(BootstrapSetup::defaultExecutionContext)
                                 .flatMap(RobustActorSystem::toJavaOptional));
 
+        final PostShutdownClassLoadingErrorFilter 
postShutdownClassLoadingErrorFilter =
+                new 
PostShutdownClassLoadingErrorFilter(uncaughtExceptionHandler);
+
         final RobustActorSystem robustActorSystem =
                 new RobustActorSystem(name, appConfig, classLoader, defaultEC, 
setup) {
                     @Override
                     public Thread.UncaughtExceptionHandler 
uncaughtExceptionHandler() {
-                        return 
uncaughtExceptionHandler.getOrElse(super::uncaughtExceptionHandler);
+                        return postShutdownClassLoadingErrorFilter;
                     }
                 };
+        robustActorSystem.registerOnTermination(
+                postShutdownClassLoadingErrorFilter::notifyShutdownComplete);
+
         robustActorSystem.start();
         return robustActorSystem;
     }
 
+    private static class PostShutdownClassLoadingErrorFilter
+            implements Thread.UncaughtExceptionHandler {
+
+        private final AtomicBoolean shutdownComplete = new AtomicBoolean();
+        private final Thread.UncaughtExceptionHandler uncaughtExceptionHandler;
+
+        public PostShutdownClassLoadingErrorFilter(
+                Thread.UncaughtExceptionHandler uncaughtExceptionHandler) {
+            this.uncaughtExceptionHandler = uncaughtExceptionHandler;
+        }
+
+        public void notifyShutdownComplete() {
+            shutdownComplete.set(true);
+        }
+
+        @Override
+        public void uncaughtException(Thread t, Throwable e) {
+            if (shutdownComplete.get()
+                    && (e instanceof NoClassDefFoundError || e instanceof 
ClassNotFoundException)) {

Review Comment:
   I'm wondering whether we should make this more restrictive (checking that 
the thread's name includes the `akka` keyword). So far, we have seen only 
`flink-akka` and `flink-metrics-akka` as the current thread in issues reported 
under FLINK-32189.



##########
flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/akka/RobustActorSystem.java:
##########


Review Comment:
   It's not part of the change but why do we have to declare 
`RobustActorSystem` as abstract and overwriting the 
`uncaughtExceptionHandler()` instead of setting the exception handler through 
the constructor and implementing the method properly? :thinking: 



##########
flink-rpc/flink-rpc-akka/src/test/java/org/apache/flink/runtime/rpc/akka/RobustActorSystemTest.java:
##########
@@ -84,6 +84,34 @@ void testUncaughtExceptionHandlerFromActor() {
         assertThat(uncaughtException).isSameAs(error);
     }
 
+    @Test
+    void testHonorClassloadingErrorBeforeShutdown() {
+        robustActorSystem
+                .uncaughtExceptionHandler()
+                .uncaughtException(Thread.currentThread(), new 
NoClassDefFoundError(""));
+
+        
assertThat(testingUncaughtExceptionHandler.findUncaughtExceptionNow()).isPresent();
+    }
+
+    @Test
+    void testIgnoreClassloadingErrorAfterShutdown() {

Review Comment:
   nit: this test could be split up in two test methods (or through a 
parameterized test), e.g.:
   
   ```
       @ParameterizedTest
       @ValueSource(classes = {NoClassDefFoundError.class, 
ClassNotFoundException.class})
       void testIgnoreClassloadingErrorAfterShutdown(Class<? extends Throwable> 
clazz)
               throws InstantiationException, IllegalAccessException {
           // wait for termination
           robustActorSystem.terminate();
           robustActorSystem.getWhenTerminated().toCompletableFuture().join();
   
           robustActorSystem
                   .uncaughtExceptionHandler()
                   .uncaughtException(Thread.currentThread(), 
clazz.newInstance());
   
           
assertThat(testingUncaughtExceptionHandler.findUncaughtExceptionNow()).isEmpty();
       }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to