tillrohrmann commented on a change in pull request #17053:
URL: https://github.com/apache/flink/pull/17053#discussion_r699412949



##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/util/ZooKeeperUtilsTest.java
##########
@@ -44,4 +58,27 @@ private void runZookeeperPathGenerationTest(
 
         assertThat(result, is(expectedValue));
     }
+
+    @Test
+    public void testStartCuratorFrameworkFailed() throws Exception {
+        try {
+            final SystemExitTrackingSecurityManager trackingSecurityManager =
+                    new SystemExitTrackingSecurityManager();
+            System.setSecurityManager(trackingSecurityManager);
+            Configuration configuration = new Configuration();
+            
configuration.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, 
"localhost:2181");
+            CuratorZookeeperClient mockedCuratorZookeeperClient =
+                    PowerMockito.mock(CuratorZookeeperClient.class);
+            PowerMockito.doThrow(new 
RuntimeException()).when(mockedCuratorZookeeperClient).start();
+            PowerMockito.whenNew(CuratorZookeeperClient.class)
+                    .withAnyArguments()
+                    .thenReturn(mockedCuratorZookeeperClient);

Review comment:
       Instead of using PowerMockito, can we set a special `ZookeeperFactory` 
in the `CuratorFrameworkFactory.Builder`? I guess we would need to make the 
`ZooKeeperUtils` a bit easier to test (e.g. introducing a package private 
`startCuratorFramework(CuratorFrameworkFactory.Builder)`.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java
##########
@@ -226,9 +230,21 @@ public static CuratorFramework 
startCuratorFramework(Configuration configuration
         }
 
         CuratorFramework cf = curatorFrameworkBuilder.build();
-
+        // This handler is only used to handle the error during start phase, 
and should be
+        // removed after start curator success.
+        UnhandledErrorListener unhandledErrorListener =
+                (message, throwable) -> {
+                    LOG.error(
+                            "Exiting process for unhandled error in start 
curator framework, "
+                                    + "error message: {}, exiting code: {}",
+                            message,
+                            ZOOKEEPER_FAILURE_EXIT_CODE,
+                            throwable);
+                    System.exit(ZOOKEEPER_FAILURE_EXIT_CODE);
+                };
+        cf.getUnhandledErrorListenable().addListener(unhandledErrorListener);
         cf.start();
-
+        
cf.getUnhandledErrorListenable().removeListener(unhandledErrorListener);

Review comment:
       What if an error happens after this line but before we register another 
error handler?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java
##########
@@ -226,9 +230,21 @@ public static CuratorFramework 
startCuratorFramework(Configuration configuration
         }
 
         CuratorFramework cf = curatorFrameworkBuilder.build();
-
+        // This handler is only used to handle the error during start phase, 
and should be
+        // removed after start curator success.
+        UnhandledErrorListener unhandledErrorListener =
+                (message, throwable) -> {
+                    LOG.error(
+                            "Exiting process for unhandled error in start 
curator framework, "
+                                    + "error message: {}, exiting code: {}",
+                            message,
+                            ZOOKEEPER_FAILURE_EXIT_CODE,
+                            throwable);
+                    System.exit(ZOOKEEPER_FAILURE_EXIT_CODE);

Review comment:
       We probably should use `FlinkSecurityManager.forceProcessExit`.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java
##########
@@ -226,9 +230,21 @@ public static CuratorFramework 
startCuratorFramework(Configuration configuration
         }
 
         CuratorFramework cf = curatorFrameworkBuilder.build();
-
+        // This handler is only used to handle the error during start phase, 
and should be
+        // removed after start curator success.
+        UnhandledErrorListener unhandledErrorListener =
+                (message, throwable) -> {
+                    LOG.error(
+                            "Exiting process for unhandled error in start 
curator framework, "
+                                    + "error message: {}, exiting code: {}",
+                            message,
+                            ZOOKEEPER_FAILURE_EXIT_CODE,
+                            throwable);
+                    System.exit(ZOOKEEPER_FAILURE_EXIT_CODE);
+                };
+        cf.getUnhandledErrorListenable().addListener(unhandledErrorListener);

Review comment:
       I am actually wondering whether we shouldn't only register this one 
handler and remove the listeners in the `ZooKeeperLeaderElectionDriver` and 
`ZooKeeperLeaderRetrievalDriver` because these two listeners would be called 
for every unhandled exception that occurs in Curator independent of the actual 
source. Moreover, the current `UnhandledErrorListener` always call the 
`FatalExitExceptionHandler` eventually.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to