This is an automated email from the ASF dual-hosted git repository.
ycai pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-analytics.git
The following commit(s) were added to refs/heads/trunk by this push:
new b87b0ed CASSANDRA-18545: Provide a SecretsProvider interface to
abstract the secret provisioning
b87b0ed is described below
commit b87b0edd310d1ef93c507bbbb1ae51e1b0b319c6
Author: Francisco Guerrero <[email protected]>
AuthorDate: Tue May 23 13:56:48 2023 -0700
CASSANDRA-18545: Provide a SecretsProvider interface to abstract the secret
provisioning
This commit introduces the SecretsProvider interface that abstracts the
secrets provisioning.
This way different implementations of the SecretsProvider can be used to
provide SSL secrets
for the Analytics job. We provide an implementation,
SslConficSecretsProvider, which provides
secrets based on the configuration for the job.
Patch by Francisco Guerrero; Reviewed by Dinesh Joshi, Yifan Cai for
CASSANDRA-18545
---
.../java/org/apache/cassandra/clients/Sidecar.java | 17 +-
.../apache/cassandra/secrets/SecretsProvider.java | 108 +++++++++
.../cassandra/{clients => secrets}/SslConfig.java | 85 +++----
.../secrets/SslConfigSecretsProvider.java | 198 +++++++++++++++
.../org/apache/cassandra/spark/KryoRegister.java | 2 +-
.../cassandra/spark/data/CassandraDataLayer.java | 5 +-
.../spark/data/CassandraDataSourceHelper.java | 2 +-
.../secrets/SslConfigSecretsProviderTest.java | 270 +++++++++++++++++++++
.../{clients => secrets}/SslConfigTest.java | 14 +-
.../cassandra/spark/KryoSerializationTests.java | 2 +-
.../data/CassandraDataSourceHelperCacheTest.java | 2 +-
.../data/partitioner/JDKSerializationTests.java | 2 +-
.../secrets/fakecerts/client-keystore-password | 1 +
.../secrets/fakecerts/client-keystore.p12 | Bin 0 -> 2576 bytes
.../secrets/fakecerts/client-truststore-password | 1 +
.../secrets/fakecerts/client-truststore.jks | Bin 0 -> 1094 bytes
16 files changed, 639 insertions(+), 70 deletions(-)
diff --git
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/clients/Sidecar.java
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/clients/Sidecar.java
index 71d8f43..bac09e2 100644
---
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/clients/Sidecar.java
+++
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/clients/Sidecar.java
@@ -37,6 +37,7 @@ import org.slf4j.LoggerFactory;
import o.a.c.sidecar.client.shaded.io.vertx.core.Vertx;
import o.a.c.sidecar.client.shaded.io.vertx.core.VertxOptions;
+import org.apache.cassandra.secrets.SecretsProvider;
import org.apache.cassandra.sidecar.client.HttpClientConfig;
import org.apache.cassandra.sidecar.client.SidecarClient;
import org.apache.cassandra.sidecar.client.SidecarConfig;
@@ -78,7 +79,7 @@ public final class Sidecar
public static SidecarClient from(SidecarInstancesProvider
sidecarInstancesProvider,
ClientConfig config,
- SslConfig sslConfig) throws IOException
+ SecretsProvider secretsProvider) throws
IOException
{
Vertx vertx = Vertx.vertx(new
VertxOptions().setUseDaemonThread(true).setWorkerPoolSize(config.maxPoolSize()));
@@ -90,16 +91,16 @@ public final class Sidecar
.maxChunkSize((int) config.maxBufferSize())
.userAgent(BuildInfo.READER_USER_AGENT);
- if (sslConfig != null)
+ if (secretsProvider != null)
{
builder = builder
.ssl(true)
- .keyStoreInputStream(sslConfig.keyStoreInputStream())
- .keyStorePassword(sslConfig.keyStorePassword())
- .keyStoreType(sslConfig.keyStoreType())
- .trustStoreInputStream(sslConfig.trustStoreInputStream())
- .trustStorePassword(sslConfig.trustStorePassword())
- .trustStoreType(sslConfig.trustStoreType());
+ .keyStoreInputStream(secretsProvider.keyStoreInputStream())
+
.keyStorePassword(String.valueOf(secretsProvider.keyStorePassword()))
+ .keyStoreType(secretsProvider.keyStoreType())
+
.trustStoreInputStream(secretsProvider.trustStoreInputStream())
+
.trustStorePassword(String.valueOf(secretsProvider.trustStorePassword()))
+ .trustStoreType(secretsProvider.trustStoreType());
}
HttpClientConfig httpClientConfig = builder.build();
diff --git
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/secrets/SecretsProvider.java
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/secrets/SecretsProvider.java
new file mode 100644
index 0000000..a54b5eb
--- /dev/null
+++
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/secrets/SecretsProvider.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.secrets;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.file.Path;
+import java.util.Map;
+
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * A Secrets provider class
+ */
+public interface SecretsProvider
+{
+ /**
+ * Provides an initialization mechanism for the secrets after the factory
has created the instance of
+ * the object by passing the {@code environmentOptions}.
+ *
+ * @param environmentOptions the environment options
+ */
+ void initialize(@NotNull Map<String, String> environmentOptions);
+
+ /**
+ * @return true if the keystore secrets have been provided, false otherwise
+ */
+ boolean hasKeyStoreSecrets();
+
+ /**
+ * @return an input stream to the keystore source
+ * @throws IOException when an IO exception occurs
+ */
+ InputStream keyStoreInputStream() throws IOException;
+
+ /**
+ * @return a character array for the keystore password
+ */
+ char[] keyStorePassword();
+
+ /**
+ * @return the keystore type
+ */
+ default String keyStoreType()
+ {
+ return "PKCS12";
+ }
+
+ /**
+ * @return true if the truststore secrets have been provided, false
otherwise
+ */
+ boolean hasTrustStoreSecrets();
+
+ /**
+ * @return an input stream to the truststore source
+ * @throws IOException when an IO exception occurs
+ */
+ InputStream trustStoreInputStream() throws IOException;
+
+ /**
+ * @return a character array for the truststore password
+ */
+ char[] trustStorePassword();
+
+ /**
+ * @return the truststore type
+ */
+ default String trustStoreType()
+ {
+ return "JKS";
+ }
+
+ /**
+ * Validates the mutual TLS secrets
+ */
+ void validateMutualTLS();
+
+ /**
+ * Returns a secret that is found under the {@code secretName}. This name
would correspond for example
+ * to a file name under the secrets directory, or to an environment
variable that contains the secret name.
+ *
+ * @param secretName the name of the secret in the underlying secret
configuration.
+ * @return a secret that is found under the {@code secretName}
+ */
+ String secretByName(String secretName);
+
+ /**
+ * @return the path for the secrets directory
+ */
+ Path secretsPath();
+}
diff --git
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/clients/SslConfig.java
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/secrets/SslConfig.java
similarity index 90%
rename from
cassandra-analytics-core/src/main/java/org/apache/cassandra/clients/SslConfig.java
rename to
cassandra-analytics-core/src/main/java/org/apache/cassandra/secrets/SslConfig.java
index 2fd72b9..e2b7fbd 100644
---
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/clients/SslConfig.java
+++
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/secrets/SslConfig.java
@@ -17,17 +17,12 @@
* under the License.
*/
-package org.apache.cassandra.clients;
+package org.apache.cassandra.secrets;
-import java.io.ByteArrayInputStream;
import java.io.IOException;
-import java.io.InputStream;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.Serializable;
-import java.nio.file.Files;
-import java.nio.file.Paths;
-import java.util.Base64;
import java.util.Map;
import org.slf4j.Logger;
@@ -46,6 +41,7 @@ public class SslConfig implements Serializable
{
private static final long serialVersionUID = -3844712192096436932L;
private static final Logger LOGGER =
LoggerFactory.getLogger(SslConfig.class);
+ public static final String SECRETS_PATH = "SECRETS_PATH";
public static final String KEYSTORE_PATH = "KEYSTORE_PATH";
public static final String KEYSTORE_BASE64_ENCODED =
"KEYSTORE_BASE64_ENCODED";
public static final String KEYSTORE_PASSWORD = "KEYSTORE_PASSWORD";
@@ -56,6 +52,8 @@ public class SslConfig implements Serializable
public static final String TRUSTSTORE_PASSWORD = "TRUSTSTORE_PASSWORD";
public static final String TRUSTSTORE_TYPE = "TRUSTSTORE_TYPE";
public static final String DEFAULT_TRUSTSTORE_TYPE = "JKS";
+
+ protected String secretsPath;
protected String keyStorePath;
protected String base64EncodedKeyStore;
protected String keyStorePassword;
@@ -67,6 +65,7 @@ public class SslConfig implements Serializable
protected SslConfig(Builder<?> builder)
{
+ secretsPath = builder.secretsPath;
keyStorePath = builder.keyStorePath;
base64EncodedKeyStore = builder.base64EncodedKeyStore;
keyStorePassword = builder.keyStorePassword;
@@ -78,24 +77,11 @@ public class SslConfig implements Serializable
}
/**
- * @return an input stream to the keystore source
- * @throws IOException when an IO exception occurs
+ * @return the path to the secrets directory
*/
- public InputStream keyStoreInputStream() throws IOException
+ public String secretsPath()
{
- if (keyStorePath != null)
- {
- return Files.newInputStream(Paths.get(keyStorePath));
- }
- else if (base64EncodedKeyStore != null)
- {
- return new
ByteArrayInputStream(Base64.getDecoder().decode(base64EncodedKeyStore));
- }
- else
- {
- // It should never reach here
- throw new RuntimeException("keyStorePath or encodedKeyStore must
be provided");
- }
+ return secretsPath;
}
/**
@@ -130,23 +116,6 @@ public class SslConfig implements Serializable
return keyStoreType != null ? keyStoreType : DEFAULT_KEYSTORE_TYPE;
}
- /**
- * @return an input stream to the truststore source
- * @throws IOException when an IO exception occurs
- */
- public InputStream trustStoreInputStream() throws IOException
- {
- if (trustStorePath != null)
- {
- return Files.newInputStream(Paths.get(trustStorePath));
- }
- else if (base64EncodedTrustStore != null)
- {
- return new
ByteArrayInputStream(Base64.getDecoder().decode(base64EncodedTrustStore));
- }
- return null;
- }
-
/**
* @return the path to the trust store
*/
@@ -184,6 +153,7 @@ public class SslConfig implements Serializable
private void readObject(ObjectInputStream in) throws IOException,
ClassNotFoundException
{
LOGGER.warn("Falling back to JDK deserialization");
+ this.secretsPath = readNullableString(in);
this.keyStorePath = readNullableString(in);
this.base64EncodedKeyStore = readNullableString(in);
this.keyStorePassword = readNullableString(in);
@@ -197,6 +167,7 @@ public class SslConfig implements Serializable
private void writeObject(ObjectOutputStream out) throws IOException
{
LOGGER.warn("Falling back to JDK serialization");
+ writeNullableString(out, secretsPath);
writeNullableString(out, keyStorePath);
writeNullableString(out, base64EncodedKeyStore);
writeNullableString(out, keyStorePassword);
@@ -232,6 +203,7 @@ public class SslConfig implements Serializable
public SslConfig read(Kryo kryo, Input in, Class type)
{
return new Builder<>()
+ .secretsPath(in.readString())
.keyStorePath(in.readString())
.base64EncodedKeyStore(in.readString())
.keyStorePassword(in.readString())
@@ -245,6 +217,7 @@ public class SslConfig implements Serializable
public void write(Kryo kryo, Output out, SslConfig config)
{
+ out.writeString(config.secretsPath);
out.writeString(config.keyStorePath);
out.writeString(config.base64EncodedKeyStore);
out.writeString(config.keyStorePassword);
@@ -259,6 +232,7 @@ public class SslConfig implements Serializable
@Nullable
public static SslConfig create(Map<String, String> options)
{
+ String secretsPath = MapUtils.getOrDefault(options, SECRETS_PATH,
null);
String keyStorePath = MapUtils.getOrDefault(options, KEYSTORE_PATH,
null);
String encodedKeyStore = MapUtils.getOrDefault(options,
KEYSTORE_BASE64_ENCODED, null);
String keyStorePassword = MapUtils.getOrDefault(options,
KEYSTORE_PASSWORD, null);
@@ -269,16 +243,18 @@ public class SslConfig implements Serializable
String trustStoreType = MapUtils.getOrDefault(options,
TRUSTSTORE_TYPE, null);
// If any of the values are provided we try to create a valid
SecretsConfig object
- if (keyStorePath != null
- || encodedKeyStore != null
- || keyStorePassword != null
- || keyStoreType != null
- || trustStorePath != null
- || encodedTrustStore != null
- || trustStorePassword != null
- || trustStoreType != null)
+ if (secretsPath != null
+ || keyStorePath != null
+ || encodedKeyStore != null
+ || keyStorePassword != null
+ || keyStoreType != null
+ || trustStorePath != null
+ || encodedTrustStore != null
+ || trustStorePassword != null
+ || trustStoreType != null)
{
Builder<?> validatedConfig = new Builder<>()
+ .secretsPath(secretsPath)
.keyStorePath(keyStorePath)
.base64EncodedKeyStore(encodedKeyStore)
.keyStorePassword(keyStorePassword)
@@ -303,6 +279,7 @@ public class SslConfig implements Serializable
*/
public static class Builder<T extends SslConfig.Builder<T>>
{
+ protected String secretsPath;
protected String keyStorePath;
protected String base64EncodedKeyStore;
protected String keyStorePassword;
@@ -321,6 +298,18 @@ public class SslConfig implements Serializable
return (T) this;
}
+ /**
+ * Sets the {@code secretsPath} and returns a reference to this
Builder enabling method chaining
+ *
+ * @param secretsPath the {@code secretsPath} to set
+ * @return a reference to this Builder
+ */
+ public T secretsPath(String secretsPath)
+ {
+ this.secretsPath = secretsPath;
+ return self();
+ }
+
/**
* Sets the {@code keyStorePath} and returns a reference to this
Builder enabling method chaining
*
@@ -470,7 +459,7 @@ public class SslConfig implements Serializable
if (trustStorePath == null && base64EncodedTrustStore == null)
{
throw new IllegalArgumentException(
- String.format("One of the '%s' or '%s' options
must be provided when the '%s' option is provided",
+ String.format("One of the '%s' or '%s' options must
be provided when the '%s' option is provided",
TRUSTSTORE_PATH,
TRUSTSTORE_BASE64_ENCODED, TRUSTSTORE_PASSWORD));
}
}
diff --git
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/secrets/SslConfigSecretsProvider.java
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/secrets/SslConfigSecretsProvider.java
new file mode 100644
index 0000000..dfdb8c0
--- /dev/null
+++
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/secrets/SslConfigSecretsProvider.java
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.secrets;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.Base64;
+import java.util.Map;
+import java.util.Objects;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * A {@link SecretsProvider} implementation based on the SSL configuration.
+ */
+public class SslConfigSecretsProvider implements SecretsProvider
+{
+ private static final Logger LOGGER =
LoggerFactory.getLogger(SslConfigSecretsProvider.class);
+ private final SslConfig config;
+
+ public SslConfigSecretsProvider(SslConfig config)
+ {
+ this.config = config;
+ }
+
+ @Override
+ public void initialize(@NotNull Map<String, String> environmentOptions)
+ {
+ // Do nothing
+ }
+
+ @Override
+ public boolean hasKeyStoreSecrets()
+ {
+ String keyStorePath = config.keyStorePath();
+ if (keyStorePath != null)
+ {
+ return Files.exists(Paths.get(keyStorePath)) && keyStorePassword()
!= null;
+ }
+ else
+ {
+ return config.base64EncodedKeyStore() != null &&
!config.base64EncodedKeyStore().isEmpty();
+ }
+ }
+
+ @Override
+ public InputStream keyStoreInputStream() throws IOException
+ {
+ if (config.keyStorePath() != null)
+ {
+ return Files.newInputStream(Paths.get(config.keyStorePath()));
+ }
+ else if (config.base64EncodedKeyStore() != null)
+ {
+ return new
ByteArrayInputStream(Base64.getDecoder().decode(config.base64EncodedKeyStore()));
+ }
+ // it should never reach here
+ throw new RuntimeException("keyStorePath or encodedKeyStore must be
provided");
+ }
+
+ @Override
+ public char[] keyStorePassword()
+ {
+ return Objects.requireNonNull(config.keyStorePassword(),
"keyStorePassword must be not null").toCharArray();
+ }
+
+ @Override
+ public String keyStoreType()
+ {
+ return config.keyStoreType();
+ }
+
+ @Override
+ public boolean hasTrustStoreSecrets()
+ {
+ String trustStorePath = config.trustStorePath();
+ if (trustStorePath != null)
+ {
+ return Files.exists(Paths.get(trustStorePath)) &&
trustStorePassword() != null;
+ }
+ return config.base64EncodedTrustStore() != null &&
!config.base64EncodedTrustStore().isEmpty();
+ }
+
+ @Override
+ public InputStream trustStoreInputStream() throws IOException
+ {
+ if (config.trustStorePath() != null)
+ {
+ return Files.newInputStream(Paths.get(config.trustStorePath()));
+ }
+ else if (config.base64EncodedTrustStore() != null)
+ {
+ return new
ByteArrayInputStream(Base64.getDecoder().decode(config.base64EncodedTrustStore()));
+ }
+ return null;
+ }
+
+ @Override
+ public char[] trustStorePassword()
+ {
+ String password = config.trustStorePassword();
+ return password != null ? password.toCharArray() : null;
+ }
+
+ @Override
+ public String trustStoreType()
+ {
+ return config.trustStoreType();
+ }
+
+ @Override
+ public void validateMutualTLS()
+ {
+ boolean fail = false;
+
+ String keyStorePath = config.keyStorePath();
+ if (keyStorePath != null)
+ {
+ if (!Files.exists(Paths.get(keyStorePath)))
+ {
+ LOGGER.warn("Provided keystore path option does not exist in
the file system keystorePath={}", keyStorePath);
+ fail = true;
+ }
+ }
+ else if (config.base64EncodedKeyStore() == null ||
config.base64EncodedKeyStore().isEmpty())
+ {
+ LOGGER.warn("Neither keystore path or encoded keystore options
were provided");
+ fail = true;
+ }
+
+ if (keyStorePassword() == null)
+ {
+ LOGGER.warn("No keystore password option provided");
+ }
+
+ String trustStorePath = config.trustStorePath();
+ if (trustStorePath != null && !Files.exists(Paths.get(trustStorePath)))
+ {
+ LOGGER.warn("Provided truststore path option does not exist in the
file system trustStorePath={}", trustStorePath);
+ fail = true;
+ }
+
+ if ((trustStorePath != null || config.base64EncodedTrustStore() !=
null) && trustStorePassword() == null)
+ {
+ LOGGER.warn("No truststore password option provided");
+ fail = true;
+ }
+
+ if (fail)
+ {
+ throw new RuntimeException("No valid keystore/password provided in
options");
+ }
+ }
+
+ @Override
+ public String secretByName(String secretName)
+ {
+ throw new UnsupportedOperationException("Currently not supported");
+ }
+
+ @Override
+ public Path secretsPath()
+ {
+ return config.secretsPath() != null ? Paths.get(config.secretsPath())
: null;
+ }
+
+ @Override
+ public String toString()
+ {
+ return "SecretsConfigProvider{"
+ + "config=" + config
+ + '}';
+ }
+}
diff --git
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/KryoRegister.java
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/KryoRegister.java
index d00b705..54d14bf 100644
---
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/KryoRegister.java
+++
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/KryoRegister.java
@@ -35,7 +35,7 @@ import org.apache.cassandra.bridge.BigNumberConfigImpl;
import org.apache.cassandra.bridge.CassandraBridgeFactory;
import org.apache.cassandra.bridge.CassandraVersion;
import org.apache.cassandra.clients.SidecarInstanceImpl;
-import org.apache.cassandra.clients.SslConfig;
+import org.apache.cassandra.secrets.SslConfig;
import org.apache.cassandra.spark.data.CassandraDataLayer;
import org.apache.cassandra.spark.data.LocalDataLayer;
import org.apache.cassandra.spark.data.ReplicationFactor;
diff --git
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/CassandraDataLayer.java
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/CassandraDataLayer.java
index 259e468..fdc51bc 100644
---
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/CassandraDataLayer.java
+++
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/CassandraDataLayer.java
@@ -69,7 +69,8 @@ import org.apache.cassandra.bridge.CassandraVersion;
import org.apache.cassandra.clients.ExecutorHolder;
import org.apache.cassandra.clients.Sidecar;
import org.apache.cassandra.clients.SidecarInstanceImpl;
-import org.apache.cassandra.clients.SslConfig;
+import org.apache.cassandra.secrets.SslConfig;
+import org.apache.cassandra.secrets.SslConfigSecretsProvider;
import org.apache.cassandra.sidecar.client.SidecarClient;
import org.apache.cassandra.sidecar.client.SidecarInstance;
import org.apache.cassandra.sidecar.client.SimpleSidecarInstancesProvider;
@@ -390,7 +391,7 @@ public class CassandraDataLayer extends
PartitionedDataLayer implements Serializ
{
sidecar = Sidecar.from(new SimpleSidecarInstancesProvider(new
ArrayList<>(clusterConfig)),
sidecarClientConfig,
- sslConfig);
+ new SslConfigSecretsProvider(sslConfig));
}
catch (IOException ioException)
{
diff --git
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/CassandraDataSourceHelper.java
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/CassandraDataSourceHelper.java
index 5617c05..7f00e0a 100644
---
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/CassandraDataSourceHelper.java
+++
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/CassandraDataSourceHelper.java
@@ -33,7 +33,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.clients.Sidecar;
-import org.apache.cassandra.clients.SslConfig;
+import org.apache.cassandra.secrets.SslConfig;
import org.apache.cassandra.spark.utils.MapUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
diff --git
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/secrets/SslConfigSecretsProviderTest.java
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/secrets/SslConfigSecretsProviderTest.java
new file mode 100644
index 0000000..a56d8e1
--- /dev/null
+++
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/secrets/SslConfigSecretsProviderTest.java
@@ -0,0 +1,270 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.secrets;
+
+import java.io.BufferedInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.nio.file.StandardCopyOption;
+import java.security.GeneralSecurityException;
+import java.security.KeyStore;
+import java.util.Base64;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Objects;
+import java.util.TreeMap;
+import javax.net.ssl.KeyManagerFactory;
+import javax.net.ssl.TrustManagerFactory;
+
+import org.apache.commons.io.IOUtils;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class SslConfigSecretsProviderTest
+{
+ @Rule
+ public TemporaryFolder folder = new TemporaryFolder();
+
+ String keyStorePassword;
+ String trustStorePassword;
+
+ @Before
+ public void setup() throws IOException
+ {
+ Path path = folder.newFolder("secrets-config").toPath();
+
+ try (InputStream stream =
getClass().getResourceAsStream("/secrets/fakecerts/client-keystore.p12"))
+ {
+ Files.copy(Objects.requireNonNull(stream),
path.resolve("keystore.p12"), StandardCopyOption.REPLACE_EXISTING);
+ }
+
+ try (InputStream stream =
getClass().getResourceAsStream("/secrets/fakecerts/client-truststore.jks"))
+ {
+ Files.copy(Objects.requireNonNull(stream),
path.resolve("truststore.jks"), StandardCopyOption.REPLACE_EXISTING);
+ }
+
+ keyStorePassword =
readPassword("/secrets/fakecerts/client-keystore-password");
+ trustStorePassword =
readPassword("/secrets/fakecerts/client-truststore-password");
+ }
+
+ @Test
+ public void testInitializationSucceeds()
+ {
+ SslConfig config = new SslConfig.Builder<>().build();
+ SecretsProvider secretsConfigProvider = new
SslConfigSecretsProvider(config);
+ assertNotNull(secretsConfigProvider);
+ }
+
+ @Test
+ public void testSuccessWithCertsPaths() throws IOException
+ {
+ Map<String, String> jobOptions = new
TreeMap<>(String.CASE_INSENSITIVE_ORDER);
+ jobOptions.put(SslConfig.KEYSTORE_PATH,
folder.getRoot().getAbsolutePath() + "/secrets-config/keystore.p12");
+ jobOptions.put(SslConfig.KEYSTORE_PASSWORD, keyStorePassword);
+ jobOptions.put(SslConfig.KEYSTORE_TYPE, "PKCS12");
+ jobOptions.put(SslConfig.TRUSTSTORE_PATH,
folder.getRoot().getAbsolutePath() + "/secrets-config/truststore.jks");
+ jobOptions.put(SslConfig.TRUSTSTORE_PASSWORD, trustStorePassword);
+ jobOptions.put(SslConfig.TRUSTSTORE_TYPE, "JKS");
+
+ SslConfig config = SslConfig.create(jobOptions);
+ SecretsProvider provider = new SslConfigSecretsProvider(config);
+ provider.initialize(Collections.emptyMap());
+
+ assertTrue(provider.hasKeyStoreSecrets());
+ long totalRead = fullyReadStream(provider.keyStoreInputStream()); //
Read keyStore
+ assertTrue(totalRead > 0);
+ assertEquals("PKCS12", provider.keyStoreType());
+ assertEquals(keyStorePassword, new
String(provider.keyStorePassword()));
+ assertTrue(provider.hasTrustStoreSecrets());
+ totalRead = fullyReadStream(provider.trustStoreInputStream()); // Read
trustStore
+ assertTrue(totalRead > 0);
+ assertEquals("JKS", provider.trustStoreType());
+ assertEquals(trustStorePassword, new
String(provider.trustStorePassword()));
+ provider.validateMutualTLS();
+
+ try
+ {
+ buildSslConfig(provider);
+ }
+ catch (GeneralSecurityException e)
+ {
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testSuccessWithEncodedCerts() throws IOException
+ {
+ Map<String, String> jobOptions = new
TreeMap<>(String.CASE_INSENSITIVE_ORDER);
+ String keyStorePath = folder.getRoot().getAbsolutePath() +
"/secrets-config/keystore.p12";
+ String encodedKeyStore =
Base64.getEncoder().encodeToString(Files.readAllBytes(Paths.get(keyStorePath)));
+ String trustStorePath = folder.getRoot().getAbsolutePath() +
"/secrets-config/truststore.jks";
+ String encodedTrustStore =
Base64.getEncoder().encodeToString(Files.readAllBytes(Paths.get(trustStorePath)));
+
+ jobOptions.put(SslConfig.KEYSTORE_BASE64_ENCODED, encodedKeyStore);
+ jobOptions.put(SslConfig.KEYSTORE_PASSWORD, keyStorePassword);
+ jobOptions.put(SslConfig.KEYSTORE_TYPE, "PKCS12");
+ jobOptions.put(SslConfig.TRUSTSTORE_BASE64_ENCODED, encodedTrustStore);
+ jobOptions.put(SslConfig.TRUSTSTORE_PASSWORD, trustStorePassword);
+ jobOptions.put(SslConfig.TRUSTSTORE_TYPE, "JKS");
+
+ SslConfig config = SslConfig.create(jobOptions);
+ SecretsProvider provider = new SslConfigSecretsProvider(config);
+ provider.initialize(Collections.emptyMap());
+
+ assertTrue(provider.hasKeyStoreSecrets());
+ long totalRead = fullyReadStream(provider.keyStoreInputStream()); //
Read keyStore
+ assertTrue(totalRead > 0);
+ assertEquals("PKCS12", provider.keyStoreType());
+ assertEquals(keyStorePassword, new
String(provider.keyStorePassword()));
+ assertTrue(provider.hasTrustStoreSecrets());
+ totalRead = fullyReadStream(provider.trustStoreInputStream()); // Read
trustStore
+ assertTrue(totalRead > 0);
+ assertEquals("JKS", provider.trustStoreType());
+ assertEquals(trustStorePassword, new
String(provider.trustStorePassword()));
+ provider.validateMutualTLS();
+
+ try
+ {
+ buildSslConfig(provider);
+ }
+ catch (GeneralSecurityException e)
+ {
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testInvalidPathToKeyStore()
+ {
+ Map<String, String> jobOptions = new
TreeMap<>(String.CASE_INSENSITIVE_ORDER);
+ jobOptions.put(SslConfig.KEYSTORE_PATH,
folder.getRoot().getAbsolutePath() +
"/secrets-config/non-existent-keystore.p12");
+ jobOptions.put(SslConfig.KEYSTORE_PASSWORD, keyStorePassword);
+ jobOptions.put(SslConfig.KEYSTORE_TYPE, "PKCS12");
+ jobOptions.put(SslConfig.TRUSTSTORE_PATH,
folder.getRoot().getAbsolutePath() + "/secrets-config/truststore.jks");
+ jobOptions.put(SslConfig.TRUSTSTORE_PASSWORD, trustStorePassword);
+ jobOptions.put(SslConfig.TRUSTSTORE_TYPE, "JKS");
+
+ SslConfig config = SslConfig.create(jobOptions);
+ SecretsProvider provider = new SslConfigSecretsProvider(config);
+ provider.initialize(Collections.emptyMap());
+
+ assertFalse(provider.hasKeyStoreSecrets());
+ try
+ {
+ provider.validateMutualTLS();
+ fail("it should fail when the path is invalid");
+ }
+ catch (RuntimeException e)
+ {
+ assertEquals("No valid keystore/password provided in options",
e.getMessage());
+ }
+ }
+
+ @Test
+ public void testInvalidPathToTrustStore()
+ {
+ Map<String, String> jobOptions = new
TreeMap<>(String.CASE_INSENSITIVE_ORDER);
+ jobOptions.put(SslConfig.KEYSTORE_PATH,
folder.getRoot().getAbsolutePath() + "/secrets-config/keystore.p12");
+ jobOptions.put(SslConfig.KEYSTORE_PASSWORD, keyStorePassword);
+ jobOptions.put(SslConfig.KEYSTORE_TYPE, "PKCS12");
+ jobOptions.put(SslConfig.TRUSTSTORE_PATH,
folder.getRoot().getAbsolutePath() +
"/secrets-config/non-existent-truststore.jks");
+ jobOptions.put(SslConfig.TRUSTSTORE_PASSWORD, trustStorePassword);
+ jobOptions.put(SslConfig.TRUSTSTORE_TYPE, "JKS");
+
+ SslConfig config = SslConfig.create(jobOptions);
+ SecretsProvider provider = new SslConfigSecretsProvider(config);
+ provider.initialize(Collections.emptyMap());
+
+ assertFalse(provider.hasTrustStoreSecrets());
+ try
+ {
+ provider.validateMutualTLS();
+ fail("it should fail when the path is invalid");
+ }
+ catch (RuntimeException e)
+ {
+ assertEquals("No valid keystore/password provided in options",
e.getMessage());
+ }
+ }
+
+ private void buildSslConfig(SecretsProvider provider) throws IOException,
GeneralSecurityException
+ {
+ provider.validateMutualTLS();
+
+ KeyManagerFactory kmf =
KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());
+ KeyStore keystore = KeyStore.getInstance(provider.keyStoreType());
+ char[] keyStorePassAr = provider.keyStorePassword();
+
+ if (keyStorePassAr == null)
+ {
+ // Support empty passwords
+ keyStorePassAr = new char[]{};
+ }
+
+ try (InputStream ksf = new
BufferedInputStream((provider.keyStoreInputStream())))
+ {
+ keystore.load(ksf, keyStorePassAr);
+ kmf.init(keystore, keyStorePassAr);
+ }
+
+ // load trust store from secrets or classpath
+ TrustManagerFactory trustManagerFactory =
TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
+ KeyStore trustKeystore =
KeyStore.getInstance(provider.trustStoreType());
+ try (InputStream ksf = new
BufferedInputStream(provider.trustStoreInputStream()))
+ {
+ trustKeystore.load(ksf, provider.trustStorePassword());
+ }
+ trustManagerFactory.init(trustKeystore);
+ }
+
+ protected long fullyReadStream(InputStream inputStream) throws IOException
+ {
+ long totalRead = 0;
+ byte[] buffer = new byte[1024];
+ try (InputStream stream = inputStream)
+ {
+ int read;
+ while ((read = stream.read(buffer, 0, buffer.length)) != -1)
+ {
+ totalRead += read;
+ }
+ }
+ return totalRead;
+ }
+
+ private String readPassword(String resourceName) throws IOException
+ {
+ return
IOUtils.toString(Objects.requireNonNull(getClass().getResourceAsStream(resourceName)),
StandardCharsets.UTF_8)
+ .trim();
+ }
+}
diff --git
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/clients/SslConfigTest.java
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/secrets/SslConfigTest.java
similarity index 94%
rename from
cassandra-analytics-core/src/test/java/org/apache/cassandra/clients/SslConfigTest.java
rename to
cassandra-analytics-core/src/test/java/org/apache/cassandra/secrets/SslConfigTest.java
index d39a3a9..b1c6ba4 100644
---
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/clients/SslConfigTest.java
+++
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/secrets/SslConfigTest.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.cassandra.clients;
+package org.apache.cassandra.secrets;
import java.util.Collections;
import java.util.Map;
@@ -25,12 +25,12 @@ import java.util.TreeMap;
import org.junit.Test;
-import static org.apache.cassandra.clients.SslConfig.KEYSTORE_BASE64_ENCODED;
-import static org.apache.cassandra.clients.SslConfig.KEYSTORE_PASSWORD;
-import static org.apache.cassandra.clients.SslConfig.KEYSTORE_PATH;
-import static org.apache.cassandra.clients.SslConfig.TRUSTSTORE_BASE64_ENCODED;
-import static org.apache.cassandra.clients.SslConfig.TRUSTSTORE_PASSWORD;
-import static org.apache.cassandra.clients.SslConfig.TRUSTSTORE_PATH;
+import static org.apache.cassandra.secrets.SslConfig.KEYSTORE_BASE64_ENCODED;
+import static org.apache.cassandra.secrets.SslConfig.KEYSTORE_PASSWORD;
+import static org.apache.cassandra.secrets.SslConfig.KEYSTORE_PATH;
+import static org.apache.cassandra.secrets.SslConfig.TRUSTSTORE_BASE64_ENCODED;
+import static org.apache.cassandra.secrets.SslConfig.TRUSTSTORE_PASSWORD;
+import static org.apache.cassandra.secrets.SslConfig.TRUSTSTORE_PATH;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
diff --git
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/KryoSerializationTests.java
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/KryoSerializationTests.java
index 9cb6e44..a23206f 100644
---
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/KryoSerializationTests.java
+++
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/KryoSerializationTests.java
@@ -32,7 +32,7 @@ import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
import org.apache.cassandra.bridge.CassandraVersion;
-import org.apache.cassandra.clients.SslConfig;
+import org.apache.cassandra.secrets.SslConfig;
import org.apache.cassandra.spark.data.CqlField;
import org.apache.cassandra.spark.data.CqlTable;
import org.apache.cassandra.spark.data.LocalDataLayer;
diff --git
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/data/CassandraDataSourceHelperCacheTest.java
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/data/CassandraDataSourceHelperCacheTest.java
index 4c344ec..80532ab 100644
---
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/data/CassandraDataSourceHelperCacheTest.java
+++
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/data/CassandraDataSourceHelperCacheTest.java
@@ -35,7 +35,7 @@ import org.junit.Before;
import org.junit.Test;
import org.apache.cassandra.clients.Sidecar;
-import org.apache.cassandra.clients.SslConfig;
+import org.apache.cassandra.secrets.SslConfig;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNotSame;
diff --git
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/data/partitioner/JDKSerializationTests.java
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/data/partitioner/JDKSerializationTests.java
index dc866b5..e1b9dcd 100644
---
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/data/partitioner/JDKSerializationTests.java
+++
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/data/partitioner/JDKSerializationTests.java
@@ -40,7 +40,7 @@ import org.junit.Test;
import org.apache.cassandra.bridge.CassandraBridge;
import org.apache.cassandra.bridge.CassandraBridgeFactory;
import org.apache.cassandra.bridge.CassandraVersion;
-import org.apache.cassandra.clients.SslConfig;
+import org.apache.cassandra.secrets.SslConfig;
import org.apache.cassandra.spark.TestUtils;
import org.apache.cassandra.spark.cdc.CommitLog;
import org.apache.cassandra.spark.cdc.CommitLogProvider;
diff --git
a/cassandra-analytics-core/src/test/resources/secrets/fakecerts/client-keystore-password
b/cassandra-analytics-core/src/test/resources/secrets/fakecerts/client-keystore-password
new file mode 100644
index 0000000..8982f8c
--- /dev/null
+++
b/cassandra-analytics-core/src/test/resources/secrets/fakecerts/client-keystore-password
@@ -0,0 +1 @@
+cassandra-analytics
diff --git
a/cassandra-analytics-core/src/test/resources/secrets/fakecerts/client-keystore.p12
b/cassandra-analytics-core/src/test/resources/secrets/fakecerts/client-keystore.p12
new file mode 100644
index 0000000..18719e1
Binary files /dev/null and
b/cassandra-analytics-core/src/test/resources/secrets/fakecerts/client-keystore.p12
differ
diff --git
a/cassandra-analytics-core/src/test/resources/secrets/fakecerts/client-truststore-password
b/cassandra-analytics-core/src/test/resources/secrets/fakecerts/client-truststore-password
new file mode 100644
index 0000000..8982f8c
--- /dev/null
+++
b/cassandra-analytics-core/src/test/resources/secrets/fakecerts/client-truststore-password
@@ -0,0 +1 @@
+cassandra-analytics
diff --git
a/cassandra-analytics-core/src/test/resources/secrets/fakecerts/client-truststore.jks
b/cassandra-analytics-core/src/test/resources/secrets/fakecerts/client-truststore.jks
new file mode 100644
index 0000000..e00acc5
Binary files /dev/null and
b/cassandra-analytics-core/src/test/resources/secrets/fakecerts/client-truststore.jks
differ
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]