jfrazee commented on a change in pull request #4216:
URL: https://github.com/apache/nifi/pull/4216#discussion_r418709863
##########
File path:
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/server/ZooKeeperStateServer.java
##########
@@ -198,6 +217,93 @@ public static ZooKeeperStateServer create(final
NiFiProperties properties) throw
zkProperties.load(bis);
}
- return new ZooKeeperStateServer(zkProperties);
+ return new ZooKeeperStateServer(reconcileProperties(properties,
zkProperties));
+ }
+
+ private static QuorumPeerConfig reconcileProperties(NiFiProperties
niFiProperties, Properties zkProperties) throws IOException, ConfigException {
+ QuorumPeerConfig peerConfig = new QuorumPeerConfig();
+ peerConfig.parseProperties(zkProperties);
+
+ // If this is an insecure NiFi no changes are needed:
+ if (!niFiProperties.isHTTPSConfigured()) {
+ logger.info("NiFi properties not mapped to ZooKeeper properties
because NiFi is insecure.");
+ return peerConfig;
+ }
+
+ // Remove HTTP client ports and addresses and warn if set, see
NIFI-7203:
+ InetSocketAddress clientPort = peerConfig.getClientPortAddress();
+ if (clientPort != null) {
+ zkProperties.remove("clientPort");
+ zkProperties.remove("clientPortAddress");
+ logger.warn("Invalid configuration detected: secure NiFi with
embedded ZooKeeper configured for unsecured HTTP connections.");
+ logger.warn("Removed HTTP port from embedded ZooKeeper
configuration to deactivate insecure HTTP connections.");
+ } else {
+ logger.info("Embedded ZooKeeper not configured for unsecured HTTP
connections.");
+ }
+
+ // Disallow partial TLS configurations for ZK, it's either all or
nothing to avoid inconsistent setups, see NIFI-7203:
+ final Set<String> zkPropKeys = ZOOKEEPER_TO_NIFI_PROPERTIES.keySet();
+ final int zkConfiguredPropCount = zkPropKeys.stream().mapToInt(key ->
zkProperties.containsKey(key) ? 1 : 0).sum();
+ if (zkConfiguredPropCount != 0 && zkConfiguredPropCount !=
zkPropKeys.size()) {
+ throw new ConfigException("Embedded ZooKeeper configuration
incomplete; either all TLS properties must be set or none must be set to avoid
inconsistent or partial configurations.");
+ }
+
+ // Set HTTPS client port:
+ final InetSocketAddress secureClientAddress =
peerConfig.getSecureClientPortAddress();
+ final String connectString =
niFiProperties.getProperty(NiFiProperties.ZOOKEEPER_CONNECT_STRING);
+
+ if (secureClientAddress == null && connectString == null) {
+ final int secureClientPort =
SocketUtils.findAvailableTcpPort(MIN_AVAILABLE_PORT);
+ zkProperties.setProperty("secureClientPort",
String.valueOf(secureClientPort));
+ logger.info("Secure client port was not set, found and set
available port {}", secureClientPort);
+
+ } else if (secureClientAddress != null && connectString == null) {
+ final int secureClientPort = secureClientAddress.getPort();
+ zkProperties.setProperty("secureClientPort",
String.valueOf(secureClientPort));
+ logger.info("Secure client port set from ZooKeeper configuration,
set port {}", secureClientPort);
+
+ } else if (secureClientAddress == null) {
+ final InetSocketAddress selectedServerAddress =
getInetSocketAddress(connectString, "ZK not configured with secure port and
NiFi ZK client connection string not usable.");
+ final int port = selectedServerAddress.getPort();
+ zkProperties.setProperty("secureClientPort", String.valueOf(port));
+ logger.info("Secure client port set from NiFi ZK connection
string, set port {}", port);
+
+ } else {
+ final InetSocketAddress selectedServerAddress =
getInetSocketAddress(connectString, "ZK configured with secure port but NiFi ZK
client connection string not usable.");
+ if (secureClientAddress.getPort() !=
selectedServerAddress.getPort()) {
+ logger.warn("Potential mismatch between NiFi ZK client
connection string and embedded ZK server secure port.");
+ } else {
+ logger.info("Matched ZK client connection string {} with
embedded ZK server secure port: {}", connectString, secureClientAddress);
+ }
+ }
+
+ // If the ZK server connection factory isn't specified, set it to the
one recommended for TLS:
+ final String cnxnSysKey =
ServerCnxnFactory.ZOOKEEPER_SERVER_CNXN_FACTORY;
+ final String cnxnPropKey = "serverCnxnFactory";
+ if ((zkProperties.getProperty(cnxnPropKey) == null) &&
System.getProperty(cnxnSysKey) == null) {
+ zkProperties.setProperty(cnxnPropKey, SERVER_CNXN_FACTORY);
+ }
+
+ // Copy the NiFi properties if needed:
+ if (zkConfiguredPropCount == 0) {
+ zkPropKeys.forEach(zkKey -> {
+ final String nifiKey = ZOOKEEPER_TO_NIFI_PROPERTIES.get(zkKey);
+ final String logValue = (zkKey.endsWith(".password") ?
"********" : niFiProperties.getProperty(nifiKey));
+ zkProperties.setProperty(zkKey,
niFiProperties.getProperty(nifiKey));
+ logger.info("Mapped NiFi property '{}' to ZooKeeper property
'{}' with value '{}'", nifiKey, zkKey, logValue);
+ });
+ } else {
+ logger.info("NiFi properties not mapped to ZooKeeper properties,
all properties already set.");
+ }
+
+ // Recreate and reload the adjusted properties to ensure they're still
valid for ZK:
+ peerConfig = new QuorumPeerConfig();
+ peerConfig.parseProperties(zkProperties);
+ return peerConfig;
+ }
+
+ private static InetSocketAddress getInetSocketAddress(String
connectString, String message) throws ConfigException {
Review comment:
As mentioned I don't think this is doing the right thing for clusters
because of the `findFirst()`.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]