tillrohrmann commented on a change in pull request #17053:
URL: https://github.com/apache/flink/pull/17053#discussion_r701175663



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java
##########
@@ -224,11 +229,34 @@ public static CuratorFramework 
startCuratorFramework(Configuration configuration
             curatorFrameworkBuilder.connectionStateErrorPolicy(
                     new SessionConnectionStateErrorPolicy());
         }
+        return startCuratorFramework(curatorFrameworkBuilder, 
fatalErrorHandler);
+    }
 
-        CuratorFramework cf = curatorFrameworkBuilder.build();
-
+    /**
+     * Starts a {@link CuratorFramework} instance and connects it to the given 
ZooKeeper quorum from
+     * a builder.
+     *
+     * @param builder {@link CuratorFrameworkFactory.Builder} A builder for 
curatorFramework.
+     * @param fatalErrorHandler {@link FatalErrorHandler} fatalErrorHandler to 
handle unExpected
+     *     error of {@link CuratorFramework}

Review comment:
       ```suggestion
        *     errors of {@link CuratorFramework}
   ```

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java
##########
@@ -149,9 +151,12 @@ public static String generateLeaderLatchPath(String path) {
      * Starts a {@link CuratorFramework} instance and connects it to the given 
ZooKeeper quorum.
      *
      * @param configuration {@link Configuration} object containing the 
configuration values
+     * @param fatalErrorHandler {@link FatalErrorHandler} fatalErrorHandler to 
handle unExpected
+     *     error of {@link CuratorFramework}

Review comment:
       ```suggestion
        *     errors of {@link CuratorFramework}
   ```

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/util/ZooKeeperUtilsTest.java
##########
@@ -44,4 +50,29 @@ private void runZookeeperPathGenerationTest(
 
         assertThat(result, is(expectedValue));
     }
+
+    @Test
+    public void testStartCuratorFrameworkFailed() throws Exception {
+        try {
+            final int exitCode = 123;
+            final SystemExitTrackingSecurityManager trackingSecurityManager =
+                    new SystemExitTrackingSecurityManager();
+            System.setSecurityManager(trackingSecurityManager);
+            final CuratorFrameworkFactory.Builder curatorFrameworkBuilder =
+                    CuratorFrameworkFactory.builder()
+                            .connectString("localhost:2181")
+                            .retryPolicy(new ExponentialBackoffRetry(1, 1))
+                            .zookeeperFactory(
+                                    (s, i, watcher, b) -> {
+                                        throw new RuntimeException();
+                                    })
+                            .namespace("flink");
+            ZooKeeperUtils.startCuratorFramework(
+                    curatorFrameworkBuilder, exception -> 
System.exit(exitCode));

Review comment:
       I think we could use a `TestingFatalErrorHandler` and then we don't have 
to do the `SystemExitTrackingSecurityManager` magic. In the assertion we can 
use `TestingFatalErrorHandler.getErrorFuture()`.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java
##########
@@ -149,9 +151,12 @@ public static String generateLeaderLatchPath(String path) {
      * Starts a {@link CuratorFramework} instance and connects it to the given 
ZooKeeper quorum.
      *
      * @param configuration {@link Configuration} object containing the 
configuration values
+     * @param fatalErrorHandler {@link FatalErrorHandler} fatalErrorHandler to 
handle unExpected

Review comment:
       ```suggestion
        * @param fatalErrorHandler {@link FatalErrorHandler} fatalErrorHandler 
to handle unexpected
   ```

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZKCheckpointIDCounterMultiServersTest.java
##########
@@ -53,7 +53,8 @@ public void testRecoveredAfterConnectionLoss() throws 
Exception {
         final Configuration configuration = new Configuration();
         configuration.setString(
                 HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, 
zooKeeperResource.getConnectString());
-        final CuratorFramework client = 
ZooKeeperUtils.startCuratorFramework(configuration);
+        final CuratorFramework client =
+                ZooKeeperUtils.startCuratorFramework(configuration, exception 
-> {});

Review comment:
       Maybe we add something like a `TestingFatalErrorHandler.IGNORE` that is 
effectively `exception -> {}`.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java
##########
@@ -224,11 +229,34 @@ public static CuratorFramework 
startCuratorFramework(Configuration configuration
             curatorFrameworkBuilder.connectionStateErrorPolicy(
                     new SessionConnectionStateErrorPolicy());
         }
+        return startCuratorFramework(curatorFrameworkBuilder, 
fatalErrorHandler);
+    }
 
-        CuratorFramework cf = curatorFrameworkBuilder.build();
-
+    /**
+     * Starts a {@link CuratorFramework} instance and connects it to the given 
ZooKeeper quorum from
+     * a builder.
+     *
+     * @param builder {@link CuratorFrameworkFactory.Builder} A builder for 
curatorFramework.
+     * @param fatalErrorHandler {@link FatalErrorHandler} fatalErrorHandler to 
handle unExpected
+     *     error of {@link CuratorFramework}
+     * @return {@link CuratorFramework} instance
+     */
+    static CuratorFramework startCuratorFramework(
+            CuratorFrameworkFactory.Builder builder, FatalErrorHandler 
fatalErrorHandler) {
+        CuratorFramework cf = builder.build();
+        UnhandledErrorListener unhandledErrorListener =
+                (message, throwable) -> {
+                    LOG.error(
+                            "Unhandled error in curator framework, error 
message: {}",
+                            message,
+                            throwable);
+                    // The exception thrown in UnhandledErrorListener will be 
catch by

Review comment:
       Which exception is thrown here?

##########
File path: 
flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
##########
@@ -184,12 +184,32 @@ public RestClusterClient(
             T clusterId,
             WaitStrategy waitStrategy)
             throws Exception {
-        this(
-                configuration,
-                restClient,
-                clusterId,
-                waitStrategy,
-                
HighAvailabilityServicesUtils.createClientHAService(configuration));
+        this.configuration = checkNotNull(configuration);
+
+        this.restClusterClientConfiguration =
+                
RestClusterClientConfiguration.fromConfiguration(configuration);
+
+        if (restClient != null) {
+            this.restClient = restClient;
+        } else {
+            this.restClient = new RestClient(configuration, executorService);
+        }
+
+        this.waitStrategy = checkNotNull(waitStrategy);
+        this.clusterId = checkNotNull(clusterId);
+
+        this.clientHAServices =
+                HighAvailabilityServicesUtils.createClientHAService(
+                        configuration,
+                        exception ->
+                                webMonitorLeaderRetriever.handleError(
+                                        new FlinkException(exception)));
+
+        this.webMonitorRetrievalService = 
clientHAServices.getClusterRestEndpointLeaderRetriever();
+        this.retryExecutorService =
+                Executors.newSingleThreadScheduledExecutor(
+                        new 
ExecutorThreadFactory("Flink-RestClusterClient-Retry"));
+        startLeaderRetrievers();

Review comment:
       This does not feel right. We are effectively copying the next 
constructor. I think there must be another solution. Maybe, if we don't specify 
a `ClientHighAvailabilityServices`, then we instantiate it with a handler that 
terminates the process in case of an unhandled exception.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZKCheckpointIDCounterMultiServersTest.java
##########
@@ -53,7 +53,8 @@ public void testRecoveredAfterConnectionLoss() throws 
Exception {
         final Configuration configuration = new Configuration();
         configuration.setString(
                 HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, 
zooKeeperResource.getConnectString());
-        final CuratorFramework client = 
ZooKeeperUtils.startCuratorFramework(configuration);
+        final CuratorFramework client =
+                ZooKeeperUtils.startCuratorFramework(configuration, exception 
-> {});

Review comment:
       There is already the `NoOpFatalErrorHandler` that could be changed to be 
a top-level class.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java
##########
@@ -224,11 +229,34 @@ public static CuratorFramework 
startCuratorFramework(Configuration configuration
             curatorFrameworkBuilder.connectionStateErrorPolicy(
                     new SessionConnectionStateErrorPolicy());
         }
+        return startCuratorFramework(curatorFrameworkBuilder, 
fatalErrorHandler);
+    }
 
-        CuratorFramework cf = curatorFrameworkBuilder.build();
-
+    /**
+     * Starts a {@link CuratorFramework} instance and connects it to the given 
ZooKeeper quorum from
+     * a builder.
+     *
+     * @param builder {@link CuratorFrameworkFactory.Builder} A builder for 
curatorFramework.
+     * @param fatalErrorHandler {@link FatalErrorHandler} fatalErrorHandler to 
handle unExpected

Review comment:
       ```suggestion
        * @param fatalErrorHandler {@link FatalErrorHandler} fatalErrorHandler 
to handle unexpected
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to