szilard-nemeth commented on code in PR #5638:
URL: https://github.com/apache/hadoop/pull/5638#discussion_r1199527654


##########
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/curator/ZKCuratorManager.java:
##########
@@ -503,4 +644,50 @@ private void setJaasConfiguration(ZKClientConfig 
zkClientConfig) throws IOExcept
       zkClientConfig.setProperty(ZKClientConfig.LOGIN_CONTEXT_NAME_KEY, 
JAAS_CLIENT_ENTRY);
     }
   }
-}
\ No newline at end of file
+
+  /**
+   * Helper class to contain the Truststore/Keystore paths for the ZK client 
connection over
+   * SSL/TLS.
+   */
+  public static class TruststoreKeystore{
+    private static String keystoreLocation;
+    private static String keystorePassword;
+    private static String truststoreLocation;
+    private static String truststorePassword;
+    /** Configuration for the ZooKeeper connection when SSL/TLS is enabled.
+     * When a value is not configured, ensure that empty string is set instead 
of null.
+     * @param conf ZooKeeper Client configuration
+     */
+    public TruststoreKeystore(Configuration conf){
+
+      keystoreLocation =
+          
StringUtils.defaultString(conf.get(CommonConfigurationKeys.ZK_SSL_KEYSTORE_LOCATION,

Review Comment:
   Why the StringUtils.defaultString is needed? 
   I mean, conf.get() will return an empty string if the config is not found, 
given that you passed empty strings for all conf.get calls already.



##########
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/curator/TestSecureZKCuratorManager.java:
##########
@@ -0,0 +1,192 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.util.curator;
+
+import org.apache.curator.test.InstanceSpec;
+import org.apache.curator.test.TestingServer;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.client.ZKClientConfig;
+import org.apache.zookeeper.common.ClientX509Util;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.hadoop.fs.FileContext.LOG;
+import static org.junit.Assert.assertEquals;
+
+
+/**
+ * Test the manager for ZooKeeper Curator when SSL/TLS is enabled for the ZK 
server-client
+ * connection.
+ */
+public class TestSecureZKCuratorManager {
+
+  private TestingServer server;
+  private ZKCuratorManager curator;
+  private Configuration hadoopConf;
+  static final Integer SECURE_CLIENT_PORT = 2281;
+  static final Integer JUTE_MAXBUFFER = 400000000;
+  static final File ZK_DATA_DIR = new File("testZkSSLClientConnectionDataDir");
+
+  @Before
+  public void setup() throws Exception {
+    Integer defaultValue = -1;
+    Map<String, Object> customConfiguration = new HashMap<>();
+    customConfiguration.put("secureClientPort", SECURE_CLIENT_PORT.toString());
+    customConfiguration.put("audit.enable", true);
+    this.hadoopConf = setUpSecure();
+    InstanceSpec spec = new InstanceSpec(ZK_DATA_DIR, SECURE_CLIENT_PORT,
+        defaultValue,
+        defaultValue,
+        true,

Review Comment:
   Extracting these (at least 1, 100, and 10) to static finals would make this 
more readable and straightforward.



##########
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/curator/ZKCuratorManager.java:
##########
@@ -452,21 +502,50 @@ public static class HadoopZookeeperFactory implements 
ZookeeperFactory {
     private final String zkPrincipal;
     private final String kerberosPrincipal;
     private final String kerberosKeytab;
+    private final Boolean sslEnabled;
 
+    /**
+     * Constructor for the helper class to configure the ZooKeeper client 
connection.
+     * @param zkPrincipal Optional.
+     */
     public HadoopZookeeperFactory(String zkPrincipal) {
       this(zkPrincipal, null, null);
     }
-
+    /**
+     * Constructor for the helper class to configure the ZooKeeper client 
connection.
+     * @param zkPrincipal Optional.
+     * @param kerberosPrincipal Optional. Use along with kerberosKeytab.
+     * @param kerberosKeytab Optional. Use along with kerberosPrincipal.
+     */
     public HadoopZookeeperFactory(String zkPrincipal, String kerberosPrincipal,
         String kerberosKeytab) {
+      this(zkPrincipal, kerberosPrincipal, kerberosKeytab, false,
+          new TruststoreKeystore(new Configuration())); }
+
+    /**
+     * Constructor for the helper class to configure the ZooKeeper client 
connection.
+     * @param zkPrincipal Optional.
+     * @param kerberosPrincipal Optional. Use along with kerberosKeytab.
+     * @param kerberosKeytab Optional. Use along with kerberosPrincipal.
+     * @param sslEnabled Flag to enable SSL/TLS ZK client connection for each 
component
+     *                   independently.
+     * @param truststoreKeystore TruststoreKeystore object containing the 
keystoreLocation,
+     *                           keystorePassword, truststoreLocation, 
truststorePassword for
+     *                           SSL/TLS connection when sslEnabled is set to 
true.
+     */
+    public HadoopZookeeperFactory(String zkPrincipal,
+        String kerberosPrincipal,
+        String kerberosKeytab,
+        boolean sslEnabled,
+        TruststoreKeystore truststoreKeystore) {

Review Comment:
   the parameter called 'truststoreKeystore' is unused.
   Please remove it



##########
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/curator/TestSecureZKCuratorManager.java:
##########
@@ -0,0 +1,192 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.util.curator;
+
+import org.apache.curator.test.InstanceSpec;
+import org.apache.curator.test.TestingServer;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.client.ZKClientConfig;
+import org.apache.zookeeper.common.ClientX509Util;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.hadoop.fs.FileContext.LOG;
+import static org.junit.Assert.assertEquals;
+
+
+/**
+ * Test the manager for ZooKeeper Curator when SSL/TLS is enabled for the ZK 
server-client
+ * connection.
+ */
+public class TestSecureZKCuratorManager {
+
+  private TestingServer server;
+  private ZKCuratorManager curator;
+  private Configuration hadoopConf;
+  static final Integer SECURE_CLIENT_PORT = 2281;
+  static final Integer JUTE_MAXBUFFER = 400000000;
+  static final File ZK_DATA_DIR = new File("testZkSSLClientConnectionDataDir");
+
+  @Before
+  public void setup() throws Exception {
+    Integer defaultValue = -1;
+    Map<String, Object> customConfiguration = new HashMap<>();
+    customConfiguration.put("secureClientPort", SECURE_CLIENT_PORT.toString());
+    customConfiguration.put("audit.enable", true);
+    this.hadoopConf = setUpSecure();
+    InstanceSpec spec = new InstanceSpec(ZK_DATA_DIR, SECURE_CLIENT_PORT,
+        defaultValue,
+        defaultValue,
+        true,
+        1,
+        100,
+        10,
+        customConfiguration);
+    this.server = new TestingServer(spec, true);
+    this.hadoopConf.set(CommonConfigurationKeys.ZK_ADDRESS, 
this.server.getConnectString());
+    this.curator = new ZKCuratorManager(this.hadoopConf);
+    this.curator.start(new ArrayList<>(), true);
+  }
+
+  public static Configuration setUpSecure() throws Exception {
+    return 
setUpSecure("src/test/java/org/apache/hadoop/util/curator/resources/data/ssl/");
+  }
+  public static Configuration setUpSecure(String testDataPath) throws 
Exception {
+    Configuration conf = new Configuration();
+    System.setProperty("zookeeper.serverCnxnFactory", 
"org.apache.zookeeper.server" +
+        ".NettyServerCnxnFactory");
+
+    System.setProperty("zookeeper.ssl.keyStore.location", testDataPath + 
"keystore.jks");
+    System.setProperty("zookeeper.ssl.keyStore.password", "password");
+    System.setProperty("zookeeper.ssl.trustStore.location", testDataPath + 
"truststore.jks");
+    System.setProperty("zookeeper.ssl.trustStore.password", "password");
+    System.setProperty("zookeeper.request.timeout", "12345");
+
+    System.setProperty("jute.maxbuffer", JUTE_MAXBUFFER.toString());
+
+    System.setProperty("javax.net.debug", "ssl");
+    System.setProperty("zookeeper.authProvider.x509", 
"org.apache.zookeeper.server.auth" +
+        ".X509AuthenticationProvider");
+
+
+    conf.set(CommonConfigurationKeys.ZK_SSL_KEYSTORE_LOCATION, testDataPath +
+        "keystore.jks");
+    conf.set(CommonConfigurationKeys.ZK_SSL_KEYSTORE_PASSWORD, "password");
+    conf.set(CommonConfigurationKeys.ZK_SSL_TRUSTSTORE_LOCATION, testDataPath +
+        "truststore.jks");
+    conf.set(CommonConfigurationKeys.ZK_SSL_TRUSTSTORE_PASSWORD, "password");
+    return conf;
+  }
+
+  @After
+  public void teardown() throws Exception {
+    this.curator.close();
+    if (this.server != null) {
+      this.server.close();
+      this.server = null;
+    }
+  }
+
+  @Test
+  public void testSecureZKConfiguration() throws Exception {
+    LOG.info("Entered to the testSecureZKConfiguration test case.");
+    // Validate that HadoopZooKeeperFactory will set ZKConfig with given 
principals
+    ZKCuratorManager.HadoopZookeeperFactory factory =
+        new ZKCuratorManager.HadoopZookeeperFactory(
+            null,
+            null,
+            null,
+            true,
+            new ZKCuratorManager.TruststoreKeystore(hadoopConf));
+    ZooKeeper zk = factory.newZooKeeper(this.server.getConnectString(), 1000, 
null, false);
+    validateSSLConfiguration(
+        this.hadoopConf.get(CommonConfigurationKeys.ZK_SSL_KEYSTORE_LOCATION),
+        this.hadoopConf.get(CommonConfigurationKeys.ZK_SSL_KEYSTORE_PASSWORD),
+        
this.hadoopConf.get(CommonConfigurationKeys.ZK_SSL_TRUSTSTORE_LOCATION),
+        
this.hadoopConf.get(CommonConfigurationKeys.ZK_SSL_TRUSTSTORE_PASSWORD),
+        zk);
+  }
+
+  private void validateSSLConfiguration(
+      String keystoreLocation,
+      String keystorePassword,
+      String truststoreLocation,
+      String truststorePassword,
+      ZooKeeper zk){
+    try (ClientX509Util x509Util = new ClientX509Util()) {
+      //testing if custom values are set properly
+      assertEquals("Validate that expected clientConfig is set in ZK config", 
keystoreLocation,
+          
zk.getClientConfig().getProperty(x509Util.getSslKeystoreLocationProperty()));
+      assertEquals("Validate that expected clientConfig is set in ZK config", 
keystorePassword,
+          
zk.getClientConfig().getProperty(x509Util.getSslKeystorePasswdProperty()));
+      assertEquals("Validate that expected clientConfig is set in ZK config", 
truststoreLocation,
+          
zk.getClientConfig().getProperty(x509Util.getSslTruststoreLocationProperty()));
+      assertEquals("Validate that expected clientConfig is set in ZK config", 
truststorePassword,
+          
zk.getClientConfig().getProperty(x509Util.getSslTruststorePasswdProperty()));
+    }
+    //testing if constant values hardcoded into the code are set properly
+    assertEquals("Validate that expected clientConfig is set in ZK config", 
"true",
+        zk.getClientConfig().getProperty(ZKClientConfig.SECURE_CLIENT));
+    assertEquals("Validate that expected clientConfig is set in ZK config",
+        "org.apache.zookeeper.ClientCnxnSocketNetty",
+        
zk.getClientConfig().getProperty(ZKClientConfig.ZOOKEEPER_CLIENT_CNXN_SOCKET));
+  }
+
+  @Test
+  public void testTruststoreKeystoreConfiguration(){
+    LOG.info("Entered to the testTruststoreKeystoreConfiguration test case.");
+    /**

Review Comment:
   nit: Should be a block comment, now it's a dangling javadoc like this.



##########
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/curator/TestSecureZKCuratorManager.java:
##########
@@ -0,0 +1,192 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.util.curator;
+
+import org.apache.curator.test.InstanceSpec;
+import org.apache.curator.test.TestingServer;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.client.ZKClientConfig;
+import org.apache.zookeeper.common.ClientX509Util;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.hadoop.fs.FileContext.LOG;
+import static org.junit.Assert.assertEquals;
+
+
+/**
+ * Test the manager for ZooKeeper Curator when SSL/TLS is enabled for the ZK 
server-client
+ * connection.
+ */
+public class TestSecureZKCuratorManager {
+
+  private TestingServer server;
+  private ZKCuratorManager curator;
+  private Configuration hadoopConf;
+  static final Integer SECURE_CLIENT_PORT = 2281;
+  static final Integer JUTE_MAXBUFFER = 400000000;
+  static final File ZK_DATA_DIR = new File("testZkSSLClientConnectionDataDir");
+
+  @Before
+  public void setup() throws Exception {
+    Integer defaultValue = -1;
+    Map<String, Object> customConfiguration = new HashMap<>();
+    customConfiguration.put("secureClientPort", SECURE_CLIENT_PORT.toString());
+    customConfiguration.put("audit.enable", true);
+    this.hadoopConf = setUpSecure();
+    InstanceSpec spec = new InstanceSpec(ZK_DATA_DIR, SECURE_CLIENT_PORT,
+        defaultValue,
+        defaultValue,
+        true,
+        1,
+        100,
+        10,
+        customConfiguration);
+    this.server = new TestingServer(spec, true);
+    this.hadoopConf.set(CommonConfigurationKeys.ZK_ADDRESS, 
this.server.getConnectString());
+    this.curator = new ZKCuratorManager(this.hadoopConf);
+    this.curator.start(new ArrayList<>(), true);
+  }
+
+  public static Configuration setUpSecure() throws Exception {
+    return 
setUpSecure("src/test/java/org/apache/hadoop/util/curator/resources/data/ssl/");
+  }
+  public static Configuration setUpSecure(String testDataPath) throws 
Exception {
+    Configuration conf = new Configuration();
+    System.setProperty("zookeeper.serverCnxnFactory", 
"org.apache.zookeeper.server" +
+        ".NettyServerCnxnFactory");
+
+    System.setProperty("zookeeper.ssl.keyStore.location", testDataPath + 
"keystore.jks");
+    System.setProperty("zookeeper.ssl.keyStore.password", "password");
+    System.setProperty("zookeeper.ssl.trustStore.location", testDataPath + 
"truststore.jks");
+    System.setProperty("zookeeper.ssl.trustStore.password", "password");
+    System.setProperty("zookeeper.request.timeout", "12345");
+
+    System.setProperty("jute.maxbuffer", JUTE_MAXBUFFER.toString());
+
+    System.setProperty("javax.net.debug", "ssl");
+    System.setProperty("zookeeper.authProvider.x509", 
"org.apache.zookeeper.server.auth" +
+        ".X509AuthenticationProvider");
+
+
+    conf.set(CommonConfigurationKeys.ZK_SSL_KEYSTORE_LOCATION, testDataPath +
+        "keystore.jks");
+    conf.set(CommonConfigurationKeys.ZK_SSL_KEYSTORE_PASSWORD, "password");
+    conf.set(CommonConfigurationKeys.ZK_SSL_TRUSTSTORE_LOCATION, testDataPath +
+        "truststore.jks");
+    conf.set(CommonConfigurationKeys.ZK_SSL_TRUSTSTORE_PASSWORD, "password");
+    return conf;
+  }
+
+  @After
+  public void teardown() throws Exception {
+    this.curator.close();
+    if (this.server != null) {
+      this.server.close();
+      this.server = null;
+    }
+  }
+
+  @Test
+  public void testSecureZKConfiguration() throws Exception {
+    LOG.info("Entered to the testSecureZKConfiguration test case.");
+    // Validate that HadoopZooKeeperFactory will set ZKConfig with given 
principals
+    ZKCuratorManager.HadoopZookeeperFactory factory =
+        new ZKCuratorManager.HadoopZookeeperFactory(
+            null,
+            null,
+            null,
+            true,
+            new ZKCuratorManager.TruststoreKeystore(hadoopConf));
+    ZooKeeper zk = factory.newZooKeeper(this.server.getConnectString(), 1000, 
null, false);
+    validateSSLConfiguration(
+        this.hadoopConf.get(CommonConfigurationKeys.ZK_SSL_KEYSTORE_LOCATION),
+        this.hadoopConf.get(CommonConfigurationKeys.ZK_SSL_KEYSTORE_PASSWORD),
+        
this.hadoopConf.get(CommonConfigurationKeys.ZK_SSL_TRUSTSTORE_LOCATION),
+        
this.hadoopConf.get(CommonConfigurationKeys.ZK_SSL_TRUSTSTORE_PASSWORD),
+        zk);
+  }
+
+  private void validateSSLConfiguration(
+      String keystoreLocation,
+      String keystorePassword,
+      String truststoreLocation,
+      String truststorePassword,
+      ZooKeeper zk){
+    try (ClientX509Util x509Util = new ClientX509Util()) {
+      //testing if custom values are set properly
+      assertEquals("Validate that expected clientConfig is set in ZK config", 
keystoreLocation,
+          
zk.getClientConfig().getProperty(x509Util.getSslKeystoreLocationProperty()));
+      assertEquals("Validate that expected clientConfig is set in ZK config", 
keystorePassword,
+          
zk.getClientConfig().getProperty(x509Util.getSslKeystorePasswdProperty()));
+      assertEquals("Validate that expected clientConfig is set in ZK config", 
truststoreLocation,
+          
zk.getClientConfig().getProperty(x509Util.getSslTruststoreLocationProperty()));
+      assertEquals("Validate that expected clientConfig is set in ZK config", 
truststorePassword,
+          
zk.getClientConfig().getProperty(x509Util.getSslTruststorePasswdProperty()));
+    }
+    //testing if constant values hardcoded into the code are set properly
+    assertEquals("Validate that expected clientConfig is set in ZK config", 
"true",
+        zk.getClientConfig().getProperty(ZKClientConfig.SECURE_CLIENT));
+    assertEquals("Validate that expected clientConfig is set in ZK config",
+        "org.apache.zookeeper.ClientCnxnSocketNetty",
+        
zk.getClientConfig().getProperty(ZKClientConfig.ZOOKEEPER_CLIENT_CNXN_SOCKET));
+  }
+
+  @Test
+  public void testTruststoreKeystoreConfiguration(){
+    LOG.info("Entered to the testTruststoreKeystoreConfiguration test case.");
+    /**
+     * By default the truststore/keystore configurations are not set, hence 
the values are null.
+     * Validate that the null values are converted into empty strings by the 
class.
+     */
+    Configuration conf = new Configuration();
+    new ZKCuratorManager.TruststoreKeystore(conf);
+
+    assertEquals("Validate that null value is converted to empty string.", "",
+        ZKCuratorManager.TruststoreKeystore.getKeystoreLocation());
+    assertEquals("Validate that null value is converted to empty string.", "",
+        ZKCuratorManager.TruststoreKeystore.getKeystorePassword());
+    assertEquals("Validate that null value is converted to empty string.", "",
+        ZKCuratorManager.TruststoreKeystore.getTruststoreLocation());
+    assertEquals("Validate that null value is converted to empty string.", "",
+        ZKCuratorManager.TruststoreKeystore.getTruststorePassword());
+
+    //Validate that non-null values will remain intact
+    conf.set(CommonConfigurationKeys.ZK_SSL_KEYSTORE_LOCATION, 
"/keystore.jks");
+    conf.set(CommonConfigurationKeys.ZK_SSL_KEYSTORE_PASSWORD, 
"keystorePassword");
+    conf.set(CommonConfigurationKeys.ZK_SSL_TRUSTSTORE_LOCATION, 
"/truststore.jks");
+    conf.set(CommonConfigurationKeys.ZK_SSL_TRUSTSTORE_PASSWORD, 
"truststorePassword");
+    new ZKCuratorManager.TruststoreKeystore(conf);

Review Comment:
   It doesn't make too much sense to invoke a constructor of TruststoreKeystore 
and store all values from the config as static fields.
   Static is not really for this.
   I would just convert all the 4 static fields to instance fields in 
TruststoreKeystore. 
   Funnily I only noticed this weirdness from the usage of this class in here 
in the testcase.
   So basically, what will be changed here is to create a new instance of 
TruststoreKeystore with the conf and use the same getters, but on that object 
rather than in a static fashion like what you have currently.



##########
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/curator/TestSecureZKCuratorManager.java:
##########
@@ -0,0 +1,192 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.util.curator;
+
+import org.apache.curator.test.InstanceSpec;
+import org.apache.curator.test.TestingServer;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.client.ZKClientConfig;
+import org.apache.zookeeper.common.ClientX509Util;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.hadoop.fs.FileContext.LOG;
+import static org.junit.Assert.assertEquals;
+
+
+/**
+ * Test the manager for ZooKeeper Curator when SSL/TLS is enabled for the ZK 
server-client
+ * connection.
+ */
+public class TestSecureZKCuratorManager {
+
+  private TestingServer server;
+  private ZKCuratorManager curator;
+  private Configuration hadoopConf;
+  static final Integer SECURE_CLIENT_PORT = 2281;
+  static final Integer JUTE_MAXBUFFER = 400000000;
+  static final File ZK_DATA_DIR = new File("testZkSSLClientConnectionDataDir");
+
+  @Before
+  public void setup() throws Exception {
+    Integer defaultValue = -1;
+    Map<String, Object> customConfiguration = new HashMap<>();
+    customConfiguration.put("secureClientPort", SECURE_CLIENT_PORT.toString());
+    customConfiguration.put("audit.enable", true);
+    this.hadoopConf = setUpSecure();
+    InstanceSpec spec = new InstanceSpec(ZK_DATA_DIR, SECURE_CLIENT_PORT,
+        defaultValue,
+        defaultValue,
+        true,
+        1,
+        100,
+        10,
+        customConfiguration);
+    this.server = new TestingServer(spec, true);
+    this.hadoopConf.set(CommonConfigurationKeys.ZK_ADDRESS, 
this.server.getConnectString());
+    this.curator = new ZKCuratorManager(this.hadoopConf);
+    this.curator.start(new ArrayList<>(), true);
+  }
+
+  public static Configuration setUpSecure() throws Exception {
+    return 
setUpSecure("src/test/java/org/apache/hadoop/util/curator/resources/data/ssl/");
+  }
+  public static Configuration setUpSecure(String testDataPath) throws 
Exception {
+    Configuration conf = new Configuration();
+    System.setProperty("zookeeper.serverCnxnFactory", 
"org.apache.zookeeper.server" +
+        ".NettyServerCnxnFactory");
+
+    System.setProperty("zookeeper.ssl.keyStore.location", testDataPath + 
"keystore.jks");
+    System.setProperty("zookeeper.ssl.keyStore.password", "password");
+    System.setProperty("zookeeper.ssl.trustStore.location", testDataPath + 
"truststore.jks");
+    System.setProperty("zookeeper.ssl.trustStore.password", "password");
+    System.setProperty("zookeeper.request.timeout", "12345");
+
+    System.setProperty("jute.maxbuffer", JUTE_MAXBUFFER.toString());
+
+    System.setProperty("javax.net.debug", "ssl");
+    System.setProperty("zookeeper.authProvider.x509", 
"org.apache.zookeeper.server.auth" +
+        ".X509AuthenticationProvider");
+
+
+    conf.set(CommonConfigurationKeys.ZK_SSL_KEYSTORE_LOCATION, testDataPath +
+        "keystore.jks");
+    conf.set(CommonConfigurationKeys.ZK_SSL_KEYSTORE_PASSWORD, "password");
+    conf.set(CommonConfigurationKeys.ZK_SSL_TRUSTSTORE_LOCATION, testDataPath +
+        "truststore.jks");
+    conf.set(CommonConfigurationKeys.ZK_SSL_TRUSTSTORE_PASSWORD, "password");
+    return conf;
+  }
+
+  @After
+  public void teardown() throws Exception {
+    this.curator.close();
+    if (this.server != null) {
+      this.server.close();
+      this.server = null;
+    }
+  }
+
+  @Test
+  public void testSecureZKConfiguration() throws Exception {
+    LOG.info("Entered to the testSecureZKConfiguration test case.");
+    // Validate that HadoopZooKeeperFactory will set ZKConfig with given 
principals
+    ZKCuratorManager.HadoopZookeeperFactory factory =
+        new ZKCuratorManager.HadoopZookeeperFactory(
+            null,
+            null,
+            null,
+            true,
+            new ZKCuratorManager.TruststoreKeystore(hadoopConf));
+    ZooKeeper zk = factory.newZooKeeper(this.server.getConnectString(), 1000, 
null, false);
+    validateSSLConfiguration(
+        this.hadoopConf.get(CommonConfigurationKeys.ZK_SSL_KEYSTORE_LOCATION),
+        this.hadoopConf.get(CommonConfigurationKeys.ZK_SSL_KEYSTORE_PASSWORD),
+        
this.hadoopConf.get(CommonConfigurationKeys.ZK_SSL_TRUSTSTORE_LOCATION),
+        
this.hadoopConf.get(CommonConfigurationKeys.ZK_SSL_TRUSTSTORE_PASSWORD),
+        zk);
+  }
+
+  private void validateSSLConfiguration(
+      String keystoreLocation,
+      String keystorePassword,
+      String truststoreLocation,
+      String truststorePassword,
+      ZooKeeper zk){
+    try (ClientX509Util x509Util = new ClientX509Util()) {
+      //testing if custom values are set properly
+      assertEquals("Validate that expected clientConfig is set in ZK config", 
keystoreLocation,
+          
zk.getClientConfig().getProperty(x509Util.getSslKeystoreLocationProperty()));
+      assertEquals("Validate that expected clientConfig is set in ZK config", 
keystorePassword,
+          
zk.getClientConfig().getProperty(x509Util.getSslKeystorePasswdProperty()));
+      assertEquals("Validate that expected clientConfig is set in ZK config", 
truststoreLocation,
+          
zk.getClientConfig().getProperty(x509Util.getSslTruststoreLocationProperty()));
+      assertEquals("Validate that expected clientConfig is set in ZK config", 
truststorePassword,
+          
zk.getClientConfig().getProperty(x509Util.getSslTruststorePasswdProperty()));
+    }
+    //testing if constant values hardcoded into the code are set properly
+    assertEquals("Validate that expected clientConfig is set in ZK config", 
"true",

Review Comment:
   Nit: "true" --> Boolean.TRUE.toString()



##########
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/curator/TestSecureZKCuratorManager.java:
##########
@@ -0,0 +1,192 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.util.curator;
+
+import org.apache.curator.test.InstanceSpec;
+import org.apache.curator.test.TestingServer;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.client.ZKClientConfig;
+import org.apache.zookeeper.common.ClientX509Util;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.hadoop.fs.FileContext.LOG;
+import static org.junit.Assert.assertEquals;
+
+
+/**
+ * Test the manager for ZooKeeper Curator when SSL/TLS is enabled for the ZK 
server-client
+ * connection.
+ */
+public class TestSecureZKCuratorManager {
+
+  private TestingServer server;
+  private ZKCuratorManager curator;
+  private Configuration hadoopConf;
+  static final Integer SECURE_CLIENT_PORT = 2281;
+  static final Integer JUTE_MAXBUFFER = 400000000;
+  static final File ZK_DATA_DIR = new File("testZkSSLClientConnectionDataDir");
+
+  @Before
+  public void setup() throws Exception {
+    Integer defaultValue = -1;

Review Comment:
   could be primitive int.
   Moreover, the name 'defaultValue' does not tell me anything.
   Please add 2 static final ints and name them that resembles quorum port  and 
election port.



##########
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/curator/TestSecureZKCuratorManager.java:
##########
@@ -0,0 +1,192 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.util.curator;
+
+import org.apache.curator.test.InstanceSpec;
+import org.apache.curator.test.TestingServer;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.client.ZKClientConfig;
+import org.apache.zookeeper.common.ClientX509Util;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.hadoop.fs.FileContext.LOG;
+import static org.junit.Assert.assertEquals;
+
+
+/**
+ * Test the manager for ZooKeeper Curator when SSL/TLS is enabled for the ZK 
server-client
+ * connection.
+ */
+public class TestSecureZKCuratorManager {
+
+  private TestingServer server;
+  private ZKCuratorManager curator;
+  private Configuration hadoopConf;
+  static final Integer SECURE_CLIENT_PORT = 2281;
+  static final Integer JUTE_MAXBUFFER = 400000000;
+  static final File ZK_DATA_DIR = new File("testZkSSLClientConnectionDataDir");
+
+  @Before
+  public void setup() throws Exception {
+    Integer defaultValue = -1;
+    Map<String, Object> customConfiguration = new HashMap<>();
+    customConfiguration.put("secureClientPort", SECURE_CLIENT_PORT.toString());
+    customConfiguration.put("audit.enable", true);
+    this.hadoopConf = setUpSecure();
+    InstanceSpec spec = new InstanceSpec(ZK_DATA_DIR, SECURE_CLIENT_PORT,
+        defaultValue,
+        defaultValue,
+        true,
+        1,
+        100,
+        10,
+        customConfiguration);
+    this.server = new TestingServer(spec, true);
+    this.hadoopConf.set(CommonConfigurationKeys.ZK_ADDRESS, 
this.server.getConnectString());
+    this.curator = new ZKCuratorManager(this.hadoopConf);
+    this.curator.start(new ArrayList<>(), true);
+  }
+
+  public static Configuration setUpSecure() throws Exception {

Review Comment:
   Maybe setUpSecureConfig is a better name.



##########
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/curator/ZKCuratorManager.java:
##########
@@ -478,10 +557,72 @@ public ZooKeeper newZooKeeper(String connectString, int 
sessionTimeout,
       if (zkClientConfig.isSaslClientEnabled() && 
!isJaasConfigurationSet(zkClientConfig)) {
         setJaasConfiguration(zkClientConfig);
       }
+      if (sslEnabled) {
+        setSslConfiguration(zkClientConfig);
+      }
       return new ZooKeeper(connectString, sessionTimeout, watcher,
           canBeReadOnly, zkClientConfig);
     }
 
+    /**
+     * Configure ZooKeeper Client with SSL/TLS connection.
+     * @param zkClientConfig ZooKeeper Client configuration
+     * */
+    private void setSslConfiguration(ZKClientConfig zkClientConfig) throws 
ConfigurationException {
+      this.setSslConfiguration(zkClientConfig, new ClientX509Util());
+    }
+
+    private void setSslConfiguration(ZKClientConfig zkClientConfig, 
ClientX509Util x509Util)
+        throws ConfigurationException {
+      validateSslConfiguration(
+          TruststoreKeystore.keystoreLocation,
+          TruststoreKeystore.keystorePassword,
+          TruststoreKeystore.truststoreLocation,
+          TruststoreKeystore.truststorePassword);
+      LOG.info("Configuring the ZooKeeper client to use SSL/TLS encryption for 
connecting to the " +
+          "ZooKeeper server.");
+      LOG.debug("Configuring the ZooKeeper client with {} location: {}.",
+          TruststoreKeystore.keystoreLocation,
+          CommonConfigurationKeys.ZK_SSL_KEYSTORE_LOCATION);
+      LOG.debug("Configuring the ZooKeeper client with {} location: {}.",
+          TruststoreKeystore.truststoreLocation,
+          CommonConfigurationKeys.ZK_SSL_TRUSTSTORE_LOCATION);
+
+      zkClientConfig.setProperty(ZKClientConfig.SECURE_CLIENT, "true");
+      zkClientConfig.setProperty(ZKClientConfig.ZOOKEEPER_CLIENT_CNXN_SOCKET,
+          "org.apache.zookeeper.ClientCnxnSocketNetty");
+      zkClientConfig.setProperty(x509Util.getSslKeystoreLocationProperty(),
+          TruststoreKeystore.keystoreLocation);
+      zkClientConfig.setProperty(x509Util.getSslKeystorePasswdProperty(),
+          TruststoreKeystore.keystorePassword);
+      zkClientConfig.setProperty(x509Util.getSslTruststoreLocationProperty(),
+          TruststoreKeystore.truststoreLocation);
+      zkClientConfig.setProperty(x509Util.getSslTruststorePasswdProperty(),
+          TruststoreKeystore.truststorePassword);
+    }
+
+    private void validateSslConfiguration(String keystoreLocation, String 
keystorePassword,

Review Comment:
   All parameters are unused in this method. Please remove them.



##########
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/curator/TestSecureZKCuratorManager.java:
##########
@@ -0,0 +1,192 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.util.curator;
+
+import org.apache.curator.test.InstanceSpec;
+import org.apache.curator.test.TestingServer;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.client.ZKClientConfig;
+import org.apache.zookeeper.common.ClientX509Util;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.hadoop.fs.FileContext.LOG;
+import static org.junit.Assert.assertEquals;
+
+
+/**
+ * Test the manager for ZooKeeper Curator when SSL/TLS is enabled for the ZK 
server-client
+ * connection.
+ */
+public class TestSecureZKCuratorManager {
+
+  private TestingServer server;
+  private ZKCuratorManager curator;
+  private Configuration hadoopConf;
+  static final Integer SECURE_CLIENT_PORT = 2281;
+  static final Integer JUTE_MAXBUFFER = 400000000;
+  static final File ZK_DATA_DIR = new File("testZkSSLClientConnectionDataDir");
+
+  @Before
+  public void setup() throws Exception {
+    Integer defaultValue = -1;
+    Map<String, Object> customConfiguration = new HashMap<>();
+    customConfiguration.put("secureClientPort", SECURE_CLIENT_PORT.toString());
+    customConfiguration.put("audit.enable", true);
+    this.hadoopConf = setUpSecure();
+    InstanceSpec spec = new InstanceSpec(ZK_DATA_DIR, SECURE_CLIENT_PORT,
+        defaultValue,
+        defaultValue,
+        true,
+        1,
+        100,
+        10,
+        customConfiguration);
+    this.server = new TestingServer(spec, true);
+    this.hadoopConf.set(CommonConfigurationKeys.ZK_ADDRESS, 
this.server.getConnectString());
+    this.curator = new ZKCuratorManager(this.hadoopConf);
+    this.curator.start(new ArrayList<>(), true);
+  }
+
+  public static Configuration setUpSecure() throws Exception {
+    return 
setUpSecure("src/test/java/org/apache/hadoop/util/curator/resources/data/ssl/");
+  }
+  public static Configuration setUpSecure(String testDataPath) throws 
Exception {
+    Configuration conf = new Configuration();
+    System.setProperty("zookeeper.serverCnxnFactory", 
"org.apache.zookeeper.server" +

Review Comment:
   Nit: Getting the name of the class from the Class object of 
NettyServerCnxnFactory would be cleaner.



##########
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/curator/TestSecureZKCuratorManager.java:
##########
@@ -0,0 +1,192 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.util.curator;
+
+import org.apache.curator.test.InstanceSpec;
+import org.apache.curator.test.TestingServer;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.client.ZKClientConfig;
+import org.apache.zookeeper.common.ClientX509Util;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.hadoop.fs.FileContext.LOG;
+import static org.junit.Assert.assertEquals;
+
+
+/**
+ * Test the manager for ZooKeeper Curator when SSL/TLS is enabled for the ZK 
server-client
+ * connection.
+ */
+public class TestSecureZKCuratorManager {
+
+  private TestingServer server;
+  private ZKCuratorManager curator;
+  private Configuration hadoopConf;
+  static final Integer SECURE_CLIENT_PORT = 2281;
+  static final Integer JUTE_MAXBUFFER = 400000000;
+  static final File ZK_DATA_DIR = new File("testZkSSLClientConnectionDataDir");
+
+  @Before
+  public void setup() throws Exception {
+    Integer defaultValue = -1;
+    Map<String, Object> customConfiguration = new HashMap<>();
+    customConfiguration.put("secureClientPort", SECURE_CLIENT_PORT.toString());
+    customConfiguration.put("audit.enable", true);
+    this.hadoopConf = setUpSecure();
+    InstanceSpec spec = new InstanceSpec(ZK_DATA_DIR, SECURE_CLIENT_PORT,
+        defaultValue,
+        defaultValue,
+        true,
+        1,
+        100,
+        10,
+        customConfiguration);
+    this.server = new TestingServer(spec, true);
+    this.hadoopConf.set(CommonConfigurationKeys.ZK_ADDRESS, 
this.server.getConnectString());
+    this.curator = new ZKCuratorManager(this.hadoopConf);
+    this.curator.start(new ArrayList<>(), true);
+  }
+
+  public static Configuration setUpSecure() throws Exception {
+    return 
setUpSecure("src/test/java/org/apache/hadoop/util/curator/resources/data/ssl/");
+  }
+  public static Configuration setUpSecure(String testDataPath) throws 
Exception {
+    Configuration conf = new Configuration();
+    System.setProperty("zookeeper.serverCnxnFactory", 
"org.apache.zookeeper.server" +
+        ".NettyServerCnxnFactory");
+
+    System.setProperty("zookeeper.ssl.keyStore.location", testDataPath + 
"keystore.jks");
+    System.setProperty("zookeeper.ssl.keyStore.password", "password");
+    System.setProperty("zookeeper.ssl.trustStore.location", testDataPath + 
"truststore.jks");
+    System.setProperty("zookeeper.ssl.trustStore.password", "password");
+    System.setProperty("zookeeper.request.timeout", "12345");
+
+    System.setProperty("jute.maxbuffer", JUTE_MAXBUFFER.toString());
+
+    System.setProperty("javax.net.debug", "ssl");
+    System.setProperty("zookeeper.authProvider.x509", 
"org.apache.zookeeper.server.auth" +
+        ".X509AuthenticationProvider");
+
+
+    conf.set(CommonConfigurationKeys.ZK_SSL_KEYSTORE_LOCATION, testDataPath +
+        "keystore.jks");
+    conf.set(CommonConfigurationKeys.ZK_SSL_KEYSTORE_PASSWORD, "password");
+    conf.set(CommonConfigurationKeys.ZK_SSL_TRUSTSTORE_LOCATION, testDataPath +
+        "truststore.jks");
+    conf.set(CommonConfigurationKeys.ZK_SSL_TRUSTSTORE_PASSWORD, "password");
+    return conf;
+  }
+
+  @After
+  public void teardown() throws Exception {
+    this.curator.close();
+    if (this.server != null) {
+      this.server.close();
+      this.server = null;
+    }
+  }
+
+  @Test
+  public void testSecureZKConfiguration() throws Exception {
+    LOG.info("Entered to the testSecureZKConfiguration test case.");
+    // Validate that HadoopZooKeeperFactory will set ZKConfig with given 
principals
+    ZKCuratorManager.HadoopZookeeperFactory factory =
+        new ZKCuratorManager.HadoopZookeeperFactory(
+            null,
+            null,
+            null,
+            true,
+            new ZKCuratorManager.TruststoreKeystore(hadoopConf));
+    ZooKeeper zk = factory.newZooKeeper(this.server.getConnectString(), 1000, 
null, false);
+    validateSSLConfiguration(
+        this.hadoopConf.get(CommonConfigurationKeys.ZK_SSL_KEYSTORE_LOCATION),
+        this.hadoopConf.get(CommonConfigurationKeys.ZK_SSL_KEYSTORE_PASSWORD),
+        
this.hadoopConf.get(CommonConfigurationKeys.ZK_SSL_TRUSTSTORE_LOCATION),
+        
this.hadoopConf.get(CommonConfigurationKeys.ZK_SSL_TRUSTSTORE_PASSWORD),
+        zk);
+  }
+
+  private void validateSSLConfiguration(
+      String keystoreLocation,
+      String keystorePassword,
+      String truststoreLocation,
+      String truststorePassword,
+      ZooKeeper zk){
+    try (ClientX509Util x509Util = new ClientX509Util()) {
+      //testing if custom values are set properly
+      assertEquals("Validate that expected clientConfig is set in ZK config", 
keystoreLocation,
+          
zk.getClientConfig().getProperty(x509Util.getSslKeystoreLocationProperty()));
+      assertEquals("Validate that expected clientConfig is set in ZK config", 
keystorePassword,
+          
zk.getClientConfig().getProperty(x509Util.getSslKeystorePasswdProperty()));
+      assertEquals("Validate that expected clientConfig is set in ZK config", 
truststoreLocation,
+          
zk.getClientConfig().getProperty(x509Util.getSslTruststoreLocationProperty()));
+      assertEquals("Validate that expected clientConfig is set in ZK config", 
truststorePassword,
+          
zk.getClientConfig().getProperty(x509Util.getSslTruststorePasswdProperty()));
+    }
+    //testing if constant values hardcoded into the code are set properly
+    assertEquals("Validate that expected clientConfig is set in ZK config", 
"true",
+        zk.getClientConfig().getProperty(ZKClientConfig.SECURE_CLIENT));
+    assertEquals("Validate that expected clientConfig is set in ZK config",
+        "org.apache.zookeeper.ClientCnxnSocketNetty",

Review Comment:
   Nit: Getting the name from the Class object will be cleaner.



##########
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/curator/TestSecureZKCuratorManager.java:
##########
@@ -0,0 +1,192 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.util.curator;
+
+import org.apache.curator.test.InstanceSpec;
+import org.apache.curator.test.TestingServer;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.client.ZKClientConfig;
+import org.apache.zookeeper.common.ClientX509Util;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.hadoop.fs.FileContext.LOG;
+import static org.junit.Assert.assertEquals;
+
+
+/**
+ * Test the manager for ZooKeeper Curator when SSL/TLS is enabled for the ZK 
server-client
+ * connection.
+ */
+public class TestSecureZKCuratorManager {
+
+  private TestingServer server;
+  private ZKCuratorManager curator;
+  private Configuration hadoopConf;
+  static final Integer SECURE_CLIENT_PORT = 2281;
+  static final Integer JUTE_MAXBUFFER = 400000000;
+  static final File ZK_DATA_DIR = new File("testZkSSLClientConnectionDataDir");
+
+  @Before
+  public void setup() throws Exception {
+    Integer defaultValue = -1;
+    Map<String, Object> customConfiguration = new HashMap<>();
+    customConfiguration.put("secureClientPort", SECURE_CLIENT_PORT.toString());
+    customConfiguration.put("audit.enable", true);
+    this.hadoopConf = setUpSecure();
+    InstanceSpec spec = new InstanceSpec(ZK_DATA_DIR, SECURE_CLIENT_PORT,
+        defaultValue,
+        defaultValue,
+        true,
+        1,
+        100,
+        10,
+        customConfiguration);
+    this.server = new TestingServer(spec, true);
+    this.hadoopConf.set(CommonConfigurationKeys.ZK_ADDRESS, 
this.server.getConnectString());
+    this.curator = new ZKCuratorManager(this.hadoopConf);
+    this.curator.start(new ArrayList<>(), true);
+  }
+
+  public static Configuration setUpSecure() throws Exception {
+    return 
setUpSecure("src/test/java/org/apache/hadoop/util/curator/resources/data/ssl/");
+  }
+  public static Configuration setUpSecure(String testDataPath) throws 
Exception {

Review Comment:
   This method does not trow any Exception according to IntelliJ



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to