This is an automated email from the ASF dual-hosted git repository. ycai pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/cassandra-analytics.git
The following commit(s) were added to refs/heads/trunk by this push: new f24951a CASSANDRA-18810: Cassandra Analytics Start-Up Validation f24951a is described below commit f24951ab6ea2b1e9af4013b030675c70d31adb90 Author: Yuriy Semchyshyn <yu...@semchyshyn.com> AuthorDate: Mon Aug 14 14:09:12 2023 -0500 CASSANDRA-18810: Cassandra Analytics Start-Up Validation Patch by Yuriy Semchyshyn; Reviewed by Dinesh Joshi, Francisco Guerrero, Yifan Cai for CASSANDRA-18810 --- CHANGES.txt | 1 + build.gradle | 1 + .../java/org/apache/cassandra/clients/Sidecar.java | 36 ++++-- .../cassandra/spark/bulkwriter/BulkSparkConf.java | 18 ++- .../bulkwriter/CassandraBulkWriterContext.java | 10 +- .../spark/bulkwriter/CassandraClusterInfo.java | 8 ++ .../spark/bulkwriter/CassandraContext.java | 16 ++- .../cassandra/spark/bulkwriter/ClusterInfo.java | 3 +- .../cassandra/spark/bulkwriter/RecordWriter.java | 2 + .../cassandra/spark/data/CassandraDataLayer.java | 20 ++- .../spark/data/CassandraDataSourceHelper.java | 3 + .../spark/validation/CassandraValidation.java | 58 +++++++++ .../spark/validation/KeyStoreValidation.java | 94 ++++++++++++++ .../spark/validation/SidecarValidation.java | 58 +++++++++ .../cassandra/spark/validation/SslValidation.java | 48 +++++++ .../spark/validation/StartupValidatable.java | 33 +++++ .../spark/validation/StartupValidation.java | 49 +++++++ .../spark/validation/StartupValidator.java | 79 ++++++++++++ .../spark/validation/TrustStoreValidation.java | 93 ++++++++++++++ .../cassandra/secrets/TestSecretsProvider.java | 141 +++++++++++++++++++++ .../spark/bulkwriter/MockBulkWriterContext.java | 9 ++ .../spark/validation/KeyStoreValidationTests.java | 99 +++++++++++++++ .../spark/validation/StartupValidatorTests.java | 74 +++++++++++ .../cassandra/spark/validation/TestValidation.java | 52 ++++++++ .../validation/TrustStoreValidationTests.java | 97 ++++++++++++++ .../resources/validation/keystore-certificate.p12 | Bin 0 -> 1046 bytes .../test/resources/validation/keystore-empty.p12 | Bin 0 -> 103 bytes .../resources/validation/keystore-malformed.p12 | 1 + .../test/resources/validation/keystore-private.p12 | Bin 0 -> 2520 bytes .../test/resources/validation/keystore-secret.p12 | Bin 0 -> 405 bytes .../org/apache/cassandra/spark/utils/Throwing.java | 20 +++ 31 files changed, 1102 insertions(+), 21 deletions(-) diff --git a/CHANGES.txt b/CHANGES.txt index 37b126d..a26e455 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 1.0.0 + * Cassandra Analytics Start-Up Validation (CASSANDRA-18810) * Expose per partition on-disk usage through new DataFrame that utilizes the Index.db SSTable file components (CASSANDRA-18683) * Fix bulk writes with Buffered RowBufferMode (CASSANDRA-18692) * Minor Refactoring to Improve Code Reusability (CASSANDRA-18684) diff --git a/build.gradle b/build.gradle index 837c6fe..eb689a4 100644 --- a/build.gradle +++ b/build.gradle @@ -70,6 +70,7 @@ tasks.register('buildIgnoreRatList', Exec) { description 'Builds a list of ignored files for the rat task from the unversioned git files' commandLine 'bash', '-c', 'git clean --force -d --dry-run -x | cut -c 14-' doFirst { + Files.createDirectories(file("${buildDir}").toPath()) standardOutput new FileOutputStream("${buildDir}/.rat-excludes.txt") } // allows task to fail when git/cut commands are unavailable or fail diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/clients/Sidecar.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/clients/Sidecar.java index 0871ddd..f1f7c1f 100644 --- a/cassandra-analytics-core/src/main/java/org/apache/cassandra/clients/Sidecar.java +++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/clients/Sidecar.java @@ -37,7 +37,8 @@ import o.a.c.sidecar.client.shaded.io.vertx.core.VertxOptions; import org.apache.cassandra.secrets.SecretsProvider; import org.apache.cassandra.sidecar.client.HttpClientConfig; import org.apache.cassandra.sidecar.client.SidecarClient; -import org.apache.cassandra.sidecar.client.SidecarConfig; +import org.apache.cassandra.sidecar.client.SidecarClientConfig; +import org.apache.cassandra.sidecar.client.SidecarClientConfigImpl; import org.apache.cassandra.sidecar.client.SidecarInstance; import org.apache.cassandra.sidecar.client.SidecarInstancesProvider; import org.apache.cassandra.sidecar.client.VertxHttpClient; @@ -49,6 +50,10 @@ import org.apache.cassandra.spark.bulkwriter.BulkSparkConf; import org.apache.cassandra.spark.data.FileType; import org.apache.cassandra.spark.utils.BuildInfo; import org.apache.cassandra.spark.utils.MapUtils; +import org.apache.cassandra.spark.validation.KeyStoreValidation; +import org.apache.cassandra.spark.validation.SslValidation; +import org.apache.cassandra.spark.validation.StartupValidator; +import org.apache.cassandra.spark.validation.TrustStoreValidation; import static org.apache.cassandra.spark.utils.Properties.DEFAULT_CHUNK_BUFFER_OVERRIDE; import static org.apache.cassandra.spark.utils.Properties.DEFAULT_CHUNK_BUFFER_SIZE; @@ -98,15 +103,18 @@ public final class Sidecar .trustStoreInputStream(secretsProvider.trustStoreInputStream()) .trustStorePassword(String.valueOf(secretsProvider.trustStorePassword())) .trustStoreType(secretsProvider.trustStoreType()); + + StartupValidator.instance().register(new KeyStoreValidation(secretsProvider)); + StartupValidator.instance().register(new TrustStoreValidation(secretsProvider)); } HttpClientConfig httpClientConfig = builder.build(); - SidecarConfig sidecarConfig = new SidecarConfig.Builder() - .maxRetries(config.maxRetries()) - .retryDelayMillis(config.millisToSleep()) - .maxRetryDelayMillis(config.maxMillisToSleep()) - .build(); + SidecarClientConfig sidecarConfig = SidecarClientConfigImpl.builder() + .maxRetries(config.maxRetries()) + .retryDelayMillis(config.millisToSleep()) + .maxRetryDelayMillis(config.maxMillisToSleep()) + .build(); return buildClient(sidecarConfig, vertx, httpClientConfig, sidecarInstancesProvider); } @@ -129,16 +137,20 @@ public final class Sidecar .ssl(conf.hasKeystoreAndKeystorePassword()) .build(); - SidecarConfig sidecarConfig = new SidecarConfig.Builder() - .maxRetries(5) - .retryDelayMillis(200) - .maxRetryDelayMillis(500) - .build(); + StartupValidator.instance().register(new SslValidation(conf)); + StartupValidator.instance().register(new KeyStoreValidation(conf)); + StartupValidator.instance().register(new TrustStoreValidation(conf)); + + SidecarClientConfig sidecarConfig = SidecarClientConfigImpl.builder() + .maxRetries(5) + .retryDelayMillis(200) + .maxRetryDelayMillis(500) + .build(); return buildClient(sidecarConfig, vertx, httpClientConfig, sidecarInstancesProvider); } - public static SidecarClient buildClient(SidecarConfig sidecarConfig, + public static SidecarClient buildClient(SidecarClientConfig sidecarConfig, Vertx vertx, HttpClientConfig httpClientConfig, SidecarInstancesProvider clusterConfig) diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BulkSparkConf.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BulkSparkConf.java index 798f259..298d204 100644 --- a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BulkSparkConf.java +++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BulkSparkConf.java @@ -200,10 +200,17 @@ public class BulkSparkConf implements Serializable } } - /* - * This method Will throw if the SSL configuration is incorrect (PATH provided w/o password, for example) + /** + * Validates the SSL configuration present and throws an exception if it is incorrect + * + * @throws NullPointerException if the mTLS KeyStore password is provided, + * but both file path and base64 string are missing; + * or if either mTLS TrustStore file path or base64 string is provided, + * but the password is missing + * @throws IllegalArgumentException if the mTLS TrustStore password is provided, + * but both file path and base64 string are missing */ - protected void validateSslConfiguration() + public void validateSslConfiguration() { if (getKeyStorePassword() != null) { @@ -479,4 +486,9 @@ public class BulkSparkConf implements Serializable { return keystorePassword != null && (keystorePath != null || keystoreBase64Encoded != null); } + + public boolean hasTruststoreAndTruststorePassword() + { + return truststorePassword != null && (truststorePath != null || truststoreBase64Encoded != null); + } } diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraBulkWriterContext.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraBulkWriterContext.java index 3789b63..149df99 100644 --- a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraBulkWriterContext.java +++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraBulkWriterContext.java @@ -57,11 +57,12 @@ public class CassandraBulkWriterContext implements BulkWriterContext, KryoSerial private final SchemaInfo schemaInfo; private CassandraBulkWriterContext(@NotNull BulkSparkConf conf, + @NotNull CassandraClusterInfo clusterInfo, @NotNull StructType dfSchema, SparkContext sparkContext) { this.conf = conf; - clusterInfo = new CassandraClusterInfo(conf); + this.clusterInfo = clusterInfo; CassandraRing<RingInstance> ring = clusterInfo.getRing(true); jobInfo = new CassandraJobInfo(conf, new TokenPartitioner(ring, conf.numberSplits, sparkContext.defaultParallelism(), conf.getCores())); @@ -93,10 +94,15 @@ public class CassandraBulkWriterContext implements BulkWriterContext, KryoSerial Preconditions.checkNotNull(dfSchema); BulkSparkConf conf = new BulkSparkConf(sparkContext.getConf(), strOptions); - CassandraBulkWriterContext bulkWriterContext = new CassandraBulkWriterContext(conf, dfSchema, sparkContext); + CassandraClusterInfo clusterInfo = new CassandraClusterInfo(conf); + + clusterInfo.startupValidate(); + + CassandraBulkWriterContext bulkWriterContext = new CassandraBulkWriterContext(conf, clusterInfo, dfSchema, sparkContext); ShutdownHookManager.addShutdownHook(org.apache.spark.util.ShutdownHookManager.TEMP_DIR_SHUTDOWN_PRIORITY(), ScalaFunctions.wrapLambda(bulkWriterContext::shutdown)); bulkWriterContext.dialHome(sparkContext.version()); + return bulkWriterContext; } diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraClusterInfo.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraClusterInfo.java index e6f145e..94663d3 100644 --- a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraClusterInfo.java +++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraClusterInfo.java @@ -534,4 +534,12 @@ public class CassandraClusterInfo implements ClusterInfo, Closeable } return false; } + + // Startup Validation + + @Override + public void startupValidate() + { + getCassandraContext().startupValidate(); + } } diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraContext.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraContext.java index e5b6853..5a01845 100644 --- a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraContext.java +++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraContext.java @@ -30,9 +30,13 @@ import org.apache.cassandra.clients.Sidecar; import org.apache.cassandra.sidecar.client.SidecarClient; import org.apache.cassandra.sidecar.client.SidecarInstance; import org.apache.cassandra.sidecar.client.SimpleSidecarInstancesProvider; +import org.apache.cassandra.spark.validation.CassandraValidation; +import org.apache.cassandra.spark.validation.SidecarValidation; +import org.apache.cassandra.spark.validation.StartupValidatable; +import org.apache.cassandra.spark.validation.StartupValidator; import org.jetbrains.annotations.NotNull; -public class CassandraContext implements Closeable +public class CassandraContext implements StartupValidatable, Closeable { private static final Logger LOGGER = LoggerFactory.getLogger(CassandraContext.class); @NotNull @@ -96,4 +100,14 @@ public class CassandraContext implements Closeable { return conf; } + + // Startup Validation + + @Override + public void startupValidate() + { + StartupValidator.instance().register(new SidecarValidation(sidecarClient)); + StartupValidator.instance().register(new CassandraValidation(sidecarClient)); + StartupValidator.instance().perform(); + } } diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/ClusterInfo.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/ClusterInfo.java index 5b65a55..3d67093 100644 --- a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/ClusterInfo.java +++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/ClusterInfo.java @@ -27,8 +27,9 @@ import org.apache.cassandra.sidecar.common.data.TimeSkewResponse; import org.apache.cassandra.spark.bulkwriter.token.CassandraRing; import org.apache.cassandra.spark.common.client.InstanceState; import org.apache.cassandra.spark.data.partitioner.Partitioner; +import org.apache.cassandra.spark.validation.StartupValidatable; -public interface ClusterInfo extends Serializable +public interface ClusterInfo extends StartupValidatable, Serializable { void refreshClusterInfo(); diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/RecordWriter.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/RecordWriter.java index 426cb8a..b8bc803 100644 --- a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/RecordWriter.java +++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/RecordWriter.java @@ -77,6 +77,8 @@ public class RecordWriter implements Serializable this.columnNames = columnNames; this.taskContextSupplier = taskContextSupplier; this.tableWriterSupplier = tableWriterSupplier; + + writerContext.cluster().startupValidate(); } private Range<BigInteger> getTokenRange(TaskContext taskContext) diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/CassandraDataLayer.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/CassandraDataLayer.java index 39c69c1..a95b6aa 100644 --- a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/CassandraDataLayer.java +++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/CassandraDataLayer.java @@ -96,6 +96,10 @@ import org.apache.cassandra.spark.utils.CqlUtils; import org.apache.cassandra.spark.utils.MapUtils; import org.apache.cassandra.spark.utils.ScalaFunctions; import org.apache.cassandra.spark.utils.ThrowableUtils; +import org.apache.cassandra.spark.validation.CassandraValidation; +import org.apache.cassandra.spark.validation.SidecarValidation; +import org.apache.cassandra.spark.validation.StartupValidatable; +import org.apache.cassandra.spark.validation.StartupValidator; import org.apache.spark.sql.types.DataType; import org.apache.spark.util.ShutdownHookManager; import org.jetbrains.annotations.NotNull; @@ -103,7 +107,7 @@ import org.jetbrains.annotations.Nullable; import static org.apache.cassandra.spark.utils.Properties.NODE_STATUS_NOT_CONSIDERED; -public class CassandraDataLayer extends PartitionedDataLayer implements Serializable +public class CassandraDataLayer extends PartitionedDataLayer implements StartupValidatable, Serializable { private static final long serialVersionUID = -9038926850642710787L; @@ -200,7 +204,8 @@ public class CassandraDataLayer extends PartitionedDataLayer implements Serializ aliasLastModifiedTimestamp(this.requestedFeatures, this.lastModifiedTimestampField); } this.rfMap = rfMap; - initInstanceMap(); + this.initInstanceMap(); + this.startupValidate(); } public void initialize(@NotNull ClientConfig options) @@ -639,6 +644,16 @@ public class CassandraDataLayer extends PartitionedDataLayer implements Serializ return new CassandraRing(partitioner, keyspace, replicationFactor, instances); } + // Startup Validation + + @Override + public void startupValidate() + { + StartupValidator.instance().register(new SidecarValidation(sidecar)); + StartupValidator.instance().register(new CassandraValidation(sidecar)); + StartupValidator.instance().perform(); + } + // JDK Serialization @SuppressWarnings("unchecked") @@ -685,6 +700,7 @@ public class CassandraDataLayer extends PartitionedDataLayer implements Serializ } this.rfMap = (Map<String, ReplicationFactor>) in.readObject(); this.initInstanceMap(); + this.startupValidate(); } private void writeObject(ObjectOutputStream out) throws IOException, ClassNotFoundException diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/CassandraDataSourceHelper.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/CassandraDataSourceHelper.java index 7f00e0a..08ea508 100644 --- a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/CassandraDataSourceHelper.java +++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/CassandraDataSourceHelper.java @@ -126,6 +126,9 @@ public final class CassandraDataSourceHelper Sidecar.ClientConfig.create(options), SslConfig.create(options)); initializeDataLayerFn.accept(dataLayer, config); + + dataLayer.startupValidate(); + return dataLayer; } } diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/validation/CassandraValidation.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/validation/CassandraValidation.java new file mode 100644 index 0000000..148200d --- /dev/null +++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/validation/CassandraValidation.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.cassandra.spark.validation; + +import java.util.concurrent.TimeUnit; + +import org.apache.cassandra.sidecar.client.SidecarClient; +import org.apache.cassandra.sidecar.common.data.HealthResponse; + +/** + * A startup validation that checks the connectivity and health of Cassandra + */ +public class CassandraValidation implements StartupValidation +{ + private static final int TIMEOUT_SECONDS = 30; + + private final SidecarClient sidecar; + + public CassandraValidation(SidecarClient sidecar) + { + this.sidecar = sidecar; + } + + @Override + public void validate() + { + HealthResponse health; + try + { + health = sidecar.cassandraHealth().get(TIMEOUT_SECONDS, TimeUnit.SECONDS); + } + catch (Throwable throwable) + { + throw new RuntimeException("Sidecar is unreachable", throwable); + } + if (!health.isOk()) + { + throw new RuntimeException("Cassandra is not healthy: " + health.status()); + } + } +} diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/validation/KeyStoreValidation.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/validation/KeyStoreValidation.java new file mode 100644 index 0000000..4ee2faa --- /dev/null +++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/validation/KeyStoreValidation.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.cassandra.spark.validation; + +import java.io.IOException; +import java.io.InputStream; +import java.security.GeneralSecurityException; +import java.security.Key; +import java.security.KeyStore; +import java.security.PrivateKey; +import java.util.Enumeration; +import java.util.function.Supplier; + +import org.apache.cassandra.secrets.SecretsProvider; +import org.apache.cassandra.spark.bulkwriter.BulkSparkConf; +import org.apache.cassandra.spark.utils.Throwing; +import org.jetbrains.annotations.NotNull; + +/** + * A startup validation that checks the KeyStore + */ +public class KeyStoreValidation implements StartupValidation +{ + private final boolean configured; + private final String type; + private final char[] password; + private final Supplier<InputStream> stream; + + public KeyStoreValidation(@NotNull SecretsProvider secrets) + { + configured = secrets.hasKeyStoreSecrets(); + type = secrets.keyStoreType(); + password = secrets.keyStorePassword(); + stream = Throwing.supplier(() -> secrets.keyStoreInputStream()); + } + + public KeyStoreValidation(@NotNull BulkSparkConf configuration) + { + configured = configuration.hasKeystoreAndKeystorePassword(); + type = configuration.getKeyStoreTypeOrDefault(); + password = configuration.getKeyStorePassword().toCharArray(); + stream = () -> configuration.getKeyStore(); + } + + @Override + public void validate() + { + try + { + if (!configured) + { + throw new RuntimeException("KeyStore is not configured"); + } + + KeyStore keyStore = KeyStore.getInstance(type); + keyStore.load(stream.get(), password); + if (keyStore.size() == 0) + { + throw new RuntimeException("KeyStore is empty"); + } + + for (Enumeration<String> aliases = keyStore.aliases(); aliases.hasMoreElements();) + { + Key key = keyStore.getKey(aliases.nextElement(), password); + if (key != null && key instanceof PrivateKey) + { + return; // KeyStore contains a private key + } + } + throw new RuntimeException("KeyStore contains no private keys"); + } + catch (IOException | GeneralSecurityException exception) + { + throw new RuntimeException("KeyStore is misconfigured", exception); + } + } +} diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/validation/SidecarValidation.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/validation/SidecarValidation.java new file mode 100644 index 0000000..fda7b18 --- /dev/null +++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/validation/SidecarValidation.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.cassandra.spark.validation; + +import java.util.concurrent.TimeUnit; + +import org.apache.cassandra.sidecar.client.SidecarClient; +import org.apache.cassandra.sidecar.common.data.HealthResponse; + +/** + * A startup validation that checks the connectivity and health of Sidecar + */ +public class SidecarValidation implements StartupValidation +{ + private static final int TIMEOUT_SECONDS = 30; + + private final SidecarClient sidecar; + + public SidecarValidation(SidecarClient sidecar) + { + this.sidecar = sidecar; + } + + @Override + public void validate() + { + HealthResponse health; + try + { + health = sidecar.sidecarHealth().get(TIMEOUT_SECONDS, TimeUnit.SECONDS); + } + catch (Throwable throwable) + { + throw new RuntimeException("Sidecar is unreachable", throwable); + } + if (!health.isOk()) + { + throw new RuntimeException("Sidecar is unhealthy: " + health.status()); + } + } +} diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/validation/SslValidation.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/validation/SslValidation.java new file mode 100644 index 0000000..52edc13 --- /dev/null +++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/validation/SslValidation.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.cassandra.spark.validation; + +import org.apache.cassandra.spark.bulkwriter.BulkSparkConf; + +/** + * A startup validation that checks the SSL configuration + */ +public class SslValidation implements StartupValidation +{ + private final BulkSparkConf configuration; + + public SslValidation(BulkSparkConf configuration) + { + this.configuration = configuration; + } + + @Override + public void validate() + { + try + { + configuration.validateSslConfiguration(); + } + catch (Throwable throwable) + { + throw new RuntimeException("SSL is misconfigured", throwable); + } + } +} diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/validation/StartupValidatable.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/validation/StartupValidatable.java new file mode 100644 index 0000000..0ad5ab0 --- /dev/null +++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/validation/StartupValidatable.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.cassandra.spark.validation; + +/** + * An interface for a class that requires and can perform startup validation using {@link StartupValidator} + */ +public interface StartupValidatable +{ + /** + * Performs startup validation using {@link StartupValidator} with currently registered {@link StartupValidation}s, + * throws a {@link RuntimeException} if any violations are found, + * needs to be invoked once per execution before any actual work is started + */ + void startupValidate(); +} diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/validation/StartupValidation.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/validation/StartupValidation.java new file mode 100644 index 0000000..235fb5a --- /dev/null +++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/validation/StartupValidation.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.cassandra.spark.validation; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * An interface that has to be implemented by all startup validations + */ +@FunctionalInterface +public interface StartupValidation +{ + Logger LOGGER = LoggerFactory.getLogger(StartupValidation.class); + + void validate(); + + default void perform() + { + try + { + LOGGER.info("Performing startup validation with " + getClass()); + validate(); + } + catch (Throwable throwable) + { + String message = "Failed startup validation with " + getClass(); + LOGGER.error(message, throwable); + throw new RuntimeException(message, throwable); + } + } +} diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/validation/StartupValidator.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/validation/StartupValidator.java new file mode 100644 index 0000000..f48fba0 --- /dev/null +++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/validation/StartupValidator.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.cassandra.spark.validation; + +import java.util.ArrayList; +import java.util.List; + +import com.google.common.annotations.VisibleForTesting; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A singleton class for performing the startup validation + * using a list of necessary {@link StartupValidation} instances + */ +public final class StartupValidator +{ + private static final Logger LOGGER = LoggerFactory.getLogger(StartupValidator.class); + private static final StartupValidator INSTANCE = new StartupValidator(); + private static final String DISABLE = "SKIP_STARTUP_VALIDATIONS"; + + private final List<StartupValidation> validations = new ArrayList<>(); + + private StartupValidator() + { + } + + public static StartupValidator instance() + { + return INSTANCE; + } + + public void register(StartupValidation validation) + { + validations.add(validation); + } + + @VisibleForTesting + void reset() + { + validations.clear(); + } + + public void perform() + { + if (enabled()) + { + LOGGER.info("Performing startup validations"); + validations.forEach(StartupValidation::perform); + LOGGER.info("Completed startup validations"); + } + else + { + LOGGER.info("Skipping startup validations"); + } + } + + public boolean enabled() + { + return System.getProperty(DISABLE) == null; + } +} diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/validation/TrustStoreValidation.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/validation/TrustStoreValidation.java new file mode 100644 index 0000000..ab4fec0 --- /dev/null +++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/validation/TrustStoreValidation.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.cassandra.spark.validation; + +import java.io.IOException; +import java.io.InputStream; +import java.security.GeneralSecurityException; +import java.security.KeyStore; +import java.security.cert.Certificate; +import java.util.Enumeration; +import java.util.function.Supplier; + +import org.apache.cassandra.secrets.SecretsProvider; +import org.apache.cassandra.spark.bulkwriter.BulkSparkConf; +import org.apache.cassandra.spark.utils.Throwing; +import org.jetbrains.annotations.NotNull; + +/** + * A startup validation that checks the TrustStore + */ +public class TrustStoreValidation implements StartupValidation +{ + private final boolean configured; + private final String type; + private final char[] password; + private final Supplier<InputStream> stream; + + public TrustStoreValidation(@NotNull SecretsProvider secrets) + { + configured = secrets.hasTrustStoreSecrets(); + type = secrets.trustStoreType(); + password = secrets.trustStorePassword(); + stream = Throwing.supplier(() -> secrets.trustStoreInputStream()); + } + + public TrustStoreValidation(@NotNull BulkSparkConf configuration) + { + configured = configuration.hasTruststoreAndTruststorePassword(); + type = configuration.getTrustStoreTypeOrDefault(); + password = configuration.getTrustStorePasswordOrDefault().toCharArray(); + stream = () -> configuration.getTrustStore(); + } + + @Override + public void validate() + { + try + { + if (!configured) + { + return; // TrustStore is optional + } + + KeyStore trustStore = KeyStore.getInstance(type); + trustStore.load(stream.get(), password); + if (trustStore.size() == 0) + { + throw new RuntimeException("TrustStore is empty"); + } + + for (Enumeration<String> aliases = trustStore.aliases(); aliases.hasMoreElements();) + { + Certificate certificate = trustStore.getCertificate(aliases.nextElement()); + if (certificate != null) + { + return; // TrustStore contains a certificate + } + } + throw new RuntimeException("TrustStore contains no certificates"); + } + catch (IOException | GeneralSecurityException exception) + { + throw new RuntimeException("TrustStore is misconfigured", exception); + } + } +} diff --git a/cassandra-analytics-core/src/test/java/org/apache/cassandra/secrets/TestSecretsProvider.java b/cassandra-analytics-core/src/test/java/org/apache/cassandra/secrets/TestSecretsProvider.java new file mode 100644 index 0000000..ce1a104 --- /dev/null +++ b/cassandra-analytics-core/src/test/java/org/apache/cassandra/secrets/TestSecretsProvider.java @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.cassandra.secrets; + +import java.io.InputStream; +import java.nio.file.Path; +import java.util.Map; + +/** + * A test implementation of {@link SecretsProvider} that reads KeyStore/TrustStore from resources + */ +public final class TestSecretsProvider implements SecretsProvider +{ + private enum Kind + { + None, + KeyStore, + TrustStore + } + + private final Kind kind; + private final String type; + private final String resource; + private final String password; + + public static SecretsProvider notConfigured() + { + return new TestSecretsProvider(Kind.None, null, null, null); + } + + public static SecretsProvider forKeyStore(String type, String resource, String password) + { + return new TestSecretsProvider(Kind.KeyStore, type, resource, password); + } + + public static SecretsProvider forTrustStore(String type, String resource, String password) + { + return new TestSecretsProvider(Kind.TrustStore, type, resource, password); + } + + private TestSecretsProvider(Kind kind, String type, String resource, String password) + { + this.kind = kind; + this.type = type; + this.resource = "/tests/validation/" + resource; + this.password = password; + } + + // KeyStore + + @Override + public boolean hasKeyStoreSecrets() + { + return kind == Kind.KeyStore; + } + + @Override + public String keyStoreType() + { + return kind == Kind.KeyStore ? type : null; + } + + @Override + public InputStream keyStoreInputStream() + { + return kind == Kind.KeyStore ? getClass().getResourceAsStream(resource) : null; + } + + @Override + public char[] keyStorePassword() + { + return kind == Kind.KeyStore ? password.toCharArray() : null; + } + + // TrustStore + + @Override + public boolean hasTrustStoreSecrets() + { + return kind == Kind.TrustStore; + } + + @Override + public String trustStoreType() + { + return kind == Kind.TrustStore ? type : null; + } + + @Override + public InputStream trustStoreInputStream() + { + return kind == Kind.TrustStore ? getClass().getResourceAsStream(resource) : null; + } + + @Override + public char[] trustStorePassword() + { + return kind == Kind.TrustStore ? password.toCharArray() : null; + } + + // Miscellaneous + + @Override + public void initialize(Map<String, String> options) + { + } + + @Override + public void validateMutualTLS() + { + } + + @Override + public String secretByName(String secretName) + { + return null; + } + + @Override + public Path secretsPath() + { + return null; + } +} diff --git a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/MockBulkWriterContext.java b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/MockBulkWriterContext.java index b6d3ff5..33e0807 100644 --- a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/MockBulkWriterContext.java +++ b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/MockBulkWriterContext.java @@ -50,6 +50,7 @@ import org.apache.cassandra.spark.common.schema.ColumnType; import org.apache.cassandra.spark.common.schema.ColumnTypes; import org.apache.cassandra.spark.data.CqlField; import org.apache.cassandra.spark.data.partitioner.Partitioner; +import org.apache.cassandra.spark.validation.StartupValidator; import org.apache.spark.sql.types.DataTypes; import org.apache.spark.sql.types.StructType; import org.jetbrains.annotations.NotNull; @@ -415,4 +416,12 @@ public class MockBulkWriterContext implements BulkWriterContext, ClusterInfo, Jo { return "keyspace.table"; } + + // Startup Validation + + @Override + public void startupValidate() + { + StartupValidator.instance().perform(); + } } diff --git a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/validation/KeyStoreValidationTests.java b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/validation/KeyStoreValidationTests.java new file mode 100644 index 0000000..2dd7b33 --- /dev/null +++ b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/validation/KeyStoreValidationTests.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.cassandra.spark.validation; + +import org.junit.jupiter.api.Test; + +import org.apache.cassandra.secrets.SecretsProvider; +import org.apache.cassandra.secrets.TestSecretsProvider; + +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +/** + * Unit tests that cover startup validation of a KeyStore + */ +public class KeyStoreValidationTests +{ + @Test + public void testUnconfiguredKeyStore() + { + SecretsProvider secrets = TestSecretsProvider.notConfigured(); + KeyStoreValidation validation = new KeyStoreValidation(secrets); + + RuntimeException exception = assertThrows(RuntimeException.class, validation::perform); + assertTrue(exception.getMessage().startsWith("Failed startup validation")); + assertTrue(exception.getCause() instanceof RuntimeException); + } + + @Test + public void testMissingKeyStore() + { + SecretsProvider secrets = TestSecretsProvider.forKeyStore("PKCS12", "keystore-missing.p12", "qwerty"); + KeyStoreValidation validation = new KeyStoreValidation(secrets); + + RuntimeException exception = assertThrows(RuntimeException.class, validation::perform); + assertTrue(exception.getMessage().startsWith("Failed startup validation")); + assertTrue(exception.getCause() instanceof RuntimeException); + } + + @Test + public void testMalformedKeyStore() + { + SecretsProvider secrets = TestSecretsProvider.forKeyStore("PKCS12", "keystore-malformed.p12", "qwerty"); + KeyStoreValidation validation = new KeyStoreValidation(secrets); + + RuntimeException exception = assertThrows(RuntimeException.class, validation::perform); + assertTrue(exception.getMessage().startsWith("Failed startup validation")); + assertTrue(exception.getCause() instanceof RuntimeException); + } + + @Test + public void testEmptyKeyStore() + { + SecretsProvider secrets = TestSecretsProvider.forKeyStore("PKCS12", "keystore-empty.p12", "qwerty"); + KeyStoreValidation validation = new KeyStoreValidation(secrets); + + RuntimeException exception = assertThrows(RuntimeException.class, validation::perform); + assertTrue(exception.getMessage().startsWith("Failed startup validation")); + assertTrue(exception.getCause() instanceof RuntimeException); + } + + @Test + public void testInvalidKeyStore() + { + SecretsProvider secrets = TestSecretsProvider.forKeyStore("PKCS12", "keystore-secret.p12", "qwerty"); + KeyStoreValidation validation = new KeyStoreValidation(secrets); + + RuntimeException exception = assertThrows(RuntimeException.class, validation::perform); + assertTrue(exception.getMessage().startsWith("Failed startup validation")); + assertTrue(exception.getCause() instanceof RuntimeException); + } + + @Test + public void testValidKeyStore() + { + SecretsProvider secrets = TestSecretsProvider.forKeyStore("PKCS12", "keystore-private.p12", "qwerty"); + KeyStoreValidation validation = new KeyStoreValidation(secrets); + + assertDoesNotThrow(validation::perform); + } +} diff --git a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/validation/StartupValidatorTests.java b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/validation/StartupValidatorTests.java new file mode 100644 index 0000000..07cdd81 --- /dev/null +++ b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/validation/StartupValidatorTests.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.cassandra.spark.validation; + +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +/** + * Unit tests that cover basic functioning of the startup validation logic + */ +public class StartupValidatorTests +{ + @BeforeEach + public void beforeEach() + { + StartupValidator.instance().reset(); + } + + @Test + public void testWithoutValidations() + { + assertDoesNotThrow(StartupValidator.instance()::perform); + } + + @Test + public void testSucceedingValidations() + { + StartupValidator.instance().register(TestValidation.succeeding()); + StartupValidator.instance().register(TestValidation.succeeding()); + StartupValidator.instance().register(TestValidation.succeeding()); + + assertDoesNotThrow(StartupValidator.instance()::perform); + } + + @Test + public void testFailingValidations() + { + StartupValidator.instance().register(TestValidation.succeeding()); + StartupValidator.instance().register(TestValidation.failing()); + StartupValidator.instance().register(TestValidation.succeeding()); + + RuntimeException exception = assertThrows(RuntimeException.class, StartupValidator.instance()::perform); + assertTrue(exception.getMessage().startsWith("Failed startup validation")); + assertTrue(exception.getCause() instanceof RuntimeException); + } + + @AfterAll + public static void afterAll() + { + StartupValidator.instance().reset(); + } +} diff --git a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/validation/TestValidation.java b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/validation/TestValidation.java new file mode 100644 index 0000000..fef0bf2 --- /dev/null +++ b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/validation/TestValidation.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.cassandra.spark.validation; + +/** + * A test implementation of {@link StartupValidation} that can succeed or fail as needed + */ +public final class TestValidation implements StartupValidation +{ + private final boolean succeed; + + public static TestValidation failing() + { + return new TestValidation(false); + } + + public static TestValidation succeeding() + { + return new TestValidation(true); + } + + private TestValidation(boolean succeed) + { + this.succeed = succeed; + } + + @Override + public void validate() + { + if (!succeed) + { + throw new RuntimeException("Failing test validation"); + } + } +} diff --git a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/validation/TrustStoreValidationTests.java b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/validation/TrustStoreValidationTests.java new file mode 100644 index 0000000..9176030 --- /dev/null +++ b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/validation/TrustStoreValidationTests.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.cassandra.spark.validation; + +import org.junit.jupiter.api.Test; + +import org.apache.cassandra.secrets.SecretsProvider; +import org.apache.cassandra.secrets.TestSecretsProvider; + +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +/** + * Unit tests that cover startup validation of a TrustStore + */ +public class TrustStoreValidationTests +{ + @Test + public void testUnconfiguredTrustStore() + { + SecretsProvider secrets = TestSecretsProvider.notConfigured(); + TrustStoreValidation validation = new TrustStoreValidation(secrets); + + assertDoesNotThrow(validation::perform); // TrustStore is optional + } + + @Test + public void testMissingTrustStore() + { + SecretsProvider secrets = TestSecretsProvider.forTrustStore("PKCS12", "keystore-missing.p12", "qwerty"); + TrustStoreValidation validation = new TrustStoreValidation(secrets); + + RuntimeException exception = assertThrows(RuntimeException.class, validation::perform); + assertTrue(exception.getMessage().startsWith("Failed startup validation")); + assertTrue(exception.getCause() instanceof RuntimeException); + } + + @Test + public void testMalformedTrustStore() + { + SecretsProvider secrets = TestSecretsProvider.forTrustStore("PKCS12", "keystore-malformed.p12", "qwerty"); + TrustStoreValidation validation = new TrustStoreValidation(secrets); + + RuntimeException exception = assertThrows(RuntimeException.class, validation::perform); + assertTrue(exception.getMessage().startsWith("Failed startup validation")); + assertTrue(exception.getCause() instanceof RuntimeException); + } + + @Test + public void testEmptyTrustStore() + { + SecretsProvider secrets = TestSecretsProvider.forTrustStore("PKCS12", "keystore-empty.p12", "qwerty"); + TrustStoreValidation validation = new TrustStoreValidation(secrets); + + RuntimeException exception = assertThrows(RuntimeException.class, validation::perform); + assertTrue(exception.getMessage().startsWith("Failed startup validation")); + assertTrue(exception.getCause() instanceof RuntimeException); + } + + @Test + public void testInvalidTrustStore() + { + SecretsProvider secrets = TestSecretsProvider.forTrustStore("PKCS12", "keystore-secret.p12", "qwerty"); + TrustStoreValidation validation = new TrustStoreValidation(secrets); + + RuntimeException exception = assertThrows(RuntimeException.class, validation::perform); + assertTrue(exception.getMessage().startsWith("Failed startup validation")); + assertTrue(exception.getCause() instanceof RuntimeException); + } + + @Test + public void testValidTrustStore() + { + SecretsProvider secrets = TestSecretsProvider.forTrustStore("PKCS12", "keystore-certificate.p12", "qwerty"); + TrustStoreValidation validation = new TrustStoreValidation(secrets); + + assertDoesNotThrow(validation::perform); + } +} diff --git a/cassandra-analytics-core/src/test/resources/validation/keystore-certificate.p12 b/cassandra-analytics-core/src/test/resources/validation/keystore-certificate.p12 new file mode 100644 index 0000000..3e1dfe0 Binary files /dev/null and b/cassandra-analytics-core/src/test/resources/validation/keystore-certificate.p12 differ diff --git a/cassandra-analytics-core/src/test/resources/validation/keystore-empty.p12 b/cassandra-analytics-core/src/test/resources/validation/keystore-empty.p12 new file mode 100644 index 0000000..8e86e8f Binary files /dev/null and b/cassandra-analytics-core/src/test/resources/validation/keystore-empty.p12 differ diff --git a/cassandra-analytics-core/src/test/resources/validation/keystore-malformed.p12 b/cassandra-analytics-core/src/test/resources/validation/keystore-malformed.p12 new file mode 100644 index 0000000..f228909 --- /dev/null +++ b/cassandra-analytics-core/src/test/resources/validation/keystore-malformed.p12 @@ -0,0 +1 @@ +qwerty \ No newline at end of file diff --git a/cassandra-analytics-core/src/test/resources/validation/keystore-private.p12 b/cassandra-analytics-core/src/test/resources/validation/keystore-private.p12 new file mode 100644 index 0000000..f6e226d Binary files /dev/null and b/cassandra-analytics-core/src/test/resources/validation/keystore-private.p12 differ diff --git a/cassandra-analytics-core/src/test/resources/validation/keystore-secret.p12 b/cassandra-analytics-core/src/test/resources/validation/keystore-secret.p12 new file mode 100644 index 0000000..8c30c5b Binary files /dev/null and b/cassandra-analytics-core/src/test/resources/validation/keystore-secret.p12 differ diff --git a/cassandra-bridge/src/main/java/org/apache/cassandra/spark/utils/Throwing.java b/cassandra-bridge/src/main/java/org/apache/cassandra/spark/utils/Throwing.java index f4a5e4e..83d25dc 100644 --- a/cassandra-bridge/src/main/java/org/apache/cassandra/spark/utils/Throwing.java +++ b/cassandra-bridge/src/main/java/org/apache/cassandra/spark/utils/Throwing.java @@ -26,6 +26,26 @@ public final class Throwing throw new IllegalStateException(getClass() + " is static utility class and shall not be instantiated"); } + @FunctionalInterface + public interface Supplier<T> + { + T get() throws Exception; + } + + public static <T> java.util.function.Supplier<T> supplier(Supplier<T> supplier) + { + return () -> { + try + { + return supplier.get(); + } + catch (Exception exception) + { + throw new RuntimeException(exception); + } + }; + } + @FunctionalInterface public interface Consumer<T> { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org