jfrazee commented on a change in pull request #4216:
URL: https://github.com/apache/nifi/pull/4216#discussion_r418709863



##########
File path: 
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/server/ZooKeeperStateServer.java
##########
@@ -198,6 +217,93 @@ public static ZooKeeperStateServer create(final 
NiFiProperties properties) throw
             zkProperties.load(bis);
         }
 
-        return new ZooKeeperStateServer(zkProperties);
+        return new ZooKeeperStateServer(reconcileProperties(properties, 
zkProperties));
+    }
+
+    private static QuorumPeerConfig reconcileProperties(NiFiProperties 
niFiProperties, Properties zkProperties) throws IOException, ConfigException {
+        QuorumPeerConfig peerConfig = new QuorumPeerConfig();
+        peerConfig.parseProperties(zkProperties);
+
+        // If this is an insecure NiFi no changes are needed:
+        if (!niFiProperties.isHTTPSConfigured()) {
+            logger.info("NiFi properties not mapped to ZooKeeper properties 
because NiFi is insecure.");
+            return peerConfig;
+        }
+
+        // Remove HTTP client ports and addresses and warn if set, see 
NIFI-7203:
+        InetSocketAddress clientPort = peerConfig.getClientPortAddress();
+        if (clientPort != null) {
+            zkProperties.remove("clientPort");
+            zkProperties.remove("clientPortAddress");
+            logger.warn("Invalid configuration detected: secure NiFi with 
embedded ZooKeeper configured for unsecured HTTP connections.");
+            logger.warn("Removed HTTP port from embedded ZooKeeper 
configuration to deactivate insecure HTTP connections.");
+        } else {
+            logger.info("Embedded ZooKeeper not configured for unsecured HTTP 
connections.");
+        }
+
+        // Disallow partial TLS configurations for ZK, it's either all or 
nothing to avoid inconsistent setups, see NIFI-7203:
+        final Set<String> zkPropKeys = ZOOKEEPER_TO_NIFI_PROPERTIES.keySet();
+        final int zkConfiguredPropCount = zkPropKeys.stream().mapToInt(key -> 
zkProperties.containsKey(key) ? 1 : 0).sum();
+        if (zkConfiguredPropCount != 0 && zkConfiguredPropCount != 
zkPropKeys.size()) {
+            throw new ConfigException("Embedded ZooKeeper configuration 
incomplete; either all TLS properties must be set or none must be set to avoid 
inconsistent or partial configurations.");
+        }
+
+        // Set HTTPS client port:
+        final InetSocketAddress secureClientAddress = 
peerConfig.getSecureClientPortAddress();
+        final String connectString = 
niFiProperties.getProperty(NiFiProperties.ZOOKEEPER_CONNECT_STRING);
+
+        if (secureClientAddress == null && connectString == null) {
+            final int secureClientPort = 
SocketUtils.findAvailableTcpPort(MIN_AVAILABLE_PORT);
+            zkProperties.setProperty("secureClientPort", 
String.valueOf(secureClientPort));
+            logger.info("Secure client port was not set, found and set 
available port {}", secureClientPort);
+
+        } else if (secureClientAddress != null && connectString == null) {
+            final int secureClientPort = secureClientAddress.getPort();
+            zkProperties.setProperty("secureClientPort", 
String.valueOf(secureClientPort));
+            logger.info("Secure client port set from ZooKeeper configuration, 
set port {}", secureClientPort);
+
+        } else if (secureClientAddress == null) {
+            final InetSocketAddress selectedServerAddress = 
getInetSocketAddress(connectString, "ZK not configured with secure port and 
NiFi ZK client connection string not usable.");
+            final int port = selectedServerAddress.getPort();
+            zkProperties.setProperty("secureClientPort", String.valueOf(port));
+            logger.info("Secure client port set from NiFi ZK connection 
string, set port {}", port);
+
+        } else {
+            final InetSocketAddress selectedServerAddress = 
getInetSocketAddress(connectString, "ZK configured with secure port but NiFi ZK 
client connection string not usable.");
+            if (secureClientAddress.getPort() != 
selectedServerAddress.getPort()) {
+                logger.warn("Potential mismatch between NiFi ZK client 
connection string and embedded ZK server secure port.");
+            } else {
+                logger.info("Matched ZK client connection string {} with 
embedded ZK server secure port: {}", connectString, secureClientAddress);
+            }
+        }
+
+        // If the ZK server connection factory isn't specified, set it to the 
one recommended for TLS:
+        final String cnxnSysKey = 
ServerCnxnFactory.ZOOKEEPER_SERVER_CNXN_FACTORY;
+        final String cnxnPropKey = "serverCnxnFactory";
+        if ((zkProperties.getProperty(cnxnPropKey) == null) && 
System.getProperty(cnxnSysKey) == null) {
+            zkProperties.setProperty(cnxnPropKey, SERVER_CNXN_FACTORY);
+        }
+
+        // Copy the NiFi properties if needed:
+        if (zkConfiguredPropCount == 0) {
+            zkPropKeys.forEach(zkKey -> {
+                final String nifiKey = ZOOKEEPER_TO_NIFI_PROPERTIES.get(zkKey);
+                final String logValue = (zkKey.endsWith(".password") ? 
"********" : niFiProperties.getProperty(nifiKey));
+                zkProperties.setProperty(zkKey, 
niFiProperties.getProperty(nifiKey));
+                logger.info("Mapped NiFi property '{}' to ZooKeeper property 
'{}' with value '{}'", nifiKey, zkKey, logValue);
+            });
+        } else {
+            logger.info("NiFi properties not mapped to ZooKeeper properties, 
all properties already set.");
+        }
+
+        // Recreate and reload the adjusted properties to ensure they're still 
valid for ZK:
+        peerConfig = new QuorumPeerConfig();
+        peerConfig.parseProperties(zkProperties);
+        return peerConfig;
+    }
+
+    private static InetSocketAddress getInetSocketAddress(String 
connectString, String message) throws ConfigException {

Review comment:
       As mentioned I don't think this is doing the right thing for clusters 
because of the `findFirst()`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to