This is an automated email from the ASF dual-hosted git repository.

smiklosovic pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/trunk by this push:
     new eac9d92  make startup checks configurable
eac9d92 is described below

commit eac9d925621dc1757ac88ac7160dc0b2bd3e0015
Author: Stefan Miklosovic <[email protected]>
AuthorDate: Fri Feb 18 14:40:08 2022 +0100

    make startup checks configurable
    
    patch by Stefan Miklosovic; reviewed by Paulo Motta for CASSANDRA-17220
---
 CHANGES.txt                                        |   1 +
 conf/cassandra.yaml                                |  14 +
 .../config/CassandraRelevantProperties.java        |  27 +-
 src/java/org/apache/cassandra/config/Config.java   |   5 +
 .../cassandra/config/DatabaseDescriptor.java       |  15 +
 .../cassandra/config/StartupChecksOptions.java     |  97 ++++
 .../apache/cassandra/service/CassandraDaemon.java  |   2 +-
 .../service/FileSystemOwnershipCheck.java          |  83 +++-
 .../org/apache/cassandra/service/StartupCheck.java |  16 +-
 .../apache/cassandra/service/StartupChecks.java    | 212 +++++---
 .../config/DatabaseDescriptorRefTest.java          |   3 +-
 .../cassandra/config/StartupCheckOptionsTest.java  | 110 +++++
 .../AbstractFilesystemOwnershipCheckTest.java      | 535 +++++++++++++++++++++
 .../service/FileSystemOwnershipCheckTest.java      | 490 -------------------
 .../cassandra/service/StartupChecksTest.java       |  15 +-
 ...ropertiesBasedFileSystemOwnershipCheckTest.java |  34 ++
 .../YamlBasedFileSystemOwnershipCheckTest.java     |  35 ++
 17 files changed, 1123 insertions(+), 571 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index de90a56..39a0f64 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 4.1
+ * Make startup checks configurable (CASSANDRA-17220)
  * Add guardrail for number of partition keys on IN queries (CASSANDRA-17186)
  * update Python test framework from nose to pytest (CASSANDRA-17293)
  * Fix improper CDC commit log segments deletion in non-blocking mode 
(CASSANDRA-17233)
diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml
index bb8cb28..eecb453 100644
--- a/conf/cassandra.yaml
+++ b/conf/cassandra.yaml
@@ -1620,3 +1620,17 @@ drop_compact_storage_enabled: false
 # The two thresholds default to -1 to disable. 
 # partition_keys_in_select_warn_threshold: -1
 # partition_keys_in_select_fail_threshold: -1
+
+# Startup Checks are executed as part of Cassandra startup process, not all of 
them
+# are configurable (so you can disable them) but these which are enumerated 
bellow.
+# Uncomment the startup checks and configure them appropriately to cover your 
needs.
+#
+#startup_checks:
+#  filesystem_ownership:
+#    enabled: false
+#    ownership_token: "sometoken" # (overriden by "CassandraOwnershipToken" 
system property)
+#    ownership_filename: ".cassandra_fs_ownership" # (overriden by 
"cassandra.fs_ownership_filename")
+#  dc:
+#    enabled: true # (overriden by cassandra.ignore_dc system property)
+#  rack:
+#    enabled: true # (overriden by cassandra.ignore_rack system property)
diff --git 
a/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java 
b/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java
index 5e2f0a8..74ed13e 100644
--- a/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java
+++ b/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java
@@ -19,6 +19,7 @@
 package org.apache.cassandra.config;
 
 import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.service.FileSystemOwnershipCheck;
 
 /** A class that extracts system properties for the cassandra node it runs 
within. */
 public enum CassandraRelevantProperties
@@ -93,8 +94,9 @@ public enum CassandraRelevantProperties
      */
     COM_SUN_MANAGEMENT_JMXREMOTE_RMI_PORT 
("com.sun.management.jmxremote.rmi.port", "0"),
 
-    /** Cassandra jmx remote port */
+    /** Cassandra jmx remote and local port */
     CASSANDRA_JMX_REMOTE_PORT("cassandra.jmx.remote.port"),
+    CASSANDRA_JMX_LOCAL_PORT("cassandra.jmx.local.port"),
 
     /** This property  indicates whether SSL is enabled for monitoring 
remotely. Default is set to false. */
     COM_SUN_MANAGEMENT_JMXREMOTE_SSL ("com.sun.management.jmxremote.ssl"),
@@ -225,6 +227,19 @@ public enum CassandraRelevantProperties
 
     
PAXOS_REPAIR_RETRY_TIMEOUT_IN_MS("cassandra.paxos_repair_retry_timeout_millis", 
"60000"),
 
+    // startup checks properties
+    LIBJEMALLOC("cassandra.libjemalloc"),
+    @Deprecated // should be removed in favor of enable flag of relevant 
startup check (checkDatacenter)
+    IGNORE_DC("cassandra.ignore_dc"),
+    @Deprecated // should be removed in favor of enable flag of relevant 
startup check (checkRack)
+    IGNORE_RACK("cassandra.ignore_rack"),
+    @Deprecated // should be removed in favor of enable flag of relevant 
startup check (FileSystemOwnershipCheck)
+    FILE_SYSTEM_CHECK_ENABLE("cassandra.enable_fs_ownership_check"),
+    @Deprecated // should be removed in favor of flags in relevant startup 
check (FileSystemOwnershipCheck)
+    FILE_SYSTEM_CHECK_OWNERSHIP_FILENAME("cassandra.fs_ownership_filename", 
FileSystemOwnershipCheck.DEFAULT_FS_OWNERSHIP_FILENAME),
+    @Deprecated // should be removed in favor of flags in relevant startup 
check (FileSystemOwnershipCheck)
+    
FILE_SYSTEM_CHECK_OWNERSHIP_TOKEN(FileSystemOwnershipCheck.FILE_SYSTEM_CHECK_OWNERSHIP_TOKEN),
+
     // properties for debugging simulator ASM output
     TEST_SIMULATOR_PRINT_ASM("cassandra.test.simulator.print_asm", "none"),
     TEST_SIMULATOR_PRINT_ASM_TYPES("cassandra.test.simulator.print_asm_types", 
""),
@@ -283,6 +298,16 @@ public enum CassandraRelevantProperties
     }
 
     /**
+     * Returns default value.
+     *
+     * @return default value, if any, otherwise null.
+     */
+    public String getDefaultValue()
+    {
+        return defaultVal;
+    }
+
+    /**
      * Gets the value of a system property as a String.
      * @return system property String value if it exists, overrideDefaultValue 
otherwise.
      */
diff --git a/src/java/org/apache/cassandra/config/Config.java 
b/src/java/org/apache/cassandra/config/Config.java
index 910a3e8..15226cf 100644
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@ -21,6 +21,7 @@ import java.lang.reflect.Field;
 import java.lang.reflect.Modifier;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -36,6 +37,7 @@ import org.slf4j.LoggerFactory;
 import org.apache.cassandra.audit.AuditLogOptions;
 import org.apache.cassandra.db.ConsistencyLevel;
 import org.apache.cassandra.fql.FullQueryLoggerOptions;
+import org.apache.cassandra.service.StartupChecks.StartupCheckType;
 
 /**
  * A class that contains configuration properties for the cassandra node it 
runs within.
@@ -767,6 +769,9 @@ public class Config
     public volatile boolean user_timestamps_enabled = true;
     public volatile boolean read_before_write_list_operations_enabled = true;
 
+    /** The configuration of startup checks. */
+    public volatile Map<StartupCheckType, Map<String, Object>> startup_checks 
= new HashMap<>();
+
     public enum PaxosVariant
     {
         v1_without_linearizable_reads, // with legacy semantics for read/read 
linearizability (i.e. not guaranteed)
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java 
b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index af0b67a..9f88ce0 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -66,6 +66,8 @@ import org.apache.cassandra.locator.SeedProvider;
 import org.apache.cassandra.security.EncryptionContext;
 import org.apache.cassandra.security.SSLFactory;
 import org.apache.cassandra.service.CacheService.CacheType;
+import org.apache.cassandra.service.FileSystemOwnershipCheck;
+import org.apache.cassandra.service.StartupChecks;
 import org.apache.cassandra.service.paxos.Paxos;
 import org.apache.cassandra.utils.FBUtilities;
 
@@ -156,6 +158,7 @@ public class DatabaseDescriptor
 
     /** The configuration for guardrails. */
     private static GuardrailsOptions guardrails;
+    private static StartupChecksOptions startupChecksOptions;
 
     private static Function<CommitLog, AbstractCommitLogSegmentManager> 
commitLogSegmentMgrProvider = c -> DatabaseDescriptor.isCDCEnabled()
                                        ? new CommitLogSegmentManagerCDC(c, 
DatabaseDescriptor.getCommitLogLocation())
@@ -368,6 +371,8 @@ public class DatabaseDescriptor
         applySslContext();
 
         applyGuardrails();
+
+        applyStartupChecks();
     }
 
     private static void applySimpleConfig()
@@ -899,6 +904,16 @@ public class DatabaseDescriptor
         }
     }
 
+    public static StartupChecksOptions getStartupChecksOptions()
+    {
+        return startupChecksOptions;
+    }
+
+    private static void applyStartupChecks()
+    {
+        startupChecksOptions = new StartupChecksOptions(conf.startup_checks);
+    }
+
     private static String storagedirFor(String type)
     {
         return storagedir(type + "_directory") + File.pathSeparator() + type;
diff --git a/src/java/org/apache/cassandra/config/StartupChecksOptions.java 
b/src/java/org/apache/cassandra/config/StartupChecksOptions.java
new file mode 100644
index 0000000..6eb3189
--- /dev/null
+++ b/src/java/org/apache/cassandra/config/StartupChecksOptions.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.config;
+
+import java.util.Collections;
+import java.util.EnumMap;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.cassandra.service.StartupChecks.StartupCheckType;
+
+import static java.lang.Boolean.FALSE;
+import static java.lang.Boolean.TRUE;
+import static 
org.apache.cassandra.service.StartupChecks.StartupCheckType.non_configurable_check;
+
+public class StartupChecksOptions
+{
+    public static final String ENABLED_PROPERTY = "enabled";
+
+    private final Map<StartupCheckType, Map<String, Object>> options = new 
EnumMap<>(StartupCheckType.class);
+
+    public StartupChecksOptions()
+    {
+        this(Collections.emptyMap());
+    }
+
+    public StartupChecksOptions(final Map<StartupCheckType, Map<String, 
Object>> options)
+    {
+        this.options.putAll(options);
+        apply();
+    }
+
+    public void set(final StartupCheckType startupCheckType, final String key, 
final Object value)
+    {
+        if (startupCheckType != non_configurable_check)
+            options.get(startupCheckType).put(key, value);
+    }
+
+    public void enable(final StartupCheckType startupCheckType)
+    {
+        set(startupCheckType, ENABLED_PROPERTY, TRUE);
+    }
+
+    public void disable(final StartupCheckType startupCheckType)
+    {
+        if (startupCheckType != non_configurable_check)
+            set(startupCheckType, ENABLED_PROPERTY, FALSE);
+    }
+
+    public boolean isEnabled(final StartupCheckType startupCheckType)
+    {
+        return 
Boolean.parseBoolean(options.get(startupCheckType).get(ENABLED_PROPERTY).toString());
+    }
+
+    public boolean isDisabled(final StartupCheckType startupCheckType)
+    {
+        return !isEnabled(startupCheckType);
+    }
+
+    public Map<String, Object> getConfig(final StartupCheckType 
startupCheckType)
+    {
+        return options.get(startupCheckType);
+    }
+
+    private void apply()
+    {
+        for (final StartupCheckType startupCheckType : 
StartupCheckType.values())
+        {
+            final Map<String, Object> configMap = 
options.computeIfAbsent(startupCheckType, k -> new HashMap<>());
+            if (configMap.containsKey(ENABLED_PROPERTY))
+                configMap.putIfAbsent(ENABLED_PROPERTY, FALSE);
+            else if (startupCheckType.disabledByDefault)
+                configMap.put(ENABLED_PROPERTY, FALSE);
+            else
+                configMap.put(ENABLED_PROPERTY, TRUE);
+        }
+        // clear if we put anything into it by accident & enable this check 
every time no matter what
+        options.get(non_configurable_check).clear();
+        options.get(non_configurable_check).put(ENABLED_PROPERTY, TRUE);
+    }
+}
diff --git a/src/java/org/apache/cassandra/service/CassandraDaemon.java 
b/src/java/org/apache/cassandra/service/CassandraDaemon.java
index 9848b96..82eadbc 100644
--- a/src/java/org/apache/cassandra/service/CassandraDaemon.java
+++ b/src/java/org/apache/cassandra/service/CassandraDaemon.java
@@ -484,7 +484,7 @@ public class CassandraDaemon
     {
         try
         {
-            startupChecks.verify();
+            startupChecks.verify(DatabaseDescriptor.getStartupChecksOptions());
         }
         catch (StartupException e)
         {
diff --git 
a/src/java/org/apache/cassandra/service/FileSystemOwnershipCheck.java 
b/src/java/org/apache/cassandra/service/FileSystemOwnershipCheck.java
index fe4427c..0d214e6 100644
--- a/src/java/org/apache/cassandra/service/FileSystemOwnershipCheck.java
+++ b/src/java/org/apache/cassandra/service/FileSystemOwnershipCheck.java
@@ -33,10 +33,14 @@ import com.google.common.collect.Multimap;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.cassandra.config.CassandraRelevantProperties;
 import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.config.StartupChecksOptions;
 import org.apache.cassandra.exceptions.StartupException;
 import org.apache.cassandra.io.util.File;
 
+import static 
org.apache.cassandra.service.StartupChecks.StartupCheckType.filesystem_ownership;
+
 /**
  * Ownership markers on disk are compatible with the java property file format.
  * 
(https://docs.oracle.com/javase/8/docs/api/java/util/Properties.html#load-java.io.Reader-)
@@ -65,19 +69,15 @@ import org.apache.cassandra.io.util.File;
  * - The value of the volume_count property must be an int which must matches
  *   the number of distinct marker files found when traversing the filesystem.
  *
- * In overridden implementations, you will need to override {@link 
#constructTokenFromProperties()}
+ * In overridden implementations, you will need to override {@link 
#constructTokenFromProperties(Map)}
  * and add the related *_PROPERTY values you will want the system to check on 
startup to confirm ownership.
  */
 public class FileSystemOwnershipCheck implements StartupCheck
 {
     private static final Logger logger = 
LoggerFactory.getLogger(FileSystemOwnershipCheck.class);
 
-    // System properties
-    static final String ENABLE_FS_OWNERSHIP_CHECK_PROPERTY      = 
"cassandra.enable_fs_ownership_check";
-    static final String FS_OWNERSHIP_FILENAME_PROPERTY          = 
"cassandra.fs_ownership_filename";
-    static final String DEFAULT_FS_OWNERSHIP_FILENAME           = 
".cassandra_fs_ownership";
-
-    static final String OWNERSHIP_TOKEN                         = 
"CassandraOwnershipToken";
+    public static final String FILE_SYSTEM_CHECK_OWNERSHIP_TOKEN = 
"CassandraOwnershipToken";
+    public static final String DEFAULT_FS_OWNERSHIP_FILENAME = 
".cassandra_fs_ownership";
 
     // Ownership file properties
     static final String VERSION                                 = "version";
@@ -86,7 +86,7 @@ public class FileSystemOwnershipCheck implements StartupCheck
 
     // Error strings
     static final String ERROR_PREFIX                            = "FS 
ownership check failed; ";
-    static final String MISSING_SYSTEM_PROPERTY                 = "system 
property '%s' required for fs ownership check not supplied";
+    static final String MISSING_PROPERTY                        = "property 
'%s' required for fs ownership check not supplied";
     static final String NO_OWNERSHIP_FILE                       = "no file 
found in tree for %s";
     static final String MULTIPLE_OWNERSHIP_FILES                = "multiple 
files found in tree for %s";
     static final String INCONSISTENT_FILES_FOUND                = 
"inconsistent ownership files found on disk: %s";
@@ -112,16 +112,25 @@ public class FileSystemOwnershipCheck implements 
StartupCheck
         this.dirs = dirs;
     }
 
-    public void execute() throws StartupException
+    @Override
+    public StartupChecks.StartupCheckType getStartupCheckType()
+    {
+        return filesystem_ownership;
+    }
+
+    @Override
+    public void execute(StartupChecksOptions options) throws StartupException
     {
-        if (!Boolean.getBoolean(ENABLE_FS_OWNERSHIP_CHECK_PROPERTY))
+        if (!isEnabled(options))
         {
-            logger.info("Filesystem ownership check is not enabled: " + 
ENABLE_FS_OWNERSHIP_CHECK_PROPERTY);
+            logger.info("Filesystem ownership check is not enabled.");
             return;
         }
 
-        String expectedToken = constructTokenFromProperties();
-        String tokenFilename = 
System.getProperty(FS_OWNERSHIP_FILENAME_PROPERTY, 
DEFAULT_FS_OWNERSHIP_FILENAME);
+        Map<String, Object> config = options.getConfig(getStartupCheckType());
+
+        String expectedToken = constructTokenFromProperties(config);
+        String tokenFilename = getFsOwnershipFilename(config);
         Map<String, Integer> foundPerTargetDir = new HashMap<>();
         Map<Path, Properties> foundProperties = new HashMap<>();
 
@@ -218,11 +227,11 @@ public class FileSystemOwnershipCheck implements 
StartupCheck
     }
 
     /** In version 1, we check and return the ownership token. Extend this for 
custom ownership hierarchies. */
-    protected String constructTokenFromProperties() throws StartupException
+    protected String constructTokenFromProperties(Map<String, Object> config) 
throws StartupException
     {
-        String cluster = System.getProperty(OWNERSHIP_TOKEN);
+        String cluster = getOwnershipToken(config);
         if (null == cluster || cluster.isEmpty())
-            throw exception(String.format(MISSING_SYSTEM_PROPERTY, 
OWNERSHIP_TOKEN));
+            throw exception(String.format(MISSING_PROPERTY, 
FILE_SYSTEM_CHECK_OWNERSHIP_TOKEN));
         return cluster;
     }
 
@@ -264,4 +273,46 @@ public class FileSystemOwnershipCheck implements 
StartupCheck
     {
         return new StartupException(StartupException.ERR_WRONG_DISK_STATE, 
ERROR_PREFIX + message);
     }
+
+    public boolean isEnabled(StartupChecksOptions options)
+    {
+        boolean enabledFromYaml = options.isEnabled(getStartupCheckType());
+        return 
CassandraRelevantProperties.FILE_SYSTEM_CHECK_ENABLE.getBoolean(enabledFromYaml);
+    }
+
+    public String getFsOwnershipFilename(Map<String, Object> config)
+    {
+        if 
(CassandraRelevantProperties.FILE_SYSTEM_CHECK_OWNERSHIP_FILENAME.isPresent())
+        {
+            logger.warn(String.format("Cassandra system property flag %s is 
deprecated and you should " +
+                                      "use startup check configuration in 
cassandra.yaml",
+                                      
CassandraRelevantProperties.FILE_SYSTEM_CHECK_OWNERSHIP_FILENAME.getKey()));
+            return 
CassandraRelevantProperties.FILE_SYSTEM_CHECK_OWNERSHIP_FILENAME.getString();
+        }
+        else
+        {
+            Object fsOwnershipFilename = config.get("ownership_filename");
+            return fsOwnershipFilename == null
+                   ? 
CassandraRelevantProperties.FILE_SYSTEM_CHECK_OWNERSHIP_FILENAME.getDefaultValue()
+                   : (String) fsOwnershipFilename;
+        }
+    }
+
+    public String getOwnershipToken(Map<String, Object> config)
+    {
+        if 
(CassandraRelevantProperties.FILE_SYSTEM_CHECK_OWNERSHIP_TOKEN.isPresent())
+        {
+            logger.warn(String.format("Cassandra system property flag %s is 
deprecated and you should " +
+                                      "use startup check configuration in 
cassandra.yaml",
+                                      
CassandraRelevantProperties.FILE_SYSTEM_CHECK_OWNERSHIP_TOKEN.getKey()));
+            return 
CassandraRelevantProperties.FILE_SYSTEM_CHECK_OWNERSHIP_TOKEN.getString();
+        }
+        else
+        {
+            Object ownershipToken = config.get("ownership_token");
+            return ownershipToken == null
+                   ? 
CassandraRelevantProperties.FILE_SYSTEM_CHECK_OWNERSHIP_TOKEN.getDefaultValue()
+                   : (String) ownershipToken;
+        }
+    }
 }
\ No newline at end of file
diff --git a/src/java/org/apache/cassandra/service/StartupCheck.java 
b/src/java/org/apache/cassandra/service/StartupCheck.java
index 649f13c..567120e 100644
--- a/src/java/org/apache/cassandra/service/StartupCheck.java
+++ b/src/java/org/apache/cassandra/service/StartupCheck.java
@@ -17,14 +17,16 @@
  */
 package org.apache.cassandra.service;
 
+import org.apache.cassandra.config.StartupChecksOptions;
 import org.apache.cassandra.exceptions.StartupException;
+import org.apache.cassandra.service.StartupChecks.StartupCheckType;
 
 /**
  * A test to determine if the system is in a valid state to start up.
  * Some implementations may not actually halt startup, but provide
  * information or advice on tuning and non-fatal environmental issues (e.g. 
like
  * checking for and warning about suboptimal JVM settings).
- * Other checks may indicate that they system is not in a correct state to be 
started.
+ * Other checks may indicate that the system is not in a correct state to be 
started.
  * Examples include missing or unaccessible data directories, unreadable 
sstables and
  * misconfiguration of cluster_name in cassandra.yaml.
  *
@@ -39,8 +41,18 @@ public interface StartupCheck
      * test should log a message regarding the reason for the failure and
      * ideally the steps required to remedy the problem.
      *
+     * @param startupChecksOptions all options from descriptor
      * @throws org.apache.cassandra.exceptions.StartupException if the test 
determines
      * that the environement or system is not in a safe state to startup
      */
-    void execute() throws StartupException;
+    void execute(StartupChecksOptions startupChecksOptions) throws 
StartupException;
+
+    /**
+     *
+     * @return type of this startup check for configuration retrieval
+     */
+    default StartupCheckType getStartupCheckType()
+    {
+        return StartupCheckType.non_configurable_check;
+    }
 }
diff --git a/src/java/org/apache/cassandra/service/StartupChecks.java 
b/src/java/org/apache/cassandra/service/StartupChecks.java
index 18f62b7..f67057d 100644
--- a/src/java/org/apache/cassandra/service/StartupChecks.java
+++ b/src/java/org/apache/cassandra/service/StartupChecks.java
@@ -33,6 +33,9 @@ import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterables;
 import org.apache.commons.lang3.StringUtils;
 
+
+import org.apache.cassandra.config.CassandraRelevantProperties;
+import org.apache.cassandra.config.StartupChecksOptions;
 import org.apache.cassandra.io.util.File;
 
 import org.slf4j.Logger;
@@ -85,6 +88,27 @@ import static 
org.apache.cassandra.utils.Clock.Global.currentTimeMillis;
  */
 public class StartupChecks
 {
+    public enum StartupCheckType
+    {
+        // non-configurable check is always enabled for execution
+        non_configurable_check,
+        filesystem_ownership(true),
+        dc,
+        rack;
+
+        public final boolean disabledByDefault;
+
+        StartupCheckType()
+        {
+            this(false);
+        }
+
+        StartupCheckType(boolean disabledByDefault)
+        {
+            this.disabledByDefault = disabledByDefault;
+        }
+    }
+
     private static final Logger logger = 
LoggerFactory.getLogger(StartupChecks.class);
     // List of checks to run before starting up. If any test reports failure, 
startup will be halted.
     private final List<StartupCheck> preFlightChecks = new ArrayList<>();
@@ -129,18 +153,23 @@ public class StartupChecks
      * Run the configured tests and return a report detailing the results.
      * @throws org.apache.cassandra.exceptions.StartupException if any test 
determines that the
      * system is not in an valid state to startup
+     * @param options options to pass to respective checks for their 
configration
      */
-    public void verify() throws StartupException
+    public void verify(StartupChecksOptions options) throws StartupException
     {
         for (StartupCheck test : preFlightChecks)
-            test.execute();
+            test.execute(options);
     }
 
     public static final StartupCheck checkJemalloc = new StartupCheck()
     {
-        public void execute()
+        @Override
+        public void execute(StartupChecksOptions options)
         {
-            String jemalloc = System.getProperty("cassandra.libjemalloc");
+            if (options.isDisabled(getStartupCheckType()))
+                return;
+
+            String jemalloc = 
CassandraRelevantProperties.LIBJEMALLOC.getString();
             if (jemalloc == null)
                 logger.warn("jemalloc shared library could not be preloaded to 
speed up memory allocations");
             else if ("-".equals(jemalloc))
@@ -150,14 +179,21 @@ public class StartupChecks
         }
     };
 
-    public static final StartupCheck checkLz4Native = () -> {
-        try
-        {
-            LZ4Factory.nativeInstance(); // make sure native loads
-        }
-        catch (AssertionError | LinkageError e)
+    public static final StartupCheck checkLz4Native = new StartupCheck()
+    {
+        @Override
+        public void execute(StartupChecksOptions options)
         {
-            logger.warn("lz4-java was unable to load native libraries; this 
will lower the performance of lz4 (network/sstables/etc.): {}", 
Throwables.getRootCause(e).getMessage());
+            if (options.isDisabled(getStartupCheckType()))
+                return;
+            try
+            {
+                LZ4Factory.nativeInstance(); // make sure native loads
+            }
+            catch (AssertionError | LinkageError e)
+            {
+                logger.warn("lz4-java was unable to load native libraries; 
this will lower the performance of lz4 (network/sstables/etc.): {}", 
Throwables.getRootCause(e).getMessage());
+            }
         }
     };
 
@@ -169,8 +205,12 @@ public class StartupChecks
          * We use this to ensure the system clock is at least somewhat correct 
at startup.
          */
         private static final long EARLIEST_LAUNCH_DATE = 1215820800000L;
-        public void execute() throws StartupException
+
+        @Override
+        public void execute(StartupChecksOptions options) throws 
StartupException
         {
+            if (options.isDisabled(getStartupCheckType()))
+                return;
             long now = currentTimeMillis();
             if (now < EARLIEST_LAUNCH_DATE)
                 throw new 
StartupException(StartupException.ERR_WRONG_MACHINE_STATE,
@@ -181,13 +221,16 @@ public class StartupChecks
 
     public static final StartupCheck checkJMXPorts = new StartupCheck()
     {
-        public void execute()
+        @Override
+        public void execute(StartupChecksOptions options)
         {
-            String jmxPort = System.getProperty("cassandra.jmx.remote.port");
+            if (options.isDisabled(getStartupCheckType()))
+                return;
+            String jmxPort = 
CassandraRelevantProperties.CASSANDRA_JMX_REMOTE_PORT.getString();
             if (jmxPort == null)
             {
                 logger.warn("JMX is not enabled to receive remote connections. 
Please see cassandra-env.sh for more info.");
-                jmxPort = System.getProperty("cassandra.jmx.local.port");
+                jmxPort = 
CassandraRelevantProperties.CASSANDRA_JMX_LOCAL_PORT.toString();
                 if (jmxPort == null)
                     logger.error("cassandra.jmx.local.port missing from 
cassandra-env.sh, unable to start local JMX service.");
             }
@@ -200,8 +243,11 @@ public class StartupChecks
 
     public static final StartupCheck checkJMXProperties = new StartupCheck()
     {
-        public void execute()
+        @Override
+        public void execute(StartupChecksOptions options)
         {
+            if (options.isDisabled(getStartupCheckType()))
+                return;
             if (COM_SUN_MANAGEMENT_JMXREMOTE_PORT.isPresent())
             {
                 logger.warn("Use of com.sun.management.jmxremote.port at 
startup is deprecated. " +
@@ -212,8 +258,11 @@ public class StartupChecks
 
     public static final StartupCheck inspectJvmOptions = new StartupCheck()
     {
-        public void execute()
+        @Override
+        public void execute(StartupChecksOptions options)
         {
+            if (options.isDisabled(getStartupCheckType()))
+                return;
             // log warnings for different kinds of sub-optimal JVMs.  tldr use 
64-bit Oracle >= 1.6u32
             if (!DatabaseDescriptor.hasLargeAddressSpace())
                 logger.warn("32bit JVM detected.  It is recommended to run 
Cassandra on a 64bit JVM for better performance.");
@@ -271,8 +320,11 @@ public class StartupChecks
 
     public static final StartupCheck checkNativeLibraryInitialization = new 
StartupCheck()
     {
-        public void execute() throws StartupException
+        @Override
+        public void execute(StartupChecksOptions options) throws 
StartupException
         {
+            if (options.isDisabled(getStartupCheckType()))
+                return;
             // Fail-fast if the native library could not be linked.
             if (!NativeLibrary.isAvailable())
                 throw new 
StartupException(StartupException.ERR_WRONG_MACHINE_STATE, "The native library 
could not be initialized properly. ");
@@ -281,8 +333,11 @@ public class StartupChecks
 
     public static final StartupCheck initSigarLibrary = new StartupCheck()
     {
-        public void execute()
+        @Override
+        public void execute(StartupChecksOptions options)
         {
+            if (options.isDisabled(getStartupCheckType()))
+                return;
             SigarLibrary.instance.warnIfRunningInDegradedMode();
         }
     };
@@ -323,9 +378,9 @@ public class StartupChecks
         }
 
         @Override
-        public void execute()
+        public void execute(StartupChecksOptions options)
         {
-            if (!FBUtilities.isLinux)
+            if (options.isDisabled(getStartupCheckType()) || 
!FBUtilities.isLinux)
                 return;
 
             String[] dataDirectories = 
DatabaseDescriptor.getRawConfig().data_file_directories;
@@ -397,9 +452,10 @@ public class StartupChecks
             return -1;
         }
 
-        public void execute()
+        @Override
+        public void execute(StartupChecksOptions options)
         {
-            if (!FBUtilities.isLinux)
+            if (options.isDisabled(getStartupCheckType()) || 
!FBUtilities.isLinux)
                 return;
 
             if (DatabaseDescriptor.getDiskAccessMode() == 
Config.DiskAccessMode.standard &&
@@ -414,39 +470,48 @@ public class StartupChecks
         }
     };
 
-    public static final StartupCheck checkDataDirs = () ->
+    public static final StartupCheck checkDataDirs = new StartupCheck()
     {
-        // check all directories(data, commitlog, saved cache) for existence 
and permission
-        Iterable<String> dirs = 
Iterables.concat(Arrays.asList(DatabaseDescriptor.getAllDataFileLocations()),
-                                                 
Arrays.asList(DatabaseDescriptor.getCommitLogLocation(),
-                                                               
DatabaseDescriptor.getSavedCachesLocation(),
-                                                               
DatabaseDescriptor.getHintsDirectory().absolutePath()));
-        for (String dataDir : dirs)
+        @Override
+        public void execute(StartupChecksOptions options) throws 
StartupException
         {
-            logger.debug("Checking directory {}", dataDir);
-            File dir = new File(dataDir);
-
-            // check that directories exist.
-            if (!dir.exists())
+            if (options.isDisabled(getStartupCheckType()))
+                return;
+            // check all directories(data, commitlog, saved cache) for 
existence and permission
+            Iterable<String> dirs = 
Iterables.concat(Arrays.asList(DatabaseDescriptor.getAllDataFileLocations()),
+                                                     
Arrays.asList(DatabaseDescriptor.getCommitLogLocation(),
+                                                                   
DatabaseDescriptor.getSavedCachesLocation(),
+                                                                   
DatabaseDescriptor.getHintsDirectory().absolutePath()));
+            for (String dataDir : dirs)
             {
-                logger.warn("Directory {} doesn't exist", dataDir);
-                // if they don't, failing their creation, stop cassandra.
-                if (!dir.tryCreateDirectories())
+                logger.debug("Checking directory {}", dataDir);
+                File dir = new File(dataDir);
+
+                // check that directories exist.
+                if (!dir.exists())
+                {
+                    logger.warn("Directory {} doesn't exist", dataDir);
+                    // if they don't, failing their creation, stop cassandra.
+                    if (!dir.tryCreateDirectories())
+                        throw new 
StartupException(StartupException.ERR_WRONG_DISK_STATE,
+                                                   "Has no permission to 
create directory "+ dataDir);
+                }
+
+                // if directories exist verify their permissions
+                if (!Directories.verifyFullPermissions(dir, dataDir))
                     throw new 
StartupException(StartupException.ERR_WRONG_DISK_STATE,
-                                               "Has no permission to create 
directory "+ dataDir);
+                                               "Insufficient permissions on 
directory " + dataDir);
             }
-
-            // if directories exist verify their permissions
-            if (!Directories.verifyFullPermissions(dir, dataDir))
-                throw new 
StartupException(StartupException.ERR_WRONG_DISK_STATE,
-                                           "Insufficient permissions on 
directory " + dataDir);
         }
     };
 
     public static final StartupCheck checkSSTablesFormat = new StartupCheck()
     {
-        public void execute() throws StartupException
+        @Override
+        public void execute(StartupChecksOptions options) throws 
StartupException
         {
+            if (options.isDisabled(getStartupCheckType()))
+                return;
             final Set<String> invalid = new HashSet<>();
             final Set<String> nonSSTablePaths = new HashSet<>();
             
nonSSTablePaths.add(FileUtils.getCanonicalPath(DatabaseDescriptor.getCommitLogLocation()));
@@ -509,8 +574,11 @@ public class StartupChecks
 
     public static final StartupCheck checkSystemKeyspaceState = new 
StartupCheck()
     {
-        public void execute() throws StartupException
+        @Override
+        public void execute(StartupChecksOptions options) throws 
StartupException
         {
+            if (options.isDisabled(getStartupCheckType()))
+                return;
             // check the system keyspace to keep user from shooting self in 
foot by changing partitioner, cluster name, etc.
             // we do a one-off scrub of the system keyspace first; we can't 
load the list of the rest of the keyspaces,
             // until system keyspace is opened.
@@ -531,9 +599,18 @@ public class StartupChecks
 
     public static final StartupCheck checkDatacenter = new StartupCheck()
     {
-        public void execute() throws StartupException
+        @Override
+        public void execute(StartupChecksOptions options) throws 
StartupException
         {
-            if (!Boolean.getBoolean("cassandra.ignore_dc"))
+            boolean enabled = options.isEnabled(getStartupCheckType());
+            if (CassandraRelevantProperties.IGNORE_DC.isPresent())
+            {
+                logger.warn(String.format("Cassandra system property flag %s 
is deprecated and you should " +
+                                          "use startup check configuration in 
cassandra.yaml",
+                                          
CassandraRelevantProperties.IGNORE_DC.getKey()));
+                enabled = 
!Boolean.getBoolean(CassandraRelevantProperties.IGNORE_DC.getKey());
+            }
+            if (enabled)
             {
                 String storedDc = SystemKeyspace.getDatacenter();
                 if (storedDc != null)
@@ -549,13 +626,28 @@ public class StartupChecks
                 }
             }
         }
+
+        @Override
+        public StartupCheckType getStartupCheckType()
+        {
+            return StartupCheckType.dc;
+        }
     };
 
     public static final StartupCheck checkRack = new StartupCheck()
     {
-        public void execute() throws StartupException
+        @Override
+        public void execute(StartupChecksOptions options) throws 
StartupException
         {
-            if (!Boolean.getBoolean("cassandra.ignore_rack"))
+            boolean enabled = options.isEnabled(getStartupCheckType());
+            if (CassandraRelevantProperties.IGNORE_RACK.isPresent())
+            {
+                logger.warn(String.format("Cassandra system property flag %s 
is deprecated and you should " +
+                                          "use startup check configuration in 
cassandra.yaml",
+                                          
CassandraRelevantProperties.IGNORE_RACK.getKey()));
+                enabled = 
!Boolean.getBoolean(CassandraRelevantProperties.IGNORE_RACK.getKey());
+            }
+            if (enabled)
             {
                 String storedRack = SystemKeyspace.getRack();
                 if (storedRack != null)
@@ -571,13 +663,25 @@ public class StartupChecks
                 }
             }
         }
+
+        @Override
+        public StartupCheckType getStartupCheckType()
+        {
+            return StartupCheckType.rack;
+        }
     };
 
-    public static final StartupCheck checkLegacyAuthTables = () ->
+    public static final StartupCheck checkLegacyAuthTables = new StartupCheck()
     {
-        Optional<String> errMsg = checkLegacyAuthTablesMessage();
-        if (errMsg.isPresent())
-            throw new StartupException(StartupException.ERR_WRONG_CONFIG, 
errMsg.get());
+        @Override
+        public void execute(StartupChecksOptions options) throws 
StartupException
+        {
+            if (options.isDisabled(getStartupCheckType()))
+                return;
+            Optional<String> errMsg = checkLegacyAuthTablesMessage();
+            if (errMsg.isPresent())
+                throw new StartupException(StartupException.ERR_WRONG_CONFIG, 
errMsg.get());
+        }
     };
 
     @VisibleForTesting
diff --git 
a/test/unit/org/apache/cassandra/config/DatabaseDescriptorRefTest.java 
b/test/unit/org/apache/cassandra/config/DatabaseDescriptorRefTest.java
index b586245..735664c 100644
--- a/test/unit/org/apache/cassandra/config/DatabaseDescriptorRefTest.java
+++ b/test/unit/org/apache/cassandra/config/DatabaseDescriptorRefTest.java
@@ -215,7 +215,8 @@ public class DatabaseDescriptorRefTest
     "org.apache.cassandra.config.SmallestDurationSeconds",
     "org.apache.cassandra.config.SmallestDurationMilliseconds",
     "org.apache.cassandra.config.SmallestDataStorageKibibytes",
-    "org.apache.cassandra.config.SmallestDataStorageMebibytes"
+    "org.apache.cassandra.config.SmallestDataStorageMebibytes",
+    "org.apache.cassandra.config.StartupChecksOptions",
     };
 
     static final Set<String> checkedClasses = new 
HashSet<>(Arrays.asList(validClasses));
diff --git a/test/unit/org/apache/cassandra/config/StartupCheckOptionsTest.java 
b/test/unit/org/apache/cassandra/config/StartupCheckOptionsTest.java
new file mode 100644
index 0000000..8be5f92
--- /dev/null
+++ b/test/unit/org/apache/cassandra/config/StartupCheckOptionsTest.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.config;
+
+import java.util.EnumMap;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.junit.Test;
+
+import org.apache.cassandra.service.StartupChecks.StartupCheckType;
+
+import static 
org.apache.cassandra.config.StartupChecksOptions.ENABLED_PROPERTY;
+import static 
org.apache.cassandra.service.StartupChecks.StartupCheckType.filesystem_ownership;
+import static 
org.apache.cassandra.service.StartupChecks.StartupCheckType.non_configurable_check;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class StartupCheckOptionsTest
+{
+    @Test
+    public void testStartupOptionsConfigApplication()
+    {
+        Map<StartupCheckType, Map<String, Object>> config = new 
EnumMap<StartupCheckType, Map<String, Object>>(StartupCheckType.class) {{
+            put(filesystem_ownership, new HashMap<String, Object>() {{
+                put(ENABLED_PROPERTY, true);
+                put("key", "value");
+            }});
+        }};
+
+        StartupChecksOptions options = new StartupChecksOptions(config);
+
+        assertTrue(Boolean.parseBoolean(options.getConfig(filesystem_ownership)
+                                               .get(ENABLED_PROPERTY)
+                                               .toString()));
+
+        assertEquals("value", 
options.getConfig(filesystem_ownership).get("key"));
+        options.set(filesystem_ownership, "key", "value2");
+        assertEquals("value2", 
options.getConfig(filesystem_ownership).get("key"));
+
+        assertTrue(options.isEnabled(filesystem_ownership));
+        options.disable(filesystem_ownership);
+        assertFalse(options.isEnabled(filesystem_ownership));
+        assertTrue(options.isDisabled(filesystem_ownership));
+    }
+
+    @Test
+    public void testNoOptions()
+    {
+        StartupChecksOptions options = new StartupChecksOptions();
+
+        assertTrue(options.isEnabled(non_configurable_check));
+
+        // disabling does not to anything on non-configurable check
+        options.disable(non_configurable_check);
+        assertTrue(options.isEnabled(non_configurable_check));
+
+        options.set(non_configurable_check, "key", "value");
+
+        // we can not put anything into non-configurable check
+        
assertFalse(options.getConfig(non_configurable_check).containsKey("key"));
+    }
+
+    @Test
+    public void testEmptyDisabledValues()
+    {
+        Map<StartupCheckType, Map<String, Object>> emptyConfig = new 
EnumMap<StartupCheckType, Map<String, Object>>(StartupCheckType.class) {{
+            put(filesystem_ownership, new HashMap<>());
+        }};
+
+        Map<StartupCheckType, Map<String, Object>> emptyEnabledConfig = new 
EnumMap<StartupCheckType, Map<String, Object>>(StartupCheckType.class) {{
+            put(filesystem_ownership, new HashMap<String, Object>() {{
+                put(ENABLED_PROPERTY, null);
+            }});
+        }};
+
+        // empty enabled property or enabled property with null value are 
still counted as enabled
+
+        StartupChecksOptions options1 = new StartupChecksOptions(emptyConfig);
+        assertTrue(options1.isDisabled(filesystem_ownership));
+
+        StartupChecksOptions options2 = new 
StartupChecksOptions(emptyEnabledConfig);
+        assertTrue(options2.isDisabled(filesystem_ownership));
+    }
+
+    @Test
+    public void testChecksDisabledByDefaultAreNotEnabled()
+    {
+        Map<StartupCheckType, Map<String, Object>> emptyConfig = new 
EnumMap<>(StartupCheckType.class);
+        StartupChecksOptions options = new StartupChecksOptions(emptyConfig);
+        assertTrue(options.isDisabled(filesystem_ownership));
+    }
+}
diff --git 
a/test/unit/org/apache/cassandra/service/AbstractFilesystemOwnershipCheckTest.java
 
b/test/unit/org/apache/cassandra/service/AbstractFilesystemOwnershipCheckTest.java
new file mode 100644
index 0000000..b70e514
--- /dev/null
+++ 
b/test/unit/org/apache/cassandra/service/AbstractFilesystemOwnershipCheckTest.java
@@ -0,0 +1,535 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.file.Files;
+import java.util.Arrays;
+import java.util.Properties;
+import java.util.Random;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+
+import org.junit.After;
+import org.junit.Assume;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import org.apache.cassandra.config.CassandraRelevantProperties;
+import org.apache.cassandra.config.StartupChecksOptions;
+import org.apache.cassandra.exceptions.StartupException;
+import org.apache.cassandra.io.util.File;
+
+import static 
org.apache.cassandra.service.FileSystemOwnershipCheck.DEFAULT_FS_OWNERSHIP_FILENAME;
+import static 
org.apache.cassandra.service.FileSystemOwnershipCheck.ERROR_PREFIX;
+import static 
org.apache.cassandra.service.FileSystemOwnershipCheck.INCONSISTENT_FILES_FOUND;
+import static 
org.apache.cassandra.service.FileSystemOwnershipCheck.INVALID_FILE_COUNT;
+import static 
org.apache.cassandra.service.FileSystemOwnershipCheck.INVALID_PROPERTY_VALUE;
+import static 
org.apache.cassandra.service.FileSystemOwnershipCheck.MISMATCHING_TOKEN;
+import static 
org.apache.cassandra.service.FileSystemOwnershipCheck.MISSING_PROPERTY;
+import static 
org.apache.cassandra.service.FileSystemOwnershipCheck.MULTIPLE_OWNERSHIP_FILES;
+import static 
org.apache.cassandra.service.FileSystemOwnershipCheck.NO_OWNERSHIP_FILE;
+import static 
org.apache.cassandra.service.FileSystemOwnershipCheck.READ_EXCEPTION;
+import static org.apache.cassandra.service.FileSystemOwnershipCheck.TOKEN;
+import static 
org.apache.cassandra.service.FileSystemOwnershipCheck.UNSUPPORTED_VERSION;
+import static org.apache.cassandra.service.FileSystemOwnershipCheck.VERSION;
+import static 
org.apache.cassandra.service.FileSystemOwnershipCheck.VOLUME_COUNT;
+import static 
org.apache.cassandra.service.StartupChecks.StartupCheckType.filesystem_ownership;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+@Ignore
+public abstract class AbstractFilesystemOwnershipCheckTest
+{
+    protected File tempDir;
+    protected String token;
+
+    protected StartupChecksOptions options = new StartupChecksOptions();
+
+    protected void setup()
+    {
+        cleanTempDir();
+        tempDir = new File(com.google.common.io.Files.createTempDir());
+        token = makeRandomString(10);
+        
System.clearProperty(CassandraRelevantProperties.FILE_SYSTEM_CHECK_OWNERSHIP_FILENAME.getKey());
+        
System.clearProperty(CassandraRelevantProperties.FILE_SYSTEM_CHECK_OWNERSHIP_TOKEN.getKey());
+        
System.clearProperty(CassandraRelevantProperties.FILE_SYSTEM_CHECK_ENABLE.getKey());
+    }
+
+    static File writeFile(File dir, String filename, Properties props) throws 
IOException
+    {
+        File tokenFile = new File(dir, filename); //checkstyle: permit this 
instantiation
+        assertTrue(tokenFile.createFileIfNotExists());
+        try (OutputStream os = Files.newOutputStream(tokenFile.toPath()))
+        {
+            props.store(os, "Test properties");
+        }
+        assertTrue(tokenFile.isReadable());
+        return tokenFile;
+    }
+
+    private static void executeAndFail(FileSystemOwnershipCheck checker,
+                                       StartupChecksOptions options,
+                                       String messageTemplate,
+                                       Object...messageArgs)
+    {
+        try
+        {
+            checker.execute(options);
+            fail("Expected an exception but none thrown");
+        } catch (StartupException e) {
+            String expected = ERROR_PREFIX + String.format(messageTemplate, 
messageArgs);
+            assertEquals(expected, e.getMessage());
+        }
+    }
+
+    private static Properties makeProperties(int version, int volumeCount, 
String token)
+    {
+        Properties props = new Properties();
+        props.setProperty(VERSION, Integer.toString(version));
+        props.setProperty(VOLUME_COUNT, Integer.toString(volumeCount));
+        props.setProperty(TOKEN, token);
+        return props;
+    }
+
+    private static File writeFile(File dir, int volumeCount, String token) 
throws IOException
+    {
+        return AbstractFilesystemOwnershipCheckTest.writeFile(dir, 
DEFAULT_FS_OWNERSHIP_FILENAME, 1, volumeCount, token);
+    }
+
+    private static File writeFile(File dir, final String filename, int 
version, int volumeCount, String token)
+    throws IOException
+    {
+        return writeFile(dir, filename, 
AbstractFilesystemOwnershipCheckTest.makeProperties(version, volumeCount, 
token));
+    }
+
+    private static File mkdirs(File parent, String path)
+    {
+        File childDir = new File(parent, path); //checkstyle: permit this 
instantiation
+        assertTrue(childDir.tryCreateDirectories());
+        assertTrue(childDir.exists());
+        return childDir;
+    }
+
+    private static FileSystemOwnershipCheck checker(Supplier<Iterable<String>> 
dirs)
+    {
+        return new FileSystemOwnershipCheck(dirs);
+    }
+
+    private static FileSystemOwnershipCheck checker(File...dirs)
+    {
+        return checker(() -> 
Arrays.stream(dirs).map(File::absolutePath).collect(Collectors.toList()));
+    }
+
+    private static FileSystemOwnershipCheck checker(String...dirs)
+    {
+        return checker(() -> Arrays.asList(dirs));
+    }
+
+    public static String makeRandomString(int length)
+    {
+        Random random = new Random();
+        char[] chars = new char[length];
+        for (int i = 0; i < length; ++i)
+            chars[i] = (char) ('a' + random.nextInt('z' - 'a' + 1));
+        return new String(chars);
+    }
+
+    protected void cleanTempDir()
+    {
+        if (tempDir != null && tempDir.exists())
+            delete(tempDir);
+    }
+
+    private void delete(File file)
+    {
+        file.trySetReadable(true);
+        file.trySetWritable(true);
+        file.trySetExecutable(true);
+        File[] files = file.tryList();
+        if (files != null)
+        {
+            for (File child : files)
+            {
+                delete(child);
+            }
+        }
+        file.delete();
+    }
+
+    @After
+    public void teardown() throws IOException
+    {
+        cleanTempDir();
+    }
+
+    // tests for enabling/disabling/configuring the check
+    @Test
+    public void skipCheckDisabledIfSystemPropertyIsEmpty() throws Exception
+    {
+        // no exceptions thrown from the supplier because the check is skipped
+        options.disable(filesystem_ownership);
+        
System.clearProperty(CassandraRelevantProperties.FILE_SYSTEM_CHECK_ENABLE.getKey());
+        AbstractFilesystemOwnershipCheckTest.checker(() -> { throw new 
RuntimeException("FAIL"); }).execute(options);
+    }
+
+    @Test
+    public void skipCheckDisabledIfSystemPropertyIsFalseButOptionsEnabled() 
throws Exception
+    {
+        // no exceptions thrown from the supplier because the check is skipped
+        options.enable(filesystem_ownership);
+        
System.setProperty(CassandraRelevantProperties.FILE_SYSTEM_CHECK_ENABLE.getKey(),
 "false");
+        AbstractFilesystemOwnershipCheckTest.checker(() -> { throw new 
RuntimeException("FAIL"); }).execute(options);
+    }
+
+    @Test
+    public void checkEnabledButClusterPropertyIsEmpty()
+    {
+        
System.setProperty(CassandraRelevantProperties.FILE_SYSTEM_CHECK_OWNERSHIP_TOKEN.getKey(),
 "");
+        
AbstractFilesystemOwnershipCheckTest.executeAndFail(AbstractFilesystemOwnershipCheckTest.checker(tempDir),
 options, MISSING_PROPERTY, 
CassandraRelevantProperties.FILE_SYSTEM_CHECK_OWNERSHIP_TOKEN.getKey());
+    }
+
+    @Test
+    public void checkEnabledButClusterPropertyIsUnset()
+    {
+        
Assume.assumeFalse(options.getConfig(filesystem_ownership).containsKey("ownership_token"));
+        
System.clearProperty(CassandraRelevantProperties.FILE_SYSTEM_CHECK_OWNERSHIP_TOKEN.getKey());
+        
AbstractFilesystemOwnershipCheckTest.executeAndFail(AbstractFilesystemOwnershipCheckTest.checker(tempDir),
 options, MISSING_PROPERTY, 
CassandraRelevantProperties.FILE_SYSTEM_CHECK_OWNERSHIP_TOKEN.getKey());
+    }
+
+    // tests for presence/absence of files in dirs
+    @Test
+    public void noRootDirectoryPresent() throws Exception
+    {
+        
AbstractFilesystemOwnershipCheckTest.executeAndFail(AbstractFilesystemOwnershipCheckTest.checker("/no/such/location"),
 options, NO_OWNERSHIP_FILE, "'/no/such/location'");
+    }
+
+    @Test
+    public void noDirectoryStructureOrTokenFilePresent() throws Exception
+    {
+        // The root directory exists, but is completely empty
+        
AbstractFilesystemOwnershipCheckTest.executeAndFail(AbstractFilesystemOwnershipCheckTest.checker(tempDir),
 options, NO_OWNERSHIP_FILE, quote(tempDir.absolutePath()));
+    }
+
+    @Test
+    public void directoryStructureButNoTokenFiles() throws Exception
+    {
+        File childDir = new File(tempDir, "cassandra/data"); //checkstyle: 
permit this instantiation
+        assertTrue(childDir.tryCreateDirectories());
+        assertTrue(childDir.exists());
+        
AbstractFilesystemOwnershipCheckTest.executeAndFail(AbstractFilesystemOwnershipCheckTest.checker(childDir),
 options, NO_OWNERSHIP_FILE, quote(childDir.absolutePath()));
+    }
+
+    @Test
+    public void multipleFilesFoundInSameTree() throws Exception
+    {
+        File leafDir = AbstractFilesystemOwnershipCheckTest.mkdirs(tempDir, 
"cassandra/data");
+        AbstractFilesystemOwnershipCheckTest.writeFile(leafDir, 1, token);
+        AbstractFilesystemOwnershipCheckTest.writeFile(leafDir.parent(), 1, 
token);
+        
AbstractFilesystemOwnershipCheckTest.executeAndFail(AbstractFilesystemOwnershipCheckTest.checker(leafDir),
 options, MULTIPLE_OWNERSHIP_FILES, leafDir);
+    }
+
+    @Test
+    public void singleValidFileInEachTree() throws Exception
+    {
+        // Happy path. Each target directory has exactly 1 token file in the
+        // dir above it, they all contain the supplied token and the correct
+        // count.
+        File[] leafDirs = new File[] { 
AbstractFilesystemOwnershipCheckTest.mkdirs(tempDir, "d1/data"),
+                                       
AbstractFilesystemOwnershipCheckTest.mkdirs(tempDir, "d2/commitlogs"),
+                                       
AbstractFilesystemOwnershipCheckTest.mkdirs(tempDir, "d3/hints") };
+        for (File dir : leafDirs)
+            AbstractFilesystemOwnershipCheckTest.writeFile(dir.parent(), 3, 
token);
+        
AbstractFilesystemOwnershipCheckTest.checker(leafDirs).execute(options);
+    }
+
+    @Test
+    public void multipleDirsSingleTree() throws Exception
+    {
+        // Happy path. Each target directory has exactly 1 token file in the
+        // dir above it (as they all share a single parent). Each contains
+        // the supplied token and the correct count (1 in this case).
+        File[] leafDirs = new File[] { 
AbstractFilesystemOwnershipCheckTest.mkdirs(tempDir, "d1/data"),
+                                       
AbstractFilesystemOwnershipCheckTest.mkdirs(tempDir, "d2/commitlogs"),
+                                       
AbstractFilesystemOwnershipCheckTest.mkdirs(tempDir, "d3/hints") };
+        AbstractFilesystemOwnershipCheckTest.writeFile(tempDir, 1, token);
+        
AbstractFilesystemOwnershipCheckTest.checker(leafDirs).execute(options);
+    }
+
+    @Test
+    public void someDirsContainNoFile() throws Exception
+    {
+        File leafDir1 = AbstractFilesystemOwnershipCheckTest.mkdirs(tempDir, 
"cassandra/data");
+        AbstractFilesystemOwnershipCheckTest.writeFile(leafDir1, 3, token);
+        File leafDir2 = AbstractFilesystemOwnershipCheckTest.mkdirs(tempDir, 
"cassandra/commitlogs");
+        AbstractFilesystemOwnershipCheckTest.writeFile(leafDir2, 3, token);
+        File leafDir3 = AbstractFilesystemOwnershipCheckTest.mkdirs(tempDir, 
"cassandra/hints");
+
+        
AbstractFilesystemOwnershipCheckTest.executeAndFail(AbstractFilesystemOwnershipCheckTest.checker(leafDir1,
 leafDir2, leafDir3),
+                                                            options,
+                                                            NO_OWNERSHIP_FILE,
+                                                            
quote(leafDir3.absolutePath()));
+    }
+
+    @Test
+    public void propsFileUnreadable() throws Exception
+    {
+        File leafDir = AbstractFilesystemOwnershipCheckTest.mkdirs(tempDir, 
"cassandra/data");
+        File tokenFile = 
AbstractFilesystemOwnershipCheckTest.writeFile(leafDir.parent(), 1, token);
+        assertTrue(tokenFile.trySetReadable(false));
+        
AbstractFilesystemOwnershipCheckTest.executeAndFail(AbstractFilesystemOwnershipCheckTest.checker(leafDir),
+                                                            options,
+                                                            READ_EXCEPTION,
+                                                            
leafDir.absolutePath());
+    }
+
+    @Test
+    public void propsFileIllegalContent() throws Exception
+    {
+        File leafDir = AbstractFilesystemOwnershipCheckTest.mkdirs(tempDir, 
"cassandra/data");
+        File propsFile = new File(leafDir, DEFAULT_FS_OWNERSHIP_FILENAME); 
//checkstyle: permit this instantiation
+        assertTrue(propsFile.createFileIfNotExists());
+        try (OutputStream os = Files.newOutputStream(propsFile.toPath()))
+        {
+            
os.write(AbstractFilesystemOwnershipCheckTest.makeRandomString(40).getBytes());
+        }
+        assertTrue(propsFile.isReadable());
+        
AbstractFilesystemOwnershipCheckTest.executeAndFail(AbstractFilesystemOwnershipCheckTest.checker(leafDir),
+                                                            options,
+                                                            
String.format(INVALID_PROPERTY_VALUE, VERSION),
+                                                            
leafDir.absolutePath());
+    }
+
+    @Test
+    public void propsParentDirUnreadable() throws Exception
+    {
+        // The props file itself is readable, but its dir is not
+        File leafDir = AbstractFilesystemOwnershipCheckTest.mkdirs(tempDir, 
"cassandra/data");
+        AbstractFilesystemOwnershipCheckTest.writeFile(leafDir, 1, token);
+        assertTrue(leafDir.trySetReadable(false));
+        AbstractFilesystemOwnershipCheckTest.checker(leafDir).execute(options);
+    }
+
+    @Test
+    public void propsParentDirUntraversable() throws Exception
+    {
+        // top level dir can't be listed, so no files are found
+        File leafDir = AbstractFilesystemOwnershipCheckTest.mkdirs(tempDir, 
"cassandra/data");
+        AbstractFilesystemOwnershipCheckTest.writeFile(leafDir.parent(), 1, 
token);
+        assertTrue(tempDir.trySetExecutable(false));
+        
AbstractFilesystemOwnershipCheckTest.executeAndFail(AbstractFilesystemOwnershipCheckTest.checker(leafDir),
+                                                            options,
+                                                            NO_OWNERSHIP_FILE,
+                                                            
quote(leafDir.absolutePath()));
+    }
+
+    @Test
+    public void overrideFilename() throws Exception
+    {
+        File leafDir = AbstractFilesystemOwnershipCheckTest.mkdirs(tempDir, 
"cassandra/data");
+        writeFile(leafDir.parent(), "other_file", 
AbstractFilesystemOwnershipCheckTest.makeProperties(1, 1, token));
+        
AbstractFilesystemOwnershipCheckTest.executeAndFail(AbstractFilesystemOwnershipCheckTest.checker(leafDir),
 options, NO_OWNERSHIP_FILE, quote(leafDir.absolutePath()));
+        
System.setProperty(CassandraRelevantProperties.FILE_SYSTEM_CHECK_OWNERSHIP_FILENAME.getKey(),
 "other_file");
+        AbstractFilesystemOwnershipCheckTest.checker(leafDir).execute(options);
+    }
+
+    // check consistency between discovered files
+    @Test
+    public void differentTokensFoundInTrees() throws Exception
+    {
+        File file1 = 
AbstractFilesystemOwnershipCheckTest.writeFile(AbstractFilesystemOwnershipCheckTest.mkdirs(tempDir,
 "d1/data"), 3, token);
+        File file2 = 
AbstractFilesystemOwnershipCheckTest.writeFile(AbstractFilesystemOwnershipCheckTest.mkdirs(tempDir,
 "d2/commitlogs"), 3, token);
+        File file3 = 
AbstractFilesystemOwnershipCheckTest.writeFile(AbstractFilesystemOwnershipCheckTest.mkdirs(tempDir,
 "d3/hints"), 3, "mismatchingtoken");
+        String errorSuffix = String.format("['%s', '%s'], ['%s']",
+                                           file1.absolutePath(),
+                                           file2.absolutePath(),
+                                           file3.absolutePath());
+
+        
AbstractFilesystemOwnershipCheckTest.executeAndFail(AbstractFilesystemOwnershipCheckTest.checker(file1.parent(),
 file2.parent(), file3.parent()),
+                                                            options,
+                                                            
INCONSISTENT_FILES_FOUND,
+                                                            errorSuffix);
+    }
+
+    @Test
+    public void differentExpectedCountsFoundInTrees() throws Exception
+    {
+        File file1 = 
AbstractFilesystemOwnershipCheckTest.writeFile(AbstractFilesystemOwnershipCheckTest.mkdirs(tempDir,
 "d1/data"), 1, token);
+        File file2 = 
AbstractFilesystemOwnershipCheckTest.writeFile(AbstractFilesystemOwnershipCheckTest.mkdirs(tempDir,
 "d2/commitlogs"), 2, token);
+        File file3 = 
AbstractFilesystemOwnershipCheckTest.writeFile(AbstractFilesystemOwnershipCheckTest.mkdirs(tempDir,
 "d3/hints"), 3, "mismatchingtoken");
+        String errorSuffix = String.format("['%s'], ['%s'], ['%s']",
+                                           file1.absolutePath(),
+                                           file2.absolutePath(),
+                                           file3.absolutePath());
+        
AbstractFilesystemOwnershipCheckTest.executeAndFail(AbstractFilesystemOwnershipCheckTest.checker(file1.parent(),
 file2.parent(), file3.parent()),
+                                                            options,
+                                                            
INCONSISTENT_FILES_FOUND,
+                                                            errorSuffix);
+    }
+
+    // tests on property values in discovered files
+    @Test
+    public void emptyPropertiesFile() throws Exception
+    {
+        File leafDir = AbstractFilesystemOwnershipCheckTest.mkdirs(tempDir, 
"cassandra/data");
+        writeFile(leafDir.parent(), DEFAULT_FS_OWNERSHIP_FILENAME, new 
Properties());
+        
AbstractFilesystemOwnershipCheckTest.executeAndFail(AbstractFilesystemOwnershipCheckTest.checker(leafDir),
+                                                            options,
+                                                            
String.format(INVALID_PROPERTY_VALUE, VERSION),
+                                                            
leafDir.parent().toPath().resolve(DEFAULT_FS_OWNERSHIP_FILENAME));
+    }
+
+    @Test
+    public void missingVersionProp() throws Exception
+    {
+        Properties p = new Properties();
+        p.setProperty(VOLUME_COUNT, "1");
+        p.setProperty(TOKEN, "foo");
+        File leafDir = AbstractFilesystemOwnershipCheckTest.mkdirs(tempDir, 
"cassandra/data");
+        writeFile(leafDir.parent(), DEFAULT_FS_OWNERSHIP_FILENAME, p);
+        
AbstractFilesystemOwnershipCheckTest.executeAndFail(AbstractFilesystemOwnershipCheckTest.checker(leafDir),
+                                                            options,
+                                                            
String.format(INVALID_PROPERTY_VALUE, VERSION),
+                                                            
leafDir.parent().toPath().resolve(DEFAULT_FS_OWNERSHIP_FILENAME));
+    }
+
+    @Test
+    public void nonNumericVersionProp() throws Exception
+    {
+        Properties p = new Properties();
+        p.setProperty(VERSION, "abc");
+        File leafDir = AbstractFilesystemOwnershipCheckTest.mkdirs(tempDir, 
"cassandra/data");
+        writeFile(leafDir.parent(), DEFAULT_FS_OWNERSHIP_FILENAME, p);
+        
AbstractFilesystemOwnershipCheckTest.executeAndFail(AbstractFilesystemOwnershipCheckTest.checker(leafDir),
+                                                            options,
+                                                            
String.format(INVALID_PROPERTY_VALUE, VERSION),
+                                                            
leafDir.parent().toPath().resolve(DEFAULT_FS_OWNERSHIP_FILENAME));
+    }
+
+    @Test
+    public void unsupportedVersionProp() throws Exception
+    {
+        Properties p = new Properties();
+        p.setProperty(VERSION, "99");
+        File leafDir = AbstractFilesystemOwnershipCheckTest.mkdirs(tempDir, 
"cassandra/data");
+        writeFile(leafDir.parent(), DEFAULT_FS_OWNERSHIP_FILENAME, p);
+        
AbstractFilesystemOwnershipCheckTest.executeAndFail(AbstractFilesystemOwnershipCheckTest.checker(leafDir),
+                                                            options,
+                                                            
String.format(UNSUPPORTED_VERSION, "99"),
+                                                            
leafDir.parent().toPath().resolve(DEFAULT_FS_OWNERSHIP_FILENAME));
+    }
+
+    @Test
+    public void missingVolumeCountProp() throws Exception
+    {
+        Properties p = new Properties();
+        p.setProperty(VERSION, "1");
+        p.setProperty(TOKEN, token);
+        File leafDir = AbstractFilesystemOwnershipCheckTest.mkdirs(tempDir, 
"cassandra/data");
+        writeFile(leafDir.parent(), DEFAULT_FS_OWNERSHIP_FILENAME, p);
+        
AbstractFilesystemOwnershipCheckTest.executeAndFail(AbstractFilesystemOwnershipCheckTest.checker(leafDir),
+                                                            options,
+                                                            
String.format(INVALID_PROPERTY_VALUE, VOLUME_COUNT),
+                                                            
leafDir.parent().toPath().resolve(DEFAULT_FS_OWNERSHIP_FILENAME));
+    }
+
+    @Test
+    public void nonNumericVolumeCountProp() throws Exception
+    {
+        Properties p = new Properties();
+        p.setProperty(VERSION, "1");
+        p.setProperty(VOLUME_COUNT, "bar");
+        p.setProperty(TOKEN, token);
+        File leafDir = AbstractFilesystemOwnershipCheckTest.mkdirs(tempDir, 
"cassandra/data");
+        writeFile(leafDir.parent(), DEFAULT_FS_OWNERSHIP_FILENAME, p);
+        
AbstractFilesystemOwnershipCheckTest.executeAndFail(AbstractFilesystemOwnershipCheckTest.checker(leafDir),
+                                                            options,
+                                                            
String.format(INVALID_PROPERTY_VALUE, VOLUME_COUNT),
+                                                            
leafDir.parent().toPath().resolve(DEFAULT_FS_OWNERSHIP_FILENAME));
+    }
+
+    @Test
+    public void missingTokenProp() throws Exception
+    {
+        Properties p = new Properties();
+        p.setProperty(VERSION, "1");
+        p.setProperty(VOLUME_COUNT, "1");
+        File leafDir = AbstractFilesystemOwnershipCheckTest.mkdirs(tempDir, 
"cassandra/data");
+        writeFile(leafDir.parent(), DEFAULT_FS_OWNERSHIP_FILENAME, p);
+        
AbstractFilesystemOwnershipCheckTest.executeAndFail(AbstractFilesystemOwnershipCheckTest.checker(leafDir),
+                                                            options,
+                                                            
String.format(INVALID_PROPERTY_VALUE, TOKEN),
+                                                            
leafDir.parent().toPath().resolve(DEFAULT_FS_OWNERSHIP_FILENAME));
+    }
+
+    @Test
+    public void emptyTokenProp() throws Exception
+    {
+        File leafDir = AbstractFilesystemOwnershipCheckTest.mkdirs(tempDir, 
"cassandra/data");
+        AbstractFilesystemOwnershipCheckTest.writeFile(leafDir.parent(), 1, 
"");
+        
AbstractFilesystemOwnershipCheckTest.executeAndFail(AbstractFilesystemOwnershipCheckTest.checker(leafDir),
+                                                            options,
+                                                            
String.format(INVALID_PROPERTY_VALUE, TOKEN),
+                                                            
leafDir.parent().toPath().resolve(DEFAULT_FS_OWNERSHIP_FILENAME));
+    }
+
+    @Test
+    public void mismatchingTokenProp() throws Exception
+    {
+        // Ownership token file exists in parent, but content doesn't match 
property
+        File leafDir = AbstractFilesystemOwnershipCheckTest.mkdirs(tempDir, 
"cassandra/data");
+        AbstractFilesystemOwnershipCheckTest.writeFile(leafDir.parent(), 1, 
AbstractFilesystemOwnershipCheckTest.makeRandomString(15));
+        
AbstractFilesystemOwnershipCheckTest.executeAndFail(AbstractFilesystemOwnershipCheckTest.checker(leafDir),
+                                                            options,
+                                                            MISMATCHING_TOKEN,
+                                                            
leafDir.parent().toPath().resolve(DEFAULT_FS_OWNERSHIP_FILENAME));
+    }
+
+    // Validate volume_count prop values match number of files found
+    @Test
+    public void expectedVolumeCountMoreThanActual() throws Exception
+    {
+        // The files on disk indicate that we should expect 2 ownership files,
+        // but we only read 1, implying a disk mount is missing
+        File[] leafDirs = new File[] { 
AbstractFilesystemOwnershipCheckTest.mkdirs(tempDir, "d1/data"),
+                                       
AbstractFilesystemOwnershipCheckTest.mkdirs(tempDir, "d2/commitlogs"),
+                                       
AbstractFilesystemOwnershipCheckTest.mkdirs(tempDir, "d3/hints") };
+        AbstractFilesystemOwnershipCheckTest.writeFile(tempDir, 2, token);
+        
AbstractFilesystemOwnershipCheckTest.executeAndFail(AbstractFilesystemOwnershipCheckTest.checker(leafDirs),
 options, INVALID_FILE_COUNT);
+    }
+
+    @Test
+    public void expectedVolumeCountLessThanActual() throws Exception
+    {
+        // The files on disk indicate that we should expect 1 ownership file,
+        // but we read 2, implying a extra unexpected disk mount is mounted
+        File leafDir1 = AbstractFilesystemOwnershipCheckTest.mkdirs(tempDir, 
"d1/data");
+        AbstractFilesystemOwnershipCheckTest.writeFile(leafDir1, 1, token);
+        File leafDir2 = AbstractFilesystemOwnershipCheckTest.mkdirs(tempDir, 
"d2/commitlogs");
+        AbstractFilesystemOwnershipCheckTest.writeFile(leafDir2, 1, token);
+        
AbstractFilesystemOwnershipCheckTest.executeAndFail(AbstractFilesystemOwnershipCheckTest.checker(leafDir1,
 leafDir2), options, INVALID_FILE_COUNT);
+    }
+
+    private String quote(String toQuote)
+    {
+        return String.format("'%s'", toQuote);
+    }
+}
diff --git 
a/test/unit/org/apache/cassandra/service/FileSystemOwnershipCheckTest.java 
b/test/unit/org/apache/cassandra/service/FileSystemOwnershipCheckTest.java
deleted file mode 100644
index 22f7268..0000000
--- a/test/unit/org/apache/cassandra/service/FileSystemOwnershipCheckTest.java
+++ /dev/null
@@ -1,490 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.service;
-
-import java.io.IOException;
-import java.io.OutputStream;
-import java.nio.file.Files;
-import java.util.Arrays;
-import java.util.Properties;
-import java.util.Random;
-import java.util.function.Supplier;
-import java.util.stream.Collectors;
-
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-import org.apache.cassandra.exceptions.StartupException;
-import org.apache.cassandra.io.util.File;
-
-import static org.apache.cassandra.service.FileSystemOwnershipCheck.*;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-public class FileSystemOwnershipCheckTest
-{
-    private File tempDir;
-    private String token;
-
-    @Before
-    public void setup() throws IOException
-    {
-        cleanTempDir();
-        tempDir = new File(com.google.common.io.Files.createTempDir());
-        token = makeRandomString(10);
-
-        System.setProperty(OWNERSHIP_TOKEN, token);
-        System.setProperty(ENABLE_FS_OWNERSHIP_CHECK_PROPERTY, "true");
-        System.clearProperty(FS_OWNERSHIP_FILENAME_PROPERTY);
-    }
-
-    @After
-    public void teardown() throws IOException
-    {
-        cleanTempDir();
-    }
-
-    private void cleanTempDir()
-    {
-        if (tempDir != null && tempDir.exists())
-            delete(tempDir);
-    }
-
-    private void delete(File file)
-    {
-        file.trySetReadable(true);
-        file.trySetWritable(true);
-        file.trySetExecutable(true);
-        File[] files = file.tryList();
-        if (files != null)
-        {
-            for (File child : files)
-            {
-                delete(child);
-            }
-        }
-        file.delete();
-    }
-
-    // tests for enabling/disabling/configuring the check
-    @Test
-    public void skipCheckIfDisabled() throws Exception
-    {
-        // no exceptions thrown from the supplier because the check is skipped
-        System.clearProperty(ENABLE_FS_OWNERSHIP_CHECK_PROPERTY);
-        checker(() -> { throw new RuntimeException("FAIL"); }).execute();
-    }
-
-    @Test
-    public void checkEnabledButClusterPropertyIsEmpty()
-    {
-        System.setProperty(OWNERSHIP_TOKEN, "");
-        executeAndFail(checker(tempDir), MISSING_SYSTEM_PROPERTY, 
OWNERSHIP_TOKEN);
-    }
-
-    @Test
-    public void checkEnabledButClusterPropertyIsUnset()
-    {
-        System.clearProperty(OWNERSHIP_TOKEN);
-        executeAndFail(checker(tempDir), MISSING_SYSTEM_PROPERTY, 
OWNERSHIP_TOKEN);
-    }
-
-    // tests for presence/absence of files in dirs
-    @Test
-    public void noRootDirectoryPresent() throws Exception
-    {
-        executeAndFail(checker("/no/such/location"), NO_OWNERSHIP_FILE, 
"'/no/such/location'");
-    }
-
-    @Test
-    public void noDirectoryStructureOrTokenFilePresent() throws Exception
-    {
-        // The root directory exists, but is completely empty
-        executeAndFail(checker(tempDir), NO_OWNERSHIP_FILE, 
quote(tempDir.absolutePath()));
-    }
-
-    @Test
-    public void directoryStructureButNoTokenFiles() throws Exception
-    {
-        File childDir = new File(tempDir, "cassandra/data"); //checkstyle: 
permit this instantiation
-        assertTrue(childDir.tryCreateDirectories());
-        assertTrue(childDir.exists());
-        executeAndFail(checker(childDir), NO_OWNERSHIP_FILE, 
quote(childDir.absolutePath()));
-    }
-
-    @Test
-    public void multipleFilesFoundInSameTree() throws Exception
-    {
-        File leafDir = mkdirs(tempDir, "cassandra/data");
-        writeFile(leafDir, 1, token);
-        writeFile(leafDir.parent(), 1, token);
-        executeAndFail(checker(leafDir), MULTIPLE_OWNERSHIP_FILES, leafDir);
-    }
-
-    @Test
-    public void singleValidFileInEachTree() throws Exception
-    {
-        // Happy path. Each target directory has exactly 1 token file in the
-        // dir above it, they all contain the supplied token and the correct
-        // count.
-        File[] leafDirs = new File[] { mkdirs(tempDir, "d1/data"),
-                                       mkdirs(tempDir, "d2/commitlogs"),
-                                       mkdirs(tempDir, "d3/hints") };
-        for (File dir : leafDirs)
-            writeFile(dir.parent(), 3, token);
-        checker(leafDirs).execute();
-    }
-
-    @Test
-    public void multipleDirsSingleTree() throws Exception
-    {
-        // Happy path. Each target directory has exactly 1 token file in the
-        // dir above it (as they all share a single parent). Each contains
-        // the supplied token and the correct count (1 in this case).
-        File[] leafDirs = new File[] { mkdirs(tempDir, "d1/data"),
-                                       mkdirs(tempDir, "d2/commitlogs"),
-                                       mkdirs(tempDir, "d3/hints") };
-        writeFile(tempDir, 1, token);
-        checker(leafDirs).execute();
-    }
-
-    @Test
-    public void someDirsContainNoFile() throws Exception
-    {
-        File leafDir1 = mkdirs(tempDir, "cassandra/data");
-        writeFile(leafDir1, 3, token);
-        File leafDir2 = mkdirs(tempDir, "cassandra/commitlogs");
-        writeFile(leafDir2, 3, token);
-        File leafDir3 = mkdirs(tempDir, "cassandra/hints");
-
-        executeAndFail(checker(leafDir1, leafDir2, leafDir3),
-                       NO_OWNERSHIP_FILE,
-                       quote(leafDir3.absolutePath()));
-    }
-
-    @Test
-    public void propsFileUnreadable() throws Exception
-    {
-        File leafDir = mkdirs(tempDir, "cassandra/data");
-        File tokenFile = writeFile(leafDir.parent(), 1, token);
-        assertTrue(tokenFile.trySetReadable(false));
-        executeAndFail(checker(leafDir),
-                       READ_EXCEPTION,
-                       leafDir.absolutePath());
-    }
-
-    @Test
-    public void propsFileIllegalContent() throws Exception
-    {
-        File leafDir = mkdirs(tempDir, "cassandra/data");
-        File propsFile = new File(leafDir, DEFAULT_FS_OWNERSHIP_FILENAME); 
//checkstyle: permit this instantiation
-        assertTrue(propsFile.createFileIfNotExists());
-        try (OutputStream os = Files.newOutputStream(propsFile.toPath()))
-        {
-            os.write(makeRandomString(40).getBytes());
-        }
-        assertTrue(propsFile.isReadable());
-        executeAndFail(checker(leafDir),
-                       String.format(INVALID_PROPERTY_VALUE, VERSION),
-                       leafDir.absolutePath());
-    }
-
-    @Test
-    public void propsParentDirUnreadable() throws Exception
-    {
-        // The props file itself is readable, but its dir is not
-        File leafDir = mkdirs(tempDir, "cassandra/data");
-        writeFile(leafDir, 1, token);
-        assertTrue(leafDir.trySetReadable(false));
-        checker(leafDir).execute();
-    }
-
-    @Test
-    public void propsParentDirUntraversable() throws Exception
-    {
-        // top level dir can't be listed, so no files are found
-        File leafDir = mkdirs(tempDir, "cassandra/data");
-        writeFile(leafDir.parent(), 1, token);
-        assertTrue(tempDir.trySetExecutable(false));
-        executeAndFail(checker(leafDir),
-                       NO_OWNERSHIP_FILE,
-                       quote(leafDir.absolutePath()));
-    }
-
-    @Test
-    public void overrideFilename() throws Exception
-    {
-        File leafDir = mkdirs(tempDir, "cassandra/data");
-        writeFile(leafDir.parent(), "other_file", makeProperties(1, 1, token));
-        executeAndFail(checker(leafDir), NO_OWNERSHIP_FILE, 
quote(leafDir.absolutePath()));
-        System.setProperty(FS_OWNERSHIP_FILENAME_PROPERTY, "other_file");
-        checker(leafDir).execute();
-    }
-
-    // check consistency between discovered files
-    @Test
-    public void differentTokensFoundInTrees() throws Exception
-    {
-        File file1 = writeFile(mkdirs(tempDir, "d1/data"), 3, token);
-        File file2 = writeFile(mkdirs(tempDir, "d2/commitlogs"), 3, token);
-        File file3 = writeFile(mkdirs(tempDir, "d3/hints"), 3, 
"mismatchingtoken");
-        String errorSuffix = String.format("['%s', '%s'], ['%s']",
-                                           file1.absolutePath(),
-                                           file2.absolutePath(),
-                                           file3.absolutePath());
-
-        executeAndFail(checker(file1.parent(), file2.parent(), file3.parent()),
-                       INCONSISTENT_FILES_FOUND,
-                       errorSuffix);
-    }
-
-    @Test
-    public void differentExpectedCountsFoundInTrees() throws Exception
-    {
-        File file1 = writeFile(mkdirs(tempDir, "d1/data"), 1, token);
-        File file2 = writeFile(mkdirs(tempDir, "d2/commitlogs"), 2, token);
-        File file3 = writeFile(mkdirs(tempDir, "d3/hints"), 3, 
"mismatchingtoken");
-        String errorSuffix = String.format("['%s'], ['%s'], ['%s']",
-                                           file1.absolutePath(),
-                                           file2.absolutePath(),
-                                           file3.absolutePath());
-        executeAndFail(checker(file1.parent(), file2.parent(), file3.parent()),
-                       INCONSISTENT_FILES_FOUND,
-                       errorSuffix);
-    }
-
-    // tests on property values in discovered files
-    @Test
-    public void emptyPropertiesFile() throws Exception
-    {
-        File leafDir = mkdirs(tempDir, "cassandra/data");
-        writeFile(leafDir.parent(), DEFAULT_FS_OWNERSHIP_FILENAME, new 
Properties());
-        executeAndFail(checker(leafDir),
-                       String.format(INVALID_PROPERTY_VALUE, VERSION),
-                       
leafDir.parent().toPath().resolve(DEFAULT_FS_OWNERSHIP_FILENAME));
-    }
-
-    @Test
-    public void missingVersionProp() throws Exception
-    {
-        Properties p = new Properties();
-        p.setProperty(VOLUME_COUNT, "1");
-        p.setProperty(TOKEN, "foo");
-        File leafDir = mkdirs(tempDir, "cassandra/data");
-        writeFile(leafDir.parent(), DEFAULT_FS_OWNERSHIP_FILENAME, p);
-        executeAndFail(checker(leafDir),
-                       String.format(INVALID_PROPERTY_VALUE, VERSION),
-                       
leafDir.parent().toPath().resolve(DEFAULT_FS_OWNERSHIP_FILENAME));
-    }
-
-    @Test
-    public void nonNumericVersionProp() throws Exception
-    {
-        Properties p = new Properties();
-        p.setProperty(VERSION, "abc");
-        File leafDir = mkdirs(tempDir, "cassandra/data");
-        writeFile(leafDir.parent(), DEFAULT_FS_OWNERSHIP_FILENAME, p);
-        executeAndFail(checker(leafDir),
-                       String.format(INVALID_PROPERTY_VALUE, VERSION),
-                       
leafDir.parent().toPath().resolve(DEFAULT_FS_OWNERSHIP_FILENAME));
-    }
-
-    @Test
-    public void unsupportedVersionProp() throws Exception
-    {
-        Properties p = new Properties();
-        p.setProperty(VERSION, "99");
-        File leafDir = mkdirs(tempDir, "cassandra/data");
-        writeFile(leafDir.parent(), DEFAULT_FS_OWNERSHIP_FILENAME, p);
-        executeAndFail(checker(leafDir),
-                       String.format(UNSUPPORTED_VERSION, "99"),
-                       
leafDir.parent().toPath().resolve(DEFAULT_FS_OWNERSHIP_FILENAME));
-    }
-
-    @Test
-    public void missingVolumeCountProp() throws Exception
-    {
-        Properties p = new Properties();
-        p.setProperty(VERSION, "1");
-        p.setProperty(TOKEN, token);
-        File leafDir = mkdirs(tempDir, "cassandra/data");
-        writeFile(leafDir.parent(), DEFAULT_FS_OWNERSHIP_FILENAME, p);
-        executeAndFail(checker(leafDir),
-                       String.format(INVALID_PROPERTY_VALUE, VOLUME_COUNT),
-                       
leafDir.parent().toPath().resolve(DEFAULT_FS_OWNERSHIP_FILENAME));
-    }
-
-    @Test
-    public void nonNumericVolumeCountProp() throws Exception
-    {
-        Properties p = new Properties();
-        p.setProperty(VERSION, "1");
-        p.setProperty(VOLUME_COUNT, "bar");
-        p.setProperty(TOKEN, token);
-        File leafDir = mkdirs(tempDir, "cassandra/data");
-        writeFile(leafDir.parent(), DEFAULT_FS_OWNERSHIP_FILENAME, p);
-        executeAndFail(checker(leafDir),
-                       String.format(INVALID_PROPERTY_VALUE, VOLUME_COUNT),
-                       
leafDir.parent().toPath().resolve(DEFAULT_FS_OWNERSHIP_FILENAME));
-    }
-
-    @Test
-    public void missingTokenProp() throws Exception
-    {
-        Properties p = new Properties();
-        p.setProperty(VERSION, "1");
-        p.setProperty(VOLUME_COUNT, "1");
-        File leafDir = mkdirs(tempDir, "cassandra/data");
-        writeFile(leafDir.parent(), DEFAULT_FS_OWNERSHIP_FILENAME, p);
-        executeAndFail(checker(leafDir),
-                       String.format(INVALID_PROPERTY_VALUE, TOKEN),
-                       
leafDir.parent().toPath().resolve(DEFAULT_FS_OWNERSHIP_FILENAME));
-    }
-
-    @Test
-    public void emptyTokenProp() throws Exception
-    {
-        File leafDir = mkdirs(tempDir, "cassandra/data");
-        writeFile(leafDir.parent(), 1, "");
-        executeAndFail(checker(leafDir),
-                       String.format(INVALID_PROPERTY_VALUE, TOKEN),
-                       
leafDir.parent().toPath().resolve(DEFAULT_FS_OWNERSHIP_FILENAME));
-    }
-
-    @Test
-    public void mismatchingTokenProp() throws Exception
-    {
-        // Ownership token file exists in parent, but content doesn't match 
property
-        File leafDir = mkdirs(tempDir, "cassandra/data");
-        writeFile(leafDir.parent(), 1, makeRandomString(15));
-        executeAndFail(checker(leafDir),
-                       MISMATCHING_TOKEN,
-                       
leafDir.parent().toPath().resolve(DEFAULT_FS_OWNERSHIP_FILENAME));
-    }
-
-
-    // Validate volume_count prop values match number of files found
-    @Test
-    public void expectedVolumeCountMoreThanActual() throws Exception
-    {
-        // The files on disk indicate that we should expect 2 ownership files,
-        // but we only read 1, implying a disk mount is missing
-        File[] leafDirs = new File[] { mkdirs(tempDir, "d1/data"),
-                                       mkdirs(tempDir, "d2/commitlogs"),
-                                       mkdirs(tempDir, "d3/hints") };
-        writeFile(tempDir, 2, token);
-        executeAndFail(checker(leafDirs), INVALID_FILE_COUNT);
-    }
-
-    @Test
-    public void expectedVolumeCountLessThanActual() throws Exception
-    {
-        // The files on disk indicate that we should expect 1 ownership file,
-        // but we read 2, implying a extra unexpected disk mount is mounted
-        File leafDir1 = mkdirs(tempDir, "d1/data");
-        writeFile(leafDir1, 1, token);
-        File leafDir2 = mkdirs(tempDir, "d2/commitlogs");
-        writeFile(leafDir2, 1, token);
-        executeAndFail(checker(leafDir1, leafDir2), INVALID_FILE_COUNT);
-    }
-
-    private static void executeAndFail(FileSystemOwnershipCheck checker, 
String messageTemplate, Object...messageArgs)
-    {
-        try
-        {
-            checker.execute();
-            fail("Expected an exception but none thrown");
-        } catch (StartupException e) {
-            String expected = ERROR_PREFIX + String.format(messageTemplate, 
messageArgs);
-            assertEquals(expected, e.getMessage());
-        }
-    }
-
-    private static Properties makeProperties(int version, int volumeCount, 
String token)
-    {
-        Properties props = new Properties();
-        props.setProperty(VERSION, Integer.toString(version));
-        props.setProperty(VOLUME_COUNT, Integer.toString(volumeCount));
-        props.setProperty(TOKEN, token);
-        return props;
-    }
-
-    private static File writeFile(File dir, int volumeCount, String token) 
throws IOException
-    {
-        return writeFile(dir, DEFAULT_FS_OWNERSHIP_FILENAME, 1, volumeCount, 
token);
-    }
-
-    private static File writeFile(File dir, final String filename, int 
version, int volumeCount, String token)
-    throws IOException
-    {
-        return writeFile(dir, filename, makeProperties(version, volumeCount, 
token));
-    }
-
-    private static File writeFile(File dir, String filename, Properties props) 
throws IOException
-    {
-        File tokenFile = new File(dir, filename); //checkstyle: permit this 
instantiation
-        assertTrue(tokenFile.createFileIfNotExists());
-        try (OutputStream os = Files.newOutputStream(tokenFile.toPath()))
-        {
-            props.store(os, "Test properties");
-        }
-        assertTrue(tokenFile.isReadable());
-        return tokenFile;
-    }
-
-    private static File mkdirs(File parent, String path)
-    {
-        File childDir = new File(parent, path); //checkstyle: permit this 
instantiation
-        assertTrue(childDir.tryCreateDirectories());
-        assertTrue(childDir.exists());
-        return childDir;
-    }
-
-    private static FileSystemOwnershipCheck checker(Supplier<Iterable<String>> 
dirs)
-    {
-        return new FileSystemOwnershipCheck(dirs);
-    }
-
-    private static FileSystemOwnershipCheck checker(File...dirs)
-    {
-        return checker(() -> 
Arrays.stream(dirs).map(File::absolutePath).collect(Collectors.toList()));
-    }
-
-    private static FileSystemOwnershipCheck checker(String...dirs)
-    {
-        return checker(() -> Arrays.asList(dirs));
-    }
-
-    public static String makeRandomString(int length)
-    {
-        Random random = new Random();
-        char[] chars = new char[length];
-        for (int i = 0; i < length; ++i)
-            chars[i] = (char) ('a' + random.nextInt('z' - 'a' + 1));
-        return new String(chars);
-    }
-
-    private String quote(String toQuote)
-    {
-        return String.format("'%s'", toQuote);
-    }
-}
\ No newline at end of file
diff --git a/test/unit/org/apache/cassandra/service/StartupChecksTest.java 
b/test/unit/org/apache/cassandra/service/StartupChecksTest.java
index a422a42..2c15fdc 100644
--- a/test/unit/org/apache/cassandra/service/StartupChecksTest.java
+++ b/test/unit/org/apache/cassandra/service/StartupChecksTest.java
@@ -22,6 +22,7 @@ import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.Paths;
 
+import org.apache.cassandra.config.StartupChecksOptions;
 import org.apache.cassandra.io.util.File;
 import org.junit.*;
 
@@ -42,6 +43,8 @@ public class StartupChecksTest
     StartupChecks startupChecks;
     Path sstableDir;
 
+    StartupChecksOptions options = new StartupChecksOptions();
+
     @BeforeClass
     public static void setupServer()
     {
@@ -82,14 +85,14 @@ public class StartupChecksTest
         FileUtils.deleteRecursive(new File(sstableDir));
         Path snapshotDir = sstableDir.resolve("snapshots");
         Files.createDirectories(snapshotDir);
-        copyInvalidLegacySSTables(snapshotDir); startupChecks.verify();
+        copyInvalidLegacySSTables(snapshotDir); startupChecks.verify(options);
 
         // and in a backups directory
         FileUtils.deleteRecursive(new File(sstableDir));
         Path backupDir = sstableDir.resolve("backups");
         Files.createDirectories(backupDir);
         copyInvalidLegacySSTables(backupDir);
-        startupChecks.verify();
+        startupChecks.verify(options);
     }
 
     @Test
@@ -100,7 +103,7 @@ public class StartupChecksTest
         copyLegacyNonSSTableFiles(sstableDir);
         assertFalse(new File(sstableDir).tryList().length == 0);
 
-        startupChecks.verify();
+        startupChecks.verify(options);
     }
 
     @Test
@@ -109,7 +112,7 @@ public class StartupChecksTest
         // This test just validates if the verify function
         // doesn't throw any exceptions
         startupChecks = 
startupChecks.withTest(StartupChecks.checkReadAheadKbSetting);
-        startupChecks.verify();
+        startupChecks.verify(options);
     }
 
     @Test
@@ -132,7 +135,7 @@ public class StartupChecksTest
     public void maxMapCountCheck() throws Exception
     {
         startupChecks = startupChecks.withTest(StartupChecks.checkMaxMapCount);
-        startupChecks.verify();
+        startupChecks.verify(options);
     }
 
     private void copyLegacyNonSSTableFiles(Path targetDir) throws IOException
@@ -161,7 +164,7 @@ public class StartupChecksTest
     {
         try
         {
-            tests.verify();
+            tests.verify(options);
             fail("Expected a startup exception but none was thrown");
         }
         catch (StartupException e)
diff --git 
a/test/unit/org/apache/cassandra/service/SystemPropertiesBasedFileSystemOwnershipCheckTest.java
 
b/test/unit/org/apache/cassandra/service/SystemPropertiesBasedFileSystemOwnershipCheckTest.java
new file mode 100644
index 0000000..cb54f81
--- /dev/null
+++ 
b/test/unit/org/apache/cassandra/service/SystemPropertiesBasedFileSystemOwnershipCheckTest.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service;
+
+import org.junit.Before;
+
+import org.apache.cassandra.config.CassandraRelevantProperties;
+
+public class SystemPropertiesBasedFileSystemOwnershipCheckTest extends 
AbstractFilesystemOwnershipCheckTest
+{
+    @Before
+    public void setup()
+    {
+        super.setup();
+        
System.setProperty(CassandraRelevantProperties.FILE_SYSTEM_CHECK_OWNERSHIP_TOKEN.getKey(),
 token);
+        
System.setProperty(CassandraRelevantProperties.FILE_SYSTEM_CHECK_ENABLE.getKey(),
 "true");
+    }
+}
\ No newline at end of file
diff --git 
a/test/unit/org/apache/cassandra/service/YamlBasedFileSystemOwnershipCheckTest.java
 
b/test/unit/org/apache/cassandra/service/YamlBasedFileSystemOwnershipCheckTest.java
new file mode 100644
index 0000000..cc00fd6
--- /dev/null
+++ 
b/test/unit/org/apache/cassandra/service/YamlBasedFileSystemOwnershipCheckTest.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service;
+
+import org.junit.Before;
+
+import static 
org.apache.cassandra.config.StartupChecksOptions.ENABLED_PROPERTY;
+import static 
org.apache.cassandra.service.StartupChecks.StartupCheckType.filesystem_ownership;
+
+public class YamlBasedFileSystemOwnershipCheckTest extends 
AbstractFilesystemOwnershipCheckTest
+{
+    @Before
+    public void setup()
+    {
+        super.setup();
+        options.getConfig(filesystem_ownership).put(ENABLED_PROPERTY, "true");
+        options.getConfig(filesystem_ownership).put("ownership_token", token);
+    }
+}

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to