snleee commented on code in PR #12249: URL: https://github.com/apache/pinot/pull/12249#discussion_r1451843966
########## pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaSSLUtils.java: ########## @@ -0,0 +1,340 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.plugin.stream.kafka20; + +import com.google.common.annotations.VisibleForTesting; +import java.io.ByteArrayInputStream; +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.nio.file.FileAlreadyExistsException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.security.KeyFactory; +import java.security.KeyStore; +import java.security.PrivateKey; +import java.security.cert.Certificate; +import java.security.cert.CertificateFactory; +import java.security.spec.PKCS8EncodedKeySpec; +import java.util.Arrays; +import java.util.Base64; +import java.util.Properties; +import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * SSL utils class which helps in initialization of Kafka client SSL configuration. The class can install the + * provided server certificate enabling one-way SSL or it can install the server certificate and the + * client certificates enabling two-way SSL. + */ +public class KafkaSSLUtils { + + private KafkaSSLUtils() { + // private on purpose + } + private static final Logger LOGGER = LoggerFactory.getLogger(KafkaSSLUtils.class); + // Value constants + private static final String DEFAULT_CERTIFICATE_TYPE = "X.509"; + private static final String DEFAULT_KEY_ALGORITHM = "RSA"; + private static final String DEFAULT_KEYSTORE_TYPE = "PKCS12"; + private static final String DEFAULT_SECURITY_PROTOCOL = "SSL"; + private static final String DEFAULT_TRUSTSTORE_TYPE = "jks"; + private static final String DEFAULT_SERVER_ALIAS = "ServerAlias"; + private static final String DEFAULT_CLIENT_ALIAS = "ClientAlias"; + // Key constants + private static final String SSL_TRUSTSTORE_LOCATION = "ssl.truststore.location"; + private static final String SSL_TRUSTSTORE_PASSWORD = "ssl.truststore.password"; + private static final String SECURITY_PROTOCOL = "security.protocol"; + private static final String SSL_KEYSTORE_LOCATION = "ssl.keystore.location"; + private static final String SSL_KEYSTORE_PASSWORD = "ssl.keystore.password"; + private static final String SSL_KEY_PASSWORD = "ssl.key.password"; + private static final String STREAM_KAFKA_SSL_SERVER_CERTIFICATE = "stream.kafka.ssl.server.certificate"; + private static final String STREAM_KAFKA_SSL_CERTIFICATE_TYPE = "stream.kafka.ssl.certificate.type"; + private static final String SSL_TRUSTSTORE_TYPE = "ssl.truststore.type"; + private static final String STREAM_KAFKA_SSL_CLIENT_CERTIFICATE = "stream.kafka.ssl.client.certificate"; + private static final String STREAM_KAFKA_SSL_CLIENT_KEY = "stream.kafka.ssl.client.key"; + private static final String STREAM_KAFKA_SSL_CLIENT_KEY_ALGORITHM = "stream.kafka.ssl.client.key.algorithm"; + private static final String SSL_KEYSTORE_TYPE = "ssl.keystore.type"; + + public static void initSSL(Properties consumerProps) { + // Check if one-way SSL is enabled. In this scenario, the client validates the server certificate. + String trustStoreLocation = consumerProps.getProperty(SSL_TRUSTSTORE_LOCATION); + String trustStorePassword = consumerProps.getProperty(SSL_TRUSTSTORE_PASSWORD); + if (StringUtils.isAnyEmpty(trustStoreLocation, trustStorePassword)) { + return; + } + renewTrustStore(consumerProps); + initTrustStore(consumerProps); + + // Set the security protocol + String securityProtocol = consumerProps.getProperty(SECURITY_PROTOCOL, DEFAULT_SECURITY_PROTOCOL); + consumerProps.setProperty(SECURITY_PROTOCOL, securityProtocol); + + // Check if two-way SSL is enabled. In this scenario, the client validates the server's certificate and the server + // validates the client's certificate. + String keyStoreLocation = consumerProps.getProperty(SSL_KEYSTORE_LOCATION); + String keyStorePassword = consumerProps.getProperty(SSL_KEYSTORE_PASSWORD); + String keyPassword = consumerProps.getProperty(SSL_KEY_PASSWORD); + if (StringUtils.isAnyEmpty(keyStoreLocation, keyStorePassword, keyPassword)) { + return; + } + renewKeyStore(consumerProps); + initKeyStore(consumerProps); + } + + @VisibleForTesting + static void initTrustStore(Properties consumerProps) { + // Create the trust store if it does not exist + Path trustStorePath = getTrustStorePath(consumerProps); + if (Files.exists(trustStorePath)) { + return; + } + + try { + // Create the trust store path + createFile(trustStorePath); + } catch (FileAlreadyExistsException fex) { + return; Review Comment: I think that we should log some error here? ########## pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaSSLUtils.java: ########## @@ -0,0 +1,340 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.plugin.stream.kafka20; + +import com.google.common.annotations.VisibleForTesting; +import java.io.ByteArrayInputStream; +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.nio.file.FileAlreadyExistsException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.security.KeyFactory; +import java.security.KeyStore; +import java.security.PrivateKey; +import java.security.cert.Certificate; +import java.security.cert.CertificateFactory; +import java.security.spec.PKCS8EncodedKeySpec; +import java.util.Arrays; +import java.util.Base64; +import java.util.Properties; +import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * SSL utils class which helps in initialization of Kafka client SSL configuration. The class can install the + * provided server certificate enabling one-way SSL or it can install the server certificate and the + * client certificates enabling two-way SSL. + */ +public class KafkaSSLUtils { + + private KafkaSSLUtils() { + // private on purpose + } + private static final Logger LOGGER = LoggerFactory.getLogger(KafkaSSLUtils.class); + // Value constants + private static final String DEFAULT_CERTIFICATE_TYPE = "X.509"; + private static final String DEFAULT_KEY_ALGORITHM = "RSA"; + private static final String DEFAULT_KEYSTORE_TYPE = "PKCS12"; + private static final String DEFAULT_SECURITY_PROTOCOL = "SSL"; + private static final String DEFAULT_TRUSTSTORE_TYPE = "jks"; + private static final String DEFAULT_SERVER_ALIAS = "ServerAlias"; + private static final String DEFAULT_CLIENT_ALIAS = "ClientAlias"; + // Key constants + private static final String SSL_TRUSTSTORE_LOCATION = "ssl.truststore.location"; + private static final String SSL_TRUSTSTORE_PASSWORD = "ssl.truststore.password"; + private static final String SECURITY_PROTOCOL = "security.protocol"; + private static final String SSL_KEYSTORE_LOCATION = "ssl.keystore.location"; + private static final String SSL_KEYSTORE_PASSWORD = "ssl.keystore.password"; + private static final String SSL_KEY_PASSWORD = "ssl.key.password"; + private static final String STREAM_KAFKA_SSL_SERVER_CERTIFICATE = "stream.kafka.ssl.server.certificate"; + private static final String STREAM_KAFKA_SSL_CERTIFICATE_TYPE = "stream.kafka.ssl.certificate.type"; + private static final String SSL_TRUSTSTORE_TYPE = "ssl.truststore.type"; + private static final String STREAM_KAFKA_SSL_CLIENT_CERTIFICATE = "stream.kafka.ssl.client.certificate"; + private static final String STREAM_KAFKA_SSL_CLIENT_KEY = "stream.kafka.ssl.client.key"; + private static final String STREAM_KAFKA_SSL_CLIENT_KEY_ALGORITHM = "stream.kafka.ssl.client.key.algorithm"; + private static final String SSL_KEYSTORE_TYPE = "ssl.keystore.type"; + + public static void initSSL(Properties consumerProps) { + // Check if one-way SSL is enabled. In this scenario, the client validates the server certificate. + String trustStoreLocation = consumerProps.getProperty(SSL_TRUSTSTORE_LOCATION); + String trustStorePassword = consumerProps.getProperty(SSL_TRUSTSTORE_PASSWORD); + if (StringUtils.isAnyEmpty(trustStoreLocation, trustStorePassword)) { + return; + } + renewTrustStore(consumerProps); + initTrustStore(consumerProps); + + // Set the security protocol + String securityProtocol = consumerProps.getProperty(SECURITY_PROTOCOL, DEFAULT_SECURITY_PROTOCOL); + consumerProps.setProperty(SECURITY_PROTOCOL, securityProtocol); + + // Check if two-way SSL is enabled. In this scenario, the client validates the server's certificate and the server + // validates the client's certificate. + String keyStoreLocation = consumerProps.getProperty(SSL_KEYSTORE_LOCATION); + String keyStorePassword = consumerProps.getProperty(SSL_KEYSTORE_PASSWORD); + String keyPassword = consumerProps.getProperty(SSL_KEY_PASSWORD); + if (StringUtils.isAnyEmpty(keyStoreLocation, keyStorePassword, keyPassword)) { + return; Review Comment: same here ########## pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaSSLUtils.java: ########## @@ -0,0 +1,340 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.plugin.stream.kafka20; + +import com.google.common.annotations.VisibleForTesting; +import java.io.ByteArrayInputStream; +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.nio.file.FileAlreadyExistsException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.security.KeyFactory; +import java.security.KeyStore; +import java.security.PrivateKey; +import java.security.cert.Certificate; +import java.security.cert.CertificateFactory; +import java.security.spec.PKCS8EncodedKeySpec; +import java.util.Arrays; +import java.util.Base64; +import java.util.Properties; +import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * SSL utils class which helps in initialization of Kafka client SSL configuration. The class can install the + * provided server certificate enabling one-way SSL or it can install the server certificate and the + * client certificates enabling two-way SSL. + */ +public class KafkaSSLUtils { + + private KafkaSSLUtils() { + // private on purpose + } + private static final Logger LOGGER = LoggerFactory.getLogger(KafkaSSLUtils.class); + // Value constants + private static final String DEFAULT_CERTIFICATE_TYPE = "X.509"; + private static final String DEFAULT_KEY_ALGORITHM = "RSA"; + private static final String DEFAULT_KEYSTORE_TYPE = "PKCS12"; + private static final String DEFAULT_SECURITY_PROTOCOL = "SSL"; + private static final String DEFAULT_TRUSTSTORE_TYPE = "jks"; + private static final String DEFAULT_SERVER_ALIAS = "ServerAlias"; + private static final String DEFAULT_CLIENT_ALIAS = "ClientAlias"; + // Key constants + private static final String SSL_TRUSTSTORE_LOCATION = "ssl.truststore.location"; + private static final String SSL_TRUSTSTORE_PASSWORD = "ssl.truststore.password"; + private static final String SECURITY_PROTOCOL = "security.protocol"; + private static final String SSL_KEYSTORE_LOCATION = "ssl.keystore.location"; + private static final String SSL_KEYSTORE_PASSWORD = "ssl.keystore.password"; + private static final String SSL_KEY_PASSWORD = "ssl.key.password"; + private static final String STREAM_KAFKA_SSL_SERVER_CERTIFICATE = "stream.kafka.ssl.server.certificate"; + private static final String STREAM_KAFKA_SSL_CERTIFICATE_TYPE = "stream.kafka.ssl.certificate.type"; + private static final String SSL_TRUSTSTORE_TYPE = "ssl.truststore.type"; + private static final String STREAM_KAFKA_SSL_CLIENT_CERTIFICATE = "stream.kafka.ssl.client.certificate"; + private static final String STREAM_KAFKA_SSL_CLIENT_KEY = "stream.kafka.ssl.client.key"; + private static final String STREAM_KAFKA_SSL_CLIENT_KEY_ALGORITHM = "stream.kafka.ssl.client.key.algorithm"; + private static final String SSL_KEYSTORE_TYPE = "ssl.keystore.type"; + + public static void initSSL(Properties consumerProps) { + // Check if one-way SSL is enabled. In this scenario, the client validates the server certificate. + String trustStoreLocation = consumerProps.getProperty(SSL_TRUSTSTORE_LOCATION); + String trustStorePassword = consumerProps.getProperty(SSL_TRUSTSTORE_PASSWORD); + if (StringUtils.isAnyEmpty(trustStoreLocation, trustStorePassword)) { + return; Review Comment: Can we add some info message here that we are skipping SSL initialization and its reason? ########## pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaSSLUtils.java: ########## @@ -0,0 +1,340 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.plugin.stream.kafka20; + +import com.google.common.annotations.VisibleForTesting; +import java.io.ByteArrayInputStream; +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.nio.file.FileAlreadyExistsException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.security.KeyFactory; +import java.security.KeyStore; +import java.security.PrivateKey; +import java.security.cert.Certificate; +import java.security.cert.CertificateFactory; +import java.security.spec.PKCS8EncodedKeySpec; +import java.util.Arrays; +import java.util.Base64; +import java.util.Properties; +import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * SSL utils class which helps in initialization of Kafka client SSL configuration. The class can install the + * provided server certificate enabling one-way SSL or it can install the server certificate and the + * client certificates enabling two-way SSL. + */ +public class KafkaSSLUtils { + + private KafkaSSLUtils() { + // private on purpose + } + private static final Logger LOGGER = LoggerFactory.getLogger(KafkaSSLUtils.class); + // Value constants + private static final String DEFAULT_CERTIFICATE_TYPE = "X.509"; + private static final String DEFAULT_KEY_ALGORITHM = "RSA"; + private static final String DEFAULT_KEYSTORE_TYPE = "PKCS12"; + private static final String DEFAULT_SECURITY_PROTOCOL = "SSL"; + private static final String DEFAULT_TRUSTSTORE_TYPE = "jks"; + private static final String DEFAULT_SERVER_ALIAS = "ServerAlias"; + private static final String DEFAULT_CLIENT_ALIAS = "ClientAlias"; + // Key constants + private static final String SSL_TRUSTSTORE_LOCATION = "ssl.truststore.location"; + private static final String SSL_TRUSTSTORE_PASSWORD = "ssl.truststore.password"; + private static final String SECURITY_PROTOCOL = "security.protocol"; + private static final String SSL_KEYSTORE_LOCATION = "ssl.keystore.location"; + private static final String SSL_KEYSTORE_PASSWORD = "ssl.keystore.password"; + private static final String SSL_KEY_PASSWORD = "ssl.key.password"; + private static final String STREAM_KAFKA_SSL_SERVER_CERTIFICATE = "stream.kafka.ssl.server.certificate"; + private static final String STREAM_KAFKA_SSL_CERTIFICATE_TYPE = "stream.kafka.ssl.certificate.type"; + private static final String SSL_TRUSTSTORE_TYPE = "ssl.truststore.type"; + private static final String STREAM_KAFKA_SSL_CLIENT_CERTIFICATE = "stream.kafka.ssl.client.certificate"; + private static final String STREAM_KAFKA_SSL_CLIENT_KEY = "stream.kafka.ssl.client.key"; + private static final String STREAM_KAFKA_SSL_CLIENT_KEY_ALGORITHM = "stream.kafka.ssl.client.key.algorithm"; + private static final String SSL_KEYSTORE_TYPE = "ssl.keystore.type"; + + public static void initSSL(Properties consumerProps) { + // Check if one-way SSL is enabled. In this scenario, the client validates the server certificate. + String trustStoreLocation = consumerProps.getProperty(SSL_TRUSTSTORE_LOCATION); + String trustStorePassword = consumerProps.getProperty(SSL_TRUSTSTORE_PASSWORD); + if (StringUtils.isAnyEmpty(trustStoreLocation, trustStorePassword)) { + return; + } + renewTrustStore(consumerProps); + initTrustStore(consumerProps); + + // Set the security protocol + String securityProtocol = consumerProps.getProperty(SECURITY_PROTOCOL, DEFAULT_SECURITY_PROTOCOL); + consumerProps.setProperty(SECURITY_PROTOCOL, securityProtocol); + + // Check if two-way SSL is enabled. In this scenario, the client validates the server's certificate and the server + // validates the client's certificate. + String keyStoreLocation = consumerProps.getProperty(SSL_KEYSTORE_LOCATION); + String keyStorePassword = consumerProps.getProperty(SSL_KEYSTORE_PASSWORD); + String keyPassword = consumerProps.getProperty(SSL_KEY_PASSWORD); + if (StringUtils.isAnyEmpty(keyStoreLocation, keyStorePassword, keyPassword)) { + return; + } + renewKeyStore(consumerProps); Review Comment: I think that it's probably better to represent the logic as: ``` if (shouldRenewKeyStore()) { initKeyStore(consumerProps); // delete and recreate the key store file } ``` ########## pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/test/java/org/apache/pinot/plugin/stream/kafka20/KafkaSSLUtilsTest.java: ########## @@ -0,0 +1,285 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.plugin.stream.kafka20; + +import java.io.FileInputStream; +import java.io.IOException; +import java.math.BigInteger; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.security.KeyPair; +import java.security.KeyPairGenerator; +import java.security.KeyStore; +import java.security.KeyStoreException; +import java.security.NoSuchAlgorithmException; +import java.security.NoSuchProviderException; +import java.security.SecureRandom; +import java.security.Security; +import java.security.cert.CertificateException; +import java.security.cert.X509Certificate; +import java.util.Date; +import java.util.Enumeration; +import java.util.Properties; +import java.util.UUID; +import org.bouncycastle.asn1.x500.X500Name; +import org.bouncycastle.cert.X509v3CertificateBuilder; +import org.bouncycastle.cert.jcajce.JcaX509CertificateConverter; +import org.bouncycastle.cert.jcajce.JcaX509v3CertificateBuilder; +import org.bouncycastle.jce.provider.BouncyCastleProvider; +import org.bouncycastle.operator.ContentSigner; +import org.bouncycastle.operator.OperatorCreationException; +import org.bouncycastle.operator.jcajce.JcaContentSignerBuilder; +import org.bouncycastle.util.encoders.Base64; +import org.testng.Assert; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + + +public class KafkaSSLUtilsTest { Review Comment: Can we add the test to check the backward compatibility? ########## pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaSSLUtils.java: ########## @@ -0,0 +1,340 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.plugin.stream.kafka20; + +import com.google.common.annotations.VisibleForTesting; +import java.io.ByteArrayInputStream; +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.nio.file.FileAlreadyExistsException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.security.KeyFactory; +import java.security.KeyStore; +import java.security.PrivateKey; +import java.security.cert.Certificate; +import java.security.cert.CertificateFactory; +import java.security.spec.PKCS8EncodedKeySpec; +import java.util.Arrays; +import java.util.Base64; +import java.util.Properties; +import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * SSL utils class which helps in initialization of Kafka client SSL configuration. The class can install the + * provided server certificate enabling one-way SSL or it can install the server certificate and the + * client certificates enabling two-way SSL. + */ +public class KafkaSSLUtils { + + private KafkaSSLUtils() { + // private on purpose + } + private static final Logger LOGGER = LoggerFactory.getLogger(KafkaSSLUtils.class); + // Value constants + private static final String DEFAULT_CERTIFICATE_TYPE = "X.509"; + private static final String DEFAULT_KEY_ALGORITHM = "RSA"; + private static final String DEFAULT_KEYSTORE_TYPE = "PKCS12"; + private static final String DEFAULT_SECURITY_PROTOCOL = "SSL"; + private static final String DEFAULT_TRUSTSTORE_TYPE = "jks"; + private static final String DEFAULT_SERVER_ALIAS = "ServerAlias"; + private static final String DEFAULT_CLIENT_ALIAS = "ClientAlias"; + // Key constants + private static final String SSL_TRUSTSTORE_LOCATION = "ssl.truststore.location"; + private static final String SSL_TRUSTSTORE_PASSWORD = "ssl.truststore.password"; + private static final String SECURITY_PROTOCOL = "security.protocol"; + private static final String SSL_KEYSTORE_LOCATION = "ssl.keystore.location"; + private static final String SSL_KEYSTORE_PASSWORD = "ssl.keystore.password"; + private static final String SSL_KEY_PASSWORD = "ssl.key.password"; + private static final String STREAM_KAFKA_SSL_SERVER_CERTIFICATE = "stream.kafka.ssl.server.certificate"; + private static final String STREAM_KAFKA_SSL_CERTIFICATE_TYPE = "stream.kafka.ssl.certificate.type"; + private static final String SSL_TRUSTSTORE_TYPE = "ssl.truststore.type"; + private static final String STREAM_KAFKA_SSL_CLIENT_CERTIFICATE = "stream.kafka.ssl.client.certificate"; + private static final String STREAM_KAFKA_SSL_CLIENT_KEY = "stream.kafka.ssl.client.key"; + private static final String STREAM_KAFKA_SSL_CLIENT_KEY_ALGORITHM = "stream.kafka.ssl.client.key.algorithm"; + private static final String SSL_KEYSTORE_TYPE = "ssl.keystore.type"; + + public static void initSSL(Properties consumerProps) { + // Check if one-way SSL is enabled. In this scenario, the client validates the server certificate. + String trustStoreLocation = consumerProps.getProperty(SSL_TRUSTSTORE_LOCATION); + String trustStorePassword = consumerProps.getProperty(SSL_TRUSTSTORE_PASSWORD); + if (StringUtils.isAnyEmpty(trustStoreLocation, trustStorePassword)) { + return; + } + renewTrustStore(consumerProps); + initTrustStore(consumerProps); + + // Set the security protocol + String securityProtocol = consumerProps.getProperty(SECURITY_PROTOCOL, DEFAULT_SECURITY_PROTOCOL); + consumerProps.setProperty(SECURITY_PROTOCOL, securityProtocol); + + // Check if two-way SSL is enabled. In this scenario, the client validates the server's certificate and the server + // validates the client's certificate. + String keyStoreLocation = consumerProps.getProperty(SSL_KEYSTORE_LOCATION); + String keyStorePassword = consumerProps.getProperty(SSL_KEYSTORE_PASSWORD); + String keyPassword = consumerProps.getProperty(SSL_KEY_PASSWORD); + if (StringUtils.isAnyEmpty(keyStoreLocation, keyStorePassword, keyPassword)) { + return; + } + renewKeyStore(consumerProps); + initKeyStore(consumerProps); + } + + @VisibleForTesting + static void initTrustStore(Properties consumerProps) { + // Create the trust store if it does not exist + Path trustStorePath = getTrustStorePath(consumerProps); + if (Files.exists(trustStorePath)) { + return; + } + + try { + // Create the trust store path + createFile(trustStorePath); + } catch (FileAlreadyExistsException fex) { + return; + } catch (IOException iex) { + throw new RuntimeException(String.format("Failed to create the trust store path: %s", trustStorePath), iex); + } + + try { + String trustStorePassword = consumerProps.getProperty(SSL_TRUSTSTORE_PASSWORD); + String serverCertificate = consumerProps.getProperty(STREAM_KAFKA_SSL_SERVER_CERTIFICATE); + String certificateType = consumerProps.getProperty(STREAM_KAFKA_SSL_CERTIFICATE_TYPE, DEFAULT_CERTIFICATE_TYPE); + String trustStoreType = consumerProps.getProperty(SSL_TRUSTSTORE_TYPE, DEFAULT_TRUSTSTORE_TYPE); + consumerProps.setProperty(SSL_TRUSTSTORE_TYPE, trustStoreType); + + // Decode the Base64 string + byte[] certBytes = Base64.getDecoder().decode(serverCertificate); + InputStream certInputStream = new ByteArrayInputStream(certBytes); + + // Create a Certificate object + CertificateFactory certificateFactory = CertificateFactory.getInstance(certificateType); + Certificate certificate = certificateFactory.generateCertificate(certInputStream); + + // Create a TrustStore and load the default TrustStore + KeyStore trustStore = KeyStore.getInstance(trustStoreType); + + // Initialize the TrustStore + trustStore.load(null, null); + + // Add the server certificate to the truststore + trustStore.setCertificateEntry(DEFAULT_SERVER_ALIAS, certificate); + + // Save the keystore to a file + try (FileOutputStream fos = new FileOutputStream(trustStorePath.toString())) { + trustStore.store(fos, trustStorePassword.toCharArray()); + } + LOGGER.info("Initialized the SSL trust store."); + } catch (Exception ex) { + throw new RuntimeException("Error initializing the SSL trust store", ex); + } + } + + @VisibleForTesting + static void initKeyStore(Properties consumerProps) { + // Create the key store if it does not exist + Path keyStorePath = getKeyStorePath(consumerProps); + if (Files.exists(keyStorePath)) { + return; + } + + try { + // Create the key store path + createFile(keyStorePath); + } catch (FileAlreadyExistsException fex) { + return; + } catch (IOException iex) { + throw new RuntimeException(String.format("Failed to create the key store path: %s", keyStorePath), iex); + } + + String keyStorePassword = consumerProps.getProperty(SSL_KEYSTORE_PASSWORD); + String keyPassword = consumerProps.getProperty(SSL_KEY_PASSWORD); + String clientCertificate = consumerProps.getProperty(STREAM_KAFKA_SSL_CLIENT_CERTIFICATE); + String certificateType = consumerProps.getProperty(STREAM_KAFKA_SSL_CERTIFICATE_TYPE, DEFAULT_CERTIFICATE_TYPE); + String privateKeyString = consumerProps.getProperty(STREAM_KAFKA_SSL_CLIENT_KEY); + String privateKeyAlgorithm = consumerProps.getProperty(STREAM_KAFKA_SSL_CLIENT_KEY_ALGORITHM, + DEFAULT_KEY_ALGORITHM); + String keyStoreType = consumerProps.getProperty(SSL_KEYSTORE_TYPE, DEFAULT_KEYSTORE_TYPE); + consumerProps.setProperty(SSL_KEYSTORE_TYPE, keyStoreType); + + try { + // decode the private key and certificate into bytes + byte[] pkBytes = Base64.getDecoder().decode(privateKeyString); + byte[] certBytes = Base64.getDecoder().decode(clientCertificate); + + // Create the private key object + PKCS8EncodedKeySpec keySpec = new PKCS8EncodedKeySpec(pkBytes); + KeyFactory keyFactory = KeyFactory.getInstance(privateKeyAlgorithm); + PrivateKey privateKey = keyFactory.generatePrivate(keySpec); + + // Create the Certificate object + CertificateFactory certFactory = CertificateFactory.getInstance(certificateType); + InputStream certInputStream = new ByteArrayInputStream(certBytes); + Certificate certificate = certFactory.generateCertificate(certInputStream); + + // Create a KeyStore object and load a new empty keystore + KeyStore keyStore = KeyStore.getInstance(keyStoreType); + keyStore.load(null, null); + + // Add the key pair and certificate to the keystore + KeyStore.PrivateKeyEntry privateKeyEntry = new KeyStore.PrivateKeyEntry( + privateKey, new Certificate[]{certificate} + ); + KeyStore.PasswordProtection keyPasswordProtection = new KeyStore.PasswordProtection(keyPassword.toCharArray()); + keyStore.setEntry(DEFAULT_CLIENT_ALIAS, privateKeyEntry, keyPasswordProtection); + + // Save the keystore to the specified location + try (FileOutputStream fos = new FileOutputStream(keyStorePath.toString())) { + keyStore.store(fos, keyStorePassword.toCharArray()); + } + LOGGER.info("Initialized the SSL key store."); + } catch (Exception ex) { + throw new RuntimeException("Error initializing the SSL key store", ex); + } + } + + private static Path getTrustStorePath(Properties consumerProps) { + String trustStoreLocation = consumerProps.getProperty(SSL_TRUSTSTORE_LOCATION); + return Paths.get(trustStoreLocation); + } + + private static Path getKeyStorePath(Properties consumerProps) { + String keyStoreLocation = consumerProps.getProperty(SSL_KEYSTORE_LOCATION); + return Paths.get(keyStoreLocation); + } + + // Renew the trust store if needed. + private static void renewTrustStore(Properties consumerProps) { + boolean renewTrustStore; + Path trustStorePath = getTrustStorePath(consumerProps); + String trustStorePassword = consumerProps.getProperty(SSL_TRUSTSTORE_PASSWORD); + String serverCertificate = consumerProps.getProperty(STREAM_KAFKA_SSL_SERVER_CERTIFICATE); + String certificateType = consumerProps.getProperty(STREAM_KAFKA_SSL_CERTIFICATE_TYPE, DEFAULT_CERTIFICATE_TYPE); + + try { + // Load the trust store + KeyStore trustStore = KeyStore.getInstance(KeyStore.getDefaultType()); + try (FileInputStream fis = new FileInputStream(trustStorePath.toString())) { + trustStore.load(fis, trustStorePassword.toCharArray()); + } + + // Decode the provided certificate + byte[] decodedCertBytes = Base64.getDecoder().decode(serverCertificate); + CertificateFactory certFactory = CertificateFactory.getInstance(certificateType); + Certificate providedCertificate = certFactory.generateCertificate(new ByteArrayInputStream(decodedCertBytes)); + + // Get the certificate from the trust store + Certificate trustStoreCertificate = trustStore.getCertificate(DEFAULT_SERVER_ALIAS); + + // Compare the certificates + renewTrustStore = !providedCertificate.equals(trustStoreCertificate); + } catch (FileNotFoundException fex) { + // create the trust store if trust store does not exist – happens the very first time + renewTrustStore = true; + } catch (Exception ex) { + // renew trust store if comparison check fails + renewTrustStore = true; + LOGGER.warn("Trust store certificate comparison check failed.", ex); + } + + if (renewTrustStore && Files.exists(trustStorePath)) { + LOGGER.info("Renewing the SSL trust store"); + deleteFile(trustStorePath); + } + } + + // Renew the key store if needed. + private static void renewKeyStore(Properties consumerProps) { Review Comment: I think that the current code will delete the certificate file if `STREAM_KAFKA_SSL_CLIENT_CERTIFICATE` is null or empty while the other config. Are we keeping backward compatibility for tables who already have TLS configured manual way? (having trustore/key store copied and the table config includes the location & password). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
