XComp commented on code in PR #24563:
URL: https://github.com/apache/flink/pull/24563#discussion_r1540999613


##########
flink-core/src/main/java/org/apache/flink/configuration/HighAvailabilityOptions.java:
##########
@@ -220,6 +225,50 @@ public class HighAvailabilityOptions {
                                                     + "with unresolvable 
hostnames.")
                                     .build());
 
+    public static final ConfigOption<Map<String, String>> 
ZOOKEEPER_CLIENT_AUTHORIZATION =
+            key("high-availability.zookeeper.client.authorization")
+                    .mapType()
+                    .noDefaultValue()
+                    .withDescription(
+                            Description.builder()
+                                    .text(
+                                            "Add connection authorization 
Subsequent calls to this method overwrite the prior calls. "
+                                                    + "In certain cases 
ZooKeeper requires additional Authorization information. "
+                                                    + "For example list of 
valid names for ensemble in order to prevent accidentally connecting to a wrong 
ensemble."
+                                                    + "Each entry of type 
Map.Entry<String, String> will be transformed "
+                                                    + "into an AuthInfo object 
with the constructor AuthInfo(String, byte[]). "
+                                                    + "The field entry.key() 
will serve as the String scheme value, while the field entry.getValue() "
+                                                    + "will be initially 
converted to a byte[] using the String#getBytes() method with %s encoding"
+                                                    + "If not set the default 
configuration for a Curator would be applied.",
+                                            
text(ConfigConstants.DEFAULT_CHARSET.displayName()))
+                                    .build());
+
+    public static final ConfigOption<Duration> ZOOKEEPER_MAX_CLOSE_WAIT_MS =
+            key("high-availability.zookeeper.client.max-close-wait-ms")
+                    .durationType()
+                    .noDefaultValue()
+                    .withDescription(
+                            Description.builder()
+                                    .text(
+                                            "Defines the time Curator should 
wait during close to join background threads. "
+                                                    + "If not set the default 
configuration for a Curator would be applied.")
+                                    .build());
+
+    public static final ConfigOption<Integer> 
ZOOKEEPER_SIMULATED_SESSION_EXP_PERCENT =
+            
key("high-availability.zookeeper.client.simulated-session-expiration-percent")
+                    .intType()
+                    .noDefaultValue()
+                    .withDescription(
+                            Description.builder()
+                                    .text(
+                                            "The percentage set by this method 
determines how and if Curator will check for session expiration. "
+                                                    + "See Curator 
documentation for %s property for more information",

Review Comment:
   ```suggestion
                                                       + "See Curator 
documentation for %s property for more information.",
   ```



##########
flink-core/src/main/java/org/apache/flink/configuration/HighAvailabilityOptions.java:
##########
@@ -220,6 +225,50 @@ public class HighAvailabilityOptions {
                                                     + "with unresolvable 
hostnames.")
                                     .build());
 
+    public static final ConfigOption<Map<String, String>> 
ZOOKEEPER_CLIENT_AUTHORIZATION =
+            key("high-availability.zookeeper.client.authorization")
+                    .mapType()
+                    .noDefaultValue()
+                    .withDescription(
+                            Description.builder()
+                                    .text(
+                                            "Add connection authorization 
Subsequent calls to this method overwrite the prior calls. "
+                                                    + "In certain cases 
ZooKeeper requires additional Authorization information. "
+                                                    + "For example list of 
valid names for ensemble in order to prevent accidentally connecting to a wrong 
ensemble."
+                                                    + "Each entry of type 
Map.Entry<String, String> will be transformed "
+                                                    + "into an AuthInfo object 
with the constructor AuthInfo(String, byte[]). "
+                                                    + "The field entry.key() 
will serve as the String scheme value, while the field entry.getValue() "
+                                                    + "will be initially 
converted to a byte[] using the String#getBytes() method with %s encoding"
+                                                    + "If not set the default 
configuration for a Curator would be applied.",
+                                            
text(ConfigConstants.DEFAULT_CHARSET.displayName()))
+                                    .build());
+
+    public static final ConfigOption<Duration> ZOOKEEPER_MAX_CLOSE_WAIT_MS =
+            key("high-availability.zookeeper.client.max-close-wait-ms")

Review Comment:
   ```suggestion
       public static final ConfigOption<Duration> ZOOKEEPER_MAX_CLOSE_WAIT =
               key("high-availability.zookeeper.client.max-close-wait")
   ```
   The `ms" suffix is not needed anymore because we're relying on `Duration`.



##########
flink-core/src/main/java/org/apache/flink/configuration/HighAvailabilityOptions.java:
##########
@@ -220,6 +225,50 @@ public class HighAvailabilityOptions {
                                                     + "with unresolvable 
hostnames.")
                                     .build());
 
+    public static final ConfigOption<Map<String, String>> 
ZOOKEEPER_CLIENT_AUTHORIZATION =
+            key("high-availability.zookeeper.client.authorization")
+                    .mapType()
+                    .noDefaultValue()
+                    .withDescription(
+                            Description.builder()
+                                    .text(
+                                            "Add connection authorization 
Subsequent calls to this method overwrite the prior calls. "
+                                                    + "In certain cases 
ZooKeeper requires additional Authorization information. "
+                                                    + "For example list of 
valid names for ensemble in order to prevent accidentally connecting to a wrong 
ensemble."

Review Comment:
   ```suggestion
                                                       + "For example list of 
valid names for ensemble in order to prevent accidentally connecting to a wrong 
ensemble. "
   ```



##########
flink-core/src/main/java/org/apache/flink/configuration/HighAvailabilityOptions.java:
##########
@@ -220,6 +225,50 @@ public class HighAvailabilityOptions {
                                                     + "with unresolvable 
hostnames.")
                                     .build());
 
+    public static final ConfigOption<Map<String, String>> 
ZOOKEEPER_CLIENT_AUTHORIZATION =
+            key("high-availability.zookeeper.client.authorization")
+                    .mapType()
+                    .noDefaultValue()
+                    .withDescription(
+                            Description.builder()
+                                    .text(
+                                            "Add connection authorization 
Subsequent calls to this method overwrite the prior calls. "
+                                                    + "In certain cases 
ZooKeeper requires additional Authorization information. "
+                                                    + "For example list of 
valid names for ensemble in order to prevent accidentally connecting to a wrong 
ensemble."
+                                                    + "Each entry of type 
Map.Entry<String, String> will be transformed "
+                                                    + "into an AuthInfo object 
with the constructor AuthInfo(String, byte[]). "
+                                                    + "The field entry.key() 
will serve as the String scheme value, while the field entry.getValue() "
+                                                    + "will be initially 
converted to a byte[] using the String#getBytes() method with %s encoding"

Review Comment:
   ```suggestion
                                                       + "will be initially 
converted to a byte[] using the String#getBytes() method with %s encoding. "
   ```



##########
flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java:
##########
@@ -246,6 +249,38 @@ public static CuratorFrameworkWithUnhandledErrorListener 
startCuratorFramework(
                         .ensembleTracker(ensembleTracking)
                         .aclProvider(aclProvider);
 
+        if 
(configuration.contains(HighAvailabilityOptions.ZOOKEEPER_CLIENT_AUTHORIZATION))
 {
+            Map<String, String> authMap =
+                    
configuration.get(HighAvailabilityOptions.ZOOKEEPER_CLIENT_AUTHORIZATION);
+            List<AuthInfo> authInfos =
+                    authMap.entrySet().stream()
+                            .map(
+                                    entry ->
+                                            new AuthInfo(
+                                                    entry.getKey(),
+                                                    entry.getValue()
+                                                            .getBytes(
+                                                                    
ConfigConstants
+                                                                            
.DEFAULT_CHARSET)))
+                            .collect(Collectors.toList());
+            curatorFrameworkBuilder.authorization(authInfos);
+        }
+
+        if 
(configuration.contains(HighAvailabilityOptions.ZOOKEEPER_MAX_CLOSE_WAIT_MS)) {
+            curatorFrameworkBuilder.maxCloseWaitMs(
+                    (int)
+                            configuration
+                                    
.get(HighAvailabilityOptions.ZOOKEEPER_MAX_CLOSE_WAIT_MS)
+                                    .toMillis());

Review Comment:
   ```suggestion
               final Duration maxCloseWaitDuration =
                       
configuration.get(HighAvailabilityOptions.ZOOKEEPER_MAX_CLOSE_WAIT_MS);
               final long maxCloseWaitInMillis = 
maxCloseWaitDuration.toMillis();
               Preconditions.checkArgument(
                       maxCloseWaitInMillis <= Integer.MAX_VALUE,
                       "The passed value of {} for {} exceeded to valid range 
of {}ms.",
                       maxCloseWaitDuration,
                       
HighAvailabilityOptions.ZOOKEEPER_MAX_CLOSE_WAIT_MS.key(),
                       Duration.ofMillis(Integer.MAX_VALUE));
               curatorFrameworkBuilder.maxCloseWaitMs((int) 
maxCloseWaitInMillis);
   ```
   Can we add another safety net for the cast to fail early if (by any chance) 
an unexpected value is used?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to