tillrohrmann commented on a change in pull request #17053:
URL: https://github.com/apache/flink/pull/17053#discussion_r701175663
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java
##########
@@ -224,11 +229,34 @@ public static CuratorFramework
startCuratorFramework(Configuration configuration
curatorFrameworkBuilder.connectionStateErrorPolicy(
new SessionConnectionStateErrorPolicy());
}
+ return startCuratorFramework(curatorFrameworkBuilder,
fatalErrorHandler);
+ }
- CuratorFramework cf = curatorFrameworkBuilder.build();
-
+ /**
+ * Starts a {@link CuratorFramework} instance and connects it to the given
ZooKeeper quorum from
+ * a builder.
+ *
+ * @param builder {@link CuratorFrameworkFactory.Builder} A builder for
curatorFramework.
+ * @param fatalErrorHandler {@link FatalErrorHandler} fatalErrorHandler to
handle unExpected
+ * error of {@link CuratorFramework}
Review comment:
```suggestion
* errors of {@link CuratorFramework}
```
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java
##########
@@ -149,9 +151,12 @@ public static String generateLeaderLatchPath(String path) {
* Starts a {@link CuratorFramework} instance and connects it to the given
ZooKeeper quorum.
*
* @param configuration {@link Configuration} object containing the
configuration values
+ * @param fatalErrorHandler {@link FatalErrorHandler} fatalErrorHandler to
handle unExpected
+ * error of {@link CuratorFramework}
Review comment:
```suggestion
* errors of {@link CuratorFramework}
```
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/util/ZooKeeperUtilsTest.java
##########
@@ -44,4 +50,29 @@ private void runZookeeperPathGenerationTest(
assertThat(result, is(expectedValue));
}
+
+ @Test
+ public void testStartCuratorFrameworkFailed() throws Exception {
+ try {
+ final int exitCode = 123;
+ final SystemExitTrackingSecurityManager trackingSecurityManager =
+ new SystemExitTrackingSecurityManager();
+ System.setSecurityManager(trackingSecurityManager);
+ final CuratorFrameworkFactory.Builder curatorFrameworkBuilder =
+ CuratorFrameworkFactory.builder()
+ .connectString("localhost:2181")
+ .retryPolicy(new ExponentialBackoffRetry(1, 1))
+ .zookeeperFactory(
+ (s, i, watcher, b) -> {
+ throw new RuntimeException();
+ })
+ .namespace("flink");
+ ZooKeeperUtils.startCuratorFramework(
+ curatorFrameworkBuilder, exception ->
System.exit(exitCode));
Review comment:
I think we could use a `TestingFatalErrorHandler` and then we don't have
to do the `SystemExitTrackingSecurityManager` magic. In the assertion we can
use `TestingFatalErrorHandler.getErrorFuture()`.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java
##########
@@ -149,9 +151,12 @@ public static String generateLeaderLatchPath(String path) {
* Starts a {@link CuratorFramework} instance and connects it to the given
ZooKeeper quorum.
*
* @param configuration {@link Configuration} object containing the
configuration values
+ * @param fatalErrorHandler {@link FatalErrorHandler} fatalErrorHandler to
handle unExpected
Review comment:
```suggestion
* @param fatalErrorHandler {@link FatalErrorHandler} fatalErrorHandler
to handle unexpected
```
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZKCheckpointIDCounterMultiServersTest.java
##########
@@ -53,7 +53,8 @@ public void testRecoveredAfterConnectionLoss() throws
Exception {
final Configuration configuration = new Configuration();
configuration.setString(
HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM,
zooKeeperResource.getConnectString());
- final CuratorFramework client =
ZooKeeperUtils.startCuratorFramework(configuration);
+ final CuratorFramework client =
+ ZooKeeperUtils.startCuratorFramework(configuration, exception
-> {});
Review comment:
Maybe we add something like a `TestingFatalErrorHandler.IGNORE` that is
effectively `exception -> {}`.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java
##########
@@ -224,11 +229,34 @@ public static CuratorFramework
startCuratorFramework(Configuration configuration
curatorFrameworkBuilder.connectionStateErrorPolicy(
new SessionConnectionStateErrorPolicy());
}
+ return startCuratorFramework(curatorFrameworkBuilder,
fatalErrorHandler);
+ }
- CuratorFramework cf = curatorFrameworkBuilder.build();
-
+ /**
+ * Starts a {@link CuratorFramework} instance and connects it to the given
ZooKeeper quorum from
+ * a builder.
+ *
+ * @param builder {@link CuratorFrameworkFactory.Builder} A builder for
curatorFramework.
+ * @param fatalErrorHandler {@link FatalErrorHandler} fatalErrorHandler to
handle unExpected
+ * error of {@link CuratorFramework}
+ * @return {@link CuratorFramework} instance
+ */
+ static CuratorFramework startCuratorFramework(
+ CuratorFrameworkFactory.Builder builder, FatalErrorHandler
fatalErrorHandler) {
+ CuratorFramework cf = builder.build();
+ UnhandledErrorListener unhandledErrorListener =
+ (message, throwable) -> {
+ LOG.error(
+ "Unhandled error in curator framework, error
message: {}",
+ message,
+ throwable);
+ // The exception thrown in UnhandledErrorListener will be
catch by
Review comment:
Which exception is thrown here?
##########
File path:
flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
##########
@@ -184,12 +184,32 @@ public RestClusterClient(
T clusterId,
WaitStrategy waitStrategy)
throws Exception {
- this(
- configuration,
- restClient,
- clusterId,
- waitStrategy,
-
HighAvailabilityServicesUtils.createClientHAService(configuration));
+ this.configuration = checkNotNull(configuration);
+
+ this.restClusterClientConfiguration =
+
RestClusterClientConfiguration.fromConfiguration(configuration);
+
+ if (restClient != null) {
+ this.restClient = restClient;
+ } else {
+ this.restClient = new RestClient(configuration, executorService);
+ }
+
+ this.waitStrategy = checkNotNull(waitStrategy);
+ this.clusterId = checkNotNull(clusterId);
+
+ this.clientHAServices =
+ HighAvailabilityServicesUtils.createClientHAService(
+ configuration,
+ exception ->
+ webMonitorLeaderRetriever.handleError(
+ new FlinkException(exception)));
+
+ this.webMonitorRetrievalService =
clientHAServices.getClusterRestEndpointLeaderRetriever();
+ this.retryExecutorService =
+ Executors.newSingleThreadScheduledExecutor(
+ new
ExecutorThreadFactory("Flink-RestClusterClient-Retry"));
+ startLeaderRetrievers();
Review comment:
This does not feel right. We are effectively copying the next
constructor. I think there must be another solution. Maybe, if we don't specify
a `ClientHighAvailabilityServices`, then we instantiate it with a handler that
terminates the process in case of an unhandled exception.
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZKCheckpointIDCounterMultiServersTest.java
##########
@@ -53,7 +53,8 @@ public void testRecoveredAfterConnectionLoss() throws
Exception {
final Configuration configuration = new Configuration();
configuration.setString(
HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM,
zooKeeperResource.getConnectString());
- final CuratorFramework client =
ZooKeeperUtils.startCuratorFramework(configuration);
+ final CuratorFramework client =
+ ZooKeeperUtils.startCuratorFramework(configuration, exception
-> {});
Review comment:
There is already the `NoOpFatalErrorHandler` that could be changed to be
a top-level class.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java
##########
@@ -224,11 +229,34 @@ public static CuratorFramework
startCuratorFramework(Configuration configuration
curatorFrameworkBuilder.connectionStateErrorPolicy(
new SessionConnectionStateErrorPolicy());
}
+ return startCuratorFramework(curatorFrameworkBuilder,
fatalErrorHandler);
+ }
- CuratorFramework cf = curatorFrameworkBuilder.build();
-
+ /**
+ * Starts a {@link CuratorFramework} instance and connects it to the given
ZooKeeper quorum from
+ * a builder.
+ *
+ * @param builder {@link CuratorFrameworkFactory.Builder} A builder for
curatorFramework.
+ * @param fatalErrorHandler {@link FatalErrorHandler} fatalErrorHandler to
handle unExpected
Review comment:
```suggestion
* @param fatalErrorHandler {@link FatalErrorHandler} fatalErrorHandler
to handle unexpected
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]