XComp commented on code in PR #24563:
URL: https://github.com/apache/flink/pull/24563#discussion_r1540999613
##########
flink-core/src/main/java/org/apache/flink/configuration/HighAvailabilityOptions.java:
##########
@@ -220,6 +225,50 @@ public class HighAvailabilityOptions {
+ "with unresolvable
hostnames.")
.build());
+ public static final ConfigOption<Map<String, String>>
ZOOKEEPER_CLIENT_AUTHORIZATION =
+ key("high-availability.zookeeper.client.authorization")
+ .mapType()
+ .noDefaultValue()
+ .withDescription(
+ Description.builder()
+ .text(
+ "Add connection authorization
Subsequent calls to this method overwrite the prior calls. "
+ + "In certain cases
ZooKeeper requires additional Authorization information. "
+ + "For example list of
valid names for ensemble in order to prevent accidentally connecting to a wrong
ensemble."
+ + "Each entry of type
Map.Entry<String, String> will be transformed "
+ + "into an AuthInfo object
with the constructor AuthInfo(String, byte[]). "
+ + "The field entry.key()
will serve as the String scheme value, while the field entry.getValue() "
+ + "will be initially
converted to a byte[] using the String#getBytes() method with %s encoding"
+ + "If not set the default
configuration for a Curator would be applied.",
+
text(ConfigConstants.DEFAULT_CHARSET.displayName()))
+ .build());
+
+ public static final ConfigOption<Duration> ZOOKEEPER_MAX_CLOSE_WAIT_MS =
+ key("high-availability.zookeeper.client.max-close-wait-ms")
+ .durationType()
+ .noDefaultValue()
+ .withDescription(
+ Description.builder()
+ .text(
+ "Defines the time Curator should
wait during close to join background threads. "
+ + "If not set the default
configuration for a Curator would be applied.")
+ .build());
+
+ public static final ConfigOption<Integer>
ZOOKEEPER_SIMULATED_SESSION_EXP_PERCENT =
+
key("high-availability.zookeeper.client.simulated-session-expiration-percent")
+ .intType()
+ .noDefaultValue()
+ .withDescription(
+ Description.builder()
+ .text(
+ "The percentage set by this method
determines how and if Curator will check for session expiration. "
+ + "See Curator
documentation for %s property for more information",
Review Comment:
```suggestion
+ "See Curator
documentation for %s property for more information.",
```
##########
flink-core/src/main/java/org/apache/flink/configuration/HighAvailabilityOptions.java:
##########
@@ -220,6 +225,50 @@ public class HighAvailabilityOptions {
+ "with unresolvable
hostnames.")
.build());
+ public static final ConfigOption<Map<String, String>>
ZOOKEEPER_CLIENT_AUTHORIZATION =
+ key("high-availability.zookeeper.client.authorization")
+ .mapType()
+ .noDefaultValue()
+ .withDescription(
+ Description.builder()
+ .text(
+ "Add connection authorization
Subsequent calls to this method overwrite the prior calls. "
+ + "In certain cases
ZooKeeper requires additional Authorization information. "
+ + "For example list of
valid names for ensemble in order to prevent accidentally connecting to a wrong
ensemble."
+ + "Each entry of type
Map.Entry<String, String> will be transformed "
+ + "into an AuthInfo object
with the constructor AuthInfo(String, byte[]). "
+ + "The field entry.key()
will serve as the String scheme value, while the field entry.getValue() "
+ + "will be initially
converted to a byte[] using the String#getBytes() method with %s encoding"
+ + "If not set the default
configuration for a Curator would be applied.",
+
text(ConfigConstants.DEFAULT_CHARSET.displayName()))
+ .build());
+
+ public static final ConfigOption<Duration> ZOOKEEPER_MAX_CLOSE_WAIT_MS =
+ key("high-availability.zookeeper.client.max-close-wait-ms")
Review Comment:
```suggestion
public static final ConfigOption<Duration> ZOOKEEPER_MAX_CLOSE_WAIT =
key("high-availability.zookeeper.client.max-close-wait")
```
The `ms" suffix is not needed anymore because we're relying on `Duration`.
##########
flink-core/src/main/java/org/apache/flink/configuration/HighAvailabilityOptions.java:
##########
@@ -220,6 +225,50 @@ public class HighAvailabilityOptions {
+ "with unresolvable
hostnames.")
.build());
+ public static final ConfigOption<Map<String, String>>
ZOOKEEPER_CLIENT_AUTHORIZATION =
+ key("high-availability.zookeeper.client.authorization")
+ .mapType()
+ .noDefaultValue()
+ .withDescription(
+ Description.builder()
+ .text(
+ "Add connection authorization
Subsequent calls to this method overwrite the prior calls. "
+ + "In certain cases
ZooKeeper requires additional Authorization information. "
+ + "For example list of
valid names for ensemble in order to prevent accidentally connecting to a wrong
ensemble."
Review Comment:
```suggestion
+ "For example list of
valid names for ensemble in order to prevent accidentally connecting to a wrong
ensemble. "
```
##########
flink-core/src/main/java/org/apache/flink/configuration/HighAvailabilityOptions.java:
##########
@@ -220,6 +225,50 @@ public class HighAvailabilityOptions {
+ "with unresolvable
hostnames.")
.build());
+ public static final ConfigOption<Map<String, String>>
ZOOKEEPER_CLIENT_AUTHORIZATION =
+ key("high-availability.zookeeper.client.authorization")
+ .mapType()
+ .noDefaultValue()
+ .withDescription(
+ Description.builder()
+ .text(
+ "Add connection authorization
Subsequent calls to this method overwrite the prior calls. "
+ + "In certain cases
ZooKeeper requires additional Authorization information. "
+ + "For example list of
valid names for ensemble in order to prevent accidentally connecting to a wrong
ensemble."
+ + "Each entry of type
Map.Entry<String, String> will be transformed "
+ + "into an AuthInfo object
with the constructor AuthInfo(String, byte[]). "
+ + "The field entry.key()
will serve as the String scheme value, while the field entry.getValue() "
+ + "will be initially
converted to a byte[] using the String#getBytes() method with %s encoding"
Review Comment:
```suggestion
+ "will be initially
converted to a byte[] using the String#getBytes() method with %s encoding. "
```
##########
flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java:
##########
@@ -246,6 +249,38 @@ public static CuratorFrameworkWithUnhandledErrorListener
startCuratorFramework(
.ensembleTracker(ensembleTracking)
.aclProvider(aclProvider);
+ if
(configuration.contains(HighAvailabilityOptions.ZOOKEEPER_CLIENT_AUTHORIZATION))
{
+ Map<String, String> authMap =
+
configuration.get(HighAvailabilityOptions.ZOOKEEPER_CLIENT_AUTHORIZATION);
+ List<AuthInfo> authInfos =
+ authMap.entrySet().stream()
+ .map(
+ entry ->
+ new AuthInfo(
+ entry.getKey(),
+ entry.getValue()
+ .getBytes(
+
ConfigConstants
+
.DEFAULT_CHARSET)))
+ .collect(Collectors.toList());
+ curatorFrameworkBuilder.authorization(authInfos);
+ }
+
+ if
(configuration.contains(HighAvailabilityOptions.ZOOKEEPER_MAX_CLOSE_WAIT_MS)) {
+ curatorFrameworkBuilder.maxCloseWaitMs(
+ (int)
+ configuration
+
.get(HighAvailabilityOptions.ZOOKEEPER_MAX_CLOSE_WAIT_MS)
+ .toMillis());
Review Comment:
```suggestion
final Duration maxCloseWaitDuration =
configuration.get(HighAvailabilityOptions.ZOOKEEPER_MAX_CLOSE_WAIT_MS);
final long maxCloseWaitInMillis =
maxCloseWaitDuration.toMillis();
Preconditions.checkArgument(
maxCloseWaitInMillis <= Integer.MAX_VALUE,
"The passed value of {} for {} exceeded to valid range
of {}ms.",
maxCloseWaitDuration,
HighAvailabilityOptions.ZOOKEEPER_MAX_CLOSE_WAIT_MS.key(),
Duration.ofMillis(Integer.MAX_VALUE));
curatorFrameworkBuilder.maxCloseWaitMs((int)
maxCloseWaitInMillis);
```
Can we add another safety net for the cast to fail early if (by any chance)
an unexpected value is used?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]