This is an automated email from the ASF dual-hosted git repository.
snlee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 19e74e80a8 Added dynamic SSL initialization support for the Kafka
client (#12249)
19e74e80a8 is described below
commit 19e74e80a825140a515eb9161d543b47d6870a8d
Author: Ragesh Rajagopalan <[email protected]>
AuthorDate: Tue Jan 16 09:34:35 2024 -0800
Added dynamic SSL initialization support for the Kafka client (#12249)
---
.../pinot-stream-ingestion/pinot-kafka-2.0/pom.xml | 13 +
.../KafkaPartitionLevelConnectionHandler.java | 1 +
.../pinot/plugin/stream/kafka20/KafkaSSLUtils.java | 339 +++++++++++++++++++++
.../plugin/stream/kafka20/KafkaSSLUtilsTest.java | 310 +++++++++++++++++++
4 files changed, 663 insertions(+)
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/pom.xml
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/pom.xml
index bd2ac3c5e0..7ee33c3b7a 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/pom.xml
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/pom.xml
@@ -37,6 +37,7 @@
<pinot.root>${basedir}/../../..</pinot.root>
<kafka.lib.version>2.8.1</kafka.lib.version>
<phase.prop>package</phase.prop>
+ <bouncycastle.version>1.70</bouncycastle.version>
</properties>
<dependencies>
@@ -111,5 +112,17 @@
<artifactId>metrics-core</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.bouncycastle</groupId>
+ <artifactId>bcprov-jdk15on</artifactId>
+ <version>${bouncycastle.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.bouncycastle</groupId>
+ <artifactId>bcpkix-jdk15on</artifactId>
+ <version>${bouncycastle.version}</version>
+ <scope>test</scope>
+ </dependency>
</dependencies>
</project>
diff --git
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConnectionHandler.java
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConnectionHandler.java
index 00371acaba..f0512cc8b3 100644
---
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConnectionHandler.java
+++
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConnectionHandler.java
@@ -67,6 +67,7 @@ public abstract class KafkaPartitionLevelConnectionHandler {
consumerProp.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG,
_config.getKafkaIsolationLevel());
}
consumerProp.put(ConsumerConfig.CLIENT_ID_CONFIG, _clientId);
+ KafkaSSLUtils.initSSL(consumerProp);
_consumer = createConsumer(consumerProp);
_topicPartition = new TopicPartition(_topic, _partition);
_consumer.assign(Collections.singletonList(_topicPartition));
diff --git
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaSSLUtils.java
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaSSLUtils.java
new file mode 100644
index 0000000000..63f285b785
--- /dev/null
+++
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaSSLUtils.java
@@ -0,0 +1,339 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.stream.kafka20;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.io.ByteArrayInputStream;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.file.FileAlreadyExistsException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.security.KeyFactory;
+import java.security.KeyStore;
+import java.security.PrivateKey;
+import java.security.cert.Certificate;
+import java.security.cert.CertificateFactory;
+import java.security.spec.PKCS8EncodedKeySpec;
+import java.util.Arrays;
+import java.util.Base64;
+import java.util.Properties;
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * SSL utils class which helps in initialization of Kafka client SSL
configuration. The class can install the
+ * provided server certificate enabling one-way SSL or it can install the
server certificate and the
+ * client certificates enabling two-way SSL.
+ */
+public class KafkaSSLUtils {
+
+ private KafkaSSLUtils() {
+ // private on purpose
+ }
+ private static final Logger LOGGER =
LoggerFactory.getLogger(KafkaSSLUtils.class);
+ // Value constants
+ private static final String DEFAULT_CERTIFICATE_TYPE = "X.509";
+ private static final String DEFAULT_KEY_ALGORITHM = "RSA";
+ private static final String DEFAULT_KEYSTORE_TYPE = "PKCS12";
+ private static final String DEFAULT_SECURITY_PROTOCOL = "SSL";
+ private static final String DEFAULT_TRUSTSTORE_TYPE = "jks";
+ private static final String DEFAULT_SERVER_ALIAS = "ServerAlias";
+ private static final String DEFAULT_CLIENT_ALIAS = "ClientAlias";
+ // Key constants
+ private static final String SSL_TRUSTSTORE_LOCATION =
"ssl.truststore.location";
+ private static final String SSL_TRUSTSTORE_PASSWORD =
"ssl.truststore.password";
+ private static final String SECURITY_PROTOCOL = "security.protocol";
+ private static final String SSL_KEYSTORE_LOCATION = "ssl.keystore.location";
+ private static final String SSL_KEYSTORE_PASSWORD = "ssl.keystore.password";
+ private static final String SSL_KEY_PASSWORD = "ssl.key.password";
+ private static final String STREAM_KAFKA_SSL_SERVER_CERTIFICATE =
"stream.kafka.ssl.server.certificate";
+ private static final String STREAM_KAFKA_SSL_CERTIFICATE_TYPE =
"stream.kafka.ssl.certificate.type";
+ private static final String SSL_TRUSTSTORE_TYPE = "ssl.truststore.type";
+ private static final String STREAM_KAFKA_SSL_CLIENT_CERTIFICATE =
"stream.kafka.ssl.client.certificate";
+ private static final String STREAM_KAFKA_SSL_CLIENT_KEY =
"stream.kafka.ssl.client.key";
+ private static final String STREAM_KAFKA_SSL_CLIENT_KEY_ALGORITHM =
"stream.kafka.ssl.client.key.algorithm";
+ private static final String SSL_KEYSTORE_TYPE = "ssl.keystore.type";
+
+ public static void initSSL(Properties consumerProps) {
+ // Check if one-way SSL is enabled. In this scenario, the client validates
the server certificate.
+ String trustStoreLocation =
consumerProps.getProperty(SSL_TRUSTSTORE_LOCATION);
+ String trustStorePassword =
consumerProps.getProperty(SSL_TRUSTSTORE_PASSWORD);
+ String serverCertificate =
consumerProps.getProperty(STREAM_KAFKA_SSL_SERVER_CERTIFICATE);
+ if (StringUtils.isAnyEmpty(trustStoreLocation, trustStorePassword,
serverCertificate)) {
+ LOGGER.info("Skipping auto SSL server validation since it's not
configured.");
+ return;
+ }
+ if (shouldRenewTrustStore(consumerProps)) {
+ initTrustStore(consumerProps);
+ }
+
+ // Set the security protocol
+ String securityProtocol = consumerProps.getProperty(SECURITY_PROTOCOL,
DEFAULT_SECURITY_PROTOCOL);
+ consumerProps.setProperty(SECURITY_PROTOCOL, securityProtocol);
+
+ // Check if two-way SSL is enabled. In this scenario, the client validates
the server's certificate and the server
+ // validates the client's certificate.
+ String keyStoreLocation = consumerProps.getProperty(SSL_KEYSTORE_LOCATION);
+ String keyStorePassword = consumerProps.getProperty(SSL_KEYSTORE_PASSWORD);
+ String keyPassword = consumerProps.getProperty(SSL_KEY_PASSWORD);
+ String clientCertificate =
consumerProps.getProperty(STREAM_KAFKA_SSL_CLIENT_CERTIFICATE);
+ if (StringUtils.isAnyEmpty(keyStoreLocation, keyStorePassword,
keyPassword, clientCertificate)) {
+ LOGGER.info("Skipping auto SSL client validation since it's not
configured.");
+ return;
+ }
+ if (shouldRenewKeyStore(consumerProps)) {
+ initKeyStore(consumerProps);
+ }
+ }
+
+ @VisibleForTesting
+ static void initTrustStore(Properties consumerProps) {
+ Path trustStorePath = getTrustStorePath(consumerProps);
+ if (Files.exists(trustStorePath)) {
+ deleteFile(trustStorePath);
+ }
+ LOGGER.info("Initializing the SSL trust store");
+ try {
+ // Create the trust store path
+ createFile(trustStorePath);
+ } catch (FileAlreadyExistsException fex) {
+ LOGGER.warn("SSL trust store initialization failed as trust store
already exists.");
+ return;
+ } catch (IOException iex) {
+ throw new RuntimeException(String.format("Failed to create the trust
store path: %s", trustStorePath), iex);
+ }
+
+ try {
+ String trustStorePassword =
consumerProps.getProperty(SSL_TRUSTSTORE_PASSWORD);
+ String serverCertificate =
consumerProps.getProperty(STREAM_KAFKA_SSL_SERVER_CERTIFICATE);
+ String certificateType =
consumerProps.getProperty(STREAM_KAFKA_SSL_CERTIFICATE_TYPE,
DEFAULT_CERTIFICATE_TYPE);
+ String trustStoreType = consumerProps.getProperty(SSL_TRUSTSTORE_TYPE,
DEFAULT_TRUSTSTORE_TYPE);
+ consumerProps.setProperty(SSL_TRUSTSTORE_TYPE, trustStoreType);
+
+ // Decode the Base64 string
+ byte[] certBytes = Base64.getDecoder().decode(serverCertificate);
+ InputStream certInputStream = new ByteArrayInputStream(certBytes);
+
+ // Create a Certificate object
+ CertificateFactory certificateFactory =
CertificateFactory.getInstance(certificateType);
+ Certificate certificate =
certificateFactory.generateCertificate(certInputStream);
+
+ // Create a TrustStore and load the default TrustStore
+ KeyStore trustStore = KeyStore.getInstance(trustStoreType);
+
+ // Initialize the TrustStore
+ trustStore.load(null, null);
+
+ // Add the server certificate to the truststore
+ trustStore.setCertificateEntry(DEFAULT_SERVER_ALIAS, certificate);
+
+ // Save the keystore to a file
+ try (FileOutputStream fos = new
FileOutputStream(trustStorePath.toString())) {
+ trustStore.store(fos, trustStorePassword.toCharArray());
+ }
+ LOGGER.info("Initialized the SSL trust store.");
+ } catch (Exception ex) {
+ throw new RuntimeException("Error initializing the SSL trust store", ex);
+ }
+ }
+
+ @VisibleForTesting
+ static void initKeyStore(Properties consumerProps) {
+ Path keyStorePath = getKeyStorePath(consumerProps);
+ if (Files.exists(keyStorePath)) {
+ deleteFile(keyStorePath);
+ }
+ LOGGER.info("Initializing the SSL key store");
+ try {
+ // Create the key store path
+ createFile(keyStorePath);
+ } catch (FileAlreadyExistsException fex) {
+ LOGGER.warn("SSL key store initialization failed as key store already
exists.");
+ return;
+ } catch (IOException iex) {
+ throw new RuntimeException(String.format("Failed to create the key store
path: %s", keyStorePath), iex);
+ }
+
+ String keyStorePassword = consumerProps.getProperty(SSL_KEYSTORE_PASSWORD);
+ String keyPassword = consumerProps.getProperty(SSL_KEY_PASSWORD);
+ String clientCertificate =
consumerProps.getProperty(STREAM_KAFKA_SSL_CLIENT_CERTIFICATE);
+ String certificateType =
consumerProps.getProperty(STREAM_KAFKA_SSL_CERTIFICATE_TYPE,
DEFAULT_CERTIFICATE_TYPE);
+ String privateKeyString =
consumerProps.getProperty(STREAM_KAFKA_SSL_CLIENT_KEY);
+ String privateKeyAlgorithm =
consumerProps.getProperty(STREAM_KAFKA_SSL_CLIENT_KEY_ALGORITHM,
+ DEFAULT_KEY_ALGORITHM);
+ String keyStoreType = consumerProps.getProperty(SSL_KEYSTORE_TYPE,
DEFAULT_KEYSTORE_TYPE);
+ consumerProps.setProperty(SSL_KEYSTORE_TYPE, keyStoreType);
+
+ try {
+ // decode the private key and certificate into bytes
+ byte[] pkBytes = Base64.getDecoder().decode(privateKeyString);
+ byte[] certBytes = Base64.getDecoder().decode(clientCertificate);
+
+ // Create the private key object
+ PKCS8EncodedKeySpec keySpec = new PKCS8EncodedKeySpec(pkBytes);
+ KeyFactory keyFactory = KeyFactory.getInstance(privateKeyAlgorithm);
+ PrivateKey privateKey = keyFactory.generatePrivate(keySpec);
+
+ // Create the Certificate object
+ CertificateFactory certFactory =
CertificateFactory.getInstance(certificateType);
+ InputStream certInputStream = new ByteArrayInputStream(certBytes);
+ Certificate certificate =
certFactory.generateCertificate(certInputStream);
+
+ // Create a KeyStore object and load a new empty keystore
+ KeyStore keyStore = KeyStore.getInstance(keyStoreType);
+ keyStore.load(null, null);
+
+ // Add the key pair and certificate to the keystore
+ KeyStore.PrivateKeyEntry privateKeyEntry = new KeyStore.PrivateKeyEntry(
+ privateKey, new Certificate[]{certificate}
+ );
+ KeyStore.PasswordProtection keyPasswordProtection = new
KeyStore.PasswordProtection(keyPassword.toCharArray());
+ keyStore.setEntry(DEFAULT_CLIENT_ALIAS, privateKeyEntry,
keyPasswordProtection);
+
+ // Save the keystore to the specified location
+ try (FileOutputStream fos = new
FileOutputStream(keyStorePath.toString())) {
+ keyStore.store(fos, keyStorePassword.toCharArray());
+ }
+ LOGGER.info("Initialized the SSL key store.");
+ } catch (Exception ex) {
+ throw new RuntimeException("Error initializing the SSL key store", ex);
+ }
+ }
+
+ private static Path getTrustStorePath(Properties consumerProps) {
+ String trustStoreLocation =
consumerProps.getProperty(SSL_TRUSTSTORE_LOCATION);
+ return Paths.get(trustStoreLocation);
+ }
+
+ private static Path getKeyStorePath(Properties consumerProps) {
+ String keyStoreLocation = consumerProps.getProperty(SSL_KEYSTORE_LOCATION);
+ return Paths.get(keyStoreLocation);
+ }
+
+ // Renew the trust store if needed.
+ private static boolean shouldRenewTrustStore(Properties consumerProps) {
+ boolean renewTrustStore;
+ Path trustStorePath = getTrustStorePath(consumerProps);
+ String trustStorePassword =
consumerProps.getProperty(SSL_TRUSTSTORE_PASSWORD);
+ String serverCertificate =
consumerProps.getProperty(STREAM_KAFKA_SSL_SERVER_CERTIFICATE);
+ String certificateType =
consumerProps.getProperty(STREAM_KAFKA_SSL_CERTIFICATE_TYPE,
DEFAULT_CERTIFICATE_TYPE);
+
+ try {
+ // Load the trust store
+ KeyStore trustStore = KeyStore.getInstance(KeyStore.getDefaultType());
+ try (FileInputStream fis = new
FileInputStream(trustStorePath.toString())) {
+ trustStore.load(fis, trustStorePassword.toCharArray());
+ }
+
+ // Decode the provided certificate
+ byte[] decodedCertBytes = Base64.getDecoder().decode(serverCertificate);
+ CertificateFactory certFactory =
CertificateFactory.getInstance(certificateType);
+ Certificate providedCertificate = certFactory.generateCertificate(new
ByteArrayInputStream(decodedCertBytes));
+
+ // Get the certificate from the trust store
+ Certificate trustStoreCertificate =
trustStore.getCertificate(DEFAULT_SERVER_ALIAS);
+
+ // Compare the certificates
+ renewTrustStore = !providedCertificate.equals(trustStoreCertificate);
+ } catch (FileNotFoundException fex) {
+ // create the trust store if trust store does not exist – happens the
very first time
+ renewTrustStore = true;
+ } catch (Exception ex) {
+ // renew trust store if comparison check fails
+ renewTrustStore = true;
+ LOGGER.warn("Trust store certificate comparison check failed.", ex);
+ }
+
+ return renewTrustStore;
+ }
+
+ // Renew the key store if needed.
+ private static boolean shouldRenewKeyStore(Properties consumerProps) {
+ boolean renewKeyStore;
+ Path keyStorePath = getKeyStorePath(consumerProps);
+ String keyStorePassword = consumerProps.getProperty(SSL_KEYSTORE_PASSWORD);
+ String keyPassword = consumerProps.getProperty(SSL_KEY_PASSWORD);
+ String certificateType =
consumerProps.getProperty(STREAM_KAFKA_SSL_CERTIFICATE_TYPE,
DEFAULT_CERTIFICATE_TYPE);
+ String clientCertificate =
consumerProps.getProperty(STREAM_KAFKA_SSL_CLIENT_CERTIFICATE);
+ String privateKeyAlgorithm =
consumerProps.getProperty(STREAM_KAFKA_SSL_CLIENT_KEY_ALGORITHM,
+ DEFAULT_KEY_ALGORITHM);
+ String privateKeyString =
consumerProps.getProperty(STREAM_KAFKA_SSL_CLIENT_KEY);
+ try {
+ // Load the KeyStore
+ KeyStore keyStore = KeyStore.getInstance(KeyStore.getDefaultType());
+ try (FileInputStream fis = new FileInputStream(keyStorePath.toString()))
{
+ keyStore.load(fis, keyStorePassword.toCharArray());
+ }
+
+ // Extract certificate and private key from KeyStore
+ Certificate keyStoreCert = keyStore.getCertificate(DEFAULT_CLIENT_ALIAS);
+ PrivateKey keyStorePrivateKey = (PrivateKey)
keyStore.getKey(DEFAULT_CLIENT_ALIAS, keyPassword.toCharArray());
+
+ // Decode provided Base64 encoded certificate and private key
+ CertificateFactory certFactory =
CertificateFactory.getInstance(certificateType);
+ Certificate providedCert = certFactory.generateCertificate(new
ByteArrayInputStream(
+ Base64.getDecoder().decode(clientCertificate)));
+ PKCS8EncodedKeySpec keySpec = new
PKCS8EncodedKeySpec(Base64.getDecoder().decode(privateKeyString));
+ KeyFactory keyFactory = KeyFactory.getInstance(privateKeyAlgorithm);
+ PrivateKey providedPrivateKey = keyFactory.generatePrivate(keySpec);
+
+ // Compare certificates and private keys
+ boolean isCertSame = Arrays.equals(keyStoreCert.getEncoded(),
providedCert.getEncoded());
+ boolean isKeySame = Arrays.equals(keyStorePrivateKey.getEncoded(),
providedPrivateKey.getEncoded());
+ renewKeyStore = !(isCertSame && isKeySame);
+ } catch (FileNotFoundException fex) {
+ // create the key store if key store does not exist – happens the very
first time
+ renewKeyStore = true;
+ } catch (Exception ex) {
+ // renew key store if comparison check fails
+ renewKeyStore = true;
+ LOGGER.warn("Key store certificate and private key comparison checks
failed.", ex);
+ }
+ return renewKeyStore;
+ }
+
+ private static void deleteFile(Path path) {
+ try {
+ Files.deleteIfExists(path);
+ LOGGER.info(String.format("Successfully deleted file: %s", path));
+ } catch (IOException iex) {
+ LOGGER.warn(String.format("Failed to delete the file: %s", path));
+ }
+ }
+
+ private static void createFile(Path path)
+ throws IOException {
+ Path parentDir = path.getParent();
+ if (parentDir != null) {
+ Files.createDirectories(parentDir);
+ }
+ Path filePath = path.toAbsolutePath();
+ if (!Files.exists(filePath)) {
+ Files.createFile(filePath);
+ LOGGER.info(String.format("Successfully created file: %s", path));
+ }
+ }
+}
diff --git
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/test/java/org/apache/pinot/plugin/stream/kafka20/KafkaSSLUtilsTest.java
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/test/java/org/apache/pinot/plugin/stream/kafka20/KafkaSSLUtilsTest.java
new file mode 100644
index 0000000000..6f61a22e82
--- /dev/null
+++
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/test/java/org/apache/pinot/plugin/stream/kafka20/KafkaSSLUtilsTest.java
@@ -0,0 +1,310 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.stream.kafka20;
+
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.math.BigInteger;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.security.KeyPair;
+import java.security.KeyPairGenerator;
+import java.security.KeyStore;
+import java.security.KeyStoreException;
+import java.security.NoSuchAlgorithmException;
+import java.security.NoSuchProviderException;
+import java.security.SecureRandom;
+import java.security.Security;
+import java.security.cert.CertificateException;
+import java.security.cert.X509Certificate;
+import java.util.Date;
+import java.util.Enumeration;
+import java.util.Properties;
+import java.util.UUID;
+import org.bouncycastle.asn1.x500.X500Name;
+import org.bouncycastle.cert.X509v3CertificateBuilder;
+import org.bouncycastle.cert.jcajce.JcaX509CertificateConverter;
+import org.bouncycastle.cert.jcajce.JcaX509v3CertificateBuilder;
+import org.bouncycastle.jce.provider.BouncyCastleProvider;
+import org.bouncycastle.operator.ContentSigner;
+import org.bouncycastle.operator.OperatorCreationException;
+import org.bouncycastle.operator.jcajce.JcaContentSignerBuilder;
+import org.bouncycastle.util.encoders.Base64;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+
+public class KafkaSSLUtilsTest {
+ private String _trustStorePath;
+ private String _keyStorePath;
+ private static final String DEFAULT_TRUSTSTORE_PASSWORD =
"mytruststorepassword";
+ private static final String DEFAULT_KEYSTORE_PASSWORD = "mykeystorepassword";
+
+ static {
+ // helps generate the X509Certificate
+ Security.addProvider(new BouncyCastleProvider());
+ }
+
+ @BeforeMethod
+ private void setup() {
+ _trustStorePath = "/tmp/" + UUID.randomUUID() + "/client.truststore.jks";
+ _keyStorePath = "/tmp/" + UUID.randomUUID() + "/client.keystore.p12";
+ }
+
+ @AfterMethod
+ private void cleanup() {
+ Path trustStorePath = Paths.get(_trustStorePath);
+ try {
+ Files.deleteIfExists(trustStorePath);
+ } catch (IOException ex) {
+ // ignored
+ }
+
+ Path keyStorePath = Paths.get(_keyStorePath);
+ try {
+ Files.deleteIfExists(keyStorePath);
+ } catch (IOException ex) {
+ // ignored
+ }
+ }
+
+ @Test
+ public void testInitTrustStore()
+ throws CertificateException, NoSuchAlgorithmException,
OperatorCreationException, NoSuchProviderException,
+ IOException, KeyStoreException {
+ Properties consumerProps = new Properties();
+ setTrustStoreProps(consumerProps);
+
+ // should not throw any exceptions
+ KafkaSSLUtils.initTrustStore(consumerProps);
+ validateTrustStoreCertificateCount(1);
+ }
+
+ @Test
+ public void testInitKeyStore()
+ throws CertificateException, NoSuchAlgorithmException,
OperatorCreationException, NoSuchProviderException,
+ IOException, KeyStoreException {
+ Properties consumerProps = new Properties();
+ setKeyStoreProps(consumerProps);
+
+ // should not throw any exceptions
+ KafkaSSLUtils.initKeyStore(consumerProps);
+ validateKeyStoreCertificateCount(1);
+ }
+
+ @Test
+ public void testInitSSLTrustStoreAndKeyStore()
+ throws CertificateException, NoSuchAlgorithmException,
OperatorCreationException, NoSuchProviderException,
+ KeyStoreException, IOException {
+ Properties consumerProps = new Properties();
+ setTrustStoreProps(consumerProps);
+ setKeyStoreProps(consumerProps);
+
+ // should not throw any exceptions
+ KafkaSSLUtils.initSSL(consumerProps);
+
+ // validate
+ validateTrustStoreCertificateCount(1);
+ validateKeyStoreCertificateCount(1);
+ }
+
+ @Test
+ public void testInitSSLTrustStoreOnly()
+ throws CertificateException, NoSuchAlgorithmException,
OperatorCreationException, NoSuchProviderException,
+ IOException, KeyStoreException {
+ Properties consumerProps = new Properties();
+ setTrustStoreProps(consumerProps);
+
+ // should not throw any exceptions
+ KafkaSSLUtils.initSSL(consumerProps);
+
+ // validate
+ validateTrustStoreCertificateCount(1);
+ }
+
+ @Test (expectedExceptions = java.io.FileNotFoundException.class)
+ public void testInitSSLKeyStoreOnly()
+ throws CertificateException, NoSuchAlgorithmException,
OperatorCreationException, NoSuchProviderException,
+ IOException, KeyStoreException {
+ Properties consumerProps = new Properties();
+ setKeyStoreProps(consumerProps);
+
+ // should not throw any exceptions
+ KafkaSSLUtils.initSSL(consumerProps);
+
+ // Validate that no certificates are installed
+ validateTrustStoreCertificateCount(0);
+ }
+
+ @Test
+ public void testInitSSLAndRenewCertificates()
+ throws CertificateException, NoSuchAlgorithmException,
OperatorCreationException, NoSuchProviderException,
+ IOException, KeyStoreException {
+ Properties consumerProps = new Properties();
+ setTrustStoreProps(consumerProps);
+ setKeyStoreProps(consumerProps);
+ KafkaSSLUtils.initSSL(consumerProps);
+
+ // renew the truststore and keystore
+ setTrustStoreProps(consumerProps);
+ setKeyStoreProps(consumerProps);
+ KafkaSSLUtils.initSSL(consumerProps);
+
+ // validate
+ validateTrustStoreCertificateCount(1);
+ validateKeyStoreCertificateCount(1);
+ }
+
+ @Test
+ public void testInitSSLBackwardsCompatibilityCheck()
+ throws CertificateException, NoSuchAlgorithmException,
OperatorCreationException, NoSuchProviderException,
+ IOException, KeyStoreException {
+ Properties consumerProps = new Properties();
+ setTrustStoreProps(consumerProps);
+ setKeyStoreProps(consumerProps);
+ KafkaSSLUtils.initSSL(consumerProps);
+
+ // validate
+ validateTrustStoreCertificateCount(1);
+ validateKeyStoreCertificateCount(1);
+
+ setTrustStoreProps(consumerProps); // new server certificate is generated
+ consumerProps.remove("stream.kafka.ssl.server.certificate");
+ setKeyStoreProps(consumerProps); // new client certificate is generated
+ consumerProps.remove("stream.kafka.ssl.client.certificate");
+
+ // Attempt to initialize the trust store and key store again without
passing the required certificates
+ KafkaSSLUtils.initSSL(consumerProps);
+ // validate again that the existing certificates are untouched.
+ validateTrustStoreCertificateCount(1);
+ validateKeyStoreCertificateCount(1);
+ }
+
+ private void validateTrustStoreCertificateCount(int expCount)
+ throws CertificateException, IOException, NoSuchAlgorithmException,
KeyStoreException {
+ // Validate that certificate is installed in the trust store
+ KeyStore trustStore = KeyStore.getInstance("JKS");
+ try (FileInputStream fis = new FileInputStream(_trustStorePath)) {
+ trustStore.load(fis, DEFAULT_TRUSTSTORE_PASSWORD.toCharArray());
+ }
+
+ int certCount = 0;
+ // Iterate through the aliases in the TrustStore
+ Enumeration<String> aliases = trustStore.aliases();
+ while (aliases.hasMoreElements()) {
+ String alias = aliases.nextElement();
+ // Check if the alias refers to a certificate
+ if (trustStore.isCertificateEntry(alias)) {
+ ++certCount;
+ }
+ }
+ Assert.assertEquals(expCount, certCount);
+ }
+
+ private void validateKeyStoreCertificateCount(int expCount)
+ throws CertificateException, IOException, NoSuchAlgorithmException,
KeyStoreException {
+ // Validate that certificate is installed in the key store
+ KeyStore keyStore = KeyStore.getInstance("PKCS12");
+ try (FileInputStream fis = new FileInputStream(_keyStorePath)) {
+ keyStore.load(fis, DEFAULT_KEYSTORE_PASSWORD.toCharArray());
+ }
+
+ int certCount = 0;
+ // Iterate through the aliases in the TrustStore
+ Enumeration<String> aliases = keyStore.aliases();
+ while (aliases.hasMoreElements()) {
+ String alias = aliases.nextElement();
+ // Check if the alias refers to a key
+ if (keyStore.isKeyEntry(alias)) {
+ ++certCount;
+ }
+ }
+ Assert.assertEquals(expCount, certCount);
+ }
+
+ private void setTrustStoreProps(Properties consumerProps)
+ throws CertificateException, NoSuchAlgorithmException,
OperatorCreationException, NoSuchProviderException {
+ String[] certCreationResult = generateSelfSignedCertificate();
+ String serverCertificate = certCreationResult[1];
+ consumerProps.setProperty("stream.kafka.ssl.server.certificate",
serverCertificate);
+ consumerProps.setProperty("stream.kafka.ssl.server.certificate.type",
"X.509");
+ consumerProps.setProperty("ssl.truststore.type", "jks");
+ consumerProps.setProperty("ssl.truststore.location", _trustStorePath);
+ consumerProps.setProperty("ssl.truststore.password",
DEFAULT_TRUSTSTORE_PASSWORD);
+ }
+
+ private void setKeyStoreProps(Properties consumerProps)
+ throws CertificateException, NoSuchAlgorithmException,
OperatorCreationException, NoSuchProviderException {
+ String[] certCreationResult = generateSelfSignedCertificate();
+ String privateKey = certCreationResult[0];
+ String clientCertificate = certCreationResult[1];
+ consumerProps.setProperty("ssl.keystore.location", _keyStorePath);
+ consumerProps.setProperty("ssl.keystore.password",
DEFAULT_KEYSTORE_PASSWORD);
+ consumerProps.setProperty("ssl.keystore.type", "PKCS12");
+ consumerProps.setProperty("ssl.key.password", "mykeypwd");
+ consumerProps.setProperty("stream.kafka.ssl.certificate.type", "X.509");
+ consumerProps.setProperty("stream.kafka.ssl.client.certificate",
clientCertificate);
+ consumerProps.setProperty("stream.kafka.ssl.client.key", privateKey);
+ consumerProps.setProperty("stream.kafka.ssl.client.key.algorithm", "RSA");
+ }
+
+ private String[] generateSelfSignedCertificate()
+ throws CertificateException, OperatorCreationException,
NoSuchAlgorithmException, NoSuchProviderException {
+ String[] certCreationResult = new String[2];
+ // Generate a key pair
+ KeyPairGenerator keyPairGenerator = KeyPairGenerator.getInstance("RSA",
"BC");
+ keyPairGenerator.initialize(2048, new SecureRandom());
+ KeyPair keyPair = keyPairGenerator.generateKeyPair();
+ // set the private key into the result object
+ certCreationResult[0] =
Base64.toBase64String(keyPair.getPrivate().getEncoded());
+
+ // Validity of the certificate
+ Date notBefore = new Date();
+ Date notAfter = new Date(notBefore.getTime() + 7 * 24 * 60 * 60 * 1000L);
// 1 week
+
+ // Issuer and Subject DN
+ X500Name issuerName = new X500Name("CN=Test CA, O=Eng, OU=IT, L=Sunnyvale,
ST=CA, C=US");
+ X500Name subjectName = new X500Name("CN=Test User, O=Eng, OU=IT,
L=Sunnyvale, ST=CA, C=US");
+
+ // Serial Number
+ BigInteger serial = BigInteger.valueOf(System.currentTimeMillis());
+
+ // Create the certificate builder
+ X509v3CertificateBuilder certBuilder = new JcaX509v3CertificateBuilder(
+ issuerName,
+ serial,
+ notBefore,
+ notAfter,
+ subjectName,
+ keyPair.getPublic());
+
+ // Create a signer
+ ContentSigner signer = new
JcaContentSignerBuilder("SHA256withRSA").setProvider("BC").build(keyPair.getPrivate());
+
+ // Build the certificate
+ X509Certificate cert = new JcaX509CertificateConverter().setProvider("BC")
+ .getCertificate(certBuilder.build(signer));
+ // set the encoded certificate string into the result object
+ certCreationResult[1] = Base64.toBase64String(cert.getEncoded());
+ return certCreationResult;
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]