jfrazee commented on a change in pull request #4753:
URL: https://github.com/apache/nifi/pull/4753#discussion_r561157059



##########
File path: 
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/server/ZooKeeperStateServer.java
##########
@@ -136,13 +156,14 @@ private void startDistributed() throws IOException {
             quorumPeer.setInitLimit(quorumPeerConfig.getInitLimit());
             quorumPeer.setSyncLimit(quorumPeerConfig.getSyncLimit());
             quorumPeer.setQuorumVerifier(quorumPeerConfig.getQuorumVerifier(), 
false);
-            quorumPeer.setCnxnFactory(connectionFactory);
             quorumPeer.setZKDatabase(new 
ZKDatabase(quorumPeer.getTxnFactory()));
             quorumPeer.setLearnerType(quorumPeerConfig.getPeerType());
             quorumPeer.setSyncEnabled(quorumPeerConfig.getSyncEnabled());
             
quorumPeer.setQuorumListenOnAllIPs(quorumPeerConfig.getQuorumListenOnAllIPs());
+            quorumPeer.setSslQuorum(quorumPeerConfig.isSslQuorum());
 
             quorumPeer.start();
+

Review comment:
       ```suggestion
   ```

##########
File path: 
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/server/ZooKeeperStateServer.java
##########
@@ -198,6 +219,144 @@ public static ZooKeeperStateServer create(final 
NiFiProperties properties) throw
             zkProperties.load(bis);
         }
 
-        return new ZooKeeperStateServer(zkProperties);
+        return new ZooKeeperStateServer(reconcileProperties(properties, 
zkProperties));
+    }
+
+    /**
+     * Reconcile properties between the nifi.properties and 
zookeeper.properties (zoo.cfg) files. Most of the ZooKeeper server properties 
are derived from
+     * the zookeeper.properties file, while the TLS key/truststore properties 
are taken from nifi.properties.
+     * @param niFiProperties NiFiProperties file containing ZooKeeper client 
and TLS configuration
+     * @param zkProperties The zookeeper.properties file containing Zookeeper 
server configuration

Review comment:
       ```suggestion
        * @param zkProperties The zookeeper.properties file containing 
ZooKeeper server configuration
   ```

##########
File path: 
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/server/ZooKeeperStateServer.java
##########
@@ -198,6 +219,144 @@ public static ZooKeeperStateServer create(final 
NiFiProperties properties) throw
             zkProperties.load(bis);
         }
 
-        return new ZooKeeperStateServer(zkProperties);
+        return new ZooKeeperStateServer(reconcileProperties(properties, 
zkProperties));
+    }
+
+    /**
+     * Reconcile properties between the nifi.properties and 
zookeeper.properties (zoo.cfg) files. Most of the ZooKeeper server properties 
are derived from
+     * the zookeeper.properties file, while the TLS key/truststore properties 
are taken from nifi.properties.
+     * @param niFiProperties NiFiProperties file containing ZooKeeper client 
and TLS configuration
+     * @param zkProperties The zookeeper.properties file containing Zookeeper 
server configuration
+     * @return A reconciled QuorumPeerConfig which will include TLS properties 
set if they are available.
+     * @throws IOException If configuration files fail to parse.
+     * @throws ConfigException If secure configuration is not as expected. 
Check administration documentation.
+     */
+    private static QuorumPeerConfig reconcileProperties(NiFiProperties 
niFiProperties, Properties zkProperties) throws IOException, ConfigException {
+        QuorumPeerConfig peerConfig = new QuorumPeerConfig();
+        peerConfig.parseProperties(zkProperties);
+
+        final boolean niFiConfigIsSecure = 
isNiFiConfigSecureForZooKeeper(niFiProperties);
+        final boolean zooKeeperConfigIsSecure = 
isZooKeeperConfigSecure(peerConfig);
+
+        if (!zooKeeperConfigIsSecure && !niFiConfigIsSecure) {
+            logger.info("{} property is set to false or is not present, and 
zookeeper.properties file does not contain secureClientPort property, so 
embedded ZooKeeper will be started without TLS.",
+                    NiFiProperties.ZOOKEEPER_CLIENT_SECURE);
+            return peerConfig;
+        }
+
+        // If secureClientPort is set but no TLS config is set, fail to start.
+        if (zooKeeperConfigIsSecure && !niFiConfigIsSecure) {
+            throw new ConfigException(
+                    String.format("Zookeeper properties file %s was configured 
to be secure but there was no valid TLS config present in nifi.properties or " +
+                                  "nifi.zookeeper.client.secure was set to 
false. Check the administration guide.",
+                                   
niFiProperties.getProperty(NiFiProperties.STATE_MANAGEMENT_ZOOKEEPER_PROPERTIES)));
+        }
+
+        // Remove any insecure ports if they were set in zookeeper.properties
+        ensureOnlySecurePortsAreEnabled(peerConfig, zkProperties);
+
+        // Set base ZooKeeper TLS server properties
+        setTlsProperties(zkProperties, new ZooKeeperServerX509Util(), 
niFiProperties);
+        // Set quorum ZooKeeper TLS server properties
+        setTlsProperties(zkProperties, new ZooKeeperQuorumX509Util(), 
niFiProperties);
+        // Set TLS client port:
+        zkProperties.setProperty("secureClientPort", 
getSecurePort(peerConfig));
+
+        // Set the required connection factory for TLS
+        zkProperties.setProperty(ZOOKEEPER_SERVER_CNXN_FACTORY, 
NettyServerCnxnFactory.class.getName());
+        zkProperties.setProperty(ZOOKEEPER_SSL_QUORUM, 
Boolean.TRUE.toString());

Review comment:
       Is there a reason these aren't in `setTlsProperties()`?

##########
File path: 
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/server/ZooKeeperStateServer.java
##########
@@ -198,6 +219,144 @@ public static ZooKeeperStateServer create(final 
NiFiProperties properties) throw
             zkProperties.load(bis);
         }
 
-        return new ZooKeeperStateServer(zkProperties);
+        return new ZooKeeperStateServer(reconcileProperties(properties, 
zkProperties));
+    }
+
+    /**
+     * Reconcile properties between the nifi.properties and 
zookeeper.properties (zoo.cfg) files. Most of the ZooKeeper server properties 
are derived from
+     * the zookeeper.properties file, while the TLS key/truststore properties 
are taken from nifi.properties.
+     * @param niFiProperties NiFiProperties file containing ZooKeeper client 
and TLS configuration
+     * @param zkProperties The zookeeper.properties file containing Zookeeper 
server configuration
+     * @return A reconciled QuorumPeerConfig which will include TLS properties 
set if they are available.
+     * @throws IOException If configuration files fail to parse.
+     * @throws ConfigException If secure configuration is not as expected. 
Check administration documentation.
+     */
+    private static QuorumPeerConfig reconcileProperties(NiFiProperties 
niFiProperties, Properties zkProperties) throws IOException, ConfigException {
+        QuorumPeerConfig peerConfig = new QuorumPeerConfig();
+        peerConfig.parseProperties(zkProperties);
+
+        final boolean niFiConfigIsSecure = 
isNiFiConfigSecureForZooKeeper(niFiProperties);
+        final boolean zooKeeperConfigIsSecure = 
isZooKeeperConfigSecure(peerConfig);
+
+        if (!zooKeeperConfigIsSecure && !niFiConfigIsSecure) {
+            logger.info("{} property is set to false or is not present, and 
zookeeper.properties file does not contain secureClientPort property, so 
embedded ZooKeeper will be started without TLS.",
+                    NiFiProperties.ZOOKEEPER_CLIENT_SECURE);
+            return peerConfig;
+        }
+
+        // If secureClientPort is set but no TLS config is set, fail to start.
+        if (zooKeeperConfigIsSecure && !niFiConfigIsSecure) {
+            throw new ConfigException(
+                    String.format("Zookeeper properties file %s was configured 
to be secure but there was no valid TLS config present in nifi.properties or " +
+                                  "nifi.zookeeper.client.secure was set to 
false. Check the administration guide.",
+                                   
niFiProperties.getProperty(NiFiProperties.STATE_MANAGEMENT_ZOOKEEPER_PROPERTIES)));
+        }
+
+        // Remove any insecure ports if they were set in zookeeper.properties
+        ensureOnlySecurePortsAreEnabled(peerConfig, zkProperties);
+
+        // Set base ZooKeeper TLS server properties
+        setTlsProperties(zkProperties, new ZooKeeperServerX509Util(), 
niFiProperties);
+        // Set quorum ZooKeeper TLS server properties
+        setTlsProperties(zkProperties, new ZooKeeperQuorumX509Util(), 
niFiProperties);
+        // Set TLS client port:
+        zkProperties.setProperty("secureClientPort", 
getSecurePort(peerConfig));
+
+        // Set the required connection factory for TLS
+        zkProperties.setProperty(ZOOKEEPER_SERVER_CNXN_FACTORY, 
NettyServerCnxnFactory.class.getName());
+        zkProperties.setProperty(ZOOKEEPER_SSL_QUORUM, 
Boolean.TRUE.toString());
+
+        // Port unification allows both secure and insecure connections - 
setting to false means only secure connections will be allowed.
+        zkProperties.setProperty(ZOOKEEPER_PORT_UNIFICATION, 
Boolean.FALSE.toString());
+
+        // Recreate and reload the adjusted properties to ensure they're still 
valid for ZK:
+        peerConfig = new QuorumPeerConfig();
+        peerConfig.parseProperties(zkProperties);
+        return peerConfig;
+    }
+
+    private static boolean isZooKeeperConfigSecure(QuorumPeerConfig 
peerConfig) throws ConfigException {
+        InetSocketAddress secureAddress = 
peerConfig.getSecureClientPortAddress();
+        InetSocketAddress insecureAddress = peerConfig.getClientPortAddress();
+
+        if(secureAddress == null && insecureAddress == null) {
+            throw new ConfigException("No clientAddress or secureClientAddress 
is set in zookeeper.properties");
+        }
+
+        return secureAddress != null;
+    }
+
+    /**
+     * Verify whether the NiFi properties portion of ZooKeeper are correctly 
configured for TLS or not
+     * @param niFiProperties The loaded nifi.properties
+     * @return True if NiFi has TLS configuration and the property 
nifi.zookeeper.client.secure=true, otherwise false or configuration exception
+     * @throws ConfigException If nifi.zookeeper.client.secure=true but no TLS 
configuration is present
+     */
+    private static boolean isNiFiConfigSecureForZooKeeper(NiFiProperties 
niFiProperties) throws ConfigException {
+        final boolean istlsConfigPresent = 
niFiProperties.isZooKeeperTlsConfigurationPresent() || 
niFiProperties.isTlsConfigurationPresent();
+        final boolean isZooKeeperClientSecure = 
Boolean.parseBoolean(niFiProperties.getProperty(NiFiProperties.ZOOKEEPER_CLIENT_SECURE));
+
+        if(isZooKeeperClientSecure && !istlsConfigPresent) {
+            throw new ConfigException(String.format("%s is true but no TLS 
configuration is present in nifi.properties.", 
NiFiProperties.ZOOKEEPER_CLIENT_SECURE));
+        }
+
+        if(isZooKeeperClientSecure && istlsConfigPresent) {
+            return true;
+        }
+
+        return false;

Review comment:
       ```suggestion
           return (isZooKeeperClientSecure && isTlsConfigPresent);
   ```

##########
File path: 
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/server/ZooKeeperStateServer.java
##########
@@ -198,6 +219,144 @@ public static ZooKeeperStateServer create(final 
NiFiProperties properties) throw
             zkProperties.load(bis);
         }
 
-        return new ZooKeeperStateServer(zkProperties);
+        return new ZooKeeperStateServer(reconcileProperties(properties, 
zkProperties));
+    }
+
+    /**
+     * Reconcile properties between the nifi.properties and 
zookeeper.properties (zoo.cfg) files. Most of the ZooKeeper server properties 
are derived from
+     * the zookeeper.properties file, while the TLS key/truststore properties 
are taken from nifi.properties.
+     * @param niFiProperties NiFiProperties file containing ZooKeeper client 
and TLS configuration
+     * @param zkProperties The zookeeper.properties file containing Zookeeper 
server configuration
+     * @return A reconciled QuorumPeerConfig which will include TLS properties 
set if they are available.
+     * @throws IOException If configuration files fail to parse.
+     * @throws ConfigException If secure configuration is not as expected. 
Check administration documentation.
+     */
+    private static QuorumPeerConfig reconcileProperties(NiFiProperties 
niFiProperties, Properties zkProperties) throws IOException, ConfigException {
+        QuorumPeerConfig peerConfig = new QuorumPeerConfig();
+        peerConfig.parseProperties(zkProperties);
+
+        final boolean niFiConfigIsSecure = 
isNiFiConfigSecureForZooKeeper(niFiProperties);
+        final boolean zooKeeperConfigIsSecure = 
isZooKeeperConfigSecure(peerConfig);
+
+        if (!zooKeeperConfigIsSecure && !niFiConfigIsSecure) {
+            logger.info("{} property is set to false or is not present, and 
zookeeper.properties file does not contain secureClientPort property, so 
embedded ZooKeeper will be started without TLS.",
+                    NiFiProperties.ZOOKEEPER_CLIENT_SECURE);
+            return peerConfig;
+        }
+
+        // If secureClientPort is set but no TLS config is set, fail to start.
+        if (zooKeeperConfigIsSecure && !niFiConfigIsSecure) {
+            throw new ConfigException(
+                    String.format("Zookeeper properties file %s was configured 
to be secure but there was no valid TLS config present in nifi.properties or " +
+                                  "nifi.zookeeper.client.secure was set to 
false. Check the administration guide.",
+                                   
niFiProperties.getProperty(NiFiProperties.STATE_MANAGEMENT_ZOOKEEPER_PROPERTIES)));
+        }
+
+        // Remove any insecure ports if they were set in zookeeper.properties
+        ensureOnlySecurePortsAreEnabled(peerConfig, zkProperties);
+
+        // Set base ZooKeeper TLS server properties
+        setTlsProperties(zkProperties, new ZooKeeperServerX509Util(), 
niFiProperties);
+        // Set quorum ZooKeeper TLS server properties
+        setTlsProperties(zkProperties, new ZooKeeperQuorumX509Util(), 
niFiProperties);
+        // Set TLS client port:
+        zkProperties.setProperty("secureClientPort", 
getSecurePort(peerConfig));
+
+        // Set the required connection factory for TLS
+        zkProperties.setProperty(ZOOKEEPER_SERVER_CNXN_FACTORY, 
NettyServerCnxnFactory.class.getName());
+        zkProperties.setProperty(ZOOKEEPER_SSL_QUORUM, 
Boolean.TRUE.toString());
+
+        // Port unification allows both secure and insecure connections - 
setting to false means only secure connections will be allowed.
+        zkProperties.setProperty(ZOOKEEPER_PORT_UNIFICATION, 
Boolean.FALSE.toString());
+
+        // Recreate and reload the adjusted properties to ensure they're still 
valid for ZK:
+        peerConfig = new QuorumPeerConfig();
+        peerConfig.parseProperties(zkProperties);
+        return peerConfig;
+    }
+
+    private static boolean isZooKeeperConfigSecure(QuorumPeerConfig 
peerConfig) throws ConfigException {
+        InetSocketAddress secureAddress = 
peerConfig.getSecureClientPortAddress();
+        InetSocketAddress insecureAddress = peerConfig.getClientPortAddress();
+
+        if(secureAddress == null && insecureAddress == null) {
+            throw new ConfigException("No clientAddress or secureClientAddress 
is set in zookeeper.properties");
+        }
+
+        return secureAddress != null;
+    }
+
+    /**
+     * Verify whether the NiFi properties portion of ZooKeeper are correctly 
configured for TLS or not
+     * @param niFiProperties The loaded nifi.properties
+     * @return True if NiFi has TLS configuration and the property 
nifi.zookeeper.client.secure=true, otherwise false or configuration exception
+     * @throws ConfigException If nifi.zookeeper.client.secure=true but no TLS 
configuration is present
+     */
+    private static boolean isNiFiConfigSecureForZooKeeper(NiFiProperties 
niFiProperties) throws ConfigException {
+        final boolean istlsConfigPresent = 
niFiProperties.isZooKeeperTlsConfigurationPresent() || 
niFiProperties.isTlsConfigurationPresent();
+        final boolean isZooKeeperClientSecure = 
Boolean.parseBoolean(niFiProperties.getProperty(NiFiProperties.ZOOKEEPER_CLIENT_SECURE));
+
+        if(isZooKeeperClientSecure && !istlsConfigPresent) {
+            throw new ConfigException(String.format("%s is true but no TLS 
configuration is present in nifi.properties.", 
NiFiProperties.ZOOKEEPER_CLIENT_SECURE));
+        }
+
+        if(isZooKeeperClientSecure && istlsConfigPresent) {
+            return true;
+        }
+
+        return false;
+    }
+
+    private static void ensureOnlySecurePortsAreEnabled(QuorumPeerConfig 
config, Properties zkProperties) {
+
+        // Remove plaintext client ports and addresses and warn if set, see 
NIFI-7203:
+        InetSocketAddress clientPort = config.getClientPortAddress();
+        InetSocketAddress secureClientPort = 
config.getSecureClientPortAddress();
+
+        if (clientPort != null && secureClientPort != null) {
+            zkProperties.remove("clientPort");
+            zkProperties.remove("clientPortAddress");
+            logger.warn("Invalid configuration was detected: A secure NiFi 
with an embedded ZooKeeper was configured for insecure connections. " +
+                    "Insecure ports have been removed from embedded ZooKeeper 
configuration to deactivate insecure connections.");
+        }
+    }
+
+    private static void setTlsProperties(Properties zooKeeperProperties, 
X509Util zooKeeperUtil, NiFiProperties niFiProperties) {
+        
zooKeeperProperties.setProperty(zooKeeperUtil.getSslKeystoreLocationProperty(),
+                ZooKeeperClientConfig.getPreferredProperty(niFiProperties, 
NiFiProperties.ZOOKEEPER_SECURITY_KEYSTORE, NiFiProperties.SECURITY_KEYSTORE));
+        
zooKeeperProperties.setProperty(zooKeeperUtil.getSslKeystorePasswdProperty(),
+                ZooKeeperClientConfig.getPreferredProperty(niFiProperties, 
NiFiProperties.ZOOKEEPER_SECURITY_KEYSTORE_PASSWD, 
NiFiProperties.SECURITY_KEYSTORE_PASSWD));
+        
zooKeeperProperties.setProperty(zooKeeperUtil.getSslKeystoreTypeProperty(),
+                ZooKeeperClientConfig.getPreferredProperty(niFiProperties, 
NiFiProperties.ZOOKEEPER_SECURITY_KEYSTORE_TYPE, 
NiFiProperties.SECURITY_KEYSTORE_TYPE));
+        
zooKeeperProperties.setProperty(zooKeeperUtil.getSslTruststoreLocationProperty(),
+                ZooKeeperClientConfig.getPreferredProperty(niFiProperties, 
NiFiProperties.ZOOKEEPER_SECURITY_TRUSTSTORE, 
NiFiProperties.SECURITY_TRUSTSTORE));
+        
zooKeeperProperties.setProperty(zooKeeperUtil.getSslTruststorePasswdProperty(),
+                ZooKeeperClientConfig.getPreferredProperty(niFiProperties, 
NiFiProperties.ZOOKEEPER_SECURITY_TRUSTSTORE_PASSWD, 
NiFiProperties.SECURITY_TRUSTSTORE_PASSWD));
+        
zooKeeperProperties.setProperty(zooKeeperUtil.getSslTruststoreTypeProperty(),
+                ZooKeeperClientConfig.getPreferredProperty(niFiProperties, 
NiFiProperties.ZOOKEEPER_SECURITY_TRUSTSTORE_TYPE, 
NiFiProperties.SECURITY_TRUSTSTORE_TYPE));
+    }
+
+    private static String getSecurePort(QuorumPeerConfig peerConfig) throws 
ConfigException {
+        final InetSocketAddress secureClientAddress = 
peerConfig.getSecureClientPortAddress();
+        String secureClientPort = null;
+
+        if (secureClientAddress != null && secureClientAddress.getPort() >= 
MIN_PORT && secureClientAddress.getPort() <= MAX_PORT) {
+            secureClientPort = String.valueOf(secureClientAddress.getPort());
+            logger.debug("Secure client port retrieved from ZooKeeper 
configuration: {}", secureClientPort);

Review comment:
       ```suggestion
               if (logger.isDebugEnabled() {
                   logger.debug("Secure client port retrieved from ZooKeeper 
configuration: {}", secureClientPort);
               }
   ```

##########
File path: 
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/server/ZooKeeperStateServer.java
##########
@@ -122,11 +135,18 @@ private void startDistributed() throws IOException {
 
         try {
             transactionLog = new 
FileTxnSnapLog(quorumPeerConfig.getDataLogDir(), quorumPeerConfig.getDataDir());
-
             connectionFactory = ServerCnxnFactory.createFactory();
-            
connectionFactory.configure(quorumPeerConfig.getClientPortAddress(), 
quorumPeerConfig.getMaxClientCnxns());
+            
connectionFactory.configure(getAvailableSocketAddress(quorumPeerConfig), 
quorumPeerConfig.getMaxClientCnxns(), quorumPeerConfig.isSslQuorum());
 
             quorumPeer = new QuorumPeer();
+
+            // Set the secure connection factory if the quorum is supposed to 
be secure.
+            if(quorumPeerConfig.isSslQuorum()) {

Review comment:
       General consistency nit on spacing. Similar stuff elsewhere.
   ```suggestion
               if (quorumPeerConfig.isSslQuorum()) {
   ```

##########
File path: 
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/server/ZooKeeperStateServer.java
##########
@@ -198,6 +219,144 @@ public static ZooKeeperStateServer create(final 
NiFiProperties properties) throw
             zkProperties.load(bis);
         }
 
-        return new ZooKeeperStateServer(zkProperties);
+        return new ZooKeeperStateServer(reconcileProperties(properties, 
zkProperties));
+    }
+
+    /**
+     * Reconcile properties between the nifi.properties and 
zookeeper.properties (zoo.cfg) files. Most of the ZooKeeper server properties 
are derived from
+     * the zookeeper.properties file, while the TLS key/truststore properties 
are taken from nifi.properties.
+     * @param niFiProperties NiFiProperties file containing ZooKeeper client 
and TLS configuration
+     * @param zkProperties The zookeeper.properties file containing Zookeeper 
server configuration
+     * @return A reconciled QuorumPeerConfig which will include TLS properties 
set if they are available.
+     * @throws IOException If configuration files fail to parse.
+     * @throws ConfigException If secure configuration is not as expected. 
Check administration documentation.
+     */
+    private static QuorumPeerConfig reconcileProperties(NiFiProperties 
niFiProperties, Properties zkProperties) throws IOException, ConfigException {
+        QuorumPeerConfig peerConfig = new QuorumPeerConfig();
+        peerConfig.parseProperties(zkProperties);
+
+        final boolean niFiConfigIsSecure = 
isNiFiConfigSecureForZooKeeper(niFiProperties);
+        final boolean zooKeeperConfigIsSecure = 
isZooKeeperConfigSecure(peerConfig);
+
+        if (!zooKeeperConfigIsSecure && !niFiConfigIsSecure) {
+            logger.info("{} property is set to false or is not present, and 
zookeeper.properties file does not contain secureClientPort property, so 
embedded ZooKeeper will be started without TLS.",
+                    NiFiProperties.ZOOKEEPER_CLIENT_SECURE);
+            return peerConfig;
+        }
+
+        // If secureClientPort is set but no TLS config is set, fail to start.
+        if (zooKeeperConfigIsSecure && !niFiConfigIsSecure) {
+            throw new ConfigException(
+                    String.format("Zookeeper properties file %s was configured 
to be secure but there was no valid TLS config present in nifi.properties or " +
+                                  "nifi.zookeeper.client.secure was set to 
false. Check the administration guide.",
+                                   
niFiProperties.getProperty(NiFiProperties.STATE_MANAGEMENT_ZOOKEEPER_PROPERTIES)));
+        }
+
+        // Remove any insecure ports if they were set in zookeeper.properties
+        ensureOnlySecurePortsAreEnabled(peerConfig, zkProperties);
+
+        // Set base ZooKeeper TLS server properties
+        setTlsProperties(zkProperties, new ZooKeeperServerX509Util(), 
niFiProperties);
+        // Set quorum ZooKeeper TLS server properties
+        setTlsProperties(zkProperties, new ZooKeeperQuorumX509Util(), 
niFiProperties);
+        // Set TLS client port:
+        zkProperties.setProperty("secureClientPort", 
getSecurePort(peerConfig));
+
+        // Set the required connection factory for TLS
+        zkProperties.setProperty(ZOOKEEPER_SERVER_CNXN_FACTORY, 
NettyServerCnxnFactory.class.getName());
+        zkProperties.setProperty(ZOOKEEPER_SSL_QUORUM, 
Boolean.TRUE.toString());
+
+        // Port unification allows both secure and insecure connections - 
setting to false means only secure connections will be allowed.
+        zkProperties.setProperty(ZOOKEEPER_PORT_UNIFICATION, 
Boolean.FALSE.toString());
+
+        // Recreate and reload the adjusted properties to ensure they're still 
valid for ZK:
+        peerConfig = new QuorumPeerConfig();
+        peerConfig.parseProperties(zkProperties);
+        return peerConfig;
+    }
+
+    private static boolean isZooKeeperConfigSecure(QuorumPeerConfig 
peerConfig) throws ConfigException {
+        InetSocketAddress secureAddress = 
peerConfig.getSecureClientPortAddress();
+        InetSocketAddress insecureAddress = peerConfig.getClientPortAddress();
+
+        if(secureAddress == null && insecureAddress == null) {
+            throw new ConfigException("No clientAddress or secureClientAddress 
is set in zookeeper.properties");
+        }
+
+        return secureAddress != null;
+    }
+
+    /**
+     * Verify whether the NiFi properties portion of ZooKeeper are correctly 
configured for TLS or not
+     * @param niFiProperties The loaded nifi.properties
+     * @return True if NiFi has TLS configuration and the property 
nifi.zookeeper.client.secure=true, otherwise false or configuration exception
+     * @throws ConfigException If nifi.zookeeper.client.secure=true but no TLS 
configuration is present
+     */
+    private static boolean isNiFiConfigSecureForZooKeeper(NiFiProperties 
niFiProperties) throws ConfigException {
+        final boolean istlsConfigPresent = 
niFiProperties.isZooKeeperTlsConfigurationPresent() || 
niFiProperties.isTlsConfigurationPresent();
+        final boolean isZooKeeperClientSecure = 
Boolean.parseBoolean(niFiProperties.getProperty(NiFiProperties.ZOOKEEPER_CLIENT_SECURE));

Review comment:
       There's a `isZooKeeperClientSecure()` in `NiFiProperties` to avoid 
unexpected behavior with `parseBoolean()`
   ```suggestion
           final boolean isTlsConfigPresent = 
niFiProperties.isZooKeeperTlsConfigurationPresent() || 
niFiProperties.isTlsConfigurationPresent();
           boolean isZooKeeperClientSecure = 
niFiProperties.isZooKeeperClientSecure();
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to