This is an automated email from the ASF dual-hosted git repository.

ycai pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-analytics.git


The following commit(s) were added to refs/heads/trunk by this push:
     new b87b0ed  CASSANDRA-18545: Provide a SecretsProvider interface to 
abstract the secret provisioning
b87b0ed is described below

commit b87b0edd310d1ef93c507bbbb1ae51e1b0b319c6
Author: Francisco Guerrero <[email protected]>
AuthorDate: Tue May 23 13:56:48 2023 -0700

    CASSANDRA-18545: Provide a SecretsProvider interface to abstract the secret 
provisioning
    
    This commit introduces the SecretsProvider interface that abstracts the 
secrets provisioning.
    This way different implementations of the SecretsProvider can be used to 
provide SSL secrets
    for the Analytics job. We provide an implementation, 
SslConficSecretsProvider, which provides
    secrets based on the configuration for the job.
    
    Patch by Francisco Guerrero; Reviewed by Dinesh Joshi, Yifan Cai for 
CASSANDRA-18545
---
 .../java/org/apache/cassandra/clients/Sidecar.java |  17 +-
 .../apache/cassandra/secrets/SecretsProvider.java  | 108 +++++++++
 .../cassandra/{clients => secrets}/SslConfig.java  |  85 +++----
 .../secrets/SslConfigSecretsProvider.java          | 198 +++++++++++++++
 .../org/apache/cassandra/spark/KryoRegister.java   |   2 +-
 .../cassandra/spark/data/CassandraDataLayer.java   |   5 +-
 .../spark/data/CassandraDataSourceHelper.java      |   2 +-
 .../secrets/SslConfigSecretsProviderTest.java      | 270 +++++++++++++++++++++
 .../{clients => secrets}/SslConfigTest.java        |  14 +-
 .../cassandra/spark/KryoSerializationTests.java    |   2 +-
 .../data/CassandraDataSourceHelperCacheTest.java   |   2 +-
 .../data/partitioner/JDKSerializationTests.java    |   2 +-
 .../secrets/fakecerts/client-keystore-password     |   1 +
 .../secrets/fakecerts/client-keystore.p12          | Bin 0 -> 2576 bytes
 .../secrets/fakecerts/client-truststore-password   |   1 +
 .../secrets/fakecerts/client-truststore.jks        | Bin 0 -> 1094 bytes
 16 files changed, 639 insertions(+), 70 deletions(-)

diff --git 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/clients/Sidecar.java
 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/clients/Sidecar.java
index 71d8f43..bac09e2 100644
--- 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/clients/Sidecar.java
+++ 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/clients/Sidecar.java
@@ -37,6 +37,7 @@ import org.slf4j.LoggerFactory;
 
 import o.a.c.sidecar.client.shaded.io.vertx.core.Vertx;
 import o.a.c.sidecar.client.shaded.io.vertx.core.VertxOptions;
+import org.apache.cassandra.secrets.SecretsProvider;
 import org.apache.cassandra.sidecar.client.HttpClientConfig;
 import org.apache.cassandra.sidecar.client.SidecarClient;
 import org.apache.cassandra.sidecar.client.SidecarConfig;
@@ -78,7 +79,7 @@ public final class Sidecar
 
     public static SidecarClient from(SidecarInstancesProvider 
sidecarInstancesProvider,
                                      ClientConfig config,
-                                     SslConfig sslConfig) throws IOException
+                                     SecretsProvider secretsProvider) throws 
IOException
     {
         Vertx vertx = Vertx.vertx(new 
VertxOptions().setUseDaemonThread(true).setWorkerPoolSize(config.maxPoolSize()));
 
@@ -90,16 +91,16 @@ public final class Sidecar
                 .maxChunkSize((int) config.maxBufferSize())
                 .userAgent(BuildInfo.READER_USER_AGENT);
 
-        if (sslConfig != null)
+        if (secretsProvider != null)
         {
             builder = builder
                     .ssl(true)
-                    .keyStoreInputStream(sslConfig.keyStoreInputStream())
-                    .keyStorePassword(sslConfig.keyStorePassword())
-                    .keyStoreType(sslConfig.keyStoreType())
-                    .trustStoreInputStream(sslConfig.trustStoreInputStream())
-                    .trustStorePassword(sslConfig.trustStorePassword())
-                    .trustStoreType(sslConfig.trustStoreType());
+                    .keyStoreInputStream(secretsProvider.keyStoreInputStream())
+                    
.keyStorePassword(String.valueOf(secretsProvider.keyStorePassword()))
+                    .keyStoreType(secretsProvider.keyStoreType())
+                    
.trustStoreInputStream(secretsProvider.trustStoreInputStream())
+                    
.trustStorePassword(String.valueOf(secretsProvider.trustStorePassword()))
+                    .trustStoreType(secretsProvider.trustStoreType());
         }
 
         HttpClientConfig httpClientConfig = builder.build();
diff --git 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/secrets/SecretsProvider.java
 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/secrets/SecretsProvider.java
new file mode 100644
index 0000000..a54b5eb
--- /dev/null
+++ 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/secrets/SecretsProvider.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.secrets;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.file.Path;
+import java.util.Map;
+
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * A Secrets provider class
+ */
+public interface SecretsProvider
+{
+    /**
+     * Provides an initialization mechanism for the secrets after the factory 
has created the instance of
+     * the object by passing the {@code environmentOptions}.
+     *
+     * @param environmentOptions the environment options
+     */
+    void initialize(@NotNull Map<String, String> environmentOptions);
+
+    /**
+     * @return true if the keystore secrets have been provided, false otherwise
+     */
+    boolean hasKeyStoreSecrets();
+
+    /**
+     * @return an input stream to the keystore source
+     * @throws IOException when an IO exception occurs
+     */
+    InputStream keyStoreInputStream() throws IOException;
+
+    /**
+     * @return a character array for the keystore password
+     */
+    char[] keyStorePassword();
+
+    /**
+     * @return the keystore type
+     */
+    default String keyStoreType()
+    {
+        return "PKCS12";
+    }
+
+    /**
+     * @return true if the truststore secrets have been provided, false 
otherwise
+     */
+    boolean hasTrustStoreSecrets();
+
+    /**
+     * @return an input stream to the truststore source
+     * @throws IOException when an IO exception occurs
+     */
+    InputStream trustStoreInputStream() throws IOException;
+
+    /**
+     * @return a character array for the truststore password
+     */
+    char[] trustStorePassword();
+
+    /**
+     * @return the truststore type
+     */
+    default String trustStoreType()
+    {
+        return "JKS";
+    }
+
+    /**
+     * Validates the mutual TLS secrets
+     */
+    void validateMutualTLS();
+
+    /**
+     * Returns a secret that is found under the {@code secretName}. This name 
would correspond for example
+     * to a file name under the secrets directory, or to an environment 
variable that contains the secret name.
+     *
+     * @param secretName the name of the secret in the underlying secret 
configuration.
+     * @return a secret that is found under the {@code secretName}
+     */
+    String secretByName(String secretName);
+
+    /**
+     * @return the path for the secrets directory
+     */
+    Path secretsPath();
+}
diff --git 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/clients/SslConfig.java
 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/secrets/SslConfig.java
similarity index 90%
rename from 
cassandra-analytics-core/src/main/java/org/apache/cassandra/clients/SslConfig.java
rename to 
cassandra-analytics-core/src/main/java/org/apache/cassandra/secrets/SslConfig.java
index 2fd72b9..e2b7fbd 100644
--- 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/clients/SslConfig.java
+++ 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/secrets/SslConfig.java
@@ -17,17 +17,12 @@
  * under the License.
  */
 
-package org.apache.cassandra.clients;
+package org.apache.cassandra.secrets;
 
-import java.io.ByteArrayInputStream;
 import java.io.IOException;
-import java.io.InputStream;
 import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
 import java.io.Serializable;
-import java.nio.file.Files;
-import java.nio.file.Paths;
-import java.util.Base64;
 import java.util.Map;
 
 import org.slf4j.Logger;
@@ -46,6 +41,7 @@ public class SslConfig implements Serializable
 {
     private static final long serialVersionUID = -3844712192096436932L;
     private static final Logger LOGGER = 
LoggerFactory.getLogger(SslConfig.class);
+    public static final String SECRETS_PATH = "SECRETS_PATH";
     public static final String KEYSTORE_PATH = "KEYSTORE_PATH";
     public static final String KEYSTORE_BASE64_ENCODED = 
"KEYSTORE_BASE64_ENCODED";
     public static final String KEYSTORE_PASSWORD = "KEYSTORE_PASSWORD";
@@ -56,6 +52,8 @@ public class SslConfig implements Serializable
     public static final String TRUSTSTORE_PASSWORD = "TRUSTSTORE_PASSWORD";
     public static final String TRUSTSTORE_TYPE = "TRUSTSTORE_TYPE";
     public static final String DEFAULT_TRUSTSTORE_TYPE = "JKS";
+
+    protected String secretsPath;
     protected String keyStorePath;
     protected String base64EncodedKeyStore;
     protected String keyStorePassword;
@@ -67,6 +65,7 @@ public class SslConfig implements Serializable
 
     protected SslConfig(Builder<?> builder)
     {
+        secretsPath = builder.secretsPath;
         keyStorePath = builder.keyStorePath;
         base64EncodedKeyStore = builder.base64EncodedKeyStore;
         keyStorePassword = builder.keyStorePassword;
@@ -78,24 +77,11 @@ public class SslConfig implements Serializable
     }
 
     /**
-     * @return an input stream to the keystore source
-     * @throws IOException when an IO exception occurs
+     * @return the path to the secrets directory
      */
-    public InputStream keyStoreInputStream() throws IOException
+    public String secretsPath()
     {
-        if (keyStorePath != null)
-        {
-            return Files.newInputStream(Paths.get(keyStorePath));
-        }
-        else if (base64EncodedKeyStore != null)
-        {
-            return new 
ByteArrayInputStream(Base64.getDecoder().decode(base64EncodedKeyStore));
-        }
-        else
-        {
-            // It should never reach here
-            throw new RuntimeException("keyStorePath or encodedKeyStore must 
be provided");
-        }
+        return secretsPath;
     }
 
     /**
@@ -130,23 +116,6 @@ public class SslConfig implements Serializable
         return keyStoreType != null ? keyStoreType : DEFAULT_KEYSTORE_TYPE;
     }
 
-    /**
-     * @return an input stream to the truststore source
-     * @throws IOException when an IO exception occurs
-     */
-    public InputStream trustStoreInputStream() throws IOException
-    {
-        if (trustStorePath != null)
-        {
-            return Files.newInputStream(Paths.get(trustStorePath));
-        }
-        else if (base64EncodedTrustStore != null)
-        {
-            return new 
ByteArrayInputStream(Base64.getDecoder().decode(base64EncodedTrustStore));
-        }
-        return null;
-    }
-
     /**
      * @return the path to the trust store
      */
@@ -184,6 +153,7 @@ public class SslConfig implements Serializable
     private void readObject(ObjectInputStream in) throws IOException, 
ClassNotFoundException
     {
         LOGGER.warn("Falling back to JDK deserialization");
+        this.secretsPath = readNullableString(in);
         this.keyStorePath = readNullableString(in);
         this.base64EncodedKeyStore = readNullableString(in);
         this.keyStorePassword = readNullableString(in);
@@ -197,6 +167,7 @@ public class SslConfig implements Serializable
     private void writeObject(ObjectOutputStream out) throws IOException
     {
         LOGGER.warn("Falling back to JDK serialization");
+        writeNullableString(out, secretsPath);
         writeNullableString(out, keyStorePath);
         writeNullableString(out, base64EncodedKeyStore);
         writeNullableString(out, keyStorePassword);
@@ -232,6 +203,7 @@ public class SslConfig implements Serializable
         public SslConfig read(Kryo kryo, Input in, Class type)
         {
             return new Builder<>()
+                   .secretsPath(in.readString())
                    .keyStorePath(in.readString())
                    .base64EncodedKeyStore(in.readString())
                    .keyStorePassword(in.readString())
@@ -245,6 +217,7 @@ public class SslConfig implements Serializable
 
         public void write(Kryo kryo, Output out, SslConfig config)
         {
+            out.writeString(config.secretsPath);
             out.writeString(config.keyStorePath);
             out.writeString(config.base64EncodedKeyStore);
             out.writeString(config.keyStorePassword);
@@ -259,6 +232,7 @@ public class SslConfig implements Serializable
     @Nullable
     public static SslConfig create(Map<String, String> options)
     {
+        String secretsPath = MapUtils.getOrDefault(options, SECRETS_PATH, 
null);
         String keyStorePath = MapUtils.getOrDefault(options, KEYSTORE_PATH, 
null);
         String encodedKeyStore = MapUtils.getOrDefault(options, 
KEYSTORE_BASE64_ENCODED, null);
         String keyStorePassword = MapUtils.getOrDefault(options, 
KEYSTORE_PASSWORD, null);
@@ -269,16 +243,18 @@ public class SslConfig implements Serializable
         String trustStoreType = MapUtils.getOrDefault(options, 
TRUSTSTORE_TYPE, null);
 
         // If any of the values are provided we try to create a valid 
SecretsConfig object
-        if (keyStorePath != null
-                || encodedKeyStore != null
-                || keyStorePassword != null
-                || keyStoreType != null
-                || trustStorePath != null
-                || encodedTrustStore != null
-                || trustStorePassword != null
-                || trustStoreType != null)
+        if (secretsPath != null
+            || keyStorePath != null
+            || encodedKeyStore != null
+            || keyStorePassword != null
+            || keyStoreType != null
+            || trustStorePath != null
+            || encodedTrustStore != null
+            || trustStorePassword != null
+            || trustStoreType != null)
         {
             Builder<?> validatedConfig = new Builder<>()
+                                         .secretsPath(secretsPath)
                                          .keyStorePath(keyStorePath)
                                          
.base64EncodedKeyStore(encodedKeyStore)
                                          .keyStorePassword(keyStorePassword)
@@ -303,6 +279,7 @@ public class SslConfig implements Serializable
      */
     public static class Builder<T extends SslConfig.Builder<T>>
     {
+        protected String secretsPath;
         protected String keyStorePath;
         protected String base64EncodedKeyStore;
         protected String keyStorePassword;
@@ -321,6 +298,18 @@ public class SslConfig implements Serializable
             return (T) this;
         }
 
+        /**
+         * Sets the {@code secretsPath} and returns a reference to this 
Builder enabling method chaining
+         *
+         * @param secretsPath the {@code secretsPath} to set
+         * @return a reference to this Builder
+         */
+        public T secretsPath(String secretsPath)
+        {
+            this.secretsPath = secretsPath;
+            return self();
+        }
+
         /**
          * Sets the {@code keyStorePath} and returns a reference to this 
Builder enabling method chaining
          *
@@ -470,7 +459,7 @@ public class SslConfig implements Serializable
                 if (trustStorePath == null && base64EncodedTrustStore == null)
                 {
                     throw new IllegalArgumentException(
-                            String.format("One of the '%s' or '%s' options 
must be provided when the '%s' option is provided",
+                           String.format("One of the '%s' or '%s' options must 
be provided when the '%s' option is provided",
                                           TRUSTSTORE_PATH, 
TRUSTSTORE_BASE64_ENCODED, TRUSTSTORE_PASSWORD));
                 }
             }
diff --git 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/secrets/SslConfigSecretsProvider.java
 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/secrets/SslConfigSecretsProvider.java
new file mode 100644
index 0000000..dfdb8c0
--- /dev/null
+++ 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/secrets/SslConfigSecretsProvider.java
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.secrets;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.Base64;
+import java.util.Map;
+import java.util.Objects;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * A {@link SecretsProvider} implementation based on the SSL configuration.
+ */
+public class SslConfigSecretsProvider implements SecretsProvider
+{
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(SslConfigSecretsProvider.class);
+    private final SslConfig config;
+
+    public SslConfigSecretsProvider(SslConfig config)
+    {
+        this.config = config;
+    }
+
+    @Override
+    public void initialize(@NotNull Map<String, String> environmentOptions)
+    {
+        // Do nothing
+    }
+
+    @Override
+    public boolean hasKeyStoreSecrets()
+    {
+        String keyStorePath = config.keyStorePath();
+        if (keyStorePath != null)
+        {
+            return Files.exists(Paths.get(keyStorePath)) && keyStorePassword() 
!= null;
+        }
+        else
+        {
+            return config.base64EncodedKeyStore() != null && 
!config.base64EncodedKeyStore().isEmpty();
+        }
+    }
+
+    @Override
+    public InputStream keyStoreInputStream() throws IOException
+    {
+        if (config.keyStorePath() != null)
+        {
+            return Files.newInputStream(Paths.get(config.keyStorePath()));
+        }
+        else if (config.base64EncodedKeyStore() != null)
+        {
+            return new 
ByteArrayInputStream(Base64.getDecoder().decode(config.base64EncodedKeyStore()));
+        }
+        // it should never reach here
+        throw new RuntimeException("keyStorePath or encodedKeyStore must be 
provided");
+    }
+
+    @Override
+    public char[] keyStorePassword()
+    {
+        return Objects.requireNonNull(config.keyStorePassword(), 
"keyStorePassword must be not null").toCharArray();
+    }
+
+    @Override
+    public String keyStoreType()
+    {
+        return config.keyStoreType();
+    }
+
+    @Override
+    public boolean hasTrustStoreSecrets()
+    {
+        String trustStorePath = config.trustStorePath();
+        if (trustStorePath != null)
+        {
+            return Files.exists(Paths.get(trustStorePath)) && 
trustStorePassword() != null;
+        }
+        return config.base64EncodedTrustStore() != null && 
!config.base64EncodedTrustStore().isEmpty();
+    }
+
+    @Override
+    public InputStream trustStoreInputStream() throws IOException
+    {
+        if (config.trustStorePath() != null)
+        {
+            return Files.newInputStream(Paths.get(config.trustStorePath()));
+        }
+        else if (config.base64EncodedTrustStore() != null)
+        {
+            return new 
ByteArrayInputStream(Base64.getDecoder().decode(config.base64EncodedTrustStore()));
+        }
+        return null;
+    }
+
+    @Override
+    public char[] trustStorePassword()
+    {
+        String password = config.trustStorePassword();
+        return password != null ? password.toCharArray() : null;
+    }
+
+    @Override
+    public String trustStoreType()
+    {
+        return config.trustStoreType();
+    }
+
+    @Override
+    public void validateMutualTLS()
+    {
+        boolean fail = false;
+
+        String keyStorePath = config.keyStorePath();
+        if (keyStorePath != null)
+        {
+            if (!Files.exists(Paths.get(keyStorePath)))
+            {
+                LOGGER.warn("Provided keystore path option does not exist in 
the file system keystorePath={}", keyStorePath);
+                fail = true;
+            }
+        }
+        else if (config.base64EncodedKeyStore() == null || 
config.base64EncodedKeyStore().isEmpty())
+        {
+            LOGGER.warn("Neither keystore path or encoded keystore options 
were provided");
+            fail = true;
+        }
+
+        if (keyStorePassword() == null)
+        {
+            LOGGER.warn("No keystore password option provided");
+        }
+
+        String trustStorePath = config.trustStorePath();
+        if (trustStorePath != null && !Files.exists(Paths.get(trustStorePath)))
+        {
+            LOGGER.warn("Provided truststore path option does not exist in the 
file system trustStorePath={}", trustStorePath);
+            fail = true;
+        }
+
+        if ((trustStorePath != null || config.base64EncodedTrustStore() != 
null) && trustStorePassword() == null)
+        {
+            LOGGER.warn("No truststore password option provided");
+            fail = true;
+        }
+
+        if (fail)
+        {
+            throw new RuntimeException("No valid keystore/password provided in 
options");
+        }
+    }
+
+    @Override
+    public String secretByName(String secretName)
+    {
+        throw new UnsupportedOperationException("Currently not supported");
+    }
+
+    @Override
+    public Path secretsPath()
+    {
+        return config.secretsPath() != null ? Paths.get(config.secretsPath()) 
: null;
+    }
+
+    @Override
+    public String toString()
+    {
+        return "SecretsConfigProvider{"
+               + "config=" + config
+               + '}';
+    }
+}
diff --git 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/KryoRegister.java
 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/KryoRegister.java
index d00b705..54d14bf 100644
--- 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/KryoRegister.java
+++ 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/KryoRegister.java
@@ -35,7 +35,7 @@ import org.apache.cassandra.bridge.BigNumberConfigImpl;
 import org.apache.cassandra.bridge.CassandraBridgeFactory;
 import org.apache.cassandra.bridge.CassandraVersion;
 import org.apache.cassandra.clients.SidecarInstanceImpl;
-import org.apache.cassandra.clients.SslConfig;
+import org.apache.cassandra.secrets.SslConfig;
 import org.apache.cassandra.spark.data.CassandraDataLayer;
 import org.apache.cassandra.spark.data.LocalDataLayer;
 import org.apache.cassandra.spark.data.ReplicationFactor;
diff --git 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/CassandraDataLayer.java
 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/CassandraDataLayer.java
index 259e468..fdc51bc 100644
--- 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/CassandraDataLayer.java
+++ 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/CassandraDataLayer.java
@@ -69,7 +69,8 @@ import org.apache.cassandra.bridge.CassandraVersion;
 import org.apache.cassandra.clients.ExecutorHolder;
 import org.apache.cassandra.clients.Sidecar;
 import org.apache.cassandra.clients.SidecarInstanceImpl;
-import org.apache.cassandra.clients.SslConfig;
+import org.apache.cassandra.secrets.SslConfig;
+import org.apache.cassandra.secrets.SslConfigSecretsProvider;
 import org.apache.cassandra.sidecar.client.SidecarClient;
 import org.apache.cassandra.sidecar.client.SidecarInstance;
 import org.apache.cassandra.sidecar.client.SimpleSidecarInstancesProvider;
@@ -390,7 +391,7 @@ public class CassandraDataLayer extends 
PartitionedDataLayer implements Serializ
         {
             sidecar = Sidecar.from(new SimpleSidecarInstancesProvider(new 
ArrayList<>(clusterConfig)),
                                    sidecarClientConfig,
-                                   sslConfig);
+                                   new SslConfigSecretsProvider(sslConfig));
         }
         catch (IOException ioException)
         {
diff --git 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/CassandraDataSourceHelper.java
 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/CassandraDataSourceHelper.java
index 5617c05..7f00e0a 100644
--- 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/CassandraDataSourceHelper.java
+++ 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/CassandraDataSourceHelper.java
@@ -33,7 +33,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.clients.Sidecar;
-import org.apache.cassandra.clients.SslConfig;
+import org.apache.cassandra.secrets.SslConfig;
 import org.apache.cassandra.spark.utils.MapUtils;
 import org.apache.spark.SparkConf;
 import org.apache.spark.SparkContext;
diff --git 
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/secrets/SslConfigSecretsProviderTest.java
 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/secrets/SslConfigSecretsProviderTest.java
new file mode 100644
index 0000000..a56d8e1
--- /dev/null
+++ 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/secrets/SslConfigSecretsProviderTest.java
@@ -0,0 +1,270 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.secrets;
+
+import java.io.BufferedInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.nio.file.StandardCopyOption;
+import java.security.GeneralSecurityException;
+import java.security.KeyStore;
+import java.util.Base64;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Objects;
+import java.util.TreeMap;
+import javax.net.ssl.KeyManagerFactory;
+import javax.net.ssl.TrustManagerFactory;
+
+import org.apache.commons.io.IOUtils;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class SslConfigSecretsProviderTest
+{
+    @Rule
+    public TemporaryFolder folder = new TemporaryFolder();
+
+    String keyStorePassword;
+    String trustStorePassword;
+
+    @Before
+    public void setup() throws IOException
+    {
+        Path path = folder.newFolder("secrets-config").toPath();
+
+        try (InputStream stream = 
getClass().getResourceAsStream("/secrets/fakecerts/client-keystore.p12"))
+        {
+            Files.copy(Objects.requireNonNull(stream), 
path.resolve("keystore.p12"), StandardCopyOption.REPLACE_EXISTING);
+        }
+
+        try (InputStream stream = 
getClass().getResourceAsStream("/secrets/fakecerts/client-truststore.jks"))
+        {
+            Files.copy(Objects.requireNonNull(stream), 
path.resolve("truststore.jks"), StandardCopyOption.REPLACE_EXISTING);
+        }
+
+        keyStorePassword = 
readPassword("/secrets/fakecerts/client-keystore-password");
+        trustStorePassword = 
readPassword("/secrets/fakecerts/client-truststore-password");
+    }
+
+    @Test
+    public void testInitializationSucceeds()
+    {
+        SslConfig config = new SslConfig.Builder<>().build();
+        SecretsProvider secretsConfigProvider = new 
SslConfigSecretsProvider(config);
+        assertNotNull(secretsConfigProvider);
+    }
+
+    @Test
+    public void testSuccessWithCertsPaths() throws IOException
+    {
+        Map<String, String> jobOptions = new 
TreeMap<>(String.CASE_INSENSITIVE_ORDER);
+        jobOptions.put(SslConfig.KEYSTORE_PATH, 
folder.getRoot().getAbsolutePath() + "/secrets-config/keystore.p12");
+        jobOptions.put(SslConfig.KEYSTORE_PASSWORD, keyStorePassword);
+        jobOptions.put(SslConfig.KEYSTORE_TYPE, "PKCS12");
+        jobOptions.put(SslConfig.TRUSTSTORE_PATH, 
folder.getRoot().getAbsolutePath() + "/secrets-config/truststore.jks");
+        jobOptions.put(SslConfig.TRUSTSTORE_PASSWORD, trustStorePassword);
+        jobOptions.put(SslConfig.TRUSTSTORE_TYPE, "JKS");
+
+        SslConfig config = SslConfig.create(jobOptions);
+        SecretsProvider provider = new SslConfigSecretsProvider(config);
+        provider.initialize(Collections.emptyMap());
+
+        assertTrue(provider.hasKeyStoreSecrets());
+        long totalRead = fullyReadStream(provider.keyStoreInputStream()); // 
Read keyStore
+        assertTrue(totalRead > 0);
+        assertEquals("PKCS12", provider.keyStoreType());
+        assertEquals(keyStorePassword, new 
String(provider.keyStorePassword()));
+        assertTrue(provider.hasTrustStoreSecrets());
+        totalRead = fullyReadStream(provider.trustStoreInputStream()); // Read 
trustStore
+        assertTrue(totalRead > 0);
+        assertEquals("JKS", provider.trustStoreType());
+        assertEquals(trustStorePassword, new 
String(provider.trustStorePassword()));
+        provider.validateMutualTLS();
+
+        try
+        {
+            buildSslConfig(provider);
+        }
+        catch (GeneralSecurityException e)
+        {
+            fail(e.getMessage());
+        }
+    }
+
+    @Test
+    public void testSuccessWithEncodedCerts() throws IOException
+    {
+        Map<String, String> jobOptions = new 
TreeMap<>(String.CASE_INSENSITIVE_ORDER);
+        String keyStorePath = folder.getRoot().getAbsolutePath() + 
"/secrets-config/keystore.p12";
+        String encodedKeyStore = 
Base64.getEncoder().encodeToString(Files.readAllBytes(Paths.get(keyStorePath)));
+        String trustStorePath = folder.getRoot().getAbsolutePath() + 
"/secrets-config/truststore.jks";
+        String encodedTrustStore = 
Base64.getEncoder().encodeToString(Files.readAllBytes(Paths.get(trustStorePath)));
+
+        jobOptions.put(SslConfig.KEYSTORE_BASE64_ENCODED, encodedKeyStore);
+        jobOptions.put(SslConfig.KEYSTORE_PASSWORD, keyStorePassword);
+        jobOptions.put(SslConfig.KEYSTORE_TYPE, "PKCS12");
+        jobOptions.put(SslConfig.TRUSTSTORE_BASE64_ENCODED, encodedTrustStore);
+        jobOptions.put(SslConfig.TRUSTSTORE_PASSWORD, trustStorePassword);
+        jobOptions.put(SslConfig.TRUSTSTORE_TYPE, "JKS");
+
+        SslConfig config = SslConfig.create(jobOptions);
+        SecretsProvider provider = new SslConfigSecretsProvider(config);
+        provider.initialize(Collections.emptyMap());
+
+        assertTrue(provider.hasKeyStoreSecrets());
+        long totalRead = fullyReadStream(provider.keyStoreInputStream()); // 
Read keyStore
+        assertTrue(totalRead > 0);
+        assertEquals("PKCS12", provider.keyStoreType());
+        assertEquals(keyStorePassword, new 
String(provider.keyStorePassword()));
+        assertTrue(provider.hasTrustStoreSecrets());
+        totalRead = fullyReadStream(provider.trustStoreInputStream()); // Read 
trustStore
+        assertTrue(totalRead > 0);
+        assertEquals("JKS", provider.trustStoreType());
+        assertEquals(trustStorePassword, new 
String(provider.trustStorePassword()));
+        provider.validateMutualTLS();
+
+        try
+        {
+            buildSslConfig(provider);
+        }
+        catch (GeneralSecurityException e)
+        {
+            fail(e.getMessage());
+        }
+    }
+
+    @Test
+    public void testInvalidPathToKeyStore()
+    {
+        Map<String, String> jobOptions = new 
TreeMap<>(String.CASE_INSENSITIVE_ORDER);
+        jobOptions.put(SslConfig.KEYSTORE_PATH, 
folder.getRoot().getAbsolutePath() + 
"/secrets-config/non-existent-keystore.p12");
+        jobOptions.put(SslConfig.KEYSTORE_PASSWORD, keyStorePassword);
+        jobOptions.put(SslConfig.KEYSTORE_TYPE, "PKCS12");
+        jobOptions.put(SslConfig.TRUSTSTORE_PATH, 
folder.getRoot().getAbsolutePath() + "/secrets-config/truststore.jks");
+        jobOptions.put(SslConfig.TRUSTSTORE_PASSWORD, trustStorePassword);
+        jobOptions.put(SslConfig.TRUSTSTORE_TYPE, "JKS");
+
+        SslConfig config = SslConfig.create(jobOptions);
+        SecretsProvider provider = new SslConfigSecretsProvider(config);
+        provider.initialize(Collections.emptyMap());
+
+        assertFalse(provider.hasKeyStoreSecrets());
+        try
+        {
+            provider.validateMutualTLS();
+            fail("it should fail when the path is invalid");
+        }
+        catch (RuntimeException e)
+        {
+            assertEquals("No valid keystore/password provided in options", 
e.getMessage());
+        }
+    }
+
+    @Test
+    public void testInvalidPathToTrustStore()
+    {
+        Map<String, String> jobOptions = new 
TreeMap<>(String.CASE_INSENSITIVE_ORDER);
+        jobOptions.put(SslConfig.KEYSTORE_PATH, 
folder.getRoot().getAbsolutePath() + "/secrets-config/keystore.p12");
+        jobOptions.put(SslConfig.KEYSTORE_PASSWORD, keyStorePassword);
+        jobOptions.put(SslConfig.KEYSTORE_TYPE, "PKCS12");
+        jobOptions.put(SslConfig.TRUSTSTORE_PATH, 
folder.getRoot().getAbsolutePath() + 
"/secrets-config/non-existent-truststore.jks");
+        jobOptions.put(SslConfig.TRUSTSTORE_PASSWORD, trustStorePassword);
+        jobOptions.put(SslConfig.TRUSTSTORE_TYPE, "JKS");
+
+        SslConfig config = SslConfig.create(jobOptions);
+        SecretsProvider provider = new SslConfigSecretsProvider(config);
+        provider.initialize(Collections.emptyMap());
+
+        assertFalse(provider.hasTrustStoreSecrets());
+        try
+        {
+            provider.validateMutualTLS();
+            fail("it should fail when the path is invalid");
+        }
+        catch (RuntimeException e)
+        {
+            assertEquals("No valid keystore/password provided in options", 
e.getMessage());
+        }
+    }
+
+    private void buildSslConfig(SecretsProvider provider) throws IOException, 
GeneralSecurityException
+    {
+        provider.validateMutualTLS();
+
+        KeyManagerFactory kmf = 
KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());
+        KeyStore keystore = KeyStore.getInstance(provider.keyStoreType());
+        char[] keyStorePassAr = provider.keyStorePassword();
+
+        if (keyStorePassAr == null)
+        {
+            // Support empty passwords
+            keyStorePassAr = new char[]{};
+        }
+
+        try (InputStream ksf = new 
BufferedInputStream((provider.keyStoreInputStream())))
+        {
+            keystore.load(ksf, keyStorePassAr);
+            kmf.init(keystore, keyStorePassAr);
+        }
+
+        // load trust store from secrets or classpath
+        TrustManagerFactory trustManagerFactory = 
TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
+        KeyStore trustKeystore = 
KeyStore.getInstance(provider.trustStoreType());
+        try (InputStream ksf = new 
BufferedInputStream(provider.trustStoreInputStream()))
+        {
+            trustKeystore.load(ksf, provider.trustStorePassword());
+        }
+        trustManagerFactory.init(trustKeystore);
+    }
+
+    protected long fullyReadStream(InputStream inputStream) throws IOException
+    {
+        long totalRead = 0;
+        byte[] buffer = new byte[1024];
+        try (InputStream stream = inputStream)
+        {
+            int read;
+            while ((read = stream.read(buffer, 0, buffer.length)) != -1)
+            {
+                totalRead += read;
+            }
+        }
+        return totalRead;
+    }
+
+    private String readPassword(String resourceName) throws IOException
+    {
+        return 
IOUtils.toString(Objects.requireNonNull(getClass().getResourceAsStream(resourceName)),
 StandardCharsets.UTF_8)
+                      .trim();
+    }
+}
diff --git 
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/clients/SslConfigTest.java
 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/secrets/SslConfigTest.java
similarity index 94%
rename from 
cassandra-analytics-core/src/test/java/org/apache/cassandra/clients/SslConfigTest.java
rename to 
cassandra-analytics-core/src/test/java/org/apache/cassandra/secrets/SslConfigTest.java
index d39a3a9..b1c6ba4 100644
--- 
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/clients/SslConfigTest.java
+++ 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/secrets/SslConfigTest.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.cassandra.clients;
+package org.apache.cassandra.secrets;
 
 import java.util.Collections;
 import java.util.Map;
@@ -25,12 +25,12 @@ import java.util.TreeMap;
 
 import org.junit.Test;
 
-import static org.apache.cassandra.clients.SslConfig.KEYSTORE_BASE64_ENCODED;
-import static org.apache.cassandra.clients.SslConfig.KEYSTORE_PASSWORD;
-import static org.apache.cassandra.clients.SslConfig.KEYSTORE_PATH;
-import static org.apache.cassandra.clients.SslConfig.TRUSTSTORE_BASE64_ENCODED;
-import static org.apache.cassandra.clients.SslConfig.TRUSTSTORE_PASSWORD;
-import static org.apache.cassandra.clients.SslConfig.TRUSTSTORE_PATH;
+import static org.apache.cassandra.secrets.SslConfig.KEYSTORE_BASE64_ENCODED;
+import static org.apache.cassandra.secrets.SslConfig.KEYSTORE_PASSWORD;
+import static org.apache.cassandra.secrets.SslConfig.KEYSTORE_PATH;
+import static org.apache.cassandra.secrets.SslConfig.TRUSTSTORE_BASE64_ENCODED;
+import static org.apache.cassandra.secrets.SslConfig.TRUSTSTORE_PASSWORD;
+import static org.apache.cassandra.secrets.SslConfig.TRUSTSTORE_PATH;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
diff --git 
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/KryoSerializationTests.java
 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/KryoSerializationTests.java
index 9cb6e44..a23206f 100644
--- 
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/KryoSerializationTests.java
+++ 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/KryoSerializationTests.java
@@ -32,7 +32,7 @@ import com.esotericsoftware.kryo.Kryo;
 import com.esotericsoftware.kryo.io.Input;
 import com.esotericsoftware.kryo.io.Output;
 import org.apache.cassandra.bridge.CassandraVersion;
-import org.apache.cassandra.clients.SslConfig;
+import org.apache.cassandra.secrets.SslConfig;
 import org.apache.cassandra.spark.data.CqlField;
 import org.apache.cassandra.spark.data.CqlTable;
 import org.apache.cassandra.spark.data.LocalDataLayer;
diff --git 
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/data/CassandraDataSourceHelperCacheTest.java
 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/data/CassandraDataSourceHelperCacheTest.java
index 4c344ec..80532ab 100644
--- 
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/data/CassandraDataSourceHelperCacheTest.java
+++ 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/data/CassandraDataSourceHelperCacheTest.java
@@ -35,7 +35,7 @@ import org.junit.Before;
 import org.junit.Test;
 
 import org.apache.cassandra.clients.Sidecar;
-import org.apache.cassandra.clients.SslConfig;
+import org.apache.cassandra.secrets.SslConfig;
 
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNotSame;
diff --git 
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/data/partitioner/JDKSerializationTests.java
 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/data/partitioner/JDKSerializationTests.java
index dc866b5..e1b9dcd 100644
--- 
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/data/partitioner/JDKSerializationTests.java
+++ 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/data/partitioner/JDKSerializationTests.java
@@ -40,7 +40,7 @@ import org.junit.Test;
 import org.apache.cassandra.bridge.CassandraBridge;
 import org.apache.cassandra.bridge.CassandraBridgeFactory;
 import org.apache.cassandra.bridge.CassandraVersion;
-import org.apache.cassandra.clients.SslConfig;
+import org.apache.cassandra.secrets.SslConfig;
 import org.apache.cassandra.spark.TestUtils;
 import org.apache.cassandra.spark.cdc.CommitLog;
 import org.apache.cassandra.spark.cdc.CommitLogProvider;
diff --git 
a/cassandra-analytics-core/src/test/resources/secrets/fakecerts/client-keystore-password
 
b/cassandra-analytics-core/src/test/resources/secrets/fakecerts/client-keystore-password
new file mode 100644
index 0000000..8982f8c
--- /dev/null
+++ 
b/cassandra-analytics-core/src/test/resources/secrets/fakecerts/client-keystore-password
@@ -0,0 +1 @@
+cassandra-analytics
diff --git 
a/cassandra-analytics-core/src/test/resources/secrets/fakecerts/client-keystore.p12
 
b/cassandra-analytics-core/src/test/resources/secrets/fakecerts/client-keystore.p12
new file mode 100644
index 0000000..18719e1
Binary files /dev/null and 
b/cassandra-analytics-core/src/test/resources/secrets/fakecerts/client-keystore.p12
 differ
diff --git 
a/cassandra-analytics-core/src/test/resources/secrets/fakecerts/client-truststore-password
 
b/cassandra-analytics-core/src/test/resources/secrets/fakecerts/client-truststore-password
new file mode 100644
index 0000000..8982f8c
--- /dev/null
+++ 
b/cassandra-analytics-core/src/test/resources/secrets/fakecerts/client-truststore-password
@@ -0,0 +1 @@
+cassandra-analytics
diff --git 
a/cassandra-analytics-core/src/test/resources/secrets/fakecerts/client-truststore.jks
 
b/cassandra-analytics-core/src/test/resources/secrets/fakecerts/client-truststore.jks
new file mode 100644
index 0000000..e00acc5
Binary files /dev/null and 
b/cassandra-analytics-core/src/test/resources/secrets/fakecerts/client-truststore.jks
 differ


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to