snleee commented on code in PR #12249:
URL: https://github.com/apache/pinot/pull/12249#discussion_r1451842439


##########
pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaSSLUtils.java:
##########
@@ -0,0 +1,340 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.stream.kafka20;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.io.ByteArrayInputStream;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.file.FileAlreadyExistsException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.security.KeyFactory;
+import java.security.KeyStore;
+import java.security.PrivateKey;
+import java.security.cert.Certificate;
+import java.security.cert.CertificateFactory;
+import java.security.spec.PKCS8EncodedKeySpec;
+import java.util.Arrays;
+import java.util.Base64;
+import java.util.Properties;
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * SSL utils class which helps in initialization of Kafka client SSL 
configuration. The class can install the
+ * provided server certificate enabling one-way SSL or it can install the 
server certificate and the
+ * client certificates enabling two-way SSL.
+ */
+public class KafkaSSLUtils {
+
+  private KafkaSSLUtils() {
+    // private on purpose
+  }
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(KafkaSSLUtils.class);
+  // Value constants
+  private static final String DEFAULT_CERTIFICATE_TYPE = "X.509";
+  private static final String DEFAULT_KEY_ALGORITHM = "RSA";
+  private static final String DEFAULT_KEYSTORE_TYPE = "PKCS12";
+  private static final String DEFAULT_SECURITY_PROTOCOL = "SSL";
+  private static final String DEFAULT_TRUSTSTORE_TYPE = "jks";
+  private static final String DEFAULT_SERVER_ALIAS = "ServerAlias";
+  private static final String DEFAULT_CLIENT_ALIAS = "ClientAlias";
+  // Key constants
+  private static final String SSL_TRUSTSTORE_LOCATION = 
"ssl.truststore.location";
+  private static final String SSL_TRUSTSTORE_PASSWORD = 
"ssl.truststore.password";
+  private static final String SECURITY_PROTOCOL = "security.protocol";
+  private static final String SSL_KEYSTORE_LOCATION = "ssl.keystore.location";
+  private static final String SSL_KEYSTORE_PASSWORD = "ssl.keystore.password";
+  private static final String SSL_KEY_PASSWORD = "ssl.key.password";
+  private static final String STREAM_KAFKA_SSL_SERVER_CERTIFICATE = 
"stream.kafka.ssl.server.certificate";
+  private static final String STREAM_KAFKA_SSL_CERTIFICATE_TYPE = 
"stream.kafka.ssl.certificate.type";
+  private static final String SSL_TRUSTSTORE_TYPE = "ssl.truststore.type";
+  private static final String STREAM_KAFKA_SSL_CLIENT_CERTIFICATE = 
"stream.kafka.ssl.client.certificate";
+  private static final String STREAM_KAFKA_SSL_CLIENT_KEY = 
"stream.kafka.ssl.client.key";
+  private static final String STREAM_KAFKA_SSL_CLIENT_KEY_ALGORITHM = 
"stream.kafka.ssl.client.key.algorithm";
+  private static final String SSL_KEYSTORE_TYPE = "ssl.keystore.type";
+
+  public static void initSSL(Properties consumerProps) {
+    // Check if one-way SSL is enabled. In this scenario, the client validates 
the server certificate.
+    String trustStoreLocation = 
consumerProps.getProperty(SSL_TRUSTSTORE_LOCATION);
+    String trustStorePassword = 
consumerProps.getProperty(SSL_TRUSTSTORE_PASSWORD);
+    if (StringUtils.isAnyEmpty(trustStoreLocation, trustStorePassword)) {
+      return;
+    }
+    renewTrustStore(consumerProps);
+    initTrustStore(consumerProps);
+
+    // Set the security protocol
+    String securityProtocol = consumerProps.getProperty(SECURITY_PROTOCOL, 
DEFAULT_SECURITY_PROTOCOL);
+    consumerProps.setProperty(SECURITY_PROTOCOL, securityProtocol);
+
+    // Check if two-way SSL is enabled. In this scenario, the client validates 
the server's certificate and the server
+    // validates the client's certificate.
+    String keyStoreLocation = consumerProps.getProperty(SSL_KEYSTORE_LOCATION);
+    String keyStorePassword = consumerProps.getProperty(SSL_KEYSTORE_PASSWORD);
+    String keyPassword = consumerProps.getProperty(SSL_KEY_PASSWORD);
+    if (StringUtils.isAnyEmpty(keyStoreLocation, keyStorePassword, 
keyPassword)) {

Review Comment:
   same here



##########
pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaSSLUtils.java:
##########
@@ -0,0 +1,340 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.stream.kafka20;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.io.ByteArrayInputStream;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.file.FileAlreadyExistsException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.security.KeyFactory;
+import java.security.KeyStore;
+import java.security.PrivateKey;
+import java.security.cert.Certificate;
+import java.security.cert.CertificateFactory;
+import java.security.spec.PKCS8EncodedKeySpec;
+import java.util.Arrays;
+import java.util.Base64;
+import java.util.Properties;
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * SSL utils class which helps in initialization of Kafka client SSL 
configuration. The class can install the
+ * provided server certificate enabling one-way SSL or it can install the 
server certificate and the
+ * client certificates enabling two-way SSL.
+ */
+public class KafkaSSLUtils {
+
+  private KafkaSSLUtils() {
+    // private on purpose
+  }
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(KafkaSSLUtils.class);
+  // Value constants
+  private static final String DEFAULT_CERTIFICATE_TYPE = "X.509";
+  private static final String DEFAULT_KEY_ALGORITHM = "RSA";
+  private static final String DEFAULT_KEYSTORE_TYPE = "PKCS12";
+  private static final String DEFAULT_SECURITY_PROTOCOL = "SSL";
+  private static final String DEFAULT_TRUSTSTORE_TYPE = "jks";
+  private static final String DEFAULT_SERVER_ALIAS = "ServerAlias";
+  private static final String DEFAULT_CLIENT_ALIAS = "ClientAlias";
+  // Key constants
+  private static final String SSL_TRUSTSTORE_LOCATION = 
"ssl.truststore.location";
+  private static final String SSL_TRUSTSTORE_PASSWORD = 
"ssl.truststore.password";
+  private static final String SECURITY_PROTOCOL = "security.protocol";
+  private static final String SSL_KEYSTORE_LOCATION = "ssl.keystore.location";
+  private static final String SSL_KEYSTORE_PASSWORD = "ssl.keystore.password";
+  private static final String SSL_KEY_PASSWORD = "ssl.key.password";
+  private static final String STREAM_KAFKA_SSL_SERVER_CERTIFICATE = 
"stream.kafka.ssl.server.certificate";
+  private static final String STREAM_KAFKA_SSL_CERTIFICATE_TYPE = 
"stream.kafka.ssl.certificate.type";
+  private static final String SSL_TRUSTSTORE_TYPE = "ssl.truststore.type";
+  private static final String STREAM_KAFKA_SSL_CLIENT_CERTIFICATE = 
"stream.kafka.ssl.client.certificate";
+  private static final String STREAM_KAFKA_SSL_CLIENT_KEY = 
"stream.kafka.ssl.client.key";
+  private static final String STREAM_KAFKA_SSL_CLIENT_KEY_ALGORITHM = 
"stream.kafka.ssl.client.key.algorithm";
+  private static final String SSL_KEYSTORE_TYPE = "ssl.keystore.type";
+
+  public static void initSSL(Properties consumerProps) {
+    // Check if one-way SSL is enabled. In this scenario, the client validates 
the server certificate.
+    String trustStoreLocation = 
consumerProps.getProperty(SSL_TRUSTSTORE_LOCATION);
+    String trustStorePassword = 
consumerProps.getProperty(SSL_TRUSTSTORE_PASSWORD);
+    if (StringUtils.isAnyEmpty(trustStoreLocation, trustStorePassword)) {

Review Comment:
   Can we add some info message here that we are skipping SSL initialization 
and its reason?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to