Aitozi commented on a change in pull request #17053:
URL: https://github.com/apache/flink/pull/17053#discussion_r699104914



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java
##########
@@ -226,9 +230,21 @@ public static CuratorFramework 
startCuratorFramework(Configuration configuration
         }
 
         CuratorFramework cf = curatorFrameworkBuilder.build();
-
+        // This handler is only used to handle the error during start phase, 
and should be
+        // removed after start curator success.
+        UnhandledErrorListener unhandledErrorListener =
+                (message, throwable) -> {
+                    LOG.error(
+                            "Exiting process for unhandled error in start 
curator framework, "
+                                    + "error message: {}, exiting code: {}",
+                            message,
+                            ZOOKEEPER_FAILURE_EXIT_CODE,
+                            throwable);
+                    System.exit(ZOOKEEPER_FAILURE_EXIT_CODE);

Review comment:
       Now I just call `System.exit()` here. Another option I think we can 
throw exception here, let it be handled in entryPoint. What's your idea? 
@tillrohrmann 

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java
##########
@@ -226,9 +230,21 @@ public static CuratorFramework 
startCuratorFramework(Configuration configuration
         }
 
         CuratorFramework cf = curatorFrameworkBuilder.build();
-
+        // This handler is only used to handle the error during start phase, 
and should be
+        // removed after start curator success.
+        UnhandledErrorListener unhandledErrorListener =
+                (message, throwable) -> {
+                    LOG.error(
+                            "Exiting process for unhandled error in start 
curator framework, "
+                                    + "error message: {}, exiting code: {}",
+                            message,
+                            ZOOKEEPER_FAILURE_EXIT_CODE,
+                            throwable);
+                    System.exit(ZOOKEEPER_FAILURE_EXIT_CODE);
+                };
+        cf.getUnhandledErrorListenable().addListener(unhandledErrorListener);
         cf.start();
-
+        
cf.getUnhandledErrorListenable().removeListener(unhandledErrorListener);

Review comment:
       I used to add this line to avoid affecting other error handler after 
`LeaderElectionService` and `LeaderRetrialService` registering handlers. 
   But I think you are right, error may still happen between `start` and other 
handler registered, I will remove this line.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/util/ZooKeeperUtilsTest.java
##########
@@ -44,4 +58,27 @@ private void runZookeeperPathGenerationTest(
 
         assertThat(result, is(expectedValue));
     }
+
+    @Test
+    public void testStartCuratorFrameworkFailed() throws Exception {
+        try {
+            final SystemExitTrackingSecurityManager trackingSecurityManager =
+                    new SystemExitTrackingSecurityManager();
+            System.setSecurityManager(trackingSecurityManager);
+            Configuration configuration = new Configuration();
+            
configuration.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, 
"localhost:2181");
+            CuratorZookeeperClient mockedCuratorZookeeperClient =
+                    PowerMockito.mock(CuratorZookeeperClient.class);
+            PowerMockito.doThrow(new 
RuntimeException()).when(mockedCuratorZookeeperClient).start();
+            PowerMockito.whenNew(CuratorZookeeperClient.class)
+                    .withAnyArguments()
+                    .thenReturn(mockedCuratorZookeeperClient);

Review comment:
       Good point 👍

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java
##########
@@ -226,9 +230,21 @@ public static CuratorFramework 
startCuratorFramework(Configuration configuration
         }
 
         CuratorFramework cf = curatorFrameworkBuilder.build();
-
+        // This handler is only used to handle the error during start phase, 
and should be
+        // removed after start curator success.
+        UnhandledErrorListener unhandledErrorListener =
+                (message, throwable) -> {
+                    LOG.error(
+                            "Exiting process for unhandled error in start 
curator framework, "
+                                    + "error message: {}, exiting code: {}",
+                            message,
+                            ZOOKEEPER_FAILURE_EXIT_CODE,
+                            throwable);
+                    System.exit(ZOOKEEPER_FAILURE_EXIT_CODE);
+                };
+        cf.getUnhandledErrorListenable().addListener(unhandledErrorListener);

Review comment:
       I don't really get your meaning, may be it's `should only` ? 
   
   I think we could safely remove the listener in `leaderElectionDriver` and 
`leaderRetrievalDriver` like you said that,all the three handlers always exit 
the process now, after one execute other will not be executed. So there is no 
meaning to register other handlers.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java
##########
@@ -226,9 +230,21 @@ public static CuratorFramework 
startCuratorFramework(Configuration configuration
         }
 
         CuratorFramework cf = curatorFrameworkBuilder.build();
-
+        // This handler is only used to handle the error during start phase, 
and should be
+        // removed after start curator success.
+        UnhandledErrorListener unhandledErrorListener =
+                (message, throwable) -> {
+                    LOG.error(
+                            "Exiting process for unhandled error in start 
curator framework, "
+                                    + "error message: {}, exiting code: {}",
+                            message,
+                            ZOOKEEPER_FAILURE_EXIT_CODE,
+                            throwable);
+                    System.exit(ZOOKEEPER_FAILURE_EXIT_CODE);

Review comment:
       A little question: How can we test the exit behavior of 
`FlinkSecurityManager.forceProcessExit` ?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java
##########
@@ -226,9 +230,21 @@ public static CuratorFramework 
startCuratorFramework(Configuration configuration
         }
 
         CuratorFramework cf = curatorFrameworkBuilder.build();
-
+        // This handler is only used to handle the error during start phase, 
and should be
+        // removed after start curator success.
+        UnhandledErrorListener unhandledErrorListener =
+                (message, throwable) -> {
+                    LOG.error(
+                            "Exiting process for unhandled error in start 
curator framework, "
+                                    + "error message: {}, exiting code: {}",
+                            message,
+                            ZOOKEEPER_FAILURE_EXIT_CODE,
+                            throwable);
+                    System.exit(ZOOKEEPER_FAILURE_EXIT_CODE);

Review comment:
       A little question: How can we test the exit behavior of 
`FlinkSecurityManager.forceProcessExit` ?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java
##########
@@ -224,11 +229,30 @@ public static CuratorFramework 
startCuratorFramework(Configuration configuration
             curatorFrameworkBuilder.connectionStateErrorPolicy(
                     new SessionConnectionStateErrorPolicy());
         }
+        return startCuratorFramework(curatorFrameworkBuilder);
+    }
 
-        CuratorFramework cf = curatorFrameworkBuilder.build();
-
+    /**
+     * Starts a {@link CuratorFramework} instance and connects it to the given 
ZooKeeper quorum from
+     * a builder.
+     *
+     * @param builder {@link CuratorFrameworkFactory.Builder} A builder for 
curatorFramework.
+     * @return {@link CuratorFramework} instance
+     */
+    static CuratorFramework 
startCuratorFramework(CuratorFrameworkFactory.Builder builder) {
+        CuratorFramework cf = builder.build();
+        UnhandledErrorListener unhandledErrorListener =
+                (message, throwable) -> {
+                    LOG.error(
+                            "Exiting process for unhandled error in curator 
framework, "
+                                    + "error message: {}, exiting code: {}",
+                            message,
+                            ZOOKEEPER_FAILURE_EXIT_CODE,
+                            throwable);
+                    
FlinkSecurityManager.forceProcessExit(ZOOKEEPER_FAILURE_EXIT_CODE);

Review comment:
       Get it, I will add the `FatalErrorHandler`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to