XComp commented on code in PR #24563:
URL: https://github.com/apache/flink/pull/24563#discussion_r1539254121


##########
flink-core/src/main/java/org/apache/flink/configuration/HighAvailabilityOptions.java:
##########
@@ -220,6 +222,52 @@ public class HighAvailabilityOptions {
                                                     + "with unresolvable 
hostnames.")
                                     .build());
 
+    public static final ConfigOption<Map<String, String>> 
ZOOKEEPER_CLIENT_AUTHORIZATION =
+            key("high-availability.zookeeper.client.authorization")
+                    .mapType()
+                    .noDefaultValue()
+                    .withDescription(
+                            Description.builder()
+                                    .text(
+                                            "Add connection authorization 
Subsequent calls to this method overwrite the prior calls. "
+                                                    + " In certain cases 
ZooKeeper requires additional Authorization information. "
+                                                    + "For example list of 
valid names for ensemble in order to prevent the accidental connecting to a 
wrong ensemble."

Review Comment:
   ```suggestion
                                                       + "For example list of 
valid names for ensemble in order to prevent accidentally connecting to a wrong 
ensemble."
   ```
   nit



##########
flink-core/src/main/java/org/apache/flink/configuration/HighAvailabilityOptions.java:
##########
@@ -220,6 +222,52 @@ public class HighAvailabilityOptions {
                                                     + "with unresolvable 
hostnames.")
                                     .build());
 
+    public static final ConfigOption<Map<String, String>> 
ZOOKEEPER_CLIENT_AUTHORIZATION =
+            key("high-availability.zookeeper.client.authorization")
+                    .mapType()
+                    .noDefaultValue()
+                    .withDescription(
+                            Description.builder()
+                                    .text(
+                                            "Add connection authorization 
Subsequent calls to this method overwrite the prior calls. "
+                                                    + " In certain cases 
ZooKeeper requires additional Authorization information. "
+                                                    + "For example list of 
valid names for ensemble in order to prevent the accidental connecting to a 
wrong ensemble."
+                                                    + "Each entry of type 
Map.Entry<String, String> will be transformed into an AuthInfo object with the 
constructor AuthInfo(String, byte[]). "
+                                                    + "The field entry.key() 
will serve as the String scheme value, while the field entry.getValue() will be 
initially converted to a byte[] using the String#getBytes() method with UTF8 
encoding"
+                                                    + "If not set the default 
configuration for a Curator would be applied.")
+                                    .build());
+
+    public static final ConfigOption<Integer> ZOOKEEPER_MAX_CLOSE_WAIT_MS =
+            key("high-availability.zookeeper.client.max-close-wait-ms")
+                    .intType()

Review Comment:
   ```suggestion
       public static final ConfigOption<Duration> 
ZOOKEEPER_MAX_CLOSE_WAIT_TIMEOUT =
               key("high-availability.zookeeper.client.max-close-wait")
                       .durationType()
   ```
   What about using `Duration` here? That makes it easier for the user to 
define his/her own time unit.



##########
docs/layouts/shortcodes/generated/high_availability_configuration.html:
##########
@@ -68,6 +80,12 @@
             <td>Integer</td>
             <td>Defines the session timeout for the ZooKeeper session in 
ms.</td>
         </tr>
+        <tr>
+            
<td><h5>high-availability.zookeeper.client.simulated-session-expiration-percent</h5></td>
+            <td style="word-wrap: break-word;">(none)</td>
+            <td>Integer</td>
+            <td>Curator has the option of attempting to monitor session 
expiration above what is provided by ZooKeeper. The percentage set by this 
method determines how and if Curator will check for session expiration. If it 
is set to 0, Curator does not do any additional checking for session 
expiration. If a positive number is set, Curator will check for session 
expiration as follows: when ZooKeeper sends a Disconnect event, Curator will 
start a timer. If re-connection is not achieved before the elapsed time exceeds 
the negotiated session time multiplied by the session expiration percent, 
Curator will simulate a session expiration. Due to timing/network issues, it is 
not possible for a client to match the server's session timeout with complete 
accuracy. Thus, the need for a session expiration percentage.If not set the 
default configuration for a Curator would be used.</td>

Review Comment:
   I'm wondering whether we could use a link to the curator documentation 
rather than having the curator documentation being copied into the Flink 
configuration. ...just to reduce maintenance on our side. WDYT? :thinking: 



##########
flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java:
##########
@@ -246,6 +249,33 @@ public static CuratorFrameworkWithUnhandledErrorListener 
startCuratorFramework(
                         .ensembleTracker(ensembleTracking)
                         .aclProvider(aclProvider);
 
+        if 
(configuration.contains(HighAvailabilityOptions.ZOOKEEPER_CLIENT_AUTHORIZATION))
 {
+            Map<String, String> authMap =
+                    
configuration.get(HighAvailabilityOptions.ZOOKEEPER_CLIENT_AUTHORIZATION);
+            List<AuthInfo> authInfos =
+                    authMap.entrySet().stream()
+                            .map(
+                                    entry ->
+                                            new AuthInfo(
+                                                    entry.getKey(),
+                                                    entry.getValue()
+                                                            
.getBytes(StandardCharsets.UTF_8)))

Review Comment:
   ```suggestion
                                                               
.getBytes(ConfigConstants.DEFAULT_CHARSET)))
   ```
   What about relying on Flink's default charset? That would enable us to make 
the corresponding documentation more generic.



##########
flink-core/src/main/java/org/apache/flink/configuration/HighAvailabilityOptions.java:
##########
@@ -220,6 +222,52 @@ public class HighAvailabilityOptions {
                                                     + "with unresolvable 
hostnames.")
                                     .build());
 
+    public static final ConfigOption<Map<String, String>> 
ZOOKEEPER_CLIENT_AUTHORIZATION =
+            key("high-availability.zookeeper.client.authorization")
+                    .mapType()
+                    .noDefaultValue()
+                    .withDescription(
+                            Description.builder()
+                                    .text(
+                                            "Add connection authorization 
Subsequent calls to this method overwrite the prior calls. "
+                                                    + " In certain cases 
ZooKeeper requires additional Authorization information. "
+                                                    + "For example list of 
valid names for ensemble in order to prevent the accidental connecting to a 
wrong ensemble."
+                                                    + "Each entry of type 
Map.Entry<String, String> will be transformed into an AuthInfo object with the 
constructor AuthInfo(String, byte[]). "
+                                                    + "The field entry.key() 
will serve as the String scheme value, while the field entry.getValue() will be 
initially converted to a byte[] using the String#getBytes() method with UTF8 
encoding"

Review Comment:
   ```suggestion
                                                 + "The field entry.key() will 
serve as the String scheme value, while the field entry.getValue() will be 
initially converted to a byte[] using the String#getBytes() method with %s 
encoding"
                                                       + "If not set the 
default configuration for a Curator would be applied.",
                                               
text(ConfigConstants.DEFAULT_CHARSET.displayName()))
   ```



##########
flink-core/src/main/java/org/apache/flink/configuration/HighAvailabilityOptions.java:
##########
@@ -220,6 +222,52 @@ public class HighAvailabilityOptions {
                                                     + "with unresolvable 
hostnames.")
                                     .build());
 
+    public static final ConfigOption<Map<String, String>> 
ZOOKEEPER_CLIENT_AUTHORIZATION =
+            key("high-availability.zookeeper.client.authorization")
+                    .mapType()
+                    .noDefaultValue()
+                    .withDescription(
+                            Description.builder()
+                                    .text(
+                                            "Add connection authorization 
Subsequent calls to this method overwrite the prior calls. "
+                                                    + " In certain cases 
ZooKeeper requires additional Authorization information. "

Review Comment:
   ```suggestion
                                                       + "In certain cases 
ZooKeeper requires additional Authorization information. "
   ```



##########
flink-core/src/main/java/org/apache/flink/configuration/HighAvailabilityOptions.java:
##########
@@ -220,6 +222,52 @@ public class HighAvailabilityOptions {
                                                     + "with unresolvable 
hostnames.")
                                     .build());
 
+    public static final ConfigOption<Map<String, String>> 
ZOOKEEPER_CLIENT_AUTHORIZATION =
+            key("high-availability.zookeeper.client.authorization")
+                    .mapType()
+                    .noDefaultValue()
+                    .withDescription(
+                            Description.builder()
+                                    .text(
+                                            "Add connection authorization 
Subsequent calls to this method overwrite the prior calls. "
+                                                    + " In certain cases 
ZooKeeper requires additional Authorization information. "
+                                                    + "For example list of 
valid names for ensemble in order to prevent the accidental connecting to a 
wrong ensemble."
+                                                    + "Each entry of type 
Map.Entry<String, String> will be transformed into an AuthInfo object with the 
constructor AuthInfo(String, byte[]). "
+                                                    + "The field entry.key() 
will serve as the String scheme value, while the field entry.getValue() will be 
initially converted to a byte[] using the String#getBytes() method with UTF8 
encoding"

Review Comment:
   Maybe shortening the substrings might be useful as well. Not everyone has an 
ultra weight screen. :-)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to