This is an automated email from the ASF dual-hosted git repository.

ycai pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-analytics.git


The following commit(s) were added to refs/heads/trunk by this push:
     new f24951a  CASSANDRA-18810: Cassandra Analytics Start-Up Validation
f24951a is described below

commit f24951ab6ea2b1e9af4013b030675c70d31adb90
Author: Yuriy Semchyshyn <yu...@semchyshyn.com>
AuthorDate: Mon Aug 14 14:09:12 2023 -0500

    CASSANDRA-18810: Cassandra Analytics Start-Up Validation
    
    Patch by Yuriy Semchyshyn; Reviewed by Dinesh Joshi, Francisco Guerrero, 
Yifan Cai for CASSANDRA-18810
---
 CHANGES.txt                                        |   1 +
 build.gradle                                       |   1 +
 .../java/org/apache/cassandra/clients/Sidecar.java |  36 ++++--
 .../cassandra/spark/bulkwriter/BulkSparkConf.java  |  18 ++-
 .../bulkwriter/CassandraBulkWriterContext.java     |  10 +-
 .../spark/bulkwriter/CassandraClusterInfo.java     |   8 ++
 .../spark/bulkwriter/CassandraContext.java         |  16 ++-
 .../cassandra/spark/bulkwriter/ClusterInfo.java    |   3 +-
 .../cassandra/spark/bulkwriter/RecordWriter.java   |   2 +
 .../cassandra/spark/data/CassandraDataLayer.java   |  20 ++-
 .../spark/data/CassandraDataSourceHelper.java      |   3 +
 .../spark/validation/CassandraValidation.java      |  58 +++++++++
 .../spark/validation/KeyStoreValidation.java       |  94 ++++++++++++++
 .../spark/validation/SidecarValidation.java        |  58 +++++++++
 .../cassandra/spark/validation/SslValidation.java  |  48 +++++++
 .../spark/validation/StartupValidatable.java       |  33 +++++
 .../spark/validation/StartupValidation.java        |  49 +++++++
 .../spark/validation/StartupValidator.java         |  79 ++++++++++++
 .../spark/validation/TrustStoreValidation.java     |  93 ++++++++++++++
 .../cassandra/secrets/TestSecretsProvider.java     | 141 +++++++++++++++++++++
 .../spark/bulkwriter/MockBulkWriterContext.java    |   9 ++
 .../spark/validation/KeyStoreValidationTests.java  |  99 +++++++++++++++
 .../spark/validation/StartupValidatorTests.java    |  74 +++++++++++
 .../cassandra/spark/validation/TestValidation.java |  52 ++++++++
 .../validation/TrustStoreValidationTests.java      |  97 ++++++++++++++
 .../resources/validation/keystore-certificate.p12  | Bin 0 -> 1046 bytes
 .../test/resources/validation/keystore-empty.p12   | Bin 0 -> 103 bytes
 .../resources/validation/keystore-malformed.p12    |   1 +
 .../test/resources/validation/keystore-private.p12 | Bin 0 -> 2520 bytes
 .../test/resources/validation/keystore-secret.p12  | Bin 0 -> 405 bytes
 .../org/apache/cassandra/spark/utils/Throwing.java |  20 +++
 31 files changed, 1102 insertions(+), 21 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index 37b126d..a26e455 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 1.0.0
+ * Cassandra Analytics Start-Up Validation (CASSANDRA-18810)
  * Expose per partition on-disk usage through new DataFrame that utilizes the 
Index.db SSTable file components (CASSANDRA-18683)
  * Fix bulk writes with Buffered RowBufferMode (CASSANDRA-18692)
  * Minor Refactoring to Improve Code Reusability (CASSANDRA-18684)
diff --git a/build.gradle b/build.gradle
index 837c6fe..eb689a4 100644
--- a/build.gradle
+++ b/build.gradle
@@ -70,6 +70,7 @@ tasks.register('buildIgnoreRatList', Exec) {
   description 'Builds a list of ignored files for the rat task from the 
unversioned git files'
   commandLine 'bash', '-c', 'git clean --force -d --dry-run -x | cut -c 14-'
   doFirst {
+    Files.createDirectories(file("${buildDir}").toPath())
     standardOutput new FileOutputStream("${buildDir}/.rat-excludes.txt")
   }
   // allows task to fail when git/cut commands are unavailable or fail
diff --git 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/clients/Sidecar.java
 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/clients/Sidecar.java
index 0871ddd..f1f7c1f 100644
--- 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/clients/Sidecar.java
+++ 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/clients/Sidecar.java
@@ -37,7 +37,8 @@ import o.a.c.sidecar.client.shaded.io.vertx.core.VertxOptions;
 import org.apache.cassandra.secrets.SecretsProvider;
 import org.apache.cassandra.sidecar.client.HttpClientConfig;
 import org.apache.cassandra.sidecar.client.SidecarClient;
-import org.apache.cassandra.sidecar.client.SidecarConfig;
+import org.apache.cassandra.sidecar.client.SidecarClientConfig;
+import org.apache.cassandra.sidecar.client.SidecarClientConfigImpl;
 import org.apache.cassandra.sidecar.client.SidecarInstance;
 import org.apache.cassandra.sidecar.client.SidecarInstancesProvider;
 import org.apache.cassandra.sidecar.client.VertxHttpClient;
@@ -49,6 +50,10 @@ import org.apache.cassandra.spark.bulkwriter.BulkSparkConf;
 import org.apache.cassandra.spark.data.FileType;
 import org.apache.cassandra.spark.utils.BuildInfo;
 import org.apache.cassandra.spark.utils.MapUtils;
+import org.apache.cassandra.spark.validation.KeyStoreValidation;
+import org.apache.cassandra.spark.validation.SslValidation;
+import org.apache.cassandra.spark.validation.StartupValidator;
+import org.apache.cassandra.spark.validation.TrustStoreValidation;
 
 import static 
org.apache.cassandra.spark.utils.Properties.DEFAULT_CHUNK_BUFFER_OVERRIDE;
 import static 
org.apache.cassandra.spark.utils.Properties.DEFAULT_CHUNK_BUFFER_SIZE;
@@ -98,15 +103,18 @@ public final class Sidecar
                       
.trustStoreInputStream(secretsProvider.trustStoreInputStream())
                       
.trustStorePassword(String.valueOf(secretsProvider.trustStorePassword()))
                       .trustStoreType(secretsProvider.trustStoreType());
+
+            StartupValidator.instance().register(new 
KeyStoreValidation(secretsProvider));
+            StartupValidator.instance().register(new 
TrustStoreValidation(secretsProvider));
         }
 
         HttpClientConfig httpClientConfig = builder.build();
 
-        SidecarConfig sidecarConfig = new SidecarConfig.Builder()
-                                      .maxRetries(config.maxRetries())
-                                      .retryDelayMillis(config.millisToSleep())
-                                      
.maxRetryDelayMillis(config.maxMillisToSleep())
-                                      .build();
+        SidecarClientConfig sidecarConfig = SidecarClientConfigImpl.builder()
+                                            .maxRetries(config.maxRetries())
+                                            
.retryDelayMillis(config.millisToSleep())
+                                            
.maxRetryDelayMillis(config.maxMillisToSleep())
+                                            .build();
 
         return buildClient(sidecarConfig, vertx, httpClientConfig, 
sidecarInstancesProvider);
     }
@@ -129,16 +137,20 @@ public final class Sidecar
                                             
.ssl(conf.hasKeystoreAndKeystorePassword())
                                             .build();
 
-        SidecarConfig sidecarConfig = new SidecarConfig.Builder()
-                                      .maxRetries(5)
-                                      .retryDelayMillis(200)
-                                      .maxRetryDelayMillis(500)
-                                      .build();
+        StartupValidator.instance().register(new SslValidation(conf));
+        StartupValidator.instance().register(new KeyStoreValidation(conf));
+        StartupValidator.instance().register(new TrustStoreValidation(conf));
+
+        SidecarClientConfig sidecarConfig = SidecarClientConfigImpl.builder()
+                                            .maxRetries(5)
+                                            .retryDelayMillis(200)
+                                            .maxRetryDelayMillis(500)
+                                            .build();
 
         return buildClient(sidecarConfig, vertx, httpClientConfig, 
sidecarInstancesProvider);
     }
 
-    public static SidecarClient buildClient(SidecarConfig sidecarConfig,
+    public static SidecarClient buildClient(SidecarClientConfig sidecarConfig,
                                             Vertx vertx,
                                             HttpClientConfig httpClientConfig,
                                             SidecarInstancesProvider 
clusterConfig)
diff --git 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BulkSparkConf.java
 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BulkSparkConf.java
index 798f259..298d204 100644
--- 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BulkSparkConf.java
+++ 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BulkSparkConf.java
@@ -200,10 +200,17 @@ public class BulkSparkConf implements Serializable
         }
     }
 
-    /*
-     * This method Will throw if the SSL configuration is incorrect (PATH 
provided w/o password, for example)
+    /**
+     * Validates the SSL configuration present and throws an exception if it 
is incorrect
+     *
+     * @throws NullPointerException if the mTLS KeyStore password is provided,
+     *                              but both file path and base64 string are 
missing;
+     *                              or if either mTLS TrustStore file path or 
base64 string is provided,
+     *                              but the password is missing
+     * @throws IllegalArgumentException if the mTLS TrustStore password is 
provided,
+     *                                  but both file path and base64 string 
are missing
      */
-    protected void validateSslConfiguration()
+    public void validateSslConfiguration()
     {
         if (getKeyStorePassword() != null)
         {
@@ -479,4 +486,9 @@ public class BulkSparkConf implements Serializable
     {
         return keystorePassword != null && (keystorePath != null || 
keystoreBase64Encoded != null);
     }
+
+    public boolean hasTruststoreAndTruststorePassword()
+    {
+        return truststorePassword != null && (truststorePath != null || 
truststoreBase64Encoded != null);
+    }
 }
diff --git 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraBulkWriterContext.java
 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraBulkWriterContext.java
index 3789b63..149df99 100644
--- 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraBulkWriterContext.java
+++ 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraBulkWriterContext.java
@@ -57,11 +57,12 @@ public class CassandraBulkWriterContext implements 
BulkWriterContext, KryoSerial
     private final SchemaInfo schemaInfo;
 
     private CassandraBulkWriterContext(@NotNull BulkSparkConf conf,
+                                       @NotNull CassandraClusterInfo 
clusterInfo,
                                        @NotNull StructType dfSchema,
                                        SparkContext sparkContext)
     {
         this.conf = conf;
-        clusterInfo = new CassandraClusterInfo(conf);
+        this.clusterInfo = clusterInfo;
         CassandraRing<RingInstance> ring = clusterInfo.getRing(true);
         jobInfo = new CassandraJobInfo(conf,
                                        new TokenPartitioner(ring, 
conf.numberSplits, sparkContext.defaultParallelism(), conf.getCores()));
@@ -93,10 +94,15 @@ public class CassandraBulkWriterContext implements 
BulkWriterContext, KryoSerial
         Preconditions.checkNotNull(dfSchema);
 
         BulkSparkConf conf = new BulkSparkConf(sparkContext.getConf(), 
strOptions);
-        CassandraBulkWriterContext bulkWriterContext = new 
CassandraBulkWriterContext(conf, dfSchema, sparkContext);
+        CassandraClusterInfo clusterInfo = new CassandraClusterInfo(conf);
+
+        clusterInfo.startupValidate();
+
+        CassandraBulkWriterContext bulkWriterContext = new 
CassandraBulkWriterContext(conf, clusterInfo, dfSchema, sparkContext);
         
ShutdownHookManager.addShutdownHook(org.apache.spark.util.ShutdownHookManager.TEMP_DIR_SHUTDOWN_PRIORITY(),
                                             
ScalaFunctions.wrapLambda(bulkWriterContext::shutdown));
         bulkWriterContext.dialHome(sparkContext.version());
+
         return bulkWriterContext;
     }
 
diff --git 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraClusterInfo.java
 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraClusterInfo.java
index e6f145e..94663d3 100644
--- 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraClusterInfo.java
+++ 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraClusterInfo.java
@@ -534,4 +534,12 @@ public class CassandraClusterInfo implements ClusterInfo, 
Closeable
         }
         return false;
     }
+
+    // Startup Validation
+
+    @Override
+    public void startupValidate()
+    {
+        getCassandraContext().startupValidate();
+    }
 }
diff --git 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraContext.java
 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraContext.java
index e5b6853..5a01845 100644
--- 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraContext.java
+++ 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraContext.java
@@ -30,9 +30,13 @@ import org.apache.cassandra.clients.Sidecar;
 import org.apache.cassandra.sidecar.client.SidecarClient;
 import org.apache.cassandra.sidecar.client.SidecarInstance;
 import org.apache.cassandra.sidecar.client.SimpleSidecarInstancesProvider;
+import org.apache.cassandra.spark.validation.CassandraValidation;
+import org.apache.cassandra.spark.validation.SidecarValidation;
+import org.apache.cassandra.spark.validation.StartupValidatable;
+import org.apache.cassandra.spark.validation.StartupValidator;
 import org.jetbrains.annotations.NotNull;
 
-public class CassandraContext implements Closeable
+public class CassandraContext implements StartupValidatable, Closeable
 {
     private static final Logger LOGGER = 
LoggerFactory.getLogger(CassandraContext.class);
     @NotNull
@@ -96,4 +100,14 @@ public class CassandraContext implements Closeable
     {
         return conf;
     }
+
+    // Startup Validation
+
+    @Override
+    public void startupValidate()
+    {
+        StartupValidator.instance().register(new 
SidecarValidation(sidecarClient));
+        StartupValidator.instance().register(new 
CassandraValidation(sidecarClient));
+        StartupValidator.instance().perform();
+    }
 }
diff --git 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/ClusterInfo.java
 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/ClusterInfo.java
index 5b65a55..3d67093 100644
--- 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/ClusterInfo.java
+++ 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/ClusterInfo.java
@@ -27,8 +27,9 @@ import 
org.apache.cassandra.sidecar.common.data.TimeSkewResponse;
 import org.apache.cassandra.spark.bulkwriter.token.CassandraRing;
 import org.apache.cassandra.spark.common.client.InstanceState;
 import org.apache.cassandra.spark.data.partitioner.Partitioner;
+import org.apache.cassandra.spark.validation.StartupValidatable;
 
-public interface ClusterInfo extends Serializable
+public interface ClusterInfo extends StartupValidatable, Serializable
 {
     void refreshClusterInfo();
 
diff --git 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/RecordWriter.java
 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/RecordWriter.java
index 426cb8a..b8bc803 100644
--- 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/RecordWriter.java
+++ 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/RecordWriter.java
@@ -77,6 +77,8 @@ public class RecordWriter implements Serializable
         this.columnNames = columnNames;
         this.taskContextSupplier = taskContextSupplier;
         this.tableWriterSupplier = tableWriterSupplier;
+
+        writerContext.cluster().startupValidate();
     }
 
     private Range<BigInteger> getTokenRange(TaskContext taskContext)
diff --git 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/CassandraDataLayer.java
 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/CassandraDataLayer.java
index 39c69c1..a95b6aa 100644
--- 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/CassandraDataLayer.java
+++ 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/CassandraDataLayer.java
@@ -96,6 +96,10 @@ import org.apache.cassandra.spark.utils.CqlUtils;
 import org.apache.cassandra.spark.utils.MapUtils;
 import org.apache.cassandra.spark.utils.ScalaFunctions;
 import org.apache.cassandra.spark.utils.ThrowableUtils;
+import org.apache.cassandra.spark.validation.CassandraValidation;
+import org.apache.cassandra.spark.validation.SidecarValidation;
+import org.apache.cassandra.spark.validation.StartupValidatable;
+import org.apache.cassandra.spark.validation.StartupValidator;
 import org.apache.spark.sql.types.DataType;
 import org.apache.spark.util.ShutdownHookManager;
 import org.jetbrains.annotations.NotNull;
@@ -103,7 +107,7 @@ import org.jetbrains.annotations.Nullable;
 
 import static 
org.apache.cassandra.spark.utils.Properties.NODE_STATUS_NOT_CONSIDERED;
 
-public class CassandraDataLayer extends PartitionedDataLayer implements 
Serializable
+public class CassandraDataLayer extends PartitionedDataLayer implements 
StartupValidatable, Serializable
 {
     private static final long serialVersionUID = -9038926850642710787L;
 
@@ -200,7 +204,8 @@ public class CassandraDataLayer extends 
PartitionedDataLayer implements Serializ
             aliasLastModifiedTimestamp(this.requestedFeatures, 
this.lastModifiedTimestampField);
         }
         this.rfMap = rfMap;
-        initInstanceMap();
+        this.initInstanceMap();
+        this.startupValidate();
     }
 
     public void initialize(@NotNull ClientConfig options)
@@ -639,6 +644,16 @@ public class CassandraDataLayer extends 
PartitionedDataLayer implements Serializ
         return new CassandraRing(partitioner, keyspace, replicationFactor, 
instances);
     }
 
+    // Startup Validation
+
+    @Override
+    public void startupValidate()
+    {
+        StartupValidator.instance().register(new SidecarValidation(sidecar));
+        StartupValidator.instance().register(new CassandraValidation(sidecar));
+        StartupValidator.instance().perform();
+    }
+
     // JDK Serialization
 
     @SuppressWarnings("unchecked")
@@ -685,6 +700,7 @@ public class CassandraDataLayer extends 
PartitionedDataLayer implements Serializ
         }
         this.rfMap = (Map<String, ReplicationFactor>) in.readObject();
         this.initInstanceMap();
+        this.startupValidate();
     }
 
     private void writeObject(ObjectOutputStream out) throws IOException, 
ClassNotFoundException
diff --git 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/CassandraDataSourceHelper.java
 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/CassandraDataSourceHelper.java
index 7f00e0a..08ea508 100644
--- 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/CassandraDataSourceHelper.java
+++ 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/CassandraDataSourceHelper.java
@@ -126,6 +126,9 @@ public final class CassandraDataSourceHelper
                                                               
Sidecar.ClientConfig.create(options),
                                                               
SslConfig.create(options));
         initializeDataLayerFn.accept(dataLayer, config);
+
+        dataLayer.startupValidate();
+
         return dataLayer;
     }
 }
diff --git 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/validation/CassandraValidation.java
 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/validation/CassandraValidation.java
new file mode 100644
index 0000000..148200d
--- /dev/null
+++ 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/validation/CassandraValidation.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.spark.validation;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.cassandra.sidecar.client.SidecarClient;
+import org.apache.cassandra.sidecar.common.data.HealthResponse;
+
+/**
+ * A startup validation that checks the connectivity and health of Cassandra
+ */
+public class CassandraValidation implements StartupValidation
+{
+    private static final int TIMEOUT_SECONDS = 30;
+
+    private final SidecarClient sidecar;
+
+    public CassandraValidation(SidecarClient sidecar)
+    {
+        this.sidecar = sidecar;
+    }
+
+    @Override
+    public void validate()
+    {
+        HealthResponse health;
+        try
+        {
+            health = sidecar.cassandraHealth().get(TIMEOUT_SECONDS, 
TimeUnit.SECONDS);
+        }
+        catch (Throwable throwable)
+        {
+            throw new RuntimeException("Sidecar is unreachable", throwable);
+        }
+        if (!health.isOk())
+        {
+            throw new RuntimeException("Cassandra is not healthy: " + 
health.status());
+        }
+    }
+}
diff --git 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/validation/KeyStoreValidation.java
 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/validation/KeyStoreValidation.java
new file mode 100644
index 0000000..4ee2faa
--- /dev/null
+++ 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/validation/KeyStoreValidation.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.spark.validation;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.security.GeneralSecurityException;
+import java.security.Key;
+import java.security.KeyStore;
+import java.security.PrivateKey;
+import java.util.Enumeration;
+import java.util.function.Supplier;
+
+import org.apache.cassandra.secrets.SecretsProvider;
+import org.apache.cassandra.spark.bulkwriter.BulkSparkConf;
+import org.apache.cassandra.spark.utils.Throwing;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * A startup validation that checks the KeyStore
+ */
+public class KeyStoreValidation implements StartupValidation
+{
+    private final boolean configured;
+    private final String type;
+    private final char[] password;
+    private final Supplier<InputStream> stream;
+
+    public KeyStoreValidation(@NotNull SecretsProvider secrets)
+    {
+        configured = secrets.hasKeyStoreSecrets();
+        type = secrets.keyStoreType();
+        password = secrets.keyStorePassword();
+        stream = Throwing.supplier(() -> secrets.keyStoreInputStream());
+    }
+
+    public KeyStoreValidation(@NotNull BulkSparkConf configuration)
+    {
+        configured = configuration.hasKeystoreAndKeystorePassword();
+        type = configuration.getKeyStoreTypeOrDefault();
+        password = configuration.getKeyStorePassword().toCharArray();
+        stream = () -> configuration.getKeyStore();
+    }
+
+    @Override
+    public void validate()
+    {
+        try
+        {
+            if (!configured)
+            {
+                throw new RuntimeException("KeyStore is not configured");
+            }
+
+            KeyStore keyStore = KeyStore.getInstance(type);
+            keyStore.load(stream.get(), password);
+            if (keyStore.size() == 0)
+            {
+                throw new RuntimeException("KeyStore is empty");
+            }
+
+            for (Enumeration<String> aliases = keyStore.aliases(); 
aliases.hasMoreElements();)
+            {
+                Key key = keyStore.getKey(aliases.nextElement(), password);
+                if (key != null && key instanceof PrivateKey)
+                {
+                    return;  // KeyStore contains a private key
+                }
+            }
+            throw new RuntimeException("KeyStore contains no private keys");
+        }
+        catch (IOException | GeneralSecurityException exception)
+        {
+            throw new RuntimeException("KeyStore is misconfigured", exception);
+        }
+    }
+}
diff --git 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/validation/SidecarValidation.java
 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/validation/SidecarValidation.java
new file mode 100644
index 0000000..fda7b18
--- /dev/null
+++ 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/validation/SidecarValidation.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.spark.validation;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.cassandra.sidecar.client.SidecarClient;
+import org.apache.cassandra.sidecar.common.data.HealthResponse;
+
+/**
+ * A startup validation that checks the connectivity and health of Sidecar
+ */
+public class SidecarValidation implements StartupValidation
+{
+    private static final int TIMEOUT_SECONDS = 30;
+
+    private final SidecarClient sidecar;
+
+    public SidecarValidation(SidecarClient sidecar)
+    {
+        this.sidecar = sidecar;
+    }
+
+    @Override
+    public void validate()
+    {
+        HealthResponse health;
+        try
+        {
+            health = sidecar.sidecarHealth().get(TIMEOUT_SECONDS, 
TimeUnit.SECONDS);
+        }
+        catch (Throwable throwable)
+        {
+            throw new RuntimeException("Sidecar is unreachable", throwable);
+        }
+        if (!health.isOk())
+        {
+            throw new RuntimeException("Sidecar is unhealthy: " + 
health.status());
+        }
+    }
+}
diff --git 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/validation/SslValidation.java
 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/validation/SslValidation.java
new file mode 100644
index 0000000..52edc13
--- /dev/null
+++ 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/validation/SslValidation.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.spark.validation;
+
+import org.apache.cassandra.spark.bulkwriter.BulkSparkConf;
+
+/**
+ * A startup validation that checks the SSL configuration
+ */
+public class SslValidation implements StartupValidation
+{
+    private final BulkSparkConf configuration;
+
+    public SslValidation(BulkSparkConf configuration)
+    {
+        this.configuration = configuration;
+    }
+
+    @Override
+    public void validate()
+    {
+        try
+        {
+            configuration.validateSslConfiguration();
+        }
+        catch (Throwable throwable)
+        {
+            throw new RuntimeException("SSL is misconfigured", throwable);
+        }
+    }
+}
diff --git 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/validation/StartupValidatable.java
 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/validation/StartupValidatable.java
new file mode 100644
index 0000000..0ad5ab0
--- /dev/null
+++ 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/validation/StartupValidatable.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.spark.validation;
+
+/**
+ * An interface for a class that requires and can perform startup validation 
using {@link StartupValidator}
+ */
+public interface StartupValidatable
+{
+    /**
+     * Performs startup validation using {@link StartupValidator} with 
currently registered {@link StartupValidation}s,
+     * throws a {@link RuntimeException} if any violations are found,
+     * needs to be invoked once per execution before any actual work is started
+     */
+    void startupValidate();
+}
diff --git 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/validation/StartupValidation.java
 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/validation/StartupValidation.java
new file mode 100644
index 0000000..235fb5a
--- /dev/null
+++ 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/validation/StartupValidation.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.spark.validation;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * An interface that has to be implemented by all startup validations
+ */
+@FunctionalInterface
+public interface StartupValidation
+{
+    Logger LOGGER = LoggerFactory.getLogger(StartupValidation.class);
+
+    void validate();
+
+    default void perform()
+    {
+        try
+        {
+            LOGGER.info("Performing startup validation with " + getClass());
+            validate();
+        }
+        catch (Throwable throwable)
+        {
+            String message = "Failed startup validation with " + getClass();
+            LOGGER.error(message, throwable);
+            throw new RuntimeException(message, throwable);
+        }
+    }
+}
diff --git 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/validation/StartupValidator.java
 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/validation/StartupValidator.java
new file mode 100644
index 0000000..f48fba0
--- /dev/null
+++ 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/validation/StartupValidator.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.spark.validation;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A singleton class for performing the startup validation
+ * using a list of necessary {@link StartupValidation} instances
+ */
+public final class StartupValidator
+{
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(StartupValidator.class);
+    private static final StartupValidator INSTANCE = new StartupValidator();
+    private static final String DISABLE = "SKIP_STARTUP_VALIDATIONS";
+
+    private final List<StartupValidation> validations = new ArrayList<>();
+
+    private StartupValidator()
+    {
+    }
+
+    public static StartupValidator instance()
+    {
+        return INSTANCE;
+    }
+
+    public void register(StartupValidation validation)
+    {
+        validations.add(validation);
+    }
+
+    @VisibleForTesting
+    void reset()
+    {
+        validations.clear();
+    }
+
+    public void perform()
+    {
+        if (enabled())
+        {
+            LOGGER.info("Performing startup validations");
+            validations.forEach(StartupValidation::perform);
+            LOGGER.info("Completed startup validations");
+        }
+        else
+        {
+            LOGGER.info("Skipping startup validations");
+        }
+    }
+
+    public boolean enabled()
+    {
+        return System.getProperty(DISABLE) == null;
+    }
+}
diff --git 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/validation/TrustStoreValidation.java
 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/validation/TrustStoreValidation.java
new file mode 100644
index 0000000..ab4fec0
--- /dev/null
+++ 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/validation/TrustStoreValidation.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.spark.validation;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.security.GeneralSecurityException;
+import java.security.KeyStore;
+import java.security.cert.Certificate;
+import java.util.Enumeration;
+import java.util.function.Supplier;
+
+import org.apache.cassandra.secrets.SecretsProvider;
+import org.apache.cassandra.spark.bulkwriter.BulkSparkConf;
+import org.apache.cassandra.spark.utils.Throwing;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * A startup validation that checks the TrustStore
+ */
+public class TrustStoreValidation implements StartupValidation
+{
+    private final boolean configured;
+    private final String type;
+    private final char[] password;
+    private final Supplier<InputStream> stream;
+
+    public TrustStoreValidation(@NotNull SecretsProvider secrets)
+    {
+        configured = secrets.hasTrustStoreSecrets();
+        type = secrets.trustStoreType();
+        password = secrets.trustStorePassword();
+        stream = Throwing.supplier(() -> secrets.trustStoreInputStream());
+    }
+
+    public TrustStoreValidation(@NotNull BulkSparkConf configuration)
+    {
+        configured = configuration.hasTruststoreAndTruststorePassword();
+        type = configuration.getTrustStoreTypeOrDefault();
+        password = 
configuration.getTrustStorePasswordOrDefault().toCharArray();
+        stream = () -> configuration.getTrustStore();
+    }
+
+    @Override
+    public void validate()
+    {
+        try
+        {
+            if (!configured)
+            {
+                return;  // TrustStore is optional
+            }
+
+            KeyStore trustStore = KeyStore.getInstance(type);
+            trustStore.load(stream.get(), password);
+            if (trustStore.size() == 0)
+            {
+                throw new RuntimeException("TrustStore is empty");
+            }
+
+            for (Enumeration<String> aliases = trustStore.aliases(); 
aliases.hasMoreElements();)
+            {
+                Certificate certificate = 
trustStore.getCertificate(aliases.nextElement());
+                if (certificate != null)
+                {
+                    return;  // TrustStore contains a certificate
+                }
+            }
+            throw new RuntimeException("TrustStore contains no certificates");
+        }
+        catch (IOException | GeneralSecurityException exception)
+        {
+            throw new RuntimeException("TrustStore is misconfigured", 
exception);
+        }
+    }
+}
diff --git 
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/secrets/TestSecretsProvider.java
 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/secrets/TestSecretsProvider.java
new file mode 100644
index 0000000..ce1a104
--- /dev/null
+++ 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/secrets/TestSecretsProvider.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.secrets;
+
+import java.io.InputStream;
+import java.nio.file.Path;
+import java.util.Map;
+
+/**
+ * A test implementation of {@link SecretsProvider} that reads 
KeyStore/TrustStore from resources
+ */
+public final class TestSecretsProvider implements SecretsProvider
+{
+    private enum Kind
+    {
+        None,
+        KeyStore,
+        TrustStore
+    }
+
+    private final Kind kind;
+    private final String type;
+    private final String resource;
+    private final String password;
+
+    public static SecretsProvider notConfigured()
+    {
+        return new TestSecretsProvider(Kind.None, null, null, null);
+    }
+
+    public static SecretsProvider forKeyStore(String type, String resource, 
String password)
+    {
+        return new TestSecretsProvider(Kind.KeyStore, type, resource, 
password);
+    }
+
+    public static SecretsProvider forTrustStore(String type, String resource, 
String password)
+    {
+        return new TestSecretsProvider(Kind.TrustStore, type, resource, 
password);
+    }
+
+    private TestSecretsProvider(Kind kind, String type, String resource, 
String password)
+    {
+        this.kind = kind;
+        this.type = type;
+        this.resource = "/tests/validation/" + resource;
+        this.password = password;
+    }
+
+    // KeyStore
+
+    @Override
+    public boolean hasKeyStoreSecrets()
+    {
+        return kind == Kind.KeyStore;
+    }
+
+    @Override
+    public String keyStoreType()
+    {
+        return kind == Kind.KeyStore ? type : null;
+    }
+
+    @Override
+    public InputStream keyStoreInputStream()
+    {
+        return kind == Kind.KeyStore ? 
getClass().getResourceAsStream(resource) : null;
+    }
+
+    @Override
+    public char[] keyStorePassword()
+    {
+        return kind == Kind.KeyStore ? password.toCharArray() : null;
+    }
+
+    // TrustStore
+
+    @Override
+    public boolean hasTrustStoreSecrets()
+    {
+        return kind == Kind.TrustStore;
+    }
+
+    @Override
+    public String trustStoreType()
+    {
+        return kind == Kind.TrustStore ? type : null;
+    }
+
+    @Override
+    public InputStream trustStoreInputStream()
+    {
+        return kind == Kind.TrustStore ? 
getClass().getResourceAsStream(resource) : null;
+    }
+
+    @Override
+    public char[] trustStorePassword()
+    {
+        return kind == Kind.TrustStore ? password.toCharArray() : null;
+    }
+
+    // Miscellaneous
+
+    @Override
+    public void initialize(Map<String, String> options)
+    {
+    }
+
+    @Override
+    public void validateMutualTLS()
+    {
+    }
+
+    @Override
+    public String secretByName(String secretName)
+    {
+        return null;
+    }
+
+    @Override
+    public Path secretsPath()
+    {
+        return null;
+    }
+}
diff --git 
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/MockBulkWriterContext.java
 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/MockBulkWriterContext.java
index b6d3ff5..33e0807 100644
--- 
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/MockBulkWriterContext.java
+++ 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/MockBulkWriterContext.java
@@ -50,6 +50,7 @@ import org.apache.cassandra.spark.common.schema.ColumnType;
 import org.apache.cassandra.spark.common.schema.ColumnTypes;
 import org.apache.cassandra.spark.data.CqlField;
 import org.apache.cassandra.spark.data.partitioner.Partitioner;
+import org.apache.cassandra.spark.validation.StartupValidator;
 import org.apache.spark.sql.types.DataTypes;
 import org.apache.spark.sql.types.StructType;
 import org.jetbrains.annotations.NotNull;
@@ -415,4 +416,12 @@ public class MockBulkWriterContext implements 
BulkWriterContext, ClusterInfo, Jo
     {
         return "keyspace.table";
     }
+
+    // Startup Validation
+
+    @Override
+    public void startupValidate()
+    {
+        StartupValidator.instance().perform();
+    }
 }
diff --git 
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/validation/KeyStoreValidationTests.java
 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/validation/KeyStoreValidationTests.java
new file mode 100644
index 0000000..2dd7b33
--- /dev/null
+++ 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/validation/KeyStoreValidationTests.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.spark.validation;
+
+import org.junit.jupiter.api.Test;
+
+import org.apache.cassandra.secrets.SecretsProvider;
+import org.apache.cassandra.secrets.TestSecretsProvider;
+
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Unit tests that cover startup validation of a KeyStore
+ */
+public class KeyStoreValidationTests
+{
+    @Test
+    public void testUnconfiguredKeyStore()
+    {
+        SecretsProvider secrets = TestSecretsProvider.notConfigured();
+        KeyStoreValidation validation = new KeyStoreValidation(secrets);
+
+        RuntimeException exception = assertThrows(RuntimeException.class, 
validation::perform);
+        assertTrue(exception.getMessage().startsWith("Failed startup 
validation"));
+        assertTrue(exception.getCause() instanceof RuntimeException);
+    }
+
+    @Test
+    public void testMissingKeyStore()
+    {
+        SecretsProvider secrets = TestSecretsProvider.forKeyStore("PKCS12", 
"keystore-missing.p12", "qwerty");
+        KeyStoreValidation validation = new KeyStoreValidation(secrets);
+
+        RuntimeException exception = assertThrows(RuntimeException.class, 
validation::perform);
+        assertTrue(exception.getMessage().startsWith("Failed startup 
validation"));
+        assertTrue(exception.getCause() instanceof RuntimeException);
+    }
+
+    @Test
+    public void testMalformedKeyStore()
+    {
+        SecretsProvider secrets = TestSecretsProvider.forKeyStore("PKCS12", 
"keystore-malformed.p12", "qwerty");
+        KeyStoreValidation validation = new KeyStoreValidation(secrets);
+
+        RuntimeException exception = assertThrows(RuntimeException.class, 
validation::perform);
+        assertTrue(exception.getMessage().startsWith("Failed startup 
validation"));
+        assertTrue(exception.getCause() instanceof RuntimeException);
+    }
+
+    @Test
+    public void testEmptyKeyStore()
+    {
+        SecretsProvider secrets = TestSecretsProvider.forKeyStore("PKCS12", 
"keystore-empty.p12", "qwerty");
+        KeyStoreValidation validation = new KeyStoreValidation(secrets);
+
+        RuntimeException exception = assertThrows(RuntimeException.class, 
validation::perform);
+        assertTrue(exception.getMessage().startsWith("Failed startup 
validation"));
+        assertTrue(exception.getCause() instanceof RuntimeException);
+    }
+
+    @Test
+    public void testInvalidKeyStore()
+    {
+        SecretsProvider secrets = TestSecretsProvider.forKeyStore("PKCS12", 
"keystore-secret.p12", "qwerty");
+        KeyStoreValidation validation = new KeyStoreValidation(secrets);
+
+        RuntimeException exception = assertThrows(RuntimeException.class, 
validation::perform);
+        assertTrue(exception.getMessage().startsWith("Failed startup 
validation"));
+        assertTrue(exception.getCause() instanceof RuntimeException);
+    }
+
+    @Test
+    public void testValidKeyStore()
+    {
+        SecretsProvider secrets = TestSecretsProvider.forKeyStore("PKCS12", 
"keystore-private.p12", "qwerty");
+        KeyStoreValidation validation = new KeyStoreValidation(secrets);
+
+        assertDoesNotThrow(validation::perform);
+    }
+}
diff --git 
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/validation/StartupValidatorTests.java
 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/validation/StartupValidatorTests.java
new file mode 100644
index 0000000..07cdd81
--- /dev/null
+++ 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/validation/StartupValidatorTests.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.spark.validation;
+
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Unit tests that cover basic functioning of the startup validation logic
+ */
+public class StartupValidatorTests
+{
+    @BeforeEach
+    public void beforeEach()
+    {
+        StartupValidator.instance().reset();
+    }
+
+    @Test
+    public void testWithoutValidations()
+    {
+        assertDoesNotThrow(StartupValidator.instance()::perform);
+    }
+
+    @Test
+    public void testSucceedingValidations()
+    {
+        StartupValidator.instance().register(TestValidation.succeeding());
+        StartupValidator.instance().register(TestValidation.succeeding());
+        StartupValidator.instance().register(TestValidation.succeeding());
+
+        assertDoesNotThrow(StartupValidator.instance()::perform);
+    }
+
+    @Test
+    public void testFailingValidations()
+    {
+        StartupValidator.instance().register(TestValidation.succeeding());
+        StartupValidator.instance().register(TestValidation.failing());
+        StartupValidator.instance().register(TestValidation.succeeding());
+
+        RuntimeException exception = assertThrows(RuntimeException.class, 
StartupValidator.instance()::perform);
+        assertTrue(exception.getMessage().startsWith("Failed startup 
validation"));
+        assertTrue(exception.getCause() instanceof RuntimeException);
+    }
+
+    @AfterAll
+    public static void afterAll()
+    {
+        StartupValidator.instance().reset();
+    }
+}
diff --git 
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/validation/TestValidation.java
 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/validation/TestValidation.java
new file mode 100644
index 0000000..fef0bf2
--- /dev/null
+++ 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/validation/TestValidation.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.spark.validation;
+
+/**
+ * A test implementation of {@link StartupValidation} that can succeed or fail 
as needed
+ */
+public final class TestValidation implements StartupValidation
+{
+    private final boolean succeed;
+
+    public static TestValidation failing()
+    {
+        return new TestValidation(false);
+    }
+
+    public static TestValidation succeeding()
+    {
+        return new TestValidation(true);
+    }
+
+    private TestValidation(boolean succeed)
+    {
+        this.succeed = succeed;
+    }
+
+    @Override
+    public void validate()
+    {
+        if (!succeed)
+        {
+            throw new RuntimeException("Failing test validation");
+        }
+    }
+}
diff --git 
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/validation/TrustStoreValidationTests.java
 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/validation/TrustStoreValidationTests.java
new file mode 100644
index 0000000..9176030
--- /dev/null
+++ 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/validation/TrustStoreValidationTests.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.spark.validation;
+
+import org.junit.jupiter.api.Test;
+
+import org.apache.cassandra.secrets.SecretsProvider;
+import org.apache.cassandra.secrets.TestSecretsProvider;
+
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Unit tests that cover startup validation of a TrustStore
+ */
+public class TrustStoreValidationTests
+{
+    @Test
+    public void testUnconfiguredTrustStore()
+    {
+        SecretsProvider secrets = TestSecretsProvider.notConfigured();
+        TrustStoreValidation validation = new TrustStoreValidation(secrets);
+
+        assertDoesNotThrow(validation::perform);  // TrustStore is optional
+    }
+
+    @Test
+    public void testMissingTrustStore()
+    {
+        SecretsProvider secrets = TestSecretsProvider.forTrustStore("PKCS12", 
"keystore-missing.p12", "qwerty");
+        TrustStoreValidation validation = new TrustStoreValidation(secrets);
+
+        RuntimeException exception = assertThrows(RuntimeException.class, 
validation::perform);
+        assertTrue(exception.getMessage().startsWith("Failed startup 
validation"));
+        assertTrue(exception.getCause() instanceof RuntimeException);
+    }
+
+    @Test
+    public void testMalformedTrustStore()
+    {
+        SecretsProvider secrets = TestSecretsProvider.forTrustStore("PKCS12", 
"keystore-malformed.p12", "qwerty");
+        TrustStoreValidation validation = new TrustStoreValidation(secrets);
+
+        RuntimeException exception = assertThrows(RuntimeException.class, 
validation::perform);
+        assertTrue(exception.getMessage().startsWith("Failed startup 
validation"));
+        assertTrue(exception.getCause() instanceof RuntimeException);
+    }
+
+    @Test
+    public void testEmptyTrustStore()
+    {
+        SecretsProvider secrets = TestSecretsProvider.forTrustStore("PKCS12", 
"keystore-empty.p12", "qwerty");
+        TrustStoreValidation validation = new TrustStoreValidation(secrets);
+
+        RuntimeException exception = assertThrows(RuntimeException.class, 
validation::perform);
+        assertTrue(exception.getMessage().startsWith("Failed startup 
validation"));
+        assertTrue(exception.getCause() instanceof RuntimeException);
+    }
+
+    @Test
+    public void testInvalidTrustStore()
+    {
+        SecretsProvider secrets = TestSecretsProvider.forTrustStore("PKCS12", 
"keystore-secret.p12", "qwerty");
+        TrustStoreValidation validation = new TrustStoreValidation(secrets);
+
+        RuntimeException exception = assertThrows(RuntimeException.class, 
validation::perform);
+        assertTrue(exception.getMessage().startsWith("Failed startup 
validation"));
+        assertTrue(exception.getCause() instanceof RuntimeException);
+    }
+
+    @Test
+    public void testValidTrustStore()
+    {
+        SecretsProvider secrets = TestSecretsProvider.forTrustStore("PKCS12", 
"keystore-certificate.p12", "qwerty");
+        TrustStoreValidation validation = new TrustStoreValidation(secrets);
+
+        assertDoesNotThrow(validation::perform);
+    }
+}
diff --git 
a/cassandra-analytics-core/src/test/resources/validation/keystore-certificate.p12
 
b/cassandra-analytics-core/src/test/resources/validation/keystore-certificate.p12
new file mode 100644
index 0000000..3e1dfe0
Binary files /dev/null and 
b/cassandra-analytics-core/src/test/resources/validation/keystore-certificate.p12
 differ
diff --git 
a/cassandra-analytics-core/src/test/resources/validation/keystore-empty.p12 
b/cassandra-analytics-core/src/test/resources/validation/keystore-empty.p12
new file mode 100644
index 0000000..8e86e8f
Binary files /dev/null and 
b/cassandra-analytics-core/src/test/resources/validation/keystore-empty.p12 
differ
diff --git 
a/cassandra-analytics-core/src/test/resources/validation/keystore-malformed.p12 
b/cassandra-analytics-core/src/test/resources/validation/keystore-malformed.p12
new file mode 100644
index 0000000..f228909
--- /dev/null
+++ 
b/cassandra-analytics-core/src/test/resources/validation/keystore-malformed.p12
@@ -0,0 +1 @@
+qwerty
\ No newline at end of file
diff --git 
a/cassandra-analytics-core/src/test/resources/validation/keystore-private.p12 
b/cassandra-analytics-core/src/test/resources/validation/keystore-private.p12
new file mode 100644
index 0000000..f6e226d
Binary files /dev/null and 
b/cassandra-analytics-core/src/test/resources/validation/keystore-private.p12 
differ
diff --git 
a/cassandra-analytics-core/src/test/resources/validation/keystore-secret.p12 
b/cassandra-analytics-core/src/test/resources/validation/keystore-secret.p12
new file mode 100644
index 0000000..8c30c5b
Binary files /dev/null and 
b/cassandra-analytics-core/src/test/resources/validation/keystore-secret.p12 
differ
diff --git 
a/cassandra-bridge/src/main/java/org/apache/cassandra/spark/utils/Throwing.java 
b/cassandra-bridge/src/main/java/org/apache/cassandra/spark/utils/Throwing.java
index f4a5e4e..83d25dc 100644
--- 
a/cassandra-bridge/src/main/java/org/apache/cassandra/spark/utils/Throwing.java
+++ 
b/cassandra-bridge/src/main/java/org/apache/cassandra/spark/utils/Throwing.java
@@ -26,6 +26,26 @@ public final class Throwing
         throw new IllegalStateException(getClass() + " is static utility class 
and shall not be instantiated");
     }
 
+    @FunctionalInterface
+    public interface Supplier<T>
+    {
+        T get() throws Exception;
+    }
+
+    public static <T> java.util.function.Supplier<T> supplier(Supplier<T> 
supplier)
+    {
+        return () -> {
+            try
+            {
+                return supplier.get();
+            }
+            catch (Exception exception)
+            {
+                throw new RuntimeException(exception);
+            }
+        };
+    }
+
     @FunctionalInterface
     public interface Consumer<T>
     {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to