This is an automated email from the ASF dual-hosted git repository.
rzo1 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/storm.git
The following commit(s) were added to refs/heads/master by this push:
new 4508edcc1 STORM-4075 Supprt mTLS between Storm and ZK (#3692)
4508edcc1 is described below
commit 4508edcc1655df22bfdf1f52ad83cc3a45486b3c
Author: puru <[email protected]>
AuthorDate: Mon Sep 30 19:22:42 2024 -0700
STORM-4075 Supprt mTLS between Storm and ZK (#3692)
* STORM-4075 Supprt mTLS between Storm and ZK
* STORM-4075 Supprt mTLS between Storm and ZK - review comments
---------
Co-authored-by: purshotam shah <[email protected]>
---
conf/defaults.yaml | 2 +
storm-client/pom.xml | 5 +
storm-client/src/jvm/org/apache/storm/Config.java | 19 +++
.../jvm/org/apache/storm/utils/CuratorUtils.java | 118 +++++++++++++
.../org/apache/storm/utils/CuratorUtilsTest.java | 183 +++++++++++++++++++++
5 files changed, 327 insertions(+)
diff --git a/conf/defaults.yaml b/conf/defaults.yaml
index 1272d7910..436809972 100644
--- a/conf/defaults.yaml
+++ b/conf/defaults.yaml
@@ -35,6 +35,8 @@ storm.zookeeper.retry.interval: 1000
storm.zookeeper.retry.intervalceiling.millis: 30000
storm.zookeeper.auth.user: null
storm.zookeeper.auth.password: null
+storm.zookeeper.ssl.enable: false
+storm.zookeeper.ssl.hostnameVerification: false
storm.cluster.mode: "distributed" # can be distributed or local
storm.local.mode.zmq: false
storm.thrift.transport: "org.apache.storm.security.auth.SimpleTransportPlugin"
diff --git a/storm-client/pom.xml b/storm-client/pom.xml
index 15c08bc6c..04f351de0 100644
--- a/storm-client/pom.xml
+++ b/storm-client/pom.xml
@@ -128,6 +128,11 @@
<artifactId>mockito-junit-jupiter</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.curator</groupId>
+ <artifactId>curator-test</artifactId>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<build>
diff --git a/storm-client/src/jvm/org/apache/storm/Config.java
b/storm-client/src/jvm/org/apache/storm/Config.java
index 5b17a9c42..a098e6cb6 100644
--- a/storm-client/src/jvm/org/apache/storm/Config.java
+++ b/storm-client/src/jvm/org/apache/storm/Config.java
@@ -1419,6 +1419,25 @@ public class Config extends HashMap<String, Object> {
*/
@IsString
public static final String STORM_ZOOKEEPER_TOPOLOGY_AUTH_SCHEME =
"storm.zookeeper.topology.auth.scheme";
+
+ /** Enable SSL/TLS for ZooKeeper client connection. */
+ @IsBoolean
+ public static final String ZK_SSL_ENABLE = "storm.zookeeper.ssl.enable";
+ /** Keystore location for ZooKeeper client connection over SSL. */
+ @IsString
+ public static final String STORM_ZOOKEEPER_SSL_KEYSTORE_PATH =
"storm.zookeeper.ssl.keystore.path";
+ /** Keystore password for ZooKeeper client connection over SSL. */
+ @IsString
+ public static final String STORM_ZOOKEEPER_SSL_KEYSTORE_PASSWORD =
"storm.zookeeper.ssl.keystore.password";
+ /** Truststore location for ZooKeeper client connection over SSL. */
+ @IsString
+ public static final String STORM_ZOOKEEPER_SSL_TRUSTSTORE_PATH =
"storm.zookeeper.ssl.truststore.path";
+ /** Truststore password for ZooKeeper client connection over SSL. */
+ @IsString
+ public static final String STORM_ZOOKEEPER_SSL_TRUSTSTORE_PASSWORD =
"storm.zookeeper.ssl.truststore.password";
+ /** Enable or disable hostname verification.*/
+ @IsBoolean
+ public static final String STORM_ZOOKEEPER_SSL_HOSTNAME_VERIFICATION =
"storm.zookeeper.ssl.hostnameVerification";
/**
* The delegate for serializing metadata, should be used for serialized
objects stored in zookeeper and on disk. This is NOT used for
* compressing serialized tuples sent between topologies.
diff --git a/storm-client/src/jvm/org/apache/storm/utils/CuratorUtils.java
b/storm-client/src/jvm/org/apache/storm/utils/CuratorUtils.java
index 843b26f77..867ddf85b 100644
--- a/storm-client/src/jvm/org/apache/storm/utils/CuratorUtils.java
+++ b/storm-client/src/jvm/org/apache/storm/utils/CuratorUtils.java
@@ -21,17 +21,22 @@ package org.apache.storm.utils;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
+import javax.naming.ConfigurationException;
import org.apache.storm.Config;
import org.apache.storm.shade.org.apache.commons.lang.StringUtils;
import org.apache.storm.shade.org.apache.curator.framework.CuratorFramework;
import
org.apache.storm.shade.org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.storm.shade.org.apache.curator.framework.api.ACLProvider;
+import org.apache.storm.shade.org.apache.zookeeper.client.ZKClientConfig;
+import org.apache.storm.shade.org.apache.zookeeper.common.ClientX509Util;
import org.apache.storm.shade.org.apache.zookeeper.data.ACL;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class CuratorUtils {
public static final Logger LOG =
LoggerFactory.getLogger(CuratorUtils.class);
+ public static final String CLIENT_CNXN
+ =
org.apache.storm.shade.org.apache.zookeeper.ClientCnxnSocketNetty.class.getName();
public static CuratorFramework newCurator(Map<String, Object> conf,
List<String> servers, Object port, String root,
List<ACL> defaultAcl) {
@@ -84,6 +89,119 @@ public class CuratorUtils {
if (auth != null && auth.scheme != null && auth.payload != null) {
builder.authorization(auth.scheme, auth.payload);
}
+ boolean sslEnabled =
ObjectReader.getBoolean(conf.get(Config.ZK_SSL_ENABLE), false);
+ if (sslEnabled) {
+ SslConf sslConf = new SslConf(conf);
+ ZKClientConfig zkClientConfig = new ZKClientConfig();
+ try {
+ setSslConfiguration(zkClientConfig, new ClientX509Util(),
sslConf);
+ } catch (ConfigurationException e) {
+ throw new RuntimeException(e);
+ }
+ builder.zkClientConfig(zkClientConfig);
+ }
+ }
+
+ /**
+ * Configure ZooKeeper Client with SSL/TLS connection.
+ * @param zkClientConfig ZooKeeper Client configuration
+ * @param x509Util The X509 utility
+ * @param sslConf The truststore and keystore configs
+ */
+ private static void setSslConfiguration(ZKClientConfig zkClientConfig,
+ ClientX509Util x509Util, SslConf
sslConf)
+ throws ConfigurationException {
+ validateSslConfiguration(sslConf);
+ LOG.info("Configuring the ZooKeeper client to use SSL/TLS encryption
for connecting to the "
+ + "ZooKeeper server.");
+ LOG.debug("Configuring the ZooKeeper client with {} location: {}.",
+ sslConf.keystoreLocation,
+ Config.STORM_ZOOKEEPER_SSL_KEYSTORE_PATH);
+ LOG.debug("Configuring the ZooKeeper client with {} location: {}.",
+ sslConf.truststoreLocation,
+ Config.STORM_ZOOKEEPER_SSL_TRUSTSTORE_PATH);
+
+ zkClientConfig.setProperty(ZKClientConfig.SECURE_CLIENT, "true");
+ zkClientConfig.setProperty(ZKClientConfig.ZOOKEEPER_CLIENT_CNXN_SOCKET,
+ CLIENT_CNXN);
+ zkClientConfig.setProperty(x509Util.getSslKeystoreLocationProperty(),
+ sslConf.keystoreLocation);
+ zkClientConfig.setProperty(x509Util.getSslKeystorePasswdProperty(),
+ sslConf.keystorePassword);
+ zkClientConfig.setProperty(x509Util.getSslTruststoreLocationProperty(),
+ sslConf.truststoreLocation);
+ zkClientConfig.setProperty(x509Util.getSslTruststorePasswdProperty(),
+ sslConf.truststorePassword);
+
zkClientConfig.setProperty(x509Util.getSslHostnameVerificationEnabledProperty(),
+ sslConf.hostnameVerification.toString());
+ }
+
+ private static void validateSslConfiguration(SslConf sslConf) throws
ConfigurationException {
+ if (StringUtils.isEmpty(sslConf.getKeystoreLocation())) {
+ throw new ConfigurationException(
+ "The keystore location parameter is empty for the
ZooKeeper client connection.");
+ }
+ if (StringUtils.isEmpty(sslConf.getKeystorePassword())) {
+ throw new ConfigurationException(
+ "The keystore password parameter is empty for the
ZooKeeper client connection.");
+ }
+ if (StringUtils.isEmpty(sslConf.getTruststoreLocation())) {
+ throw new ConfigurationException(
+ "The truststore location parameter is empty for the
ZooKeeper client connection" + ".");
+ }
+ if (StringUtils.isEmpty(sslConf.getTruststorePassword())) {
+ throw new ConfigurationException(
+ "The truststore password parameter is empty for the
ZooKeeper client connection" + ".");
+ }
+ }
+
+ public static SslConf getSslConf(Map<String, Object> conf) {
+ return new SslConf(conf);
+ }
+ /**
+ * Helper class to contain the Truststore/Keystore paths for the ZK client
connection over
+ * SSL/TLS.
+ */
+
+ static final class SslConf {
+ private final String keystoreLocation;
+ private final String keystorePassword;
+ private final String truststoreLocation;
+ private final String truststorePassword;
+ private final Boolean hostnameVerification;
+
+ /**
+ * Configuration for the ZooKeeper connection when SSL/TLS is enabled.
+ *
+ * @param conf configuration map
+ */
+ private SslConf(Map<String, Object> conf) {
+ keystoreLocation =
ObjectReader.getString(conf.get(Config.STORM_ZOOKEEPER_SSL_KEYSTORE_PATH), "");
+ keystorePassword =
ObjectReader.getString(conf.get(Config.STORM_ZOOKEEPER_SSL_KEYSTORE_PASSWORD),
"");
+ truststoreLocation =
ObjectReader.getString(conf.get(Config.STORM_ZOOKEEPER_SSL_TRUSTSTORE_PATH),
"");
+ truststorePassword =
ObjectReader.getString(conf.get(Config.STORM_ZOOKEEPER_SSL_TRUSTSTORE_PASSWORD),
"");
+ hostnameVerification =
ObjectReader.getBoolean(conf.get(Config.STORM_ZOOKEEPER_SSL_HOSTNAME_VERIFICATION),
true);
+ }
+
+ public String getKeystoreLocation() {
+ return keystoreLocation;
+ }
+
+ public String getKeystorePassword() {
+ return keystorePassword;
+ }
+
+ public String getTruststoreLocation() {
+ return truststoreLocation;
+ }
+
+ public String getTruststorePassword() {
+ return truststorePassword;
+ }
+
+ public Boolean getHostnameVerification() {
+ return hostnameVerification;
+ }
}
public static void testSetupBuilder(CuratorFrameworkFactory.Builder
diff --git a/storm-client/test/jvm/org/apache/storm/utils/CuratorUtilsTest.java
b/storm-client/test/jvm/org/apache/storm/utils/CuratorUtilsTest.java
index daef2886e..68f730fc6 100644
--- a/storm-client/test/jvm/org/apache/storm/utils/CuratorUtilsTest.java
+++ b/storm-client/test/jvm/org/apache/storm/utils/CuratorUtilsTest.java
@@ -18,22 +18,46 @@
package org.apache.storm.utils;
+import java.io.File;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+
+import org.apache.curator.test.InstanceSpec;
import org.apache.storm.Config;
import org.apache.storm.cluster.DaemonType;
import
org.apache.storm.shade.org.apache.curator.ensemble.fixed.FixedEnsembleProvider;
import org.apache.storm.shade.org.apache.curator.framework.AuthInfo;
import org.apache.storm.shade.org.apache.curator.framework.CuratorFramework;
import
org.apache.storm.shade.org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.storm.shade.org.apache.zookeeper.ClientCnxnSocketNetty;
+import org.apache.storm.shade.org.apache.zookeeper.ZooKeeper;
+import org.apache.storm.shade.org.apache.zookeeper.client.ZKClientConfig;
+import org.apache.storm.shade.org.apache.zookeeper.common.ClientX509Util;
import org.junit.jupiter.api.Test;
+import org.apache.curator.test.TestingServer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
public class CuratorUtilsTest {
+ private static final Logger LOG =
LoggerFactory.getLogger(CuratorUtilsTest.class);
+ private static final int JUTE_MAXBUFFER = 400000000;
+ private static final int SECURE_CLIENT_PORT = 6065;
+ private static final boolean DELETE_DATA_DIRECTORY_ON_CLOSE = true;
+ static final File ZK_DATA_DIR = new
File("testZkSSLClientConnectionDataDir");
+ private static final int SERVER_ID = 1;
+ private static final int TICK_TIME = 100;
+ private static final int MAX_CLIENT_CNXNS = 10;
+ private static final int ELECTION_PORT = -1;
+ private static final int QUORUM_PORT = -1;
+
+ private TestingServer server;
+
@Test
public void newCuratorUsesExponentialBackoffTest() {
final int expectedInterval = 2400;
@@ -86,4 +110,163 @@ public class CuratorUtilsTest {
CuratorUtils.testSetupBuilder(builder, zkStr, conf, auth);
return builder;
}
+
+ /**
+ * A method to configure the test ZK server to accept secure client
connection.
+ * The self-signed certificates were generated for testing purposes as
described below.
+ * For the ZK client to connect with the ZK server, the ZK server's
keystore and truststore
+ * should be used.
+ * For testing purposes the keystore and truststore were generated using
default values.
+ * 1. to generate the keystore.jks file:
+ * # keytool -genkey -alias mockcert -keyalg RSA -keystore keystore.jks
-keysize 2048
+ * 2. generate the ca-cert and the ca-key:
+ * # openssl req -new -x509 -keyout ca-key -out ca-cert
+ * 3. to generate the certificate signing request (cert-file):
+ * # keytool -keystore keystore.jks -alias mockcert -certreq -file
certificate-request
+ * 4. to generate the ca-cert.srl file and make the cert valid for 10
years:
+ * # openssl x509 -req -CA ca-cert -CAkey ca-key -in certificate-request
-out cert-signed
+ * -days 3650 -CAcreateserial -passin pass:password
+ * 5. add the ca-cert to the keystore.jks:
+ * # keytool -keystore keystore.jks -alias mockca -import -file ca-cert
+ * 6. install the signed certificate to the keystore:
+ * # keytool -keystore keystore.jks -alias mockcert -import -file
cert-signed
+ * 7. add the certificate to the truststore:
+ * # keytool -keystore truststore.jks -alias mockcert -import -file ca-cert
+ * For our purpose, we only need the end result of this process: the
keystore.jks and the
+ * truststore.jks files.
+ *
+ * @return conf The method returns the updated Configuration.
+ */
+ public Map<String, Object> setUpSecureConfig(String testDataPath) throws
Exception {
+ System.setProperty("zookeeper.ssl.keyStore.location", testDataPath +
"testKeyStore.jks");
+ System.setProperty("zookeeper.ssl.keyStore.password", "testpass");
+ System.setProperty("zookeeper.ssl.trustStore.location", testDataPath +
"testTrustStore.jks");
+ System.setProperty("zookeeper.ssl.trustStore.password", "testpass");
+ System.setProperty("zookeeper.request.timeout", "12345");
+ System.setProperty("zookeeper.serverCnxnFactory",
"org.apache.zookeeper.server.NettyServerCnxnFactory");
+ System.setProperty("jute.maxbuffer", String.valueOf(JUTE_MAXBUFFER));
+
+ System.setProperty("javax.net.debug", "ssl");
+ System.setProperty("zookeeper.authProvider.x509",
+ "org.apache.zookeeper.server.auth" +
".X509AuthenticationProvider");
+
+ // inject values to the ZK configuration file for secure connection
+ Map<String, Object> customConfiguration = new HashMap<>();
+ customConfiguration.put("secureClientPort",
String.valueOf(SECURE_CLIENT_PORT));
+ customConfiguration.put("audit.enable", "true");
+ InstanceSpec spec =
+ new InstanceSpec(ZK_DATA_DIR, SECURE_CLIENT_PORT,
ELECTION_PORT, QUORUM_PORT,
+ DELETE_DATA_DIRECTORY_ON_CLOSE, SERVER_ID, TICK_TIME,
MAX_CLIENT_CNXNS,
+ customConfiguration);
+
+ this.server = new TestingServer(spec, false);
+ Map<String, Object> conf = new HashMap<>();
+ conf.put(Config.STORM_ZOOKEEPER_SSL_KEYSTORE_PATH,
+ testDataPath + "testKeyStore.jks");
+ conf.put(Config.STORM_ZOOKEEPER_SSL_KEYSTORE_PASSWORD, "testpass");
+ conf.put(Config.STORM_ZOOKEEPER_SSL_TRUSTSTORE_PATH,
+ testDataPath + "testTrustStore.jks");
+ conf.put(Config.STORM_ZOOKEEPER_SSL_TRUSTSTORE_PASSWORD, "testpass");
+ conf.put(Config.STORM_ZOOKEEPER_SSL_HOSTNAME_VERIFICATION, false);
+ return conf;
+ }
+
+ @Test
+ public void testSecureZKConfiguration() throws Exception {
+ LOG.info("Entered to the testSecureZKConfiguration test case.");
+ Map<String, Object> conf = setUpSecureConfig("test/resources/ssl/");
+ conf.put(Config.STORM_ZOOKEEPER_CONNECTION_TIMEOUT, 0);
+ conf.put(Config.STORM_ZOOKEEPER_SESSION_TIMEOUT, 0);
+ conf.put(Config.STORM_ZOOKEEPER_RETRY_INTERVAL, 0);
+ conf.put(Config.STORM_ZOOKEEPER_RETRY_INTERVAL_CEILING, 0);
+ conf.put(Config.STORM_ZOOKEEPER_RETRY_TIMES, 0);
+ conf.put(Config.ZK_SSL_ENABLE, true);
+ String zkStr = this.server.getConnectString();
+ CuratorFrameworkFactory.Builder builder =
CuratorFrameworkFactory.builder();
+ CuratorUtils.testSetupBuilder(builder, zkStr, conf, null);
+ CuratorFramework curatorFramework = builder.build();
+ curatorFramework.start();
+ ZooKeeper zk = curatorFramework.getZookeeperClient().getZooKeeper();
+
validateSSLConfiguration(ObjectReader.getString(conf.get(Config.STORM_ZOOKEEPER_SSL_KEYSTORE_PATH)),
+
ObjectReader.getString(conf.get(Config.STORM_ZOOKEEPER_SSL_KEYSTORE_PASSWORD)),
+
ObjectReader.getString(conf.get(Config.STORM_ZOOKEEPER_SSL_TRUSTSTORE_PATH)),
+
ObjectReader.getString(conf.get(Config.STORM_ZOOKEEPER_SSL_TRUSTSTORE_PASSWORD)),
+
ObjectReader.getBoolean(conf.get(Config.STORM_ZOOKEEPER_SSL_HOSTNAME_VERIFICATION),
true),
+ zk);
+ this.server.close();
+ }
+
+ private void validateSSLConfiguration(String keystoreLocation, String
keystorePassword,
+ String truststoreLocation, String
truststorePassword,
+ Boolean hostNameVerification,
ZooKeeper zk) {
+ try (ClientX509Util x509Util = new ClientX509Util()) {
+ //testing if custom values are set properly
+ assertEquals(keystoreLocation,
+
zk.getClientConfig().getProperty(x509Util.getSslKeystoreLocationProperty())
+ , "Validate that expected clientConfig is set in ZK
config");
+ assertEquals(keystorePassword,
+
zk.getClientConfig().getProperty(x509Util.getSslKeystorePasswdProperty())
+ , "Validate that expected clientConfig is set in ZK config");
+ assertEquals(truststoreLocation,
+
zk.getClientConfig().getProperty(x509Util.getSslTruststoreLocationProperty())
+ , "Validate that expected clientConfig is set in ZK config");
+ assertEquals(truststorePassword,
+
zk.getClientConfig().getProperty(x509Util.getSslTruststorePasswdProperty())
+ , "Validate that expected clientConfig is set in ZK config");
+ assertEquals(hostNameVerification.toString(),
+
zk.getClientConfig().getProperty(x509Util.getSslHostnameVerificationEnabledProperty())
+ , "Validate that expected clientConfig is set in ZK
config");
+ }
+ //testing if constant values hardcoded into the code are set properly
+ assertEquals(Boolean.TRUE.toString(),
+ zk.getClientConfig().getProperty(ZKClientConfig.SECURE_CLIENT)
+ , "Validate that expected clientConfig is set in ZK config");
+ assertEquals(ClientCnxnSocketNetty.class.getCanonicalName(),
+
zk.getClientConfig().getProperty(ZKClientConfig.ZOOKEEPER_CLIENT_CNXN_SOCKET)
+ , "Validate that expected clientConfig is set in ZK config");
+ }
+
+ @Test
+ public void testTruststoreKeystoreConfiguration() {
+ LOG.info("Entered to the testTruststoreKeystoreConfiguration test
case.");
+ /*
+ By default the truststore/keystore configurations are not set, hence the
values are null.
+ Validate that the null values are converted into empty strings by the
class.
+ */
+ Map<String, Object> conf = new HashMap<>();
+ CuratorUtils.SslConf zkSslConf =
+ CuratorUtils.getSslConf(conf);
+
+ assertEquals("",
+ zkSslConf.getKeystoreLocation(), "Validate that null value is
converted to empty string.");
+ assertEquals("",
+ zkSslConf.getKeystorePassword(), "Validate that null value is
converted to empty string.");
+ assertEquals("",
+ zkSslConf.getTruststoreLocation(), "Validate that null value
is converted to empty string.");
+ assertEquals("",
+ zkSslConf.getTruststorePassword(), "Validate that null value
is converted to empty string.");
+ assertEquals(true,
+ zkSslConf.getHostnameVerification(), "Validate that null value
is converted to false.");
+
+
+ //Validate that non-null values will remain intact
+ conf.put(Config.STORM_ZOOKEEPER_SSL_KEYSTORE_PATH, "/keystore.jks");
+ conf.put(Config.STORM_ZOOKEEPER_SSL_KEYSTORE_PASSWORD,
"keystorePassword");
+ conf.put(Config.STORM_ZOOKEEPER_SSL_TRUSTSTORE_PATH,
"/truststore.jks");
+ conf.put(Config.STORM_ZOOKEEPER_SSL_TRUSTSTORE_PASSWORD,
"truststorePassword");
+ conf.put(Config.STORM_ZOOKEEPER_SSL_HOSTNAME_VERIFICATION, false);
+
+ zkSslConf =
+ CuratorUtils.getSslConf(conf);
+ assertEquals("/keystore.jks",
+ zkSslConf.getKeystoreLocation(), "Validate that non-null value
kept intact.");
+ assertEquals("keystorePassword",
+ zkSslConf.getKeystorePassword(), "Validate that non-null value
kept intact.");
+ assertEquals("/truststore.jks",
+ zkSslConf.getTruststoreLocation(), "Validate that non-null
value kept intact.");
+ assertEquals("truststorePassword",
+ zkSslConf.getTruststorePassword(), "Validate that non-null
value kept intact.");
+ assertEquals(false,
+ zkSslConf.getHostnameVerification(), "Validate that non-null
value kept intact.");
+ }
}