This is an automated email from the ASF dual-hosted git repository.

smiklosovic pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 585f89bb42 Support custom StartupCheck implementations via SPI
585f89bb42 is described below

commit 585f89bb4274b1af471ae43aeef01796d0e503d0
Author: nickbar01234 <[email protected]>
AuthorDate: Sun Jan 11 22:01:38 2026 -0500

    Support custom StartupCheck implementations via SPI
    
    patch by Nick Bar; reviewed by Stefan Miklosovic, Jyothsna Konisa for 
CASSANDRA-21093
    
    Co-authored-by: Stefan Miklosovic <[email protected]>
---
 CHANGES.txt                                        |   1 +
 NEWS.txt                                           |   1 +
 conf/cassandra.yaml                                |   3 +
 conf/cassandra_latest.yaml                         |   3 +
 examples/startup-checks/README.adoc                |  45 ++++
 examples/startup-checks/build.xml                  |  68 ++++++
 .../service/checks/MyCustomStartupCheck.java       |  81 +++++++
 .../org.apache.cassandra.service.StartupCheck      |  16 ++
 src/java/org/apache/cassandra/config/Config.java   |   3 +-
 .../cassandra/config/DatabaseDescriptor.java       |  18 +-
 .../config/StartupChecksConfiguration.java         | 149 ++++++++++++
 .../cassandra/config/StartupChecksOptions.java     |  97 --------
 .../apache/cassandra/service/CassandraDaemon.java  |   4 +-
 .../cassandra/service/DataResurrectionCheck.java   |  62 +++--
 .../service/FileSystemOwnershipCheck.java          |  32 ++-
 .../org/apache/cassandra/service/StartupCheck.java |  40 +++-
 .../apache/cassandra/service/StartupChecks.java    | 254 ++++++++++++++++-----
 .../test/DataResurrectionCheckTest.java            |  35 +--
 .../config/DatabaseDescriptorRefTest.java          |   4 +-
 .../cassandra/config/StartupCheckOptionsTest.java  | 150 ------------
 .../config/StartupChecksConfigurationTest.java     | 228 ++++++++++++++++++
 .../config/YamlConfigurationLoaderTest.java        |   9 +-
 .../AbstractFilesystemOwnershipCheckTest.java      |  13 +-
 .../cassandra/service/StartupChecksTest.java       | 176 +++++++++++++-
 ...ropertiesBasedFileSystemOwnershipCheckTest.java |   7 +-
 .../YamlBasedFileSystemOwnershipCheckTest.java     |  17 +-
 26 files changed, 1118 insertions(+), 398 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index 286b108a8e..e6fa54b38f 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 5.1
+ * Support custom StartupCheck implementations via SPI (CASSANDRA-21093)
  * Make sstableexpiredblockers support human-readable output with SSTable 
sizes (CASSANDRA-20448)
  * Enhance nodetool compactionhistory to report more compaction properities 
(CASSANDRA-20081)
  * Fix initial auto-repairs skipped by too soon check (CASSANDRA-21115)
diff --git a/NEWS.txt b/NEWS.txt
index 724d860ea9..18284b6499 100644
--- a/NEWS.txt
+++ b/NEWS.txt
@@ -136,6 +136,7 @@ New features
       and "length" defined on UTF8 strings. See CASSANDRA-20102 for more 
information.
     - New functions `format_bytes` and `format_time` were added. See 
CASSANDRA-19546.
     - It is possible to use Async-profiler for various profiling scenarios. 
See CASSANDRA-20854.
+    - It is possible to provide custom startup check via Java SPI. See 
CASSANDRA-21093.
 
 Upgrading
 ---------
diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml
index 5f3a935be3..34509ad0c6 100644
--- a/conf/cassandra.yaml
+++ b/conf/cassandra.yaml
@@ -2673,6 +2673,9 @@ max_security_label_length: 48
 # are configurable (so you can disable them) but these which are enumerated 
bellow.
 # Uncomment the startup checks and configure them appropriately to cover your 
needs.
 #
+# It is possible to implement custom startup checks by implementing 
StatupCheck interface 
+# and placing JAR on a classpath, using SPI mechanism.
+#
 #startup_checks:
 # Verifies correct ownership of attached locations on disk at startup. See 
CASSANDRA-16879 for more details.
 #  check_filesystem_ownership:
diff --git a/conf/cassandra_latest.yaml b/conf/cassandra_latest.yaml
index 17638deadb..891570eb12 100644
--- a/conf/cassandra_latest.yaml
+++ b/conf/cassandra_latest.yaml
@@ -2437,6 +2437,9 @@ default_secondary_index_enabled: true
 # are configurable (so you can disable them) but these which are enumerated 
bellow.
 # Uncomment the startup checks and configure them appropriately to cover your 
needs.
 #
+# It is possible to implement custom startup checks by implementing 
StatupCheck interface
+# and placing JAR on a classpath, using SPI mechanism.
+#
 #startup_checks:
 # Verifies correct ownership of attached locations on disk at startup. See 
CASSANDRA-16879 for more details.
 #  check_filesystem_ownership:
diff --git a/examples/startup-checks/README.adoc 
b/examples/startup-checks/README.adoc
new file mode 100644
index 0000000000..331e85c79a
--- /dev/null
+++ b/examples/startup-checks/README.adoc
@@ -0,0 +1,45 @@
+== Cassandra Startup Check Example
+
+An implementation of `StartupCheck` interface will create custom startup 
check. For the purposes of this example,
+there will be one startup check with the name `my_check`.
+
+If you want to code your own startup check without patching Cassandra 
yourself, you need to do the following:
+
+1. Code against interface `org.apache.cassandra.service.StartupCheck`.
+2. Put the class implementing this interface to 
`META-INF/services/org.apache.cassandra.service.StartupCheck`
+3. Build a JAR both with the implementation and `META-INF` resources, as show 
in this example, and put this JAR onto
+Cassandra's classpath.
+4. When Cassandra starts, it will auto-detect new check by loading 
`MyCustomStartupCheck` in your JAR.
+5. You can code more check classes and add them all into one jar, just add 
another entry into file in step 2.
+
+You can also configure the checks loaded like this in cassandra.yaml under 
`startup_checks` section,
+like following:
+
+----
+startup_checks:
+  my_check:
+    key: value
+----
+
+You can get these options from `StartupChecksConfiguration`, by check's 
`name()`.
+
+=== Installation
+
+----
+$ cd <cassandra_src_dir>/examples/startup-checks
+$ ant install
+----
+
+It will build the startup check and will copy it to `lib` as well as to 
`build/lib/jars`.
+
+You remove it from everywhere by
+
+----
+$ cd <cassandra_src_dir>/examples/startup-checks
+$ ant clean
+----
+
+=== Usage
+
+Follow the logs, you will see the output of the check. You can
+play with options in cassandra.yaml to see what is propagated to that check.
\ No newline at end of file
diff --git a/examples/startup-checks/build.xml 
b/examples/startup-checks/build.xml
new file mode 100644
index 0000000000..df287cdb87
--- /dev/null
+++ b/examples/startup-checks/build.xml
@@ -0,0 +1,68 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one
+ ~ or more contributor license agreements.  See the NOTICE file
+ ~ distributed with this work for additional information
+ ~ regarding copyright ownership.  The ASF licenses this file
+ ~ to you under the Apache License, Version 2.0 (the
+ ~ "License"); you may not use this file except in compliance
+ ~ with the License.  You may obtain a copy of the License at
+ ~
+ ~   http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing,
+ ~ software distributed under the License is distributed on an
+ ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ ~ KIND, either express or implied.  See the License for the
+ ~ specific language governing permissions and limitations
+ ~ under the License.
+-->
+
+<project default="jar" name="startup-check-example">
+       <property name="cassandra.dir" value="../.." />
+       <property name="cassandra.dir.lib" value="${cassandra.dir}/lib" />
+       <property name="cassandra.classes" 
value="${cassandra.dir}/build/classes/main" />
+       <property name="build.src" value="${basedir}/src" />
+       <property name="build.dir" value="${basedir}/build" />
+       <property name="conf.dir" value="${basedir}/conf" />
+       <property name="build.classes" value="${build.dir}/classes" />
+       <property name="final.name" value="startup-check-example" />
+
+       <path id="build.classpath">
+               <fileset dir="${cassandra.dir.lib}">
+                       <include name="**/*.jar" />
+               </fileset>
+               <fileset dir="${cassandra.dir}/build/lib/jars">
+                       <include name="**/*.jar" />
+               </fileset>
+               <pathelement location="${cassandra.classes}" />
+       </path>
+       <target name="init">
+               <mkdir dir="${build.classes}" />
+       </target>
+
+       <target name="build" depends="init">
+               <javac destdir="${build.classes}" debug="true" 
includeantruntime="false">
+                       <src path="${build.src}" />
+                       <classpath refid="build.classpath" />
+               </javac>
+       </target>
+
+       <target name="jar" depends="build">
+               <jar jarfile="${build.dir}/${final.name}.jar">
+                       <fileset dir="${build.classes}" />
+                       <fileset dir="${build.src}/resources"/>
+               </jar>
+       </target>
+
+       <target name="install" depends="jar">
+               <copy verbose="true" file="${build.dir}/${final.name}.jar" 
todir="${cassandra.dir}/lib" overwrite="true"/>
+               <copy verbose="true" file="${build.dir}/${final.name}.jar" 
todir="${cassandra.dir}/build/lib/jars" overwrite="true"/>
+       </target>
+
+       <target name="clean">
+               <delete dir="${build.dir}" />
+               <delete file="${cassandra.dir}/lib/${final.name}.jar"/>
+               <delete 
file="${cassandra.dir}/build/lib/jars/${final.name}.jar"/>
+       </target>
+</project>
diff --git 
a/examples/startup-checks/src/org/apache/cassandra/service/checks/MyCustomStartupCheck.java
 
b/examples/startup-checks/src/org/apache/cassandra/service/checks/MyCustomStartupCheck.java
new file mode 100644
index 0000000000..e974e2524d
--- /dev/null
+++ 
b/examples/startup-checks/src/org/apache/cassandra/service/checks/MyCustomStartupCheck.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service.checks;
+
+import java.time.Instant;
+import java.util.Calendar;
+import java.util.TimeZone;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.config.StartupChecksConfiguration;
+import org.apache.cassandra.exceptions.StartupException;
+import org.apache.cassandra.service.StartupCheck;
+
+public class MyCustomStartupCheck implements StartupCheck
+{
+    private static final Logger logger = 
LoggerFactory.getLogger(MyCustomStartupCheck.class);
+
+    @Override
+    public String name()
+    {
+        return "my_check";
+    }
+
+    @Override
+    public void execute(StartupChecksConfiguration configuration) throws 
StartupException
+    {
+        if (configuration.isDisabled(name()))
+            return;
+
+        Calendar calendar = Calendar.getInstance(TimeZone.getTimeZone("UTC"));
+        calendar.set(2000, 1, 1);
+        if (Instant.now().isBefore(calendar.toInstant()))
+        {
+            throw new 
StartupException(StartupException.ERR_WRONG_MACHINE_STATE,
+                                       "Cassandra is the database for this 
millennium!");
+        }
+        else
+        {
+            logger.info("Executing " + name() + " with options: " + 
configuration.getConfig(name()));
+        }
+    }
+
+    @Override
+    public boolean isConfigurable()
+    {
+        return true;
+    }
+
+    @Override
+    public boolean isDisabledByDefault()
+    {
+        return false;
+    }
+
+    @Override
+    public void postAction(StartupChecksConfiguration options)
+    {
+        if (options.isDisabled(name()))
+            return;
+
+        logger.info("Executing post-action for " + name());
+    }
+}
diff --git 
a/examples/startup-checks/src/resources/META-INF/services/org.apache.cassandra.service.StartupCheck
 
b/examples/startup-checks/src/resources/META-INF/services/org.apache.cassandra.service.StartupCheck
new file mode 100644
index 0000000000..19da0a508d
--- /dev/null
+++ 
b/examples/startup-checks/src/resources/META-INF/services/org.apache.cassandra.service.StartupCheck
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+org.apache.cassandra.service.checks.MyCustomStartupCheck
diff --git a/src/java/org/apache/cassandra/config/Config.java 
b/src/java/org/apache/cassandra/config/Config.java
index 2f290d8182..fafcddddc6 100644
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@ -45,7 +45,6 @@ import org.apache.cassandra.index.internal.CassandraIndex;
 import org.apache.cassandra.io.compress.BufferType;
 import org.apache.cassandra.io.sstable.format.big.BigFormat;
 import org.apache.cassandra.repair.autorepair.AutoRepairConfig;
-import org.apache.cassandra.service.StartupChecks.StartupCheckType;
 import org.apache.cassandra.utils.StorageCompatibilityMode;
 
 import static 
org.apache.cassandra.config.CassandraRelevantProperties.AUTOCOMPACTION_ON_STARTUP_ENABLED;
@@ -1012,7 +1011,7 @@ public class Config
     public volatile DurationSpec.IntSecondsBound 
streaming_slow_events_log_timeout = new DurationSpec.IntSecondsBound("10s");
 
     /** The configuration of startup checks. */
-    public volatile Map<StartupCheckType, Map<String, Object>> startup_checks 
= new HashMap<>();
+    public volatile Map<String, Map<String, Object>> startup_checks = new 
HashMap<>();
 
     public volatile DurationSpec.LongNanosecondsBound repair_state_expires = 
new DurationSpec.LongNanosecondsBound("3d");
     public volatile int repair_state_size = 100_000;
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java 
b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index 809a44fdcc..b9f659e19d 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -123,6 +123,8 @@ import org.apache.cassandra.security.EncryptionContext;
 import org.apache.cassandra.security.JREProvider;
 import org.apache.cassandra.security.SSLFactory;
 import org.apache.cassandra.service.CacheService.CacheType;
+import org.apache.cassandra.service.FileSystemOwnershipCheck;
+import org.apache.cassandra.service.StartupChecks;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.service.accord.api.AccordWaitStrategies;
 import org.apache.cassandra.service.consensus.TransactionalMode;
@@ -266,7 +268,7 @@ public class DatabaseDescriptor
      * The configuration for guardrails.
      */
     private static GuardrailsOptions guardrails;
-    private static StartupChecksOptions startupChecksOptions;
+    private static StartupChecksConfiguration startupChecksConfiguration;
 
     private static ImmutableMap<String, SSTableFormat<?, ?>> sstableFormats;
     private static volatile SSTableFormat<?, ?> selectedSSTableFormat;
@@ -1341,14 +1343,22 @@ public class DatabaseDescriptor
         }
     }
 
-    public static StartupChecksOptions getStartupChecksOptions()
+    public static StartupChecksConfiguration getStartupChecksConfiguration()
     {
-        return startupChecksOptions;
+        return startupChecksConfiguration;
     }
 
     private static void applyStartupChecks()
     {
-        startupChecksOptions = new StartupChecksOptions(conf.startup_checks);
+        try
+        {
+            StartupChecks startupChecks = new 
StartupChecks().withDefaultTests().withTest(new 
FileSystemOwnershipCheck()).withServiceLoaderTests();
+            startupChecksConfiguration = new 
StartupChecksConfiguration(startupChecks, conf.startup_checks);
+        }
+        catch (Throwable t)
+        {
+            throw new ConfigurationException("Invalid configuration of 
startup_checks: " + t.getMessage());
+        }
     }
 
     private static String storagedirFor(String type)
diff --git 
a/src/java/org/apache/cassandra/config/StartupChecksConfiguration.java 
b/src/java/org/apache/cassandra/config/StartupChecksConfiguration.java
new file mode 100644
index 0000000000..442b06d5f7
--- /dev/null
+++ b/src/java/org/apache/cassandra/config/StartupChecksConfiguration.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.config;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import com.google.common.annotations.VisibleForTesting;
+
+import org.apache.cassandra.exceptions.StartupException;
+import org.apache.cassandra.service.StartupCheck;
+import org.apache.cassandra.service.StartupChecks;
+
+import static java.lang.Boolean.FALSE;
+import static java.lang.Boolean.TRUE;
+
+public class StartupChecksConfiguration
+{
+    public static final String ENABLED_PROPERTY = "enabled";
+
+    private final Map<String, Map<String, Object>> options = new HashMap<>();
+    private final StartupChecks startupChecks;
+
+    public StartupChecksConfiguration(StartupChecks startupChecks, Map<String, 
Map<String, Object>> options)
+    {
+        this.options.putAll(new HashMap<>(options));
+        this.startupChecks = startupChecks;
+
+        apply();
+    }
+
+    @VisibleForTesting
+    public StartupCheck getCheck(String name)
+    {
+        return startupChecks.getCheck(name);
+    }
+
+    private StartupCheck getConfigurableCheck(String name)
+    {
+        StartupCheck check = startupChecks.getCheck(name);
+        if (check == null || !check.isConfigurable())
+            return null;
+        else
+            return check;
+    }
+
+    public void set(String name, String key, Object value)
+    {
+        StartupCheck check = getConfigurableCheck(name);
+        if (check == null)
+            return;
+
+        Map<String, Object> checkConfiguration = options.get(name);
+        if (checkConfiguration == null)
+            return;
+
+        checkConfiguration.put(key, value);
+    }
+
+    public void enable(String name)
+    {
+        set(name, ENABLED_PROPERTY, TRUE);
+    }
+
+    public void disable(String name)
+    {
+        set(name, ENABLED_PROPERTY, FALSE);
+    }
+
+    public boolean isEnabled(String name)
+    {
+        Map<String, Object> config = getConfig(name);
+        if (config == null)
+            return false;
+
+        Object enabledBoolean = config.get(ENABLED_PROPERTY);
+        if (enabledBoolean == null)
+            return false;
+
+        return Boolean.parseBoolean(enabledBoolean.toString());
+    }
+
+    public boolean isDisabled(String name)
+    {
+        return !isEnabled(name);
+    }
+
+    public Map<String, Object> getConfig(String name)
+    {
+        return options.get(name);
+    }
+
+    private void apply()
+    {
+        List<String> notExistingCheckNames = new ArrayList<>();
+        List<String> notConfigurableCheckNames = new ArrayList<>();
+
+        for (Map.Entry<String, Map<String, Object>> userConfigEntry : 
options.entrySet())
+        {
+            String key = userConfigEntry.getKey();
+            StartupCheck check = startupChecks.getCheck(key);
+            if (check == null)
+                notExistingCheckNames.add(key);
+            else if (!check.isConfigurable())
+                notConfigurableCheckNames.add(key);
+        }
+
+        if (!notExistingCheckNames.isEmpty())
+            throw new IllegalStateException("There are configuration entries 
for startup checks which do not exist: " + notExistingCheckNames);
+        if (!notConfigurableCheckNames.isEmpty())
+            throw new IllegalStateException("There are configuration entries 
for startup checks which are not configurable: " + notConfigurableCheckNames);
+
+        for (StartupCheck check : startupChecks.getChecks())
+        {
+            String startupCheckName = check.name();
+            Map<String, Object> configMap = 
this.options.computeIfAbsent(startupCheckName, k -> new HashMap<>());
+            if (configMap.containsKey(ENABLED_PROPERTY))
+                configMap.putIfAbsent(ENABLED_PROPERTY, FALSE);
+            else if (check.isDisabledByDefault())
+                configMap.put(ENABLED_PROPERTY, FALSE);
+            else
+                configMap.put(ENABLED_PROPERTY, TRUE);
+        }
+    }
+
+    public void verify() throws StartupException
+    {
+        assert startupChecks != null;
+        startupChecks.verify(this);
+    }
+}
diff --git a/src/java/org/apache/cassandra/config/StartupChecksOptions.java 
b/src/java/org/apache/cassandra/config/StartupChecksOptions.java
deleted file mode 100644
index 6eb31898b2..0000000000
--- a/src/java/org/apache/cassandra/config/StartupChecksOptions.java
+++ /dev/null
@@ -1,97 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.config;
-
-import java.util.Collections;
-import java.util.EnumMap;
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.cassandra.service.StartupChecks.StartupCheckType;
-
-import static java.lang.Boolean.FALSE;
-import static java.lang.Boolean.TRUE;
-import static 
org.apache.cassandra.service.StartupChecks.StartupCheckType.non_configurable_check;
-
-public class StartupChecksOptions
-{
-    public static final String ENABLED_PROPERTY = "enabled";
-
-    private final Map<StartupCheckType, Map<String, Object>> options = new 
EnumMap<>(StartupCheckType.class);
-
-    public StartupChecksOptions()
-    {
-        this(Collections.emptyMap());
-    }
-
-    public StartupChecksOptions(final Map<StartupCheckType, Map<String, 
Object>> options)
-    {
-        this.options.putAll(options);
-        apply();
-    }
-
-    public void set(final StartupCheckType startupCheckType, final String key, 
final Object value)
-    {
-        if (startupCheckType != non_configurable_check)
-            options.get(startupCheckType).put(key, value);
-    }
-
-    public void enable(final StartupCheckType startupCheckType)
-    {
-        set(startupCheckType, ENABLED_PROPERTY, TRUE);
-    }
-
-    public void disable(final StartupCheckType startupCheckType)
-    {
-        if (startupCheckType != non_configurable_check)
-            set(startupCheckType, ENABLED_PROPERTY, FALSE);
-    }
-
-    public boolean isEnabled(final StartupCheckType startupCheckType)
-    {
-        return 
Boolean.parseBoolean(options.get(startupCheckType).get(ENABLED_PROPERTY).toString());
-    }
-
-    public boolean isDisabled(final StartupCheckType startupCheckType)
-    {
-        return !isEnabled(startupCheckType);
-    }
-
-    public Map<String, Object> getConfig(final StartupCheckType 
startupCheckType)
-    {
-        return options.get(startupCheckType);
-    }
-
-    private void apply()
-    {
-        for (final StartupCheckType startupCheckType : 
StartupCheckType.values())
-        {
-            final Map<String, Object> configMap = 
options.computeIfAbsent(startupCheckType, k -> new HashMap<>());
-            if (configMap.containsKey(ENABLED_PROPERTY))
-                configMap.putIfAbsent(ENABLED_PROPERTY, FALSE);
-            else if (startupCheckType.disabledByDefault)
-                configMap.put(ENABLED_PROPERTY, FALSE);
-            else
-                configMap.put(ENABLED_PROPERTY, TRUE);
-        }
-        // clear if we put anything into it by accident & enable this check 
every time no matter what
-        options.get(non_configurable_check).clear();
-        options.get(non_configurable_check).put(ENABLED_PROPERTY, TRUE);
-    }
-}
diff --git a/src/java/org/apache/cassandra/service/CassandraDaemon.java 
b/src/java/org/apache/cassandra/service/CassandraDaemon.java
index 67c0605111..0144fcae07 100644
--- a/src/java/org/apache/cassandra/service/CassandraDaemon.java
+++ b/src/java/org/apache/cassandra/service/CassandraDaemon.java
@@ -216,7 +216,6 @@ public class CassandraDaemon
     private JMXConnectorServer jmxServer;
 
     private final boolean runManaged;
-    protected final StartupChecks startupChecks;
     private boolean setupCompleted;
 
     public CassandraDaemon()
@@ -227,7 +226,6 @@ public class CassandraDaemon
     public CassandraDaemon(boolean runManaged)
     {
         this.runManaged = runManaged;
-        this.startupChecks = new 
StartupChecks().withDefaultTests().withTest(new FileSystemOwnershipCheck());
         this.setupCompleted = false;
     }
 
@@ -451,7 +449,7 @@ public class CassandraDaemon
     {
         try
         {
-            startupChecks.verify(DatabaseDescriptor.getStartupChecksOptions());
+            DatabaseDescriptor.getStartupChecksConfiguration().verify();
         }
         catch (StartupException e)
         {
diff --git a/src/java/org/apache/cassandra/service/DataResurrectionCheck.java 
b/src/java/org/apache/cassandra/service/DataResurrectionCheck.java
index cd7a4dda62..47c6d7beff 100644
--- a/src/java/org/apache/cassandra/service/DataResurrectionCheck.java
+++ b/src/java/org/apache/cassandra/service/DataResurrectionCheck.java
@@ -40,7 +40,7 @@ import org.slf4j.LoggerFactory;
 import org.apache.cassandra.concurrent.ScheduledExecutors;
 import org.apache.cassandra.config.CassandraRelevantProperties;
 import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.config.StartupChecksOptions;
+import org.apache.cassandra.config.StartupChecksConfiguration;
 import org.apache.cassandra.exceptions.StartupException;
 import org.apache.cassandra.io.util.File;
 import org.apache.cassandra.schema.KeyspaceMetadata;
@@ -146,18 +146,30 @@ public class DataResurrectionCheck implements StartupCheck
     }
 
     @Override
-    public StartupChecks.StartupCheckType getStartupCheckType()
+    public boolean isConfigurable()
     {
-        return StartupChecks.StartupCheckType.check_data_resurrection;
+        return true;
     }
 
     @Override
-    public void execute(StartupChecksOptions options) throws StartupException
+    public String name()
     {
-        if (options.isDisabled(getStartupCheckType()))
+        return "check_data_resurrection";
+    }
+
+    @Override
+    public boolean isDisabledByDefault()
+    {
+        return true;
+    }
+
+    @Override
+    public void execute(StartupChecksConfiguration configuration) throws 
StartupException
+    {
+        if (configuration.isDisabled(name()))
             return;
 
-        Map<String, Object> config = 
options.getConfig(StartupChecks.StartupCheckType.check_data_resurrection);
+        Map<String, Object> config = configuration.getConfig(name());
         File heartbeatFile = getHeartbeatFile(config);
 
         if (!heartbeatFile.exists())
@@ -224,30 +236,30 @@ public class DataResurrectionCheck implements StartupCheck
     }
 
     @Override
-    public void postAction(StartupChecksOptions options)
+    public void postAction(StartupChecksConfiguration configuration)
     {
         // Schedule heartbeating after all checks have passed, not as part of 
the check,
         // as it might happen that other checks after it might fail, but we 
would be heartbeating already.
-        if 
(options.isEnabled(StartupChecks.StartupCheckType.check_data_resurrection))
-        {
-            Map<String, Object> config = 
options.getConfig(StartupChecks.StartupCheckType.check_data_resurrection);
-            File heartbeatFile = 
DataResurrectionCheck.getHeartbeatFile(config);
+        if (!configuration.isEnabled(name()))
+            return;
 
-            ScheduledExecutors.scheduledTasks.scheduleAtFixedRate(() ->
+        Map<String, Object> configMap = configuration.getConfig(name());
+        File heartbeatFile = DataResurrectionCheck.getHeartbeatFile(configMap);
+
+        ScheduledExecutors.scheduledTasks.scheduleAtFixedRate(() ->
+        {
+            Heartbeat heartbeat = new 
Heartbeat(Instant.ofEpochMilli(Clock.Global.currentTimeMillis()));
+            try
             {
-                Heartbeat heartbeat = new 
Heartbeat(Instant.ofEpochMilli(Clock.Global.currentTimeMillis()));
-                try
-                {
-                    heartbeatFile.parent().createDirectoriesIfNotExists();
-                    DataResurrectionCheck.LOGGER.trace("writing heartbeat to 
file " + heartbeatFile);
-                    heartbeat.serializeToJsonFile(heartbeatFile);
-                }
-                catch (IOException ex)
-                {
-                    DataResurrectionCheck.LOGGER.error("Unable to serialize 
heartbeat to " + heartbeatFile, ex);
-                }
-            }, 0, 
CassandraRelevantProperties.CHECK_DATA_RESURRECTION_HEARTBEAT_PERIOD.getInt(), 
MILLISECONDS);
-        }
+                heartbeatFile.parent().createDirectoriesIfNotExists();
+                DataResurrectionCheck.LOGGER.trace("writing heartbeat to file 
" + heartbeatFile);
+                heartbeat.serializeToJsonFile(heartbeatFile);
+            }
+            catch (IOException ex)
+            {
+                DataResurrectionCheck.LOGGER.error("Unable to serialize 
heartbeat to " + heartbeatFile, ex);
+            }
+        }, 0, 
CassandraRelevantProperties.CHECK_DATA_RESURRECTION_HEARTBEAT_PERIOD.getInt(), 
MILLISECONDS);
     }
 
     @VisibleForTesting
diff --git 
a/src/java/org/apache/cassandra/service/FileSystemOwnershipCheck.java 
b/src/java/org/apache/cassandra/service/FileSystemOwnershipCheck.java
index b8fe30b823..e378b37e21 100644
--- a/src/java/org/apache/cassandra/service/FileSystemOwnershipCheck.java
+++ b/src/java/org/apache/cassandra/service/FileSystemOwnershipCheck.java
@@ -39,12 +39,10 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.config.CassandraRelevantProperties;
 import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.config.StartupChecksOptions;
+import org.apache.cassandra.config.StartupChecksConfiguration;
 import org.apache.cassandra.exceptions.StartupException;
 import org.apache.cassandra.io.util.File;
 
-import static 
org.apache.cassandra.service.StartupChecks.StartupCheckType.check_filesystem_ownership;
-
 /**
  * Ownership markers on disk are compatible with the java property file format.
  * 
(https://docs.oracle.com/javase/8/docs/api/java/util/Properties.html#load-java.io.Reader-)
@@ -99,7 +97,7 @@ public class FileSystemOwnershipCheck implements StartupCheck
 
     private final Supplier<Iterable<String>> dirs;
 
-    FileSystemOwnershipCheck()
+    public FileSystemOwnershipCheck()
     {
         this(() -> 
Iterables.concat(Arrays.asList(DatabaseDescriptor.getAllDataFileLocations()),
                                     
Arrays.asList(DatabaseDescriptor.getCommitLogLocation(),
@@ -114,21 +112,33 @@ public class FileSystemOwnershipCheck implements 
StartupCheck
     }
 
     @Override
-    public StartupChecks.StartupCheckType getStartupCheckType()
+    public String name()
+    {
+        return "check_filesystem_ownership";
+    }
+
+    @Override
+    public boolean isConfigurable()
+    {
+        return true;
+    }
+
+    @Override
+    public boolean isDisabledByDefault()
     {
-        return check_filesystem_ownership;
+        return true;
     }
 
     @Override
-    public void execute(StartupChecksOptions options) throws StartupException
+    public void execute(StartupChecksConfiguration configuration) throws 
StartupException
     {
-        if (!isEnabled(options))
+        if (!isEnabled(configuration))
         {
             logger.info("Filesystem ownership check is not enabled.");
             return;
         }
 
-        Map<String, Object> config = options.getConfig(getStartupCheckType());
+        Map<String, Object> config = configuration.getConfig(name());
 
         String expectedToken = constructTokenFromProperties(config);
         String tokenFilename = getFsOwnershipFilename(config);
@@ -275,9 +285,9 @@ public class FileSystemOwnershipCheck implements 
StartupCheck
         return new StartupException(StartupException.ERR_WRONG_DISK_STATE, 
ERROR_PREFIX + message);
     }
 
-    public boolean isEnabled(StartupChecksOptions options)
+    public boolean isEnabled(StartupChecksConfiguration options)
     {
-        boolean enabledFromYaml = options.isEnabled(getStartupCheckType());
+        boolean enabledFromYaml = options.isEnabled(name());
         return 
CassandraRelevantProperties.FILE_SYSTEM_CHECK_ENABLE.getBoolean(enabledFromYaml);
     }
 
diff --git a/src/java/org/apache/cassandra/service/StartupCheck.java 
b/src/java/org/apache/cassandra/service/StartupCheck.java
index c3790e8e99..720ecba7f5 100644
--- a/src/java/org/apache/cassandra/service/StartupCheck.java
+++ b/src/java/org/apache/cassandra/service/StartupCheck.java
@@ -17,9 +17,8 @@
  */
 package org.apache.cassandra.service;
 
-import org.apache.cassandra.config.StartupChecksOptions;
+import org.apache.cassandra.config.StartupChecksConfiguration;
 import org.apache.cassandra.exceptions.StartupException;
-import org.apache.cassandra.service.StartupChecks.StartupCheckType;
 
 /**
  * A test to determine if the system is in a valid state to start up.
@@ -35,33 +34,56 @@ import 
org.apache.cassandra.service.StartupChecks.StartupCheckType;
  */
 public interface StartupCheck
 {
+    /**
+     * Name of a startup check, as it would appear in the configuration in 
cassandra.yaml.
+     * Not all startup checks are configurable. It is considered to be an 
illegal state to
+     * mention non-configurable startup check in cassandra.yaml.
+     *
+     * @return name of a startup check
+     */
+    String name();
+
     /**
      * Run some test to determine whether the system is safe to be started
      * In the case where a test determines it is not safe to proceed, the
      * test should log a message regarding the reason for the failure and
      * ideally the steps required to remedy the problem.
      *
-     * @param startupChecksOptions all options from descriptor
+     * @param configuration all options from descriptor
      * @throws org.apache.cassandra.exceptions.StartupException if the test 
determines
      * that the environement or system is not in a safe state to startup
      */
-    void execute(StartupChecksOptions startupChecksOptions) throws 
StartupException;
+    void execute(StartupChecksConfiguration configuration) throws 
StartupException;
+
+    /**
+     * Tells whether a startup check can be configured, at the moment via 
cassandra.yml.
+     *
+     * @return true if a startup check is configurable, false otherwise.
+     */
+    default boolean isConfigurable()
+    {
+        return false;
+    }
 
     /**
+     * Tells if a specific (configurable) check is executed when it is not 
specified in cassandra.yaml. By default,
+     * an implementation of a startup check is executed even if it is not 
specified. For some checks, it might be
+     * preferential to not execute them when they are not explicity mentioned.
      *
-     * @return type of this startup check for configuration retrieval
+     * @return true if a check is not executed when it is not specified in 
cassandra.yaml, defaults to false - that is,
+     * a check will be executed even if it is not explicitly mentioned in 
cassandra.yaml.
      */
-    default StartupCheckType getStartupCheckType()
+    default boolean isDisabledByDefault()
     {
-        return StartupCheckType.non_configurable_check;
+        return false;
     }
 
     /**
      * Post-hook after all startup checks succeeded.
      *
-     * @param options startup check options from descriptor
+     * @param configuration startup check options from descriptor
      */
-    default void postAction(StartupChecksOptions options)
+    default void postAction(StartupChecksConfiguration configuration)
     {
     }
 }
diff --git a/src/java/org/apache/cassandra/service/StartupChecks.java 
b/src/java/org/apache/cassandra/service/StartupChecks.java
index 6de0158a9c..b3b33ee93b 100644
--- a/src/java/org/apache/cassandra/service/StartupChecks.java
+++ b/src/java/org/apache/cassandra/service/StartupChecks.java
@@ -36,6 +36,8 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.ServiceConfigurationError;
+import java.util.ServiceLoader;
 import java.util.Set;
 import java.util.stream.Collectors;
 
@@ -57,7 +59,7 @@ import 
org.apache.cassandra.config.CassandraRelevantProperties;
 import org.apache.cassandra.config.Config;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.config.JMXServerOptions;
-import org.apache.cassandra.config.StartupChecksOptions;
+import org.apache.cassandra.config.StartupChecksConfiguration;
 import org.apache.cassandra.cql3.QueryProcessor;
 import org.apache.cassandra.cql3.UntypedResultSet;
 import org.apache.cassandra.db.ColumnFamilyStore;
@@ -105,26 +107,6 @@ import static 
org.apache.cassandra.utils.LocalizeString.toLowerCaseLocalized;
  */
 public class StartupChecks
 {
-    public enum StartupCheckType
-    {
-        // non-configurable check is always enabled for execution
-        non_configurable_check,
-        check_filesystem_ownership(true),
-        check_data_resurrection(true);
-
-        public final boolean disabledByDefault;
-
-        StartupCheckType()
-        {
-            this(false);
-        }
-
-        StartupCheckType(boolean disabledByDefault)
-        {
-            this.disabledByDefault = disabledByDefault;
-        }
-    }
-
     private static final Logger logger = 
LoggerFactory.getLogger(StartupChecks.class);
     // List of checks to run before starting up. If any test reports failure, 
startup will be halted.
     private final List<StartupCheck> preFlightChecks = new ArrayList<>();
@@ -150,12 +132,77 @@ public class StartupChecks
                                                                       
checkKernelParamsForAsyncProfiler,
                                                                       new 
DataResurrectionCheck());
 
+    public List<StartupCheck> getChecks()
+    {
+        return List.copyOf(preFlightChecks);
+    }
+
+    public StartupCheck getCheck(String name)
+    {
+        for (StartupCheck startupCheck : preFlightChecks)
+        {
+            if (startupCheck.name().equals(name))
+                return startupCheck;
+        }
+        return null;
+    }
+
     public StartupChecks withDefaultTests()
     {
         preFlightChecks.addAll(DEFAULT_TESTS);
         return this;
     }
 
+    public StartupChecks withServiceLoaderTests()
+    {
+        ServiceLoader<StartupCheck> loader;
+
+        try
+        {
+            loader = ServiceLoader.load(StartupCheck.class);
+        }
+        catch (ServiceConfigurationError t)
+        {
+            logger.warn("Unable to get startup checks via ServiceLoader. " +
+                        "Custom checks will not be triggered. Reason: " + 
t.getMessage());
+            return this;
+        }
+
+        Set<StartupCheck> customChecks = new HashSet<>();
+        Set<String> uniqueNames = new HashSet<>();
+        Set<String> duplicitNames = new HashSet<>();
+
+        for (StartupCheck check : loader)
+        {
+            if (!uniqueNames.add(check.name()))
+                duplicitNames.add(check.name());
+            else
+                customChecks.add(check);
+        }
+
+        if (!duplicitNames.isEmpty())
+        {
+            throw new IllegalStateException("There was an attempt to load 
custom startup " +
+                                            "checks with same name which is 
ambiguous: " + duplicitNames);
+        }
+
+        for (StartupCheck customCheck : customChecks)
+        {
+            for (StartupCheck preFlightCheck : preFlightChecks)
+            {
+                if (preFlightCheck.name().equals(customCheck.name()))
+                {
+                    throw new IllegalStateException("There was an attempt to 
load custom startup check " +
+                                                    "with same name as 
in-built check: " + preFlightCheck.name());
+                }
+            }
+        }
+
+        preFlightChecks.addAll(customChecks);
+
+        return this;
+    }
+
     /**
      * Add system test to be run before schema is loaded during startup
      * @param test the system test to include
@@ -172,7 +219,7 @@ public class StartupChecks
      * system is not in an valid state to startup
      * @param options options to pass to respective checks for their 
configration
      */
-    public void verify(StartupChecksOptions options) throws StartupException
+    public void verify(StartupChecksConfiguration options) throws 
StartupException
     {
         for (StartupCheck test : preFlightChecks)
             test.execute(options);
@@ -185,7 +232,7 @@ public class StartupChecks
             }
             catch (Throwable t)
             {
-                logger.warn("Failed to run startup check post-action on " + 
test.getStartupCheckType());
+                logger.warn("Failed to run startup check post-action on " + 
test.name());
             }
         }
     }
@@ -194,9 +241,15 @@ public class StartupChecks
     public static final StartupCheck checkKernelBug1057843 = new StartupCheck()
     {
         @Override
-        public void execute(StartupChecksOptions startupChecksOptions) throws 
StartupException
+        public String name()
+        {
+            return "kernel_bug_1057843";
+        }
+
+        @Override
+        public void execute(StartupChecksConfiguration configuration) throws 
StartupException
         {
-            if (startupChecksOptions.isDisabled(getStartupCheckType()))
+            if (configuration.isDisabled(name()))
                 return;
 
             if (!FBUtilities.isLinux)
@@ -251,9 +304,15 @@ public class StartupChecks
     public static final StartupCheck checkJemalloc = new StartupCheck()
     {
         @Override
-        public void execute(StartupChecksOptions options)
+        public String name()
         {
-            if (options.isDisabled(getStartupCheckType()))
+            return "jemalloc";
+        }
+
+        @Override
+        public void execute(StartupChecksConfiguration configuration)
+        {
+            if (configuration.isDisabled(name()))
                 return;
 
             String jemalloc = 
CassandraRelevantProperties.LIBJEMALLOC.getString();
@@ -269,9 +328,15 @@ public class StartupChecks
     public static final StartupCheck checkLz4Native = new StartupCheck()
     {
         @Override
-        public void execute(StartupChecksOptions options)
+        public String name()
+        {
+            return "lz4_native";
+        }
+
+        @Override
+        public void execute(StartupChecksConfiguration configuration)
         {
-            if (options.isDisabled(getStartupCheckType()))
+            if (configuration.isDisabled(name()))
                 return;
             try
             {
@@ -286,6 +351,12 @@ public class StartupChecks
 
     public static final StartupCheck checkValidLaunchDate = new StartupCheck()
     {
+        @Override
+        public String name()
+        {
+            return "valid_launch_date";
+        }
+
         /**
          * The earliest legit timestamp a casandra instance could have ever 
launched.
          * Date roughly taken from 
http://perspectives.mvdirona.com/2008/07/12/FacebookReleasesCassandraAsOpenSource.aspx
@@ -294,9 +365,9 @@ public class StartupChecks
         private static final long EARLIEST_LAUNCH_DATE = 1215820800000L;
 
         @Override
-        public void execute(StartupChecksOptions options) throws 
StartupException
+        public void execute(StartupChecksConfiguration configuration) throws 
StartupException
         {
-            if (options.isDisabled(getStartupCheckType()))
+            if (configuration.isDisabled(name()))
                 return;
             long now = currentTimeMillis();
             if (now < EARLIEST_LAUNCH_DATE)
@@ -309,9 +380,15 @@ public class StartupChecks
     public static final StartupCheck checkJMXPorts = new StartupCheck()
     {
         @Override
-        public void execute(StartupChecksOptions options)
+        public String name()
+        {
+            return "jmx_ports";
+        }
+
+        @Override
+        public void execute(StartupChecksConfiguration configuration)
         {
-            if (options.isDisabled(getStartupCheckType()))
+            if (configuration.isDisabled(name()))
                 return;
 
             JMXServerOptions jmxServerOptions = 
DatabaseDescriptor.getJmxServerOptions();
@@ -335,9 +412,15 @@ public class StartupChecks
     public static final StartupCheck checkJMXProperties = new StartupCheck()
     {
         @Override
-        public void execute(StartupChecksOptions options)
+        public String name()
+        {
+            return "jmx_properties";
+        }
+
+        @Override
+        public void execute(StartupChecksConfiguration configuration)
         {
-            if (options.isDisabled(getStartupCheckType()))
+            if (configuration.isDisabled(name()))
                 return;
             if (COM_SUN_MANAGEMENT_JMXREMOTE_PORT.isPresent())
             {
@@ -350,9 +433,15 @@ public class StartupChecks
     public static final StartupCheck inspectJvmOptions = new StartupCheck()
     {
         @Override
-        public void execute(StartupChecksOptions options)
+        public String name()
         {
-            if (options.isDisabled(getStartupCheckType()))
+            return "jvm_options";
+        }
+
+        @Override
+        public void execute(StartupChecksConfiguration configuration)
+        {
+            if (configuration.isDisabled(name()))
                 return;
             // log warnings for different kinds of sub-optimal JVMs.  tldr use 
64-bit Oracle >= 1.6u32
             if (!DatabaseDescriptor.hasLargeAddressSpace())
@@ -412,9 +501,15 @@ public class StartupChecks
     public static final StartupCheck checkNativeLibraryInitialization = new 
StartupCheck()
     {
         @Override
-        public void execute(StartupChecksOptions options) throws 
StartupException
+        public String name()
+        {
+            return "native_library_initialization";
+        }
+
+        @Override
+        public void execute(StartupChecksConfiguration configuration) throws 
StartupException
         {
-            if (options.isDisabled(getStartupCheckType()))
+            if (configuration.isDisabled(name()))
                 return;
             // Fail-fast if the native library could not be linked.
             if (!NativeLibrary.isAvailable())
@@ -425,7 +520,13 @@ public class StartupChecks
     public static final StartupCheck checkProcessEnvironment = new 
StartupCheck()
     {
         @Override
-        public void execute(StartupChecksOptions options)
+        public String name()
+        {
+            return "process_environment";
+        }
+
+        @Override
+        public void execute(StartupChecksConfiguration configuration)
         {
             Optional<String> degradations = 
FBUtilities.getSystemInfo().isDegraded();
 
@@ -438,6 +539,12 @@ public class StartupChecks
 
     public static final StartupCheck checkReadAheadKbSetting = new 
StartupCheck()
     {
+        @Override
+        public String name()
+        {
+            return "read_ahead_kb_setting";
+        }
+
         // This value is in KB.
         private static final long MAX_RECOMMENDED_READ_AHEAD_KB_SETTING = 128;
 
@@ -472,9 +579,9 @@ public class StartupChecks
         }
 
         @Override
-        public void execute(StartupChecksOptions options)
+        public void execute(StartupChecksConfiguration configuration)
         {
-            if (options.isDisabled(getStartupCheckType()) || 
!FBUtilities.isLinux)
+            if (configuration.isDisabled(name()) || !FBUtilities.isLinux)
                 return;
 
             String[] dataDirectories = 
DatabaseDescriptor.getRawConfig().data_file_directories;
@@ -518,6 +625,12 @@ public class StartupChecks
 
     public static final StartupCheck checkMaxMapCount = new StartupCheck()
     {
+        @Override
+        public String name()
+        {
+            return "max_map_count";
+        }
+
         private final long EXPECTED_MAX_MAP_COUNT = 1048575;
         private final String MAX_MAP_COUNT_PATH = "/proc/sys/vm/max_map_count";
 
@@ -547,9 +660,9 @@ public class StartupChecks
         }
 
         @Override
-        public void execute(StartupChecksOptions options)
+        public void execute(StartupChecksConfiguration configuration)
         {
-            if (options.isDisabled(getStartupCheckType()) || 
!FBUtilities.isLinux)
+            if (configuration.isDisabled(name()) || !FBUtilities.isLinux)
                 return;
 
             if (DatabaseDescriptor.getDiskAccessMode() == 
Config.DiskAccessMode.standard &&
@@ -567,9 +680,15 @@ public class StartupChecks
     public static final StartupCheck checkDataDirs = new StartupCheck()
     {
         @Override
-        public void execute(StartupChecksOptions options) throws 
StartupException
+        public String name()
+        {
+            return "data_dirs";
+        }
+
+        @Override
+        public void execute(StartupChecksConfiguration configuration) throws 
StartupException
         {
-            if (options.isDisabled(getStartupCheckType()))
+            if (configuration.isDisabled(name()))
                 return;
             // check all directories(data, commitlog, saved cache) for 
existence and permission
             Iterable<String> dirs = 
Iterables.concat(Arrays.asList(DatabaseDescriptor.getAllDataFileLocations()),
@@ -602,9 +721,15 @@ public class StartupChecks
     public static final StartupCheck checkSSTablesFormat = new StartupCheck()
     {
         @Override
-        public void execute(StartupChecksOptions options) throws 
StartupException
+        public String name()
+        {
+            return "sstables_format";
+        }
+
+        @Override
+        public void execute(StartupChecksConfiguration configuration) throws 
StartupException
         {
-            if (options.isDisabled(getStartupCheckType()))
+            if (configuration.isDisabled(name()))
                 return;
             final Set<String> invalid = new HashSet<>();
             final Set<String> nonSSTablePaths = new HashSet<>();
@@ -717,9 +842,15 @@ public class StartupChecks
     public static final StartupCheck checkSystemKeyspaceState = new 
StartupCheck()
     {
         @Override
-        public void execute(StartupChecksOptions options) throws 
StartupException
+        public String name()
         {
-            if (options.isDisabled(getStartupCheckType()))
+            return "system_keyspace_state";
+        }
+
+        @Override
+        public void execute(StartupChecksConfiguration configuration) throws 
StartupException
+        {
+            if (configuration.isDisabled(name()))
                 return;
             // check the system keyspace to keep user from shooting self in 
foot by changing partitioner, cluster name, etc.
             // we do a one-off scrub of the system keyspace first; we can't 
load the list of the rest of the keyspaces,
@@ -748,9 +879,15 @@ public class StartupChecks
     public static final StartupCheck checkLegacyAuthTables = new StartupCheck()
     {
         @Override
-        public void execute(StartupChecksOptions options) throws 
StartupException
+        public String name()
         {
-            if (options.isDisabled(getStartupCheckType()))
+            return "legacy_auth_tables";
+        }
+
+        @Override
+        public void execute(StartupChecksConfiguration configuration) throws 
StartupException
+        {
+            if (configuration.isDisabled(name()))
                 return;
             Optional<String> errMsg = checkLegacyAuthTablesMessage();
             if (errMsg.isPresent())
@@ -792,7 +929,14 @@ public class StartupChecks
             return perfEventParanoid <= 1 && kptrRestrict == 0;
         }
 
-        public void execute(StartupChecksOptions startupChecksOptions, boolean 
shouldThrow)
+
+        @Override
+        public String name()
+        {
+            return "async_profiler_kernel_parameters";
+        }
+
+        public void execute(StartupChecksConfiguration 
startupChecksConfiguration, boolean shouldThrow)
         {
             try
             {
@@ -824,9 +968,9 @@ public class StartupChecks
         }
 
         @Override
-        public void execute(StartupChecksOptions startupChecksOptions)
+        public void execute(StartupChecksConfiguration configuration)
         {
-            execute(startupChecksOptions, false);
+            execute(configuration, false);
         }
     }
 
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/DataResurrectionCheckTest.java
 
b/test/distributed/org/apache/cassandra/distributed/test/DataResurrectionCheckTest.java
index f48c872552..adb6627543 100644
--- 
a/test/distributed/org/apache/cassandra/distributed/test/DataResurrectionCheckTest.java
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/DataResurrectionCheckTest.java
@@ -18,36 +18,34 @@
 
 package org.apache.cassandra.distributed.test;
 
-import java.util.EnumMap;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicReference;
 
 import org.junit.Test;
 
-import org.apache.cassandra.config.StartupChecksOptions;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.config.StartupChecksConfiguration;
 import org.apache.cassandra.distributed.Cluster;
 import org.apache.cassandra.distributed.api.IInvokableInstance;
 import org.apache.cassandra.distributed.api.IIsolatedExecutor;
 import org.apache.cassandra.distributed.shared.WithProperties;
 import org.apache.cassandra.exceptions.StartupException;
 import org.apache.cassandra.io.util.File;
-import org.apache.cassandra.service.DataResurrectionCheck;
 import org.apache.cassandra.service.DataResurrectionCheck.Heartbeat;
-import org.apache.cassandra.service.StartupChecks.StartupCheckType;
+import org.apache.cassandra.service.StartupCheck;
 import org.apache.cassandra.utils.Clock.Global;
 
 import static java.lang.String.format;
 import static java.util.concurrent.TimeUnit.MINUTES;
 import static java.util.concurrent.TimeUnit.SECONDS;
 import static 
org.apache.cassandra.config.CassandraRelevantProperties.CHECK_DATA_RESURRECTION_HEARTBEAT_PERIOD;
-import static 
org.apache.cassandra.config.StartupChecksOptions.ENABLED_PROPERTY;
+import static 
org.apache.cassandra.config.StartupChecksConfiguration.ENABLED_PROPERTY;
 import static org.apache.cassandra.distributed.Cluster.build;
 import static org.apache.cassandra.distributed.api.Feature.NATIVE_PROTOCOL;
 import static 
org.apache.cassandra.service.DataResurrectionCheck.DEFAULT_HEARTBEAT_FILE;
 import static 
org.apache.cassandra.service.DataResurrectionCheck.EXCLUDED_KEYSPACES_CONFIG_PROPERTY;
 import static 
org.apache.cassandra.service.DataResurrectionCheck.EXCLUDED_TABLES_CONFIG_PROPERTY;
-import static 
org.apache.cassandra.service.StartupChecks.StartupCheckType.check_data_resurrection;
 import static org.awaitility.Awaitility.await;
 import static org.hamcrest.Matchers.containsString;
 import static org.junit.Assert.assertNotNull;
@@ -137,14 +135,21 @@ public class DataResurrectionCheckTest extends 
TestBaseImpl
         {
             try
             {
-                DataResurrectionCheck check = new DataResurrectionCheck();
-                StartupChecksOptions startupChecksOptions = new 
StartupChecksOptions();
-                startupChecksOptions.enable(check_data_resurrection);
+                StartupChecksConfiguration checksConfiguration = 
DatabaseDescriptor.getStartupChecksConfiguration();
+                checksConfiguration.enable("check_data_resurrection");
+                Map<String, Object> currentConfig = 
checksConfiguration.getConfig("check_data_resurrection");
+                if (currentConfig != null)
+                {
+                    boolean wasEnabled = 
checksConfiguration.isEnabled("check_data_resurrection");
+                    currentConfig.clear();
+                    currentConfig.put(ENABLED_PROPERTY, wasEnabled);
+                }
 
                 for (int i = 0; i < config.length - 1; i = i + 2)
-                    startupChecksOptions.set(check_data_resurrection, 
config[i], config[i + 1]);
+                    checksConfiguration.set("check_data_resurrection", 
config[i], config[i + 1]);
 
-                check.execute(startupChecksOptions);
+                StartupCheck check = 
checksConfiguration.getCheck("check_data_resurrection");
+                check.execute(checksConfiguration);
                 return null;
             }
             catch (StartupException e)
@@ -154,12 +159,12 @@ public class DataResurrectionCheckTest extends 
TestBaseImpl
         }).call();
     }
 
-    private Map<StartupCheckType, Map<String, Object>> 
getStartupChecksConfig(String... configs)
+    private Map<String, Map<String, Object>> getStartupChecksConfig(String... 
configs)
     {
-        return new EnumMap<StartupCheckType, Map<String, 
Object>>(StartupCheckType.class)
+        return new HashMap<>()
         {{
-            put(check_data_resurrection,
-                new HashMap<String, Object>()
+            put("check_data_resurrection",
+                new HashMap<>()
                 {{
                     for (int i = 0; i < configs.length - 1; i = i + 2)
                         put(configs[i], configs[i + 1]);
diff --git 
a/test/unit/org/apache/cassandra/config/DatabaseDescriptorRefTest.java 
b/test/unit/org/apache/cassandra/config/DatabaseDescriptorRefTest.java
index ffd0522ee8..5a6e2086ff 100644
--- a/test/unit/org/apache/cassandra/config/DatabaseDescriptorRefTest.java
+++ b/test/unit/org/apache/cassandra/config/DatabaseDescriptorRefTest.java
@@ -176,8 +176,7 @@ public class DatabaseDescriptorRefTest
     "org.apache.cassandra.config.RetrySpec",
     "org.apache.cassandra.config.RetrySpec$MaxAttempt",
     "org.apache.cassandra.config.RetrySpec$Type",
-    "org.apache.cassandra.config.StartupChecksOptions",
-    "org.apache.cassandra.config.StartupChecksOptions",
+    "org.apache.cassandra.config.StartupChecksConfiguration",
     "org.apache.cassandra.config.StorageAttachedIndexOptions",
     "org.apache.cassandra.config.StringRetryStrategy",
     "org.apache.cassandra.config.SubnetGroups",
@@ -327,6 +326,7 @@ public class DatabaseDescriptorRefTest
     "org.apache.cassandra.service.RetryStrategy$WaitRandomizerFactory",
     "org.apache.cassandra.service.RetryStrategy$1",
     "org.apache.cassandra.service.RetryStrategy$WaitRandomizer",
+    "org.apache.cassandra.service.StartupCheck",
     "org.apache.cassandra.service.TimeoutStrategy",
     "org.apache.cassandra.service.TimeoutStrategy$LatencyModifierFactory",
     "org.apache.cassandra.service.TimeoutStrategy$Wait",
diff --git a/test/unit/org/apache/cassandra/config/StartupCheckOptionsTest.java 
b/test/unit/org/apache/cassandra/config/StartupCheckOptionsTest.java
deleted file mode 100644
index f613c0ecda..0000000000
--- a/test/unit/org/apache/cassandra/config/StartupCheckOptionsTest.java
+++ /dev/null
@@ -1,150 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.config;
-
-import java.util.EnumMap;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Set;
-
-import org.junit.Test;
-
-import org.apache.cassandra.service.DataResurrectionCheck;
-import org.apache.cassandra.service.StartupChecks.StartupCheckType;
-import org.apache.cassandra.utils.Pair;
-
-import static 
org.apache.cassandra.config.StartupChecksOptions.ENABLED_PROPERTY;
-import static 
org.apache.cassandra.service.StartupChecks.StartupCheckType.check_filesystem_ownership;
-import static 
org.apache.cassandra.service.StartupChecks.StartupCheckType.non_configurable_check;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-public class StartupCheckOptionsTest
-{
-    @Test
-    public void testStartupOptionsConfigApplication()
-    {
-        Map<StartupCheckType, Map<String, Object>> config = new 
EnumMap<StartupCheckType, Map<String, Object>>(StartupCheckType.class) {{
-            put(check_filesystem_ownership, new HashMap<String, Object>() {{
-                put(ENABLED_PROPERTY, true);
-                put("key", "value");
-            }});
-        }};
-
-        StartupChecksOptions options = new StartupChecksOptions(config);
-
-        
assertTrue(Boolean.parseBoolean(options.getConfig(check_filesystem_ownership)
-                                               .get(ENABLED_PROPERTY)
-                                               .toString()));
-
-        assertEquals("value", 
options.getConfig(check_filesystem_ownership).get("key"));
-        options.set(check_filesystem_ownership, "key", "value2");
-        assertEquals("value2", 
options.getConfig(check_filesystem_ownership).get("key"));
-
-        assertTrue(options.isEnabled(check_filesystem_ownership));
-        options.disable(check_filesystem_ownership);
-        assertFalse(options.isEnabled(check_filesystem_ownership));
-        assertTrue(options.isDisabled(check_filesystem_ownership));
-    }
-
-    @Test
-    public void testNoOptions()
-    {
-        StartupChecksOptions options = new StartupChecksOptions();
-
-        assertTrue(options.isEnabled(non_configurable_check));
-
-        // disabling does not to anything on non-configurable check
-        options.disable(non_configurable_check);
-        assertTrue(options.isEnabled(non_configurable_check));
-
-        options.set(non_configurable_check, "key", "value");
-
-        // we can not put anything into non-configurable check
-        
assertFalse(options.getConfig(non_configurable_check).containsKey("key"));
-    }
-
-    @Test
-    public void testEmptyDisabledValues()
-    {
-        Map<StartupCheckType, Map<String, Object>> emptyConfig = new 
EnumMap<StartupCheckType, Map<String, Object>>(StartupCheckType.class) {{
-            put(check_filesystem_ownership, new HashMap<>());
-        }};
-
-        Map<StartupCheckType, Map<String, Object>> emptyEnabledConfig = new 
EnumMap<StartupCheckType, Map<String, Object>>(StartupCheckType.class) {{
-            put(check_filesystem_ownership, new HashMap<String, Object>() {{
-                put(ENABLED_PROPERTY, null);
-            }});
-        }};
-
-        // empty enabled property or enabled property with null value are 
still counted as enabled
-
-        StartupChecksOptions options1 = new StartupChecksOptions(emptyConfig);
-        assertTrue(options1.isDisabled(check_filesystem_ownership));
-
-        StartupChecksOptions options2 = new 
StartupChecksOptions(emptyEnabledConfig);
-        assertTrue(options2.isDisabled(check_filesystem_ownership));
-    }
-
-    @Test
-    public void testChecksDisabledByDefaultAreNotEnabled()
-    {
-        Map<StartupCheckType, Map<String, Object>> emptyConfig = new 
EnumMap<>(StartupCheckType.class);
-        StartupChecksOptions options = new StartupChecksOptions(emptyConfig);
-        assertTrue(options.isDisabled(check_filesystem_ownership));
-    }
-
-    @Test
-    public void testExcludedKeyspacesInDataResurrectionCheckOptions()
-    {
-        Map<String, Object> config = new HashMap<String, Object>(){{
-            put("excluded_keyspaces", "ks1,ks2,ks3");
-        }};
-        DataResurrectionCheck check = new DataResurrectionCheck();
-        check.getExcludedKeyspaces(config);
-
-        Set<String> excludedKeyspaces = check.getExcludedKeyspaces(config);
-        assertEquals(3, excludedKeyspaces.size());
-        assertTrue(excludedKeyspaces.contains("ks1"));
-        assertTrue(excludedKeyspaces.contains("ks2"));
-        assertTrue(excludedKeyspaces.contains("ks3"));
-    }
-
-    @Test
-    public void testExcludedTablesInDataResurrectionCheckOptions()
-    {
-        for (String input : new String[]{
-        "ks1.tb1,ks1.tb2,ks3.tb3",
-        " ks1 . tb1,  ks1 .tb2  ,ks3 .tb3  "
-        })
-        {
-            Map<String, Object> config = new HashMap<String, Object>(){{
-                put("excluded_tables", input);
-            }};
-
-            DataResurrectionCheck check = new DataResurrectionCheck();
-            Set<Pair<String, String>> excludedTables = 
check.getExcludedTables(config);
-            assertEquals(3, excludedTables.size());
-            assertTrue(excludedTables.contains(Pair.create("ks1", "tb1")));
-            assertTrue(excludedTables.contains(Pair.create("ks1", "tb2")));
-            assertTrue(excludedTables.contains(Pair.create("ks3", "tb3")));
-        }
-    }
-}
diff --git 
a/test/unit/org/apache/cassandra/config/StartupChecksConfigurationTest.java 
b/test/unit/org/apache/cassandra/config/StartupChecksConfigurationTest.java
new file mode 100644
index 0000000000..72cb806813
--- /dev/null
+++ b/test/unit/org/apache/cassandra/config/StartupChecksConfigurationTest.java
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.config;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.cassandra.service.DataResurrectionCheck;
+import org.apache.cassandra.service.FileSystemOwnershipCheck;
+import org.apache.cassandra.service.StartupCheck;
+import org.apache.cassandra.service.StartupChecks;
+import org.apache.cassandra.utils.Pair;
+
+import static 
org.apache.cassandra.config.StartupChecksConfiguration.ENABLED_PROPERTY;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+public class StartupChecksConfigurationTest
+{
+    @Test
+    public void testStartupOptionsConfigApplication()
+    {
+        Map<String, Map<String, Object>> config = new HashMap<>()
+        {{
+            put("check_filesystem_ownership", new HashMap<>()
+            {{
+                put(ENABLED_PROPERTY, true);
+                put("key", "value");
+            }});
+        }};
+
+        StartupChecks startupChecks = new 
StartupChecks().withDefaultTests().withTest(new FileSystemOwnershipCheck());
+        StartupChecksConfiguration options = new 
StartupChecksConfiguration(startupChecks, config);
+
+        
assertTrue(Boolean.parseBoolean(options.getConfig("check_filesystem_ownership")
+                                               .get(ENABLED_PROPERTY)
+                                               .toString()));
+
+        assertEquals("value", 
options.getConfig("check_filesystem_ownership").get("key"));
+        options.set("check_filesystem_ownership", "key", "value2");
+        assertEquals("value2", 
options.getConfig("check_filesystem_ownership").get("key"));
+
+        assertTrue(options.isEnabled("check_filesystem_ownership"));
+        options.disable("check_filesystem_ownership");
+        assertFalse(options.isEnabled("check_filesystem_ownership"));
+        assertTrue(options.isDisabled("check_filesystem_ownership"));
+    }
+
+    @Test
+    public void testNoOptions()
+    {
+
+        StartupChecks startupChecks = new StartupChecks().withDefaultTests();
+
+        StartupChecksConfiguration options = new 
StartupChecksConfiguration(new StartupChecks().withDefaultTests(), new 
HashMap<>());
+
+        for (StartupCheck check : startupChecks.getChecks())
+        {
+            if (!check.isConfigurable())
+                assertTrue(options.isEnabled(check.name()));
+        }
+
+        // disabling does not do anything on non-configurable check
+
+        Optional<StartupCheck> nonConfigurableCheck = 
startupChecks.getChecks().stream().filter(check -> 
!check.isConfigurable()).findFirst();
+
+        Assert.assertTrue(nonConfigurableCheck.isPresent());
+
+        String checkName = nonConfigurableCheck.get().name();
+
+        options.disable(checkName);
+
+        assertTrue(options.isEnabled(checkName));
+
+        options.set(checkName, "key", "value");
+        // we can not put anything into non-configurable check
+        Map<String, Object> nonConfigurableCheckConfig = 
options.getConfig(checkName);
+        assertNotNull(nonConfigurableCheckConfig);
+        assertFalse(nonConfigurableCheckConfig.containsKey("key"));
+    }
+
+    @Test
+    public void testEmptyDisabledValues()
+    {
+        Map<String, Map<String, Object>> emptyConfig = new HashMap<>()
+        {{
+            put("check_filesystem_ownership", new HashMap<>());
+        }};
+
+        Map<String, Map<String, Object>> emptyEnabledConfig = new HashMap<>()
+        {{
+            put("check_filesystem_ownership", new HashMap<>()
+            {{
+                put(ENABLED_PROPERTY, null);
+            }});
+        }};
+
+        // empty enabled property or enabled property with null value are 
still counted as disabled
+
+        StartupChecks startupChecks = new 
StartupChecks().withDefaultTests().withTest(new FileSystemOwnershipCheck());
+        StartupChecksConfiguration options1 = new 
StartupChecksConfiguration(startupChecks, emptyConfig);
+        assertTrue(options1.isDisabled("check_filesystem_ownership"));
+
+        StartupChecksConfiguration options2 = new 
StartupChecksConfiguration(startupChecks, emptyEnabledConfig);
+        assertTrue(options2.isDisabled("check_filesystem_ownership"));
+    }
+
+    @Test
+    public void testChecksDisabledByDefaultAreNotEnabled()
+    {
+        Map<String, Map<String, Object>> emptyConfig = new HashMap<>();
+        StartupChecksConfiguration options = new 
StartupChecksConfiguration(new StartupChecks().withDefaultTests(), emptyConfig);
+        assertTrue(options.isDisabled("check_filesystem_ownership"));
+    }
+
+    @Test
+    public void testExcludedKeyspacesInDataResurrectionCheckOptions()
+    {
+        Map<String, Object> config = new HashMap<>()
+        {{
+            put("excluded_keyspaces", "ks1,ks2,ks3");
+        }};
+        DataResurrectionCheck check = new DataResurrectionCheck();
+        check.getExcludedKeyspaces(config);
+
+        Set<String> excludedKeyspaces = check.getExcludedKeyspaces(config);
+        assertEquals(3, excludedKeyspaces.size());
+        assertTrue(excludedKeyspaces.contains("ks1"));
+        assertTrue(excludedKeyspaces.contains("ks2"));
+        assertTrue(excludedKeyspaces.contains("ks3"));
+    }
+
+    @Test
+    public void testExcludedTablesInDataResurrectionCheckOptions()
+    {
+        for (String input : new String[]{
+        "ks1.tb1,ks1.tb2,ks3.tb3",
+        " ks1 . tb1,  ks1 .tb2  ,ks3 .tb3  "
+        })
+        {
+            Map<String, Object> config = new HashMap<>()
+            {{
+                put("excluded_tables", input);
+            }};
+
+            DataResurrectionCheck check = new DataResurrectionCheck();
+            Set<Pair<String, String>> excludedTables = 
check.getExcludedTables(config);
+            assertEquals(3, excludedTables.size());
+            assertTrue(excludedTables.contains(Pair.create("ks1", "tb1")));
+            assertTrue(excludedTables.contains(Pair.create("ks1", "tb2")));
+            assertTrue(excludedTables.contains(Pair.create("ks3", "tb3")));
+        }
+    }
+
+    @Test
+    public void testNonConfigurableCheckIsNotConfigurable()
+    {
+        StartupChecks startupChecks = new StartupChecks().withDefaultTests();
+        Optional<StartupCheck> maybeNotConfigurableCheck = 
startupChecks.getChecks().stream().filter(check -> 
!check.isConfigurable()).findFirst();
+        Assert.assertTrue(maybeNotConfigurableCheck.isPresent());
+
+        StartupCheck check = maybeNotConfigurableCheck.get();
+
+        Map<String, Map<String, Object>> config = new HashMap<>()
+        {
+            {
+                put(check.name(), new HashMap<>()
+                {{
+                    put(ENABLED_PROPERTY, false);
+                    put("key", "value");
+                }});
+            }
+        };
+
+        assertThatThrownBy(() -> new StartupChecksConfiguration(startupChecks, 
config))
+        .isInstanceOf(IllegalStateException.class)
+        .hasMessage(String.format("There are configuration entries for startup 
checks which are not configurable: [%s]", check.name()));
+    }
+
+    @Test
+    public void testNotExistingCheckYieldsInvalidState()
+    {
+        StartupChecks startupChecks = new StartupChecks().withDefaultTests();
+        Map<String, Map<String, Object>> config = new HashMap<>()
+        {
+            {
+                put("jemalloc", new HashMap<>()
+                {{
+                    put(ENABLED_PROPERTY, true);
+                    put("key", "value");
+                }});
+                put("check_data_resurrection", new HashMap<>()
+                {{
+                    put(ENABLED_PROPERTY, true);
+                    put("key2", "value2");
+                }});
+            }
+        };
+
+        assertThatThrownBy(() -> new StartupChecksConfiguration(startupChecks, 
config))
+        .isInstanceOf(IllegalStateException.class)
+        .hasMessage("There are configuration entries for startup checks which 
are not configurable: [jemalloc]");
+    }
+}
diff --git 
a/test/unit/org/apache/cassandra/config/YamlConfigurationLoaderTest.java 
b/test/unit/org/apache/cassandra/config/YamlConfigurationLoaderTest.java
index 1992a50dd5..a816a9f9fb 100644
--- a/test/unit/org/apache/cassandra/config/YamlConfigurationLoaderTest.java
+++ b/test/unit/org/apache/cassandra/config/YamlConfigurationLoaderTest.java
@@ -46,7 +46,6 @@ import 
org.apache.cassandra.distributed.shared.WithEnvironment;
 import org.apache.cassandra.distributed.shared.WithProperties;
 import org.apache.cassandra.io.util.File;
 import org.apache.cassandra.repair.autorepair.AutoRepairConfig;
-import org.apache.cassandra.service.StartupChecks;
 
 import static 
org.apache.cassandra.config.CassandraRelevantProperties.CONFIG_ALLOW_ENVIRONMENT_VARIABLES;
 import static 
org.apache.cassandra.config.CassandraRelevantProperties.CONFIG_ALLOW_SYSTEM_PROPERTIES;
@@ -179,8 +178,8 @@ public class YamlConfigurationLoaderTest
             assertThat(config.paxos_variant).isEqualTo(Config.PaxosVariant.v2);
             assertThat(config.client_error_reporting_exclusions).isEqualTo(new 
SubnetGroups(Arrays.asList("127.0.0.2", "127.0.0.1")));
             assertThat(config.startup_checks).hasSize(1);
-            
assertThat(config.startup_checks.get(StartupChecks.StartupCheckType.check_data_resurrection).get("enabled")).isEqualTo(Boolean.TRUE.toString());
-            
assertThat(config.startup_checks.get(StartupChecks.StartupCheckType.check_data_resurrection).get("heartbeat_file")).isEqualTo("/var/lib/cassandra/data/cassandra-heartbeat");
+            
assertThat(config.startup_checks.get("check_data_resurrection").get("enabled")).isEqualTo(Boolean.TRUE.toString());
+            
assertThat(config.startup_checks.get("check_data_resurrection").get("heartbeat_file")).isEqualTo("/var/lib/cassandra/data/cassandra-heartbeat");
         }
 
         try (WithProperties ignore = new WithProperties()
@@ -278,8 +277,8 @@ public class YamlConfigurationLoaderTest
             
assertThat(config.memtable.configurations.get("default").inherits).isEqualTo("trie");
             assertThat(config.client_error_reporting_exclusions).isEqualTo(new 
SubnetGroups(Arrays.asList("127.0.0.2", "127.0.0.1")));
             assertThat(config.startup_checks).hasSize(1);
-            
assertThat(config.startup_checks.get(StartupChecks.StartupCheckType.check_data_resurrection).get("enabled")).isEqualTo("true");
-            
assertThat(config.startup_checks.get(StartupChecks.StartupCheckType.check_data_resurrection).get("heartbeat_file")).isEqualTo("/var/lib/cassandra/data/cassandra-heartbeat");
+            
assertThat(config.startup_checks.get("check_data_resurrection").get("enabled")).isEqualTo("true");
+            
assertThat(config.startup_checks.get("check_data_resurrection").get("heartbeat_file")).isEqualTo("/var/lib/cassandra/data/cassandra-heartbeat");
         }
 
         try (WithEnvironment ignore  = new 
WithEnvironment(CassandraRelevantEnv.CASSANDRA_ALLOW_CONFIG_ENVIRONMENT_VARIABLES.getKey(),
 Boolean.TRUE.toString(),
diff --git 
a/test/unit/org/apache/cassandra/service/AbstractFilesystemOwnershipCheckTest.java
 
b/test/unit/org/apache/cassandra/service/AbstractFilesystemOwnershipCheckTest.java
index 7c0d78dbd5..4eea222cf3 100644
--- 
a/test/unit/org/apache/cassandra/service/AbstractFilesystemOwnershipCheckTest.java
+++ 
b/test/unit/org/apache/cassandra/service/AbstractFilesystemOwnershipCheckTest.java
@@ -34,7 +34,7 @@ import org.junit.Ignore;
 import org.junit.Test;
 
 import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.config.StartupChecksOptions;
+import org.apache.cassandra.config.StartupChecksConfiguration;
 import org.apache.cassandra.distributed.shared.WithProperties;
 import org.apache.cassandra.exceptions.StartupException;
 import org.apache.cassandra.io.util.File;
@@ -55,7 +55,6 @@ import static 
org.apache.cassandra.service.FileSystemOwnershipCheck.TOKEN;
 import static 
org.apache.cassandra.service.FileSystemOwnershipCheck.UNSUPPORTED_VERSION;
 import static org.apache.cassandra.service.FileSystemOwnershipCheck.VERSION;
 import static 
org.apache.cassandra.service.FileSystemOwnershipCheck.VOLUME_COUNT;
-import static 
org.apache.cassandra.service.StartupChecks.StartupCheckType.check_filesystem_ownership;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
@@ -66,7 +65,7 @@ public abstract class AbstractFilesystemOwnershipCheckTest
     protected File tempDir;
     protected String token;
 
-    protected StartupChecksOptions options = new StartupChecksOptions();
+    protected StartupChecksConfiguration options;
 
     static WithProperties properties;
 
@@ -99,7 +98,7 @@ public abstract class AbstractFilesystemOwnershipCheckTest
     }
 
     private static void executeAndFail(FileSystemOwnershipCheck checker,
-                                       StartupChecksOptions options,
+                                       StartupChecksConfiguration options,
                                        String messageTemplate,
                                        Object...messageArgs)
     {
@@ -206,7 +205,7 @@ public abstract class AbstractFilesystemOwnershipCheckTest
     public void skipCheckDisabledIfSystemPropertyIsEmpty() throws Exception
     {
         // no exceptions thrown from the supplier because the check is skipped
-        options.disable(check_filesystem_ownership);
+        options.disable("check_filesystem_ownership");
         System.clearProperty(FILE_SYSTEM_CHECK_ENABLE.getKey());
         AbstractFilesystemOwnershipCheckTest.checker(() -> { throw new 
RuntimeException("FAIL"); }).execute(options);
     }
@@ -215,7 +214,7 @@ public abstract class AbstractFilesystemOwnershipCheckTest
     public void skipCheckDisabledIfSystemPropertyIsFalseButOptionsEnabled() 
throws Exception
     {
         // no exceptions thrown from the supplier because the check is skipped
-        options.enable(check_filesystem_ownership);
+        options.enable("check_filesystem_ownership");
         FILE_SYSTEM_CHECK_ENABLE.setBoolean(false);
         AbstractFilesystemOwnershipCheckTest.checker(() -> { throw new 
RuntimeException("FAIL"); }).execute(options);
     }
@@ -230,7 +229,7 @@ public abstract class AbstractFilesystemOwnershipCheckTest
     @Test
     public void checkEnabledButClusterPropertyIsUnset()
     {
-        
Assume.assumeFalse(options.getConfig(check_filesystem_ownership).containsKey("ownership_token"));
+        
Assume.assumeFalse(options.getConfig("check_filesystem_ownership").containsKey("ownership_token"));
         FILE_SYSTEM_CHECK_OWNERSHIP_TOKEN.clearValue(); // checkstyle: 
suppress nearby 'clearValueSystemPropertyUsage'
         AbstractFilesystemOwnershipCheckTest.executeAndFail(checker(tempDir), 
options, MISSING_PROPERTY, FILE_SYSTEM_CHECK_OWNERSHIP_TOKEN.getKey());
     }
diff --git a/test/unit/org/apache/cassandra/service/StartupChecksTest.java 
b/test/unit/org/apache/cassandra/service/StartupChecksTest.java
index 8cb204f795..7d5ba0e40c 100644
--- a/test/unit/org/apache/cassandra/service/StartupChecksTest.java
+++ b/test/unit/org/apache/cassandra/service/StartupChecksTest.java
@@ -25,8 +25,10 @@ import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.nio.file.spi.FileSystemProvider;
 import java.time.Instant;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.ServiceLoader;
 import java.util.Set;
 import java.util.concurrent.Callable;
 import java.util.stream.Collectors;
@@ -40,12 +42,14 @@ import org.junit.Assume;
 import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
+import org.mockito.MockedStatic;
+import org.mockito.Mockito;
 
 import org.apache.cassandra.SchemaLoader;
 import org.apache.cassandra.config.CassandraRelevantProperties;
 import org.apache.cassandra.config.Config.DiskAccessMode;
 import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.config.StartupChecksOptions;
+import org.apache.cassandra.config.StartupChecksConfiguration;
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.Directories;
 import org.apache.cassandra.db.Keyspace;
@@ -64,13 +68,18 @@ import static java.util.Collections.singletonList;
 import static 
org.apache.cassandra.config.CassandraRelevantProperties.TEST_INVALID_LEGACY_SSTABLE_ROOT;
 import static org.apache.cassandra.io.util.FileUtils.createTempFile;
 import static 
org.apache.cassandra.service.DataResurrectionCheck.HEARTBEAT_FILE_CONFIG_PROPERTY;
-import static 
org.apache.cassandra.service.StartupChecks.StartupCheckType.check_data_resurrection;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
+import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 public class StartupChecksTest
@@ -88,7 +97,7 @@ public class StartupChecksTest
     Path sstableDir;
     static File heartbeatFile;
 
-    StartupChecksOptions options = new StartupChecksOptions();
+    StartupChecksConfiguration options = new StartupChecksConfiguration(new 
StartupChecks().withDefaultTests(), new HashMap<>());
 
     @BeforeClass
     public static void setupServer()
@@ -109,8 +118,8 @@ public class StartupChecksTest
         sstableDir = Paths.get(dataDir.absolutePath(), "Keyspace1", 
"Standard1");
         Files.createDirectories(sstableDir);
 
-        options.enable(check_data_resurrection);
-        options.getConfig(check_data_resurrection)
+        options.enable("check_data_resurrection");
+        options.getConfig("check_data_resurrection")
                .put(HEARTBEAT_FILE_CONFIG_PROPERTY, 
heartbeatFile.absolutePath());
 
         startupChecks = new StartupChecks();
@@ -183,13 +192,13 @@ public class StartupChecksTest
     public void testGetReadAheadKBPath()
     {
         Path sdaDirectory = StartupChecks.getReadAheadKBPath("/dev/sda12");
-        Assert.assertEquals(Paths.get("/sys/block/sda/queue/read_ahead_kb"), 
sdaDirectory);
+        assertEquals(Paths.get("/sys/block/sda/queue/read_ahead_kb"), 
sdaDirectory);
 
         Path scsiDirectory = StartupChecks.getReadAheadKBPath("/dev/scsi1");
-        Assert.assertEquals(Paths.get("/sys/block/scsi/queue/read_ahead_kb"), 
scsiDirectory);
+        assertEquals(Paths.get("/sys/block/scsi/queue/read_ahead_kb"), 
scsiDirectory);
 
         Path dirWithoutNumbers = StartupChecks.getReadAheadKBPath("/dev/sca");
-        Assert.assertEquals(Paths.get("/sys/block/sca/queue/read_ahead_kb"), 
dirWithoutNumbers);
+        assertEquals(Paths.get("/sys/block/sca/queue/read_ahead_kb"), 
dirWithoutNumbers);
 
         Path invalidDir = StartupChecks.getReadAheadKBPath("/invaliddir/xpto");
         Assert.assertNull(invalidDir);
@@ -254,6 +263,157 @@ public class StartupChecksTest
         testKernelBug1057843Check("ext4", DiskAccessMode.mmap, new 
Semver("6.1.64.1-generic"), false);
     }
 
+    @SuppressWarnings("unchecked")
+    @Test
+    public void testExternalCheckIsLoaded() throws StartupException
+    {
+        StartupCheck externalCheck = spy(new StartupCheck()
+        {
+            @Override
+            public String name()
+            {
+                return "my_custom_check";
+            }
+
+            @Override
+            public void execute(StartupChecksConfiguration configuration)
+            {
+
+            }
+
+            @Override
+            public boolean isConfigurable()
+            {
+                return true;
+            }
+
+            @Override
+            public boolean isDisabledByDefault()
+            {
+                return false;
+            }
+        });
+
+        ServiceLoader<StartupCheck> loader = mock(ServiceLoader.class);
+        doReturn(List.of(externalCheck).iterator()).when(loader).iterator();
+        try (MockedStatic<ServiceLoader> serviceLoader = 
Mockito.mockStatic(ServiceLoader.class)) {
+            serviceLoader.when(() -> 
ServiceLoader.load(StartupCheck.class)).thenReturn(loader);
+
+            StartupChecks checks = new 
StartupChecks().withDefaultTests().withServiceLoaderTests();
+
+            StartupCheck myCustomCheck = checks.getCheck("my_custom_check");
+            assertNotNull(myCustomCheck);
+
+            StartupChecksConfiguration configuration = new 
StartupChecksConfiguration(checks, new HashMap<>());
+
+            checks.verify(configuration);
+            verify(externalCheck, times(1)).execute(configuration);
+        }
+    }
+
+    @Test
+    public void testLoadingCustomChecksWithNotUniqueNameIsForbidden()
+    {
+        StartupCheck externalCheck = spy(new StartupCheck()
+        {
+            @Override
+            public String name()
+            {
+                return "my_custom_check";
+            }
+
+            @Override
+            public void execute(StartupChecksConfiguration configuration)
+            {
+
+            }
+
+            @Override
+            public boolean isConfigurable()
+            {
+                return true;
+            }
+
+            @Override
+            public boolean isDisabledByDefault()
+            {
+                return false;
+            }
+        });
+
+        ServiceLoader<StartupCheck> loader = mock(ServiceLoader.class);
+
+        // two times! We model loading of two checks with same name
+        doReturn(List.of(externalCheck, 
externalCheck).iterator()).when(loader).iterator();
+
+        try (MockedStatic<ServiceLoader> serviceLoader = 
Mockito.mockStatic(ServiceLoader.class)) {
+            serviceLoader.when(() -> 
ServiceLoader.load(StartupCheck.class)).thenReturn(loader);
+
+            try
+            {
+                new 
StartupChecks().withDefaultTests().withServiceLoaderTests();
+                fail("it should not be possible to specify two custom checks 
with same name");
+            }
+            catch (Throwable t)
+            {
+                assertEquals("There was an attempt to load custom startup 
checks with same name which is ambiguous: [my_custom_check]",
+                             t.getMessage());
+            }
+        }
+    }
+
+    @Test
+    public void testCustomCheckHasSameNameAsInBuiltCheck()
+    {
+        StartupCheck externalCheck = spy(new StartupCheck()
+        {
+            @Override
+            public String name()
+            {
+                // for the sake of it being same as one of in-builts
+                return StartupChecks.checkLz4Native.name();
+            }
+
+            @Override
+            public void execute(StartupChecksConfiguration configuration)
+            {
+
+            }
+
+            @Override
+            public boolean isConfigurable()
+            {
+                return true;
+            }
+
+            @Override
+            public boolean isDisabledByDefault()
+            {
+                return false;
+            }
+        });
+
+        ServiceLoader<StartupCheck> loader = mock(ServiceLoader.class);
+
+        // two times! We model loading of two checks with same name
+        doReturn(List.of(externalCheck, 
externalCheck).iterator()).when(loader).iterator();
+
+        try (MockedStatic<ServiceLoader> serviceLoader = 
Mockito.mockStatic(ServiceLoader.class)) {
+            serviceLoader.when(() -> 
ServiceLoader.load(StartupCheck.class)).thenReturn(loader);
+
+            try
+            {
+                new 
StartupChecks().withDefaultTests().withServiceLoaderTests();
+                fail("it should not be possible to specify a check with same 
name as in-built check");
+            }
+            catch (Throwable t)
+            {
+                assertEquals("There was an attempt to load custom startup 
checks with same name which is ambiguous: [" + 
StartupChecks.checkLz4Native.name() + ']',
+                             t.getMessage());
+            }
+        }
+    }
+
     private <R> void withPathOverriddingFileSystem(Map<String, String> 
pathOverrides, Callable<? extends R> callable) throws Exception
     {
         Map<String, FileStore> fileStores = 
Set.copyOf(pathOverrides.values()).stream().collect(Collectors.toMap(s -> s, s 
-> {
diff --git 
a/test/unit/org/apache/cassandra/service/SystemPropertiesBasedFileSystemOwnershipCheckTest.java
 
b/test/unit/org/apache/cassandra/service/SystemPropertiesBasedFileSystemOwnershipCheckTest.java
index 557b26dc95..0e99773c89 100644
--- 
a/test/unit/org/apache/cassandra/service/SystemPropertiesBasedFileSystemOwnershipCheckTest.java
+++ 
b/test/unit/org/apache/cassandra/service/SystemPropertiesBasedFileSystemOwnershipCheckTest.java
@@ -18,9 +18,12 @@
 
 package org.apache.cassandra.service;
 
+import java.util.HashMap;
+
 import org.junit.Before;
 
 import org.apache.cassandra.config.CassandraRelevantProperties;
+import org.apache.cassandra.config.StartupChecksConfiguration;
 import org.apache.cassandra.distributed.shared.WithProperties;
 
 public class SystemPropertiesBasedFileSystemOwnershipCheckTest extends 
AbstractFilesystemOwnershipCheckTest
@@ -31,5 +34,7 @@ public class 
SystemPropertiesBasedFileSystemOwnershipCheckTest extends AbstractF
         super.setup();
         properties = new 
WithProperties().set(CassandraRelevantProperties.FILE_SYSTEM_CHECK_OWNERSHIP_TOKEN,
 token)
                                          
.set(CassandraRelevantProperties.FILE_SYSTEM_CHECK_ENABLE, true);
+        StartupChecks startupChecks = new StartupChecks().withTest(new 
FileSystemOwnershipCheck());
+        options = new StartupChecksConfiguration(startupChecks, new 
HashMap<>());
     }
-}
\ No newline at end of file
+}
diff --git 
a/test/unit/org/apache/cassandra/service/YamlBasedFileSystemOwnershipCheckTest.java
 
b/test/unit/org/apache/cassandra/service/YamlBasedFileSystemOwnershipCheckTest.java
index 9b962e9225..3bb3259790 100644
--- 
a/test/unit/org/apache/cassandra/service/YamlBasedFileSystemOwnershipCheckTest.java
+++ 
b/test/unit/org/apache/cassandra/service/YamlBasedFileSystemOwnershipCheckTest.java
@@ -18,10 +18,13 @@
 
 package org.apache.cassandra.service;
 
+import java.util.HashMap;
+
 import org.junit.Before;
 
-import static 
org.apache.cassandra.config.StartupChecksOptions.ENABLED_PROPERTY;
-import static 
org.apache.cassandra.service.StartupChecks.StartupCheckType.check_filesystem_ownership;
+import org.apache.cassandra.config.StartupChecksConfiguration;
+
+import static 
org.apache.cassandra.config.StartupChecksConfiguration.ENABLED_PROPERTY;
 
 public class YamlBasedFileSystemOwnershipCheckTest extends 
AbstractFilesystemOwnershipCheckTest
 {
@@ -29,7 +32,13 @@ public class YamlBasedFileSystemOwnershipCheckTest extends 
AbstractFilesystemOwn
     public void setup()
     {
         super.setup();
-        options.getConfig(check_filesystem_ownership).put(ENABLED_PROPERTY, 
"true");
-        options.getConfig(check_filesystem_ownership).put("ownership_token", 
token);
+        StartupChecks startupChecks = new StartupChecks().withTest(new 
FileSystemOwnershipCheck());
+        options = new StartupChecksConfiguration(startupChecks, new 
HashMap<>() {{
+            put("check_filesystem_ownership", new HashMap<>()
+            {{
+                put(ENABLED_PROPERTY, "true");
+                put("ownership_token", token);
+            }});
+        }});
     }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to