This is an automated email from the ASF dual-hosted git repository.

dcapwell pushed a commit to branch cep-15-accord
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/cep-15-accord by this push:
     new 62948a8c52 CEP-15: (C*) Enhance in-memory FileSystem to work with mmap 
and support tests to add custom logic
62948a8c52 is described below

commit 62948a8c525cee5eb2115e9f2441342b48f4cb89
Author: David Capwell <[email protected]>
AuthorDate: Mon May 8 11:56:37 2023 -0700

    CEP-15: (C*) Enhance in-memory FileSystem to work with mmap and support 
tests to add custom logic
    
    patch by David Capwell; reviewed by Caleb Rackliffe for CASSANDRA-18485
---
 .../org/apache/cassandra/hints/HintsCatalog.java   |   2 +
 src/java/org/apache/cassandra/io/util/File.java    |  19 +
 .../org/apache/cassandra/utils/ByteBufferUtil.java |  13 +
 src/java/org/apache/cassandra/utils/SyncUtil.java  |   8 +
 .../cassandra/distributed/impl/Instance.java       |   5 +-
 .../cassandra/simulator/ClusterSimulation.java     |  47 +-
 .../apache/cassandra/auth/GrantAndRevokeTest.java  |   2 +
 .../apache/cassandra/cql3/CDCStatementTest.java    |   2 +
 test/unit/org/apache/cassandra/cql3/CQLTester.java |  52 +-
 .../cassandra/cql3/CustomNowInSecondsTest.java     |   3 +
 .../apache/cassandra/cql3/GcCompactionTest.java    |   2 +-
 .../org/apache/cassandra/cql3/KeyCacheCqlTest.java |   2 +-
 .../cassandra/cql3/PreparedStatementsTest.java     |   2 +
 .../cql3/selection/SelectionColumnMappingTest.java |   3 +
 .../validation/entities/FrozenCollectionsTest.java |   2 +
 .../cql3/validation/entities/JsonTest.java         |   2 +
 .../cql3/validation/entities/UserTypesTest.java    |   3 +
 .../cql3/validation/entities/VirtualTableTest.java |   3 +
 .../miscellaneous/SSTablesIteratedTest.java        |   2 +-
 .../InsertUpdateIfConditionCollectionsTest.java    |   3 +
 .../InsertUpdateIfConditionStaticsTest.java        |   2 +
 .../operations/InsertUpdateIfConditionTest.java    |   2 +
 .../validation/operations/SelectLimitTest.java     |   3 +
 .../cassandra/db/CorruptPrimaryIndexTest.java      |  56 ++
 .../unit/org/apache/cassandra/db/KeyspaceTest.java |   2 +-
 .../commitlog/CommitLogSegmentManagerCDCTest.java  |   3 +
 .../db/memtable/MemtableSizeTestBase.java          |   2 +
 .../db/virtual/CredentialsCacheKeysTableTest.java  |   3 +
 .../NetworkPermissionsCacheKeysTableTest.java      |   3 +
 .../db/virtual/PermissionsCacheKeysTableTest.java  |   2 +
 .../db/virtual/RolesCacheKeysTableTest.java        |   2 +
 .../io/filesystem/ForwardingFileChannel.java       | 146 ++++
 .../io/filesystem/ForwardingFileSystem.java        | 129 ++++
 .../filesystem/ForwardingFileSystemProvider.java   | 246 ++++++
 .../cassandra/io/filesystem/ForwardingPath.java    | 234 ++++++
 .../io/filesystem/ListenableFileSystem.java        | 856 +++++++++++++++++++++
 .../org/apache/cassandra/io/util/FileSystems.java  |  89 +++
 .../schema/RemoveWithoutDroppingTest.java          |   3 +
 .../cassandra/service/PartitionDenylistTest.java   |   3 +
 39 files changed, 1931 insertions(+), 32 deletions(-)

diff --git a/src/java/org/apache/cassandra/hints/HintsCatalog.java 
b/src/java/org/apache/cassandra/hints/HintsCatalog.java
index ecde896b90..6bc0030924 100644
--- a/src/java/org/apache/cassandra/hints/HintsCatalog.java
+++ b/src/java/org/apache/cassandra/hints/HintsCatalog.java
@@ -168,6 +168,8 @@ final class HintsCatalog
         }
         else
         {
+            if (SyncUtil.SKIP_SYNC)
+                return;
             logger.error("Unable to open directory {}", 
hintsDirectory.absolutePath());
             FileUtils.handleFSErrorAndPropagate(new FSWriteError(new 
IOException(String.format("Unable to open hint directory %s", 
hintsDirectory.absolutePath())), hintsDirectory.absolutePath()));
         }
diff --git a/src/java/org/apache/cassandra/io/util/File.java 
b/src/java/org/apache/cassandra/io/util/File.java
index c9bc97c3a9..de415388ed 100644
--- a/src/java/org/apache/cassandra/io/util/File.java
+++ b/src/java/org/apache/cassandra/io/util/File.java
@@ -38,6 +38,7 @@ import java.util.function.Predicate;
 import java.util.stream.Stream;
 import javax.annotation.Nullable;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.util.concurrent.RateLimiter;
 
 import net.openhft.chronicle.core.util.ThrowingFunction;
@@ -125,6 +126,18 @@ public class File implements Comparable<File>
         if (!path.isAbsolute() || path.isOpaque()) throw new 
IllegalArgumentException();
     }
 
+    /**
+     * Unsafe constructor that allows a File to use a differet {@link 
FileSystem} than {@link File#filesystem}.
+     *
+     * The main caller of such a method are cases such as JVM Dtest functions 
that need access to the logging framwork
+     * files, which exists on in {@link FileSystems#getDefault()}.
+     */
+    @VisibleForTesting
+    public File(FileSystem fs, String first, String... more)
+    {
+        this.path = fs.getPath(first, more);
+    }
+
     /**
      * @param path the path to wrap
      */
@@ -777,6 +790,12 @@ public class File implements Comparable<File>
         return path;
     }
 
+    @VisibleForTesting
+    public static FileSystem unsafeGetFilesystem()
+    {
+        return filesystem;
+    }
+
     public static void unsafeSetFilesystem(FileSystem fs)
     {
         filesystem = fs;
diff --git a/src/java/org/apache/cassandra/utils/ByteBufferUtil.java 
b/src/java/org/apache/cassandra/utils/ByteBufferUtil.java
index 615138e7d2..4b740578a3 100644
--- a/src/java/org/apache/cassandra/utils/ByteBufferUtil.java
+++ b/src/java/org/apache/cassandra/utils/ByteBufferUtil.java
@@ -24,10 +24,12 @@ package org.apache.cassandra.utils;
  */
 
 import java.io.DataInput;
+import java.io.EOFException;
 import java.io.IOException;
 import java.io.InputStream;
 import java.net.InetAddress;
 import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
 import java.nio.charset.CharacterCodingException;
 import java.nio.charset.Charset;
 import java.nio.charset.StandardCharsets;
@@ -934,6 +936,17 @@ public class ByteBufferUtil
         return true;
     }
 
+    public static void readFully(FileChannel channel, ByteBuffer dst, long 
position) throws IOException
+    {
+        while (dst.hasRemaining())
+        {
+            int read = channel.read(dst, position);
+            if (read == -1)
+                throw new EOFException();
+            position += read;
+        }
+    }
+
     public static <T> ByteBuffer serialized(IVersionedSerializer<T> 
serializer, T value, int version)
     {
         try (DataOutputBuffer dob = new DataOutputBuffer())
diff --git a/src/java/org/apache/cassandra/utils/SyncUtil.java 
b/src/java/org/apache/cassandra/utils/SyncUtil.java
index 6055859531..081a5f1e16 100644
--- a/src/java/org/apache/cassandra/utils/SyncUtil.java
+++ b/src/java/org/apache/cassandra/utils/SyncUtil.java
@@ -31,6 +31,8 @@ import org.apache.cassandra.config.Config;
 
 import com.google.common.base.Preconditions;
 import org.apache.cassandra.io.util.File;
+import org.apache.cassandra.utils.memory.MemoryUtil;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -99,6 +101,12 @@ public class SyncUtil
     public static MappedByteBuffer force(MappedByteBuffer buf)
     {
         Preconditions.checkNotNull(buf);
+        Object attachment = MemoryUtil.getAttachment(buf);
+        if (attachment instanceof Runnable)
+        {
+            ((Runnable) attachment).run();
+            return buf;
+        }
         if (SKIP_SYNC)
         {
             Object fd = null;
diff --git 
a/test/distributed/org/apache/cassandra/distributed/impl/Instance.java 
b/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
index 348bde4666..84cf5cf1fc 100644
--- a/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
+++ b/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
@@ -25,6 +25,7 @@ import java.net.BindException;
 import java.net.InetSocketAddress;
 import java.nio.ByteBuffer;
 import java.nio.file.FileSystem;
+import java.nio.file.FileSystems;
 import java.security.Permission;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -221,10 +222,10 @@ public class Instance extends IsolatedExecutor implements 
IInvokableInstance
         String suite = System.getProperty("suitename", 
"suitename_IS_UNDEFINED");
         String clusterId = ClusterIDDefiner.getId();
         String instanceId = InstanceIDDefiner.getInstanceId();
-        File f = new 
File(String.format("build/test/logs/%s/%s/%s/%s/system.log", tag, suite, 
clusterId, instanceId));
+        File f = new File(FileSystems.getDefault(), 
String.format("build/test/logs/%s/%s/%s/%s/system.log", tag, suite, clusterId, 
instanceId));
         // when creating a cluster globally in a test class we get the logs 
without the suite, try finding those logs:
         if (!f.exists())
-            f = new File(String.format("build/test/logs/%s/%s/%s/system.log", 
tag, clusterId, instanceId));
+            f = new File(FileSystems.getDefault(), 
String.format("build/test/logs/%s/%s/%s/system.log", tag, clusterId, 
instanceId));
         if (!f.exists())
             throw new AssertionError("Unable to locate system.log under " + 
new File("build/test/logs").absolutePath() + "; make sure ICluster.setup() is 
called or extend TestBaseImpl and do not define a static beforeClass function 
with @BeforeClass");
         return new FileLogAction(f);
diff --git 
a/test/simulator/main/org/apache/cassandra/simulator/ClusterSimulation.java 
b/test/simulator/main/org/apache/cassandra/simulator/ClusterSimulation.java
index ac3a773453..b0257c62b2 100644
--- a/test/simulator/main/org/apache/cassandra/simulator/ClusterSimulation.java
+++ b/test/simulator/main/org/apache/cassandra/simulator/ClusterSimulation.java
@@ -21,8 +21,8 @@ package org.apache.cassandra.simulator;
 import java.io.IOException;
 import java.lang.reflect.Field;
 import java.lang.reflect.Modifier;
-import java.nio.file.FileSystem;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.EnumMap;
 import java.util.List;
 import java.util.Map;
@@ -35,8 +35,6 @@ import java.util.function.LongConsumer;
 import java.util.function.Predicate;
 import java.util.function.Supplier;
 
-import com.google.common.jimfs.Configuration;
-import com.google.common.jimfs.Jimfs;
 import com.google.common.util.concurrent.AsyncFunction;
 import com.google.common.util.concurrent.FutureCallback;
 
@@ -55,6 +53,7 @@ import 
org.apache.cassandra.distributed.api.IIsolatedExecutor.SerializableRunnab
 import org.apache.cassandra.distributed.impl.DirectStreamingConnectionFactory;
 import org.apache.cassandra.distributed.impl.IsolatedExecutor;
 import org.apache.cassandra.io.compress.LZ4Compressor;
+import org.apache.cassandra.io.util.FileSystems;
 import org.apache.cassandra.service.paxos.BallotGenerator;
 import org.apache.cassandra.service.paxos.PaxosPrepare;
 import org.apache.cassandra.simulator.RandomSource.Choices;
@@ -62,6 +61,7 @@ import 
org.apache.cassandra.simulator.asm.InterceptAsClassTransformer;
 import org.apache.cassandra.simulator.asm.NemesisFieldSelectors;
 import org.apache.cassandra.simulator.cluster.ClusterActions;
 import org.apache.cassandra.simulator.cluster.ClusterActions.TopologyChange;
+import org.apache.cassandra.io.filesystem.ListenableFileSystem;
 import org.apache.cassandra.simulator.systems.Failures;
 import 
org.apache.cassandra.simulator.systems.InterceptedWait.CaptureSites.Capture;
 import org.apache.cassandra.simulator.systems.InterceptibleThread;
@@ -605,7 +605,7 @@ public class ClusterSimulation<S extends Simulation> 
implements AutoCloseable
     public final SimulatedSystems simulated;
     public final Cluster cluster;
     public final S simulation;
-    private final FileSystem jimfs;
+    private final ListenableFileSystem fs;
     protected final Map<Integer, List<Closeable>> onUnexpectedShutdown = new 
TreeMap<>();
     protected final List<Callable<Void>> onShutdown = new 
CopyOnWriteArrayList<>();
     protected final ThreadLocalRandomCheck threadLocalRandomCheck;
@@ -616,9 +616,7 @@ public class ClusterSimulation<S extends Simulation> 
implements AutoCloseable
                              SimulationFactory<S> factory) throws IOException
     {
         this.random = random;
-        this.jimfs  = Jimfs.newFileSystem(Long.toHexString(seed) + 'x' + 
uniqueNum, Configuration.unix().toBuilder()
-                                                                               
.setMaxSize(4L << 30).setBlockSize(512)
-                                                                               
.build());
+        this.fs = new 
ListenableFileSystem(FileSystems.jimfs(Long.toHexString(seed) + 'x' + 
uniqueNum));
 
         final SimulatedMessageDelivery delivery;
         final SimulatedExecution execution;
@@ -671,23 +669,28 @@ public class ClusterSimulation<S extends Simulation> 
implements AutoCloseable
 
         Failures failures = new Failures();
         ThreadAllocator threadAllocator = new ThreadAllocator(random, 
builder.threadCount, numOfNodes);
+        List<String> allowedDiskAccessModes = Arrays.asList("mmap", 
"mmap_index_only", "standard");
+        String disk_access_mode = allowedDiskAccessModes.get(random.uniform(0, 
allowedDiskAccessModes.size() - 1));
+        boolean commitlogCompressed = random.decide(.5f);
         cluster = snitch.setup(Cluster.build(numOfNodes)
-                         .withRoot(jimfs.getPath("/cassandra"))
+                         .withRoot(fs.getPath("/cassandra"))
                          .withSharedClasses(sharedClassPredicate)
-                         .withConfig(config -> 
configUpdater.accept(threadAllocator.update(config
-                             .with(Feature.BLANK_GOSSIP)
-                             .set("read_request_timeout", 
String.format("%dms", NANOSECONDS.toMillis(builder.readTimeoutNanos)))
-                             .set("write_request_timeout", 
String.format("%dms", NANOSECONDS.toMillis(builder.writeTimeoutNanos)))
-                             .set("cas_contention_timeout", 
String.format("%dms", NANOSECONDS.toMillis(builder.contentionTimeoutNanos)))
-                             .set("request_timeout", String.format("%dms", 
NANOSECONDS.toMillis(builder.requestTimeoutNanos)))
-                             .set("memtable_heap_space", "1MiB")
-                             .set("memtable_allocation_type", 
builder.memoryListener != null ? "unslabbed_heap_buffers_logged" : 
"heap_buffers")
-                             .set("file_cache_size", "16MiB")
-                             .set("use_deterministic_table_id", true)
-                             .set("disk_access_mode", "standard")
-                             .set("failure_detector", 
SimulatedFailureDetector.Instance.class.getName())
-                             .set("commitlog_compression", new 
ParameterizedClass(LZ4Compressor.class.getName(), emptyMap()))
-                         )))
+                         .withConfig(config -> {
+                             config.with(Feature.BLANK_GOSSIP)
+                                   .set("read_request_timeout", 
String.format("%dms", NANOSECONDS.toMillis(builder.readTimeoutNanos)))
+                                   .set("write_request_timeout", 
String.format("%dms", NANOSECONDS.toMillis(builder.writeTimeoutNanos)))
+                                   .set("cas_contention_timeout", 
String.format("%dms", NANOSECONDS.toMillis(builder.contentionTimeoutNanos)))
+                                   .set("request_timeout", 
String.format("%dms", NANOSECONDS.toMillis(builder.requestTimeoutNanos)))
+                                   .set("memtable_heap_space", "1MiB")
+                                   .set("memtable_allocation_type", 
builder.memoryListener != null ? "unslabbed_heap_buffers_logged" : 
"heap_buffers")
+                                   .set("file_cache_size", "16MiB")
+                                   .set("use_deterministic_table_id", true)
+                                   .set("disk_access_mode", disk_access_mode)
+                                   .set("failure_detector", 
SimulatedFailureDetector.Instance.class.getName());
+                             if (commitlogCompressed)
+                                 config.set("commitlog_compression", new 
ParameterizedClass(LZ4Compressor.class.getName(), emptyMap()));
+                             
configUpdater.accept(threadAllocator.update(config));
+                         })
                          .withInstanceInitializer(new IInstanceInitializer()
                          {
                              @Override
diff --git a/test/unit/org/apache/cassandra/auth/GrantAndRevokeTest.java 
b/test/unit/org/apache/cassandra/auth/GrantAndRevokeTest.java
index 5c1c2a0298..eeff052ed0 100644
--- a/test/unit/org/apache/cassandra/auth/GrantAndRevokeTest.java
+++ b/test/unit/org/apache/cassandra/auth/GrantAndRevokeTest.java
@@ -22,6 +22,7 @@ import org.junit.BeforeClass;
 import org.junit.Test;
 
 import com.datastax.driver.core.ResultSet;
+import org.apache.cassandra.ServerTestUtils;
 import org.apache.cassandra.Util;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.cql3.CQLTester;
@@ -36,6 +37,7 @@ public class GrantAndRevokeTest extends CQLTester
     @BeforeClass
     public static void setUpClass()
     {
+        ServerTestUtils.daemonInitialization();
         DatabaseDescriptor.setPermissionsValidity(0);
         CQLTester.setUpClass();
         requireAuthentication();
diff --git a/test/unit/org/apache/cassandra/cql3/CDCStatementTest.java 
b/test/unit/org/apache/cassandra/cql3/CDCStatementTest.java
index fb24aa99c2..55865b1573 100644
--- a/test/unit/org/apache/cassandra/cql3/CDCStatementTest.java
+++ b/test/unit/org/apache/cassandra/cql3/CDCStatementTest.java
@@ -22,6 +22,7 @@ import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
+import org.apache.cassandra.ServerTestUtils;
 import org.apache.cassandra.config.DatabaseDescriptor;
 
 public class CDCStatementTest extends CQLTester
@@ -29,6 +30,7 @@ public class CDCStatementTest extends CQLTester
     @BeforeClass
     public static void setUpClass()
     {
+        ServerTestUtils.daemonInitialization();
         DatabaseDescriptor.setCDCEnabled(true);
         CQLTester.setUpClass();
     }
diff --git a/test/unit/org/apache/cassandra/cql3/CQLTester.java 
b/test/unit/org/apache/cassandra/cql3/CQLTester.java
index 92ffecb75c..c532d8b8f9 100644
--- a/test/unit/org/apache/cassandra/cql3/CQLTester.java
+++ b/test/unit/org/apache/cassandra/cql3/CQLTester.java
@@ -69,13 +69,17 @@ import org.apache.cassandra.auth.AuthTestUtils;
 import org.apache.cassandra.auth.IRoleManager;
 import org.apache.cassandra.concurrent.ScheduledExecutors;
 import org.apache.cassandra.concurrent.Stage;
+import org.apache.cassandra.config.CassandraRelevantProperties;
 import org.apache.cassandra.config.DataStorageSpec;
 import org.apache.cassandra.config.EncryptionOptions;
 import org.apache.cassandra.db.virtual.VirtualKeyspaceRegistry;
 import org.apache.cassandra.db.virtual.VirtualSchemaKeyspace;
 import org.apache.cassandra.exceptions.InvalidRequestException;
 import org.apache.cassandra.index.SecondaryIndexManager;
+import org.apache.cassandra.io.filesystem.ListenableFileSystem;
+import org.apache.cassandra.io.sstable.Descriptor;
 import org.apache.cassandra.io.util.File;
+import org.apache.cassandra.io.util.FileSystems;
 import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.locator.TokenMetadata;
 import org.apache.cassandra.metrics.CassandraMetricsRegistry;
@@ -192,8 +196,6 @@ public abstract class CQLTester
 
         nativeAddr = InetAddress.getLoopbackAddress();
         nativePort = getAutomaticallyAllocatedPort(nativeAddr);
-
-        ServerTestUtils.daemonInitialization();
     }
 
     private List<String> keyspaces = new ArrayList<>();
@@ -319,6 +321,8 @@ public abstract class CQLTester
     @BeforeClass
     public static void setUpClass()
     {
+        ServerTestUtils.daemonInitialization();
+
         if (ROW_CACHE_SIZE_IN_MIB > 0)
             DatabaseDescriptor.setRowCacheSizeInMiB(ROW_CACHE_SIZE_IN_MIB);
         
StorageService.instance.setPartitionerUnsafe(Murmur3Partitioner.instance);
@@ -1333,7 +1337,7 @@ public abstract class CQLTester
         return QueryProcessor.instance.prepare(formatQuery(query), 
ClientState.forInternalCalls());
     }
 
-    protected UntypedResultSet execute(String query, Object... values) throws 
Throwable
+    protected UntypedResultSet execute(String query, Object... values)
     {
         return executeFormattedQuery(formatQuery(query), values);
     }
@@ -1347,7 +1351,7 @@ public abstract class CQLTester
      * Executes the provided query using the {@link 
ClientState#forInternalCalls()} as the expected ClientState. Note:
      * this means permissions checking will not apply and queries will proceed 
regardless of role or guardrails.
      */
-    protected UntypedResultSet executeFormattedQuery(String query, Object... 
values) throws Throwable
+    protected UntypedResultSet executeFormattedQuery(String query, Object... 
values)
     {
         UntypedResultSet rs;
         if (usePrepared)
@@ -2414,4 +2418,44 @@ public abstract class CQLTester
                 && Objects.equal(password, u.password);
         }
     }
+
+    public static abstract class InMemory extends CQLTester
+    {
+        protected static ListenableFileSystem fs = null;
+
+        /**
+         * Used by {@link #cleanupFileSystemListeners()} to know if file 
system listeners should be removed at the start
+         * of a test; can disable for cases where listeners are needed cross 
mutliple tests.
+         */
+        protected boolean cleanupFileSystemListeners = true;
+
+        @BeforeClass
+        public static void setUpClass()
+        {
+            fs = FileSystems.newGlobalInMemoryFileSystem();
+            
CassandraRelevantProperties.IGNORE_MISSING_NATIVE_FILE_HINTS.setBoolean(true);
+            FileSystems.maybeCreateTmp();
+
+            CQLTester.setUpClass();
+        }
+        @Before
+        public void cleanupFileSystemListeners()
+        {
+            if (!cleanupFileSystemListeners)
+                return;
+            fs.clearListeners();
+        }
+
+        protected ListenableFileSystem.PathFilter 
isCurrentTableIndexFile(String keyspace)
+        {
+            return path -> {
+                if (!path.getFileName().toString().endsWith("Index.db"))
+                    return false;
+                Descriptor desc = Descriptor.fromFile(new File(path));
+                if (!desc.ksname.equals(keyspace) && 
desc.cfname.equals(currentTable()))
+                    return false;
+                return true;
+            };
+        }
+    }
 }
diff --git a/test/unit/org/apache/cassandra/cql3/CustomNowInSecondsTest.java 
b/test/unit/org/apache/cassandra/cql3/CustomNowInSecondsTest.java
index 8768b77ca9..ce9b103c67 100644
--- a/test/unit/org/apache/cassandra/cql3/CustomNowInSecondsTest.java
+++ b/test/unit/org/apache/cassandra/cql3/CustomNowInSecondsTest.java
@@ -25,6 +25,7 @@ import com.google.common.collect.ImmutableList;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
+import org.apache.cassandra.ServerTestUtils;
 import org.apache.cassandra.cql3.statements.BatchStatement;
 import org.apache.cassandra.cql3.statements.ModificationStatement;
 import org.apache.cassandra.db.ConsistencyLevel;
@@ -43,6 +44,8 @@ public class CustomNowInSecondsTest extends CQLTester
     @BeforeClass
     public static void setUpClass()
     {
+        ServerTestUtils.daemonInitialization();
+
         prepareServer();
         requireNetwork();
     }
diff --git a/test/unit/org/apache/cassandra/cql3/GcCompactionTest.java 
b/test/unit/org/apache/cassandra/cql3/GcCompactionTest.java
index eded145c66..91111d19d0 100644
--- a/test/unit/org/apache/cassandra/cql3/GcCompactionTest.java
+++ b/test/unit/org/apache/cassandra/cql3/GcCompactionTest.java
@@ -55,7 +55,7 @@ public class GcCompactionTest extends CQLTester
     }
 
     @Override
-    protected UntypedResultSet execute(String query, Object... values) throws 
Throwable
+    protected UntypedResultSet execute(String query, Object... values)
     {
         return executeFormattedQuery(formatQuery(KEYSPACE_PER_TEST, query), 
values);
     }
diff --git a/test/unit/org/apache/cassandra/cql3/KeyCacheCqlTest.java 
b/test/unit/org/apache/cassandra/cql3/KeyCacheCqlTest.java
index d40b001e90..779b39eea3 100644
--- a/test/unit/org/apache/cassandra/cql3/KeyCacheCqlTest.java
+++ b/test/unit/org/apache/cassandra/cql3/KeyCacheCqlTest.java
@@ -120,7 +120,7 @@ public class KeyCacheCqlTest extends CQLTester
     }
 
     @Override
-    protected UntypedResultSet execute(String query, Object... values) throws 
Throwable
+    protected UntypedResultSet execute(String query, Object... values)
     {
         return executeFormattedQuery(formatQuery(KEYSPACE_PER_TEST, query), 
values);
     }
diff --git a/test/unit/org/apache/cassandra/cql3/PreparedStatementsTest.java 
b/test/unit/org/apache/cassandra/cql3/PreparedStatementsTest.java
index e6a41dd70f..efc4793c9d 100644
--- a/test/unit/org/apache/cassandra/cql3/PreparedStatementsTest.java
+++ b/test/unit/org/apache/cassandra/cql3/PreparedStatementsTest.java
@@ -34,6 +34,7 @@ import com.datastax.driver.core.ResultSet;
 import com.datastax.driver.core.Session;
 import com.datastax.driver.core.exceptions.SyntaxError;
 import com.datastax.driver.core.exceptions.WriteTimeoutException;
+import org.apache.cassandra.ServerTestUtils;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.ConsistencyLevel;
 import org.apache.cassandra.exceptions.PreparedQueryNotFoundException;
@@ -60,6 +61,7 @@ public class PreparedStatementsTest extends CQLTester
     @BeforeClass
     public static void setUpClass()
     {
+        ServerTestUtils.daemonInitialization();
         DatabaseDescriptor.setAccordTransactionsEnabled(true);
         CQLTester.setUpClass();
     }
diff --git 
a/test/unit/org/apache/cassandra/cql3/selection/SelectionColumnMappingTest.java 
b/test/unit/org/apache/cassandra/cql3/selection/SelectionColumnMappingTest.java
index 659f34a737..d36c0e2bd2 100644
--- 
a/test/unit/org/apache/cassandra/cql3/selection/SelectionColumnMappingTest.java
+++ 
b/test/unit/org/apache/cassandra/cql3/selection/SelectionColumnMappingTest.java
@@ -26,6 +26,7 @@ import java.util.List;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
+import org.apache.cassandra.ServerTestUtils;
 import org.apache.cassandra.schema.ColumnMetadata;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.schema.Schema;
@@ -54,6 +55,8 @@ public class SelectionColumnMappingTest extends CQLTester
     @BeforeClass
     public static void setUpClass()     // overrides CQLTester.setUpClass()
     {
+        ServerTestUtils.daemonInitialization();
+
         
DatabaseDescriptor.setPartitionerUnsafe(ByteOrderedPartitioner.instance);
 
         prepareServer();
diff --git 
a/test/unit/org/apache/cassandra/cql3/validation/entities/FrozenCollectionsTest.java
 
b/test/unit/org/apache/cassandra/cql3/validation/entities/FrozenCollectionsTest.java
index 5cbfb4cff5..c4cdca1e57 100644
--- 
a/test/unit/org/apache/cassandra/cql3/validation/entities/FrozenCollectionsTest.java
+++ 
b/test/unit/org/apache/cassandra/cql3/validation/entities/FrozenCollectionsTest.java
@@ -26,6 +26,7 @@ import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
+import org.apache.cassandra.ServerTestUtils;
 import org.apache.cassandra.cql3.CQLTester;
 import org.apache.cassandra.cql3.restrictions.StatementRestrictions;
 import org.apache.cassandra.db.marshal.*;
@@ -43,6 +44,7 @@ public class FrozenCollectionsTest extends CQLTester
     @BeforeClass
     public static void setUpClass()     // overrides CQLTester.setUpClass()
     {
+        ServerTestUtils.daemonInitialization();
         // Selecting partitioner for a table is not exposed on CREATE TABLE.
         
StorageService.instance.setPartitionerUnsafe(ByteOrderedPartitioner.instance);
 
diff --git 
a/test/unit/org/apache/cassandra/cql3/validation/entities/JsonTest.java 
b/test/unit/org/apache/cassandra/cql3/validation/entities/JsonTest.java
index 3f5408fed4..502192c274 100644
--- a/test/unit/org/apache/cassandra/cql3/validation/entities/JsonTest.java
+++ b/test/unit/org/apache/cassandra/cql3/validation/entities/JsonTest.java
@@ -17,6 +17,7 @@
  */
 package org.apache.cassandra.cql3.validation.entities;
 
+import org.apache.cassandra.ServerTestUtils;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.cql3.Json;
 import org.apache.cassandra.cql3.CQLTester;
@@ -50,6 +51,7 @@ public class JsonTest extends CQLTester
     @BeforeClass
     public static void setUpClass()
     {
+        ServerTestUtils.daemonInitialization();
         if (ROW_CACHE_SIZE_IN_MIB > 0)
             DatabaseDescriptor.setRowCacheSizeInMiB(ROW_CACHE_SIZE_IN_MIB);
 
diff --git 
a/test/unit/org/apache/cassandra/cql3/validation/entities/UserTypesTest.java 
b/test/unit/org/apache/cassandra/cql3/validation/entities/UserTypesTest.java
index 1520b4cab9..0d99494979 100644
--- a/test/unit/org/apache/cassandra/cql3/validation/entities/UserTypesTest.java
+++ b/test/unit/org/apache/cassandra/cql3/validation/entities/UserTypesTest.java
@@ -22,6 +22,7 @@ import java.util.UUID;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
+import org.apache.cassandra.ServerTestUtils;
 import org.apache.cassandra.cql3.CQLTester;
 import org.apache.cassandra.dht.ByteOrderedPartitioner;
 import org.apache.cassandra.exceptions.InvalidRequestException;
@@ -32,6 +33,8 @@ public class UserTypesTest extends CQLTester
     @BeforeClass
     public static void setUpClass()     // overrides CQLTester.setUpClass()
     {
+        ServerTestUtils.daemonInitialization();
+
         // Selecting partitioner for a table is not exposed on CREATE TABLE.
         
StorageService.instance.setPartitionerUnsafe(ByteOrderedPartitioner.instance);
 
diff --git 
a/test/unit/org/apache/cassandra/cql3/validation/entities/VirtualTableTest.java 
b/test/unit/org/apache/cassandra/cql3/validation/entities/VirtualTableTest.java
index a17f74cec2..b2da6e59ca 100644
--- 
a/test/unit/org/apache/cassandra/cql3/validation/entities/VirtualTableTest.java
+++ 
b/test/unit/org/apache/cassandra/cql3/validation/entities/VirtualTableTest.java
@@ -39,6 +39,7 @@ import org.junit.BeforeClass;
 import org.junit.Test;
 
 import com.datastax.driver.core.exceptions.InvalidQueryException;
+import org.apache.cassandra.ServerTestUtils;
 import org.apache.cassandra.cql3.CQLTester;
 import org.apache.cassandra.db.Mutation;
 import org.apache.cassandra.db.marshal.Int32Type;
@@ -208,6 +209,8 @@ public class VirtualTableTest extends CQLTester
     @BeforeClass
     public static void setUpClass()
     {
+        ServerTestUtils.daemonInitialization();
+
         TableMetadata vt1Metadata = TableMetadata.builder(KS_NAME, VT1_NAME)
                 .kind(TableMetadata.Kind.VIRTUAL)
                 .addPartitionKeyColumn("pk", UTF8Type.instance)
diff --git 
a/test/unit/org/apache/cassandra/cql3/validation/miscellaneous/SSTablesIteratedTest.java
 
b/test/unit/org/apache/cassandra/cql3/validation/miscellaneous/SSTablesIteratedTest.java
index 83d12f507e..3c7ce37d04 100644
--- 
a/test/unit/org/apache/cassandra/cql3/validation/miscellaneous/SSTablesIteratedTest.java
+++ 
b/test/unit/org/apache/cassandra/cql3/validation/miscellaneous/SSTablesIteratedTest.java
@@ -79,7 +79,7 @@ public class SSTablesIteratedTest extends CQLTester
     }
 
     @Override
-    protected UntypedResultSet execute(String query, Object... values) throws 
Throwable
+    protected UntypedResultSet execute(String query, Object... values)
     {
         return executeFormattedQuery(formatQuery(KEYSPACE_PER_TEST, query), 
values);
     }
diff --git 
a/test/unit/org/apache/cassandra/cql3/validation/operations/InsertUpdateIfConditionCollectionsTest.java
 
b/test/unit/org/apache/cassandra/cql3/validation/operations/InsertUpdateIfConditionCollectionsTest.java
index c0a5139b5c..fe4073c19c 100644
--- 
a/test/unit/org/apache/cassandra/cql3/validation/operations/InsertUpdateIfConditionCollectionsTest.java
+++ 
b/test/unit/org/apache/cassandra/cql3/validation/operations/InsertUpdateIfConditionCollectionsTest.java
@@ -29,6 +29,7 @@ import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
+import org.apache.cassandra.ServerTestUtils;
 import org.apache.cassandra.cql3.CQLTester;
 import org.apache.cassandra.exceptions.InvalidRequestException;
 import org.apache.cassandra.exceptions.SyntaxException;
@@ -51,6 +52,8 @@ public class InsertUpdateIfConditionCollectionsTest extends 
CQLTester
     @Parameterized.Parameters(name = "{index}: clusterMinVersion={0}")
     public static Collection<Object[]> data()
     {
+        ServerTestUtils.daemonInitialization();
+
         return InsertUpdateIfConditionTest.data();
     }
 
diff --git 
a/test/unit/org/apache/cassandra/cql3/validation/operations/InsertUpdateIfConditionStaticsTest.java
 
b/test/unit/org/apache/cassandra/cql3/validation/operations/InsertUpdateIfConditionStaticsTest.java
index b9c9ce33cb..3270fc157a 100644
--- 
a/test/unit/org/apache/cassandra/cql3/validation/operations/InsertUpdateIfConditionStaticsTest.java
+++ 
b/test/unit/org/apache/cassandra/cql3/validation/operations/InsertUpdateIfConditionStaticsTest.java
@@ -27,6 +27,7 @@ import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
+import org.apache.cassandra.ServerTestUtils;
 import org.apache.cassandra.cql3.CQLTester;
 
 /* InsertUpdateIfConditionCollectionsTest class has been split into multiple 
ones because of timeout issues (CASSANDRA-16670)
@@ -47,6 +48,7 @@ public class InsertUpdateIfConditionStaticsTest extends 
CQLTester
     @Parameterized.Parameters(name = "{index}: clusterMinVersion={0}")
     public static Collection<Object[]> data()
     {
+        ServerTestUtils.daemonInitialization();
         return InsertUpdateIfConditionTest.data();
     }
 
diff --git 
a/test/unit/org/apache/cassandra/cql3/validation/operations/InsertUpdateIfConditionTest.java
 
b/test/unit/org/apache/cassandra/cql3/validation/operations/InsertUpdateIfConditionTest.java
index 8429ec0b3d..c617544684 100644
--- 
a/test/unit/org/apache/cassandra/cql3/validation/operations/InsertUpdateIfConditionTest.java
+++ 
b/test/unit/org/apache/cassandra/cql3/validation/operations/InsertUpdateIfConditionTest.java
@@ -28,6 +28,7 @@ import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
+import org.apache.cassandra.ServerTestUtils;
 import org.apache.cassandra.Util;
 import org.apache.cassandra.cql3.CQLTester;
 import org.apache.cassandra.cql3.Duration;
@@ -61,6 +62,7 @@ public class InsertUpdateIfConditionTest extends CQLTester
     @Parameterized.Parameters(name = "{index}: clusterMinVersion={0}")
     public static Collection<Object[]> data()
     {
+        ServerTestUtils.daemonInitialization();
         return Arrays.asList(new Object[]{ "3.0", (Runnable) () -> {
                                  Pair<Boolean, CassandraVersion> res = 
Gossiper.instance.isUpgradingFromVersionLowerThanC17653(new 
CassandraVersion("3.11"));
                                  assertTrue(debugMsgCASSANDRA17653(res), 
res.left);
diff --git 
a/test/unit/org/apache/cassandra/cql3/validation/operations/SelectLimitTest.java
 
b/test/unit/org/apache/cassandra/cql3/validation/operations/SelectLimitTest.java
index 2e419e1f1b..347bc1d445 100644
--- 
a/test/unit/org/apache/cassandra/cql3/validation/operations/SelectLimitTest.java
+++ 
b/test/unit/org/apache/cassandra/cql3/validation/operations/SelectLimitTest.java
@@ -23,6 +23,7 @@ package org.apache.cassandra.cql3.validation.operations;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
+import org.apache.cassandra.ServerTestUtils;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.cql3.CQLTester;
 import org.apache.cassandra.dht.ByteOrderedPartitioner;
@@ -34,6 +35,8 @@ public class SelectLimitTest extends CQLTester
     @BeforeClass
     public static void setUpClass()
     {
+        ServerTestUtils.daemonInitialization();
+
         
StorageService.instance.setPartitionerUnsafe(ByteOrderedPartitioner.instance);
         
DatabaseDescriptor.setPartitionerUnsafe(ByteOrderedPartitioner.instance);
 
diff --git a/test/unit/org/apache/cassandra/db/CorruptPrimaryIndexTest.java 
b/test/unit/org/apache/cassandra/db/CorruptPrimaryIndexTest.java
new file mode 100644
index 0000000000..c46e96ceec
--- /dev/null
+++ b/test/unit/org/apache/cassandra/db/CorruptPrimaryIndexTest.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db;
+
+import org.junit.Test;
+
+import org.apache.cassandra.cql3.CQLTester;
+import org.apache.cassandra.cql3.UntypedResultSet;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+public class CorruptPrimaryIndexTest extends CQLTester.InMemory
+{
+    @Test
+    public void bigPrimaryIndexDoesNotDetectDiskCorruption()
+    {
+        // Set listener early, before the file is opened; mmap access can not 
be listened to, so need to observe the open, which happens on flush
+        fs.onPostRead(isCurrentTableIndexFile(keyspace()), (path, channel, 
position, dst, read) -> {
+            // Reading the Primary index for the test!
+            // format
+            // 2 bytes: length of bytes for PK
+            // 4 bytes: pk as an int32
+            // variable bytes (see 
org.apache.cassandra.io.sstable.format.big.RowIndexEntry.IndexSerializer.deserialize(org.apache.cassandra.io.util.FileDataInput))
+            assertThat(position).describedAs("Unexpected access, should start 
read from start of file").isEqualTo(0);
+
+            // simulate bit rot by having 1 byte change... but make sure it's 
the pk!
+            dst.put(2, Byte.MAX_VALUE);
+        });
+
+        createTable("CREATE TABLE %s (id int PRIMARY KEY, value int)");
+        execute("INSERT INTO %s (id, value) VALUES (?, ?)", 0, 0);
+        flush();
+
+        UntypedResultSet rs = execute("SELECT * FROM %s WHERE id=?", 0);
+        // this assert check is here to get the test to be green... if the 
format is fixed and this data loss is not
+        // happening anymore, then this check should be updated
+        assertThatThrownBy(() -> assertRows(rs, row(0, 0))).hasMessage("Got 
less rows than expected. Expected 1 but got 0");
+    }
+}
diff --git a/test/unit/org/apache/cassandra/db/KeyspaceTest.java 
b/test/unit/org/apache/cassandra/db/KeyspaceTest.java
index a2fa9e9528..dab306de7e 100644
--- a/test/unit/org/apache/cassandra/db/KeyspaceTest.java
+++ b/test/unit/org/apache/cassandra/db/KeyspaceTest.java
@@ -61,7 +61,7 @@ public class KeyspaceTest extends CQLTester
     }
 
     @Override
-    protected UntypedResultSet execute(String query, Object... values) throws 
Throwable
+    protected UntypedResultSet execute(String query, Object... values)
     {
         return executeFormattedQuery(formatQuery(KEYSPACE_PER_TEST, query), 
values);
     }
diff --git 
a/test/unit/org/apache/cassandra/db/commitlog/CommitLogSegmentManagerCDCTest.java
 
b/test/unit/org/apache/cassandra/db/commitlog/CommitLogSegmentManagerCDCTest.java
index 3789b51714..5f6f235ece 100644
--- 
a/test/unit/org/apache/cassandra/db/commitlog/CommitLogSegmentManagerCDCTest.java
+++ 
b/test/unit/org/apache/cassandra/db/commitlog/CommitLogSegmentManagerCDCTest.java
@@ -27,6 +27,7 @@ import java.util.*;
 import java.util.concurrent.TimeUnit;
 
 import 
com.google.monitoring.runtime.instrumentation.common.util.concurrent.Uninterruptibles;
+import org.apache.cassandra.ServerTestUtils;
 import org.apache.cassandra.io.util.File;
 import org.apache.cassandra.io.util.FileReader;
 import org.junit.Assert;
@@ -51,6 +52,8 @@ public class CommitLogSegmentManagerCDCTest extends CQLTester
     @BeforeClass
     public static void setUpClass()
     {
+        ServerTestUtils.daemonInitialization();
+
         DatabaseDescriptor.setCDCEnabled(true);
         DatabaseDescriptor.setCDCTotalSpaceInMiB(1024);
         CQLTester.setUpClass();
diff --git 
a/test/unit/org/apache/cassandra/db/memtable/MemtableSizeTestBase.java 
b/test/unit/org/apache/cassandra/db/memtable/MemtableSizeTestBase.java
index d817dbf939..77605279b9 100644
--- a/test/unit/org/apache/cassandra/db/memtable/MemtableSizeTestBase.java
+++ b/test/unit/org/apache/cassandra/db/memtable/MemtableSizeTestBase.java
@@ -30,6 +30,7 @@ import org.junit.runners.Parameterized;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.cassandra.ServerTestUtils;
 import org.apache.cassandra.Util;
 import org.apache.cassandra.config.Config;
 import org.apache.cassandra.config.DatabaseDescriptor;
@@ -76,6 +77,7 @@ public abstract class MemtableSizeTestBase extends CQLTester
 
     public static void setup(Config.MemtableAllocationType allocationType)
     {
+        ServerTestUtils.daemonInitialization();
         try
         {
             Field confField = 
DatabaseDescriptor.class.getDeclaredField("conf");
diff --git 
a/test/unit/org/apache/cassandra/db/virtual/CredentialsCacheKeysTableTest.java 
b/test/unit/org/apache/cassandra/db/virtual/CredentialsCacheKeysTableTest.java
index 45132a962e..7aa1ec4fc6 100644
--- 
a/test/unit/org/apache/cassandra/db/virtual/CredentialsCacheKeysTableTest.java
+++ 
b/test/unit/org/apache/cassandra/db/virtual/CredentialsCacheKeysTableTest.java
@@ -26,6 +26,7 @@ import org.junit.Test;
 
 import com.datastax.driver.core.EndPoint;
 import com.datastax.driver.core.PlainTextAuthProvider;
+import org.apache.cassandra.ServerTestUtils;
 import org.apache.cassandra.auth.AuthTestUtils;
 import org.apache.cassandra.auth.AuthenticatedUser;
 import org.apache.cassandra.auth.IAuthenticator;
@@ -48,6 +49,8 @@ public class CredentialsCacheKeysTableTest extends CQLTester
     @BeforeClass
     public static void setUpClass()
     {
+        ServerTestUtils.daemonInitialization();
+
         // high value is used for convenient debugging
         DatabaseDescriptor.setCredentialsValidity(20_000);
 
diff --git 
a/test/unit/org/apache/cassandra/db/virtual/NetworkPermissionsCacheKeysTableTest.java
 
b/test/unit/org/apache/cassandra/db/virtual/NetworkPermissionsCacheKeysTableTest.java
index f944884098..e935bbb214 100644
--- 
a/test/unit/org/apache/cassandra/db/virtual/NetworkPermissionsCacheKeysTableTest.java
+++ 
b/test/unit/org/apache/cassandra/db/virtual/NetworkPermissionsCacheKeysTableTest.java
@@ -24,6 +24,7 @@ import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
+import org.apache.cassandra.ServerTestUtils;
 import org.apache.cassandra.auth.AuthTestUtils;
 import org.apache.cassandra.auth.AuthenticatedUser;
 import org.apache.cassandra.auth.DCPermissions;
@@ -46,6 +47,8 @@ public class NetworkPermissionsCacheKeysTableTest extends 
CQLTester
     @BeforeClass
     public static void setUpClass()
     {
+        ServerTestUtils.daemonInitialization();
+
         // high value is used for convenient debugging
         DatabaseDescriptor.setPermissionsValidity(20_000);
 
diff --git 
a/test/unit/org/apache/cassandra/db/virtual/PermissionsCacheKeysTableTest.java 
b/test/unit/org/apache/cassandra/db/virtual/PermissionsCacheKeysTableTest.java
index 2171b2d035..429517521f 100644
--- 
a/test/unit/org/apache/cassandra/db/virtual/PermissionsCacheKeysTableTest.java
+++ 
b/test/unit/org/apache/cassandra/db/virtual/PermissionsCacheKeysTableTest.java
@@ -28,6 +28,7 @@ import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
+import org.apache.cassandra.ServerTestUtils;
 import org.apache.cassandra.auth.AuthTestUtils;
 import org.apache.cassandra.auth.AuthenticatedUser;
 import org.apache.cassandra.auth.DataResource;
@@ -52,6 +53,7 @@ public class PermissionsCacheKeysTableTest extends CQLTester
     @BeforeClass
     public static void setUpClass()
     {
+        ServerTestUtils.daemonInitialization();
         // high value is used for convenient debugging
         DatabaseDescriptor.setPermissionsValidity(20_000);
 
diff --git 
a/test/unit/org/apache/cassandra/db/virtual/RolesCacheKeysTableTest.java 
b/test/unit/org/apache/cassandra/db/virtual/RolesCacheKeysTableTest.java
index 40c3037005..de14aefd63 100644
--- a/test/unit/org/apache/cassandra/db/virtual/RolesCacheKeysTableTest.java
+++ b/test/unit/org/apache/cassandra/db/virtual/RolesCacheKeysTableTest.java
@@ -24,6 +24,7 @@ import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
+import org.apache.cassandra.ServerTestUtils;
 import org.apache.cassandra.auth.AuthTestUtils;
 import org.apache.cassandra.auth.AuthenticatedUser;
 import org.apache.cassandra.auth.IRoleManager;
@@ -46,6 +47,7 @@ public class RolesCacheKeysTableTest extends CQLTester
     @BeforeClass
     public static void setUpClass()
     {
+        ServerTestUtils.daemonInitialization();
         // high value is used for convenient debugging
         DatabaseDescriptor.setRolesValidity(20_000);
 
diff --git 
a/test/unit/org/apache/cassandra/io/filesystem/ForwardingFileChannel.java 
b/test/unit/org/apache/cassandra/io/filesystem/ForwardingFileChannel.java
new file mode 100644
index 0000000000..4fecd24d38
--- /dev/null
+++ b/test/unit/org/apache/cassandra/io/filesystem/ForwardingFileChannel.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.io.filesystem;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.MappedByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.channels.FileLock;
+import java.nio.channels.ReadableByteChannel;
+import java.nio.channels.WritableByteChannel;
+
+public class ForwardingFileChannel extends FileChannel
+{
+    protected final FileChannel delegate;
+
+    public ForwardingFileChannel(FileChannel delegate)
+    {
+        this.delegate = delegate;
+    }
+
+    protected FileChannel delegate()
+    {
+        return delegate;
+    }
+
+    @Override
+    public int read(ByteBuffer dst) throws IOException
+    {
+        return delegate().read(dst);
+    }
+
+    @Override
+    public long read(ByteBuffer[] dsts, int offset, int length) throws 
IOException
+    {
+        return delegate().read(dsts, offset, length);
+    }
+
+    @Override
+    public int write(ByteBuffer src) throws IOException
+    {
+        return delegate().write(src);
+    }
+
+    @Override
+    public long write(ByteBuffer[] srcs, int offset, int length) throws 
IOException
+    {
+        return delegate().write(srcs, offset, length);
+    }
+
+    @Override
+    public long position() throws IOException
+    {
+        return delegate().position();
+    }
+
+    @Override
+    public FileChannel position(long newPosition) throws IOException
+    {
+        return delegate().position(newPosition);
+    }
+
+    @Override
+    public long size() throws IOException
+    {
+        return delegate().size();
+    }
+
+    @Override
+    public FileChannel truncate(long size) throws IOException
+    {
+        return delegate().truncate(size);
+    }
+
+    @Override
+    public void force(boolean metaData) throws IOException
+    {
+        delegate().force(metaData);
+    }
+
+    @Override
+    public long transferTo(long position, long count, WritableByteChannel 
target) throws IOException
+    {
+        return delegate().transferTo(position, count, target);
+    }
+
+    @Override
+    public long transferFrom(ReadableByteChannel src, long position, long 
count) throws IOException
+    {
+        return delegate().transferFrom(src, position, count);
+    }
+
+    @Override
+    public int read(ByteBuffer dst, long position) throws IOException
+    {
+        return delegate().read(dst, position);
+    }
+
+    @Override
+    public int write(ByteBuffer src, long position) throws IOException
+    {
+        return delegate().write(src, position);
+    }
+
+    @Override
+    public MappedByteBuffer map(MapMode mode, long position, long size) throws 
IOException
+    {
+        return delegate().map(mode, position, size);
+    }
+
+    @Override
+    public FileLock lock(long position, long size, boolean shared) throws 
IOException
+    {
+        return delegate().lock(position, size, shared);
+    }
+
+    @Override
+    public FileLock tryLock(long position, long size, boolean shared) throws 
IOException
+    {
+        return delegate().tryLock(position, size, shared);
+    }
+
+    @Override
+    protected void implCloseChannel() throws IOException
+    {
+        // .close(), and .isOpen() are "final", so need to leverage 
implCloseChannel
+        // to close the underline channel
+        delegate().close();
+    }
+}
diff --git 
a/test/unit/org/apache/cassandra/io/filesystem/ForwardingFileSystem.java 
b/test/unit/org/apache/cassandra/io/filesystem/ForwardingFileSystem.java
new file mode 100644
index 0000000000..39c682ca7f
--- /dev/null
+++ b/test/unit/org/apache/cassandra/io/filesystem/ForwardingFileSystem.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.io.filesystem;
+
+import java.io.IOException;
+import java.nio.file.FileStore;
+import java.nio.file.FileSystem;
+import java.nio.file.Path;
+import java.nio.file.PathMatcher;
+import java.nio.file.WatchService;
+import java.nio.file.attribute.UserPrincipalLookupService;
+import java.nio.file.spi.FileSystemProvider;
+import java.util.Set;
+
+import com.google.common.collect.Iterables;
+
+public class ForwardingFileSystem extends FileSystem
+{
+    protected final FileSystem delegate;
+
+    public ForwardingFileSystem(FileSystem delegate)
+    {
+        this.delegate = delegate;
+    }
+
+    protected FileSystem delegate()
+    {
+        return delegate;
+    }
+
+    protected Path wrap(Path p)
+    {
+        return p;
+    }
+
+    protected Path unwrap(Path p)
+    {
+        return p;
+    }
+
+    @Override
+    public FileSystemProvider provider()
+    {
+        return delegate().provider();
+    }
+
+    @Override
+    public void close() throws IOException
+    {
+        delegate().close();
+    }
+
+    @Override
+    public boolean isOpen()
+    {
+        return delegate().isOpen();
+    }
+
+    @Override
+    public boolean isReadOnly()
+    {
+        return delegate().isReadOnly();
+    }
+
+    @Override
+    public String getSeparator()
+    {
+        return delegate().getSeparator();
+    }
+
+    @Override
+    public Iterable<Path> getRootDirectories()
+    {
+        return Iterables.transform(delegate().getRootDirectories(), 
this::wrap);
+    }
+
+    @Override
+    public Iterable<FileStore> getFileStores()
+    {
+        return delegate().getFileStores();
+    }
+
+    @Override
+    public Set<String> supportedFileAttributeViews()
+    {
+        return delegate().supportedFileAttributeViews();
+    }
+
+    @Override
+    public Path getPath(String first, String... more)
+    {
+        return wrap(delegate().getPath(first, more));
+    }
+
+    @Override
+    public PathMatcher getPathMatcher(String syntaxAndPattern)
+    {
+        PathMatcher matcher = delegate().getPathMatcher(syntaxAndPattern);
+        return path -> matcher.matches(unwrap(path));
+    }
+
+    @Override
+    public UserPrincipalLookupService getUserPrincipalLookupService()
+    {
+        return delegate().getUserPrincipalLookupService();
+    }
+
+    @Override
+    public WatchService newWatchService() throws IOException
+    {
+        return delegate().newWatchService();
+    }
+}
diff --git 
a/test/unit/org/apache/cassandra/io/filesystem/ForwardingFileSystemProvider.java
 
b/test/unit/org/apache/cassandra/io/filesystem/ForwardingFileSystemProvider.java
new file mode 100644
index 0000000000..fd1f6d90b2
--- /dev/null
+++ 
b/test/unit/org/apache/cassandra/io/filesystem/ForwardingFileSystemProvider.java
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.io.filesystem;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.URI;
+import java.nio.channels.AsynchronousFileChannel;
+import java.nio.channels.FileChannel;
+import java.nio.channels.SeekableByteChannel;
+import java.nio.file.AccessMode;
+import java.nio.file.CopyOption;
+import java.nio.file.DirectoryStream;
+import java.nio.file.FileStore;
+import java.nio.file.FileSystem;
+import java.nio.file.LinkOption;
+import java.nio.file.OpenOption;
+import java.nio.file.Path;
+import java.nio.file.attribute.BasicFileAttributes;
+import java.nio.file.attribute.FileAttribute;
+import java.nio.file.attribute.FileAttributeView;
+import java.nio.file.spi.FileSystemProvider;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+
+import com.google.common.collect.Iterators;
+
+public class ForwardingFileSystemProvider extends FileSystemProvider
+{
+    protected final FileSystemProvider delegate;
+
+    public ForwardingFileSystemProvider(FileSystemProvider delegate)
+    {
+        this.delegate = delegate;
+    }
+
+    protected FileSystemProvider delegate()
+    {
+        return delegate;
+    }
+
+    protected Path wrap(Path p)
+    {
+        return p;
+    }
+
+    protected Path unwrap(Path p)
+    {
+        return p;
+    }
+
+    @Override
+    public String getScheme()
+    {
+        return delegate().getScheme();
+    }
+
+    @Override
+    public FileSystem newFileSystem(URI uri, Map<String, ?> env) throws 
IOException
+    {
+        return delegate().newFileSystem(uri, env);
+    }
+
+    @Override
+    public FileSystem getFileSystem(URI uri)
+    {
+        return delegate().getFileSystem(uri);
+    }
+
+    @Override
+    public Path getPath(URI uri)
+    {
+        return wrap(delegate().getPath(uri));
+    }
+
+    @Override
+    public SeekableByteChannel newByteChannel(Path path, Set<? extends 
OpenOption> options, FileAttribute<?>... attrs) throws IOException
+    {
+        return delegate().newByteChannel(unwrap(path), options, attrs);
+    }
+
+    @Override
+    public DirectoryStream<Path> newDirectoryStream(Path dir, 
DirectoryStream.Filter<? super Path> filter) throws IOException
+    {
+        DirectoryStream<Path> stream = 
delegate().newDirectoryStream(unwrap(dir), filter);
+        return new DirectoryStream<Path>()
+        {
+            @Override
+            public Iterator<Path> iterator()
+            {
+                return Iterators.transform(stream.iterator(), 
ForwardingFileSystemProvider.this::wrap);
+            }
+
+            @Override
+            public void close() throws IOException
+            {
+                stream.close();
+            }
+        };
+    }
+
+    @Override
+    public void createDirectory(Path dir, FileAttribute<?>... attrs) throws 
IOException
+    {
+        delegate().createDirectory(unwrap(dir), attrs);
+    }
+
+    @Override
+    public void delete(Path path) throws IOException
+    {
+        delegate().delete(unwrap(path));
+    }
+
+    @Override
+    public void copy(Path source, Path target, CopyOption... options) throws 
IOException
+    {
+        delegate().copy(unwrap(source), unwrap(target), options);
+    }
+
+    @Override
+    public void move(Path source, Path target, CopyOption... options) throws 
IOException
+    {
+        delegate().move(unwrap(source), unwrap(target), options);
+    }
+
+    @Override
+    public boolean isSameFile(Path path, Path path2) throws IOException
+    {
+        return delegate().isSameFile(unwrap(path), unwrap(path2));
+    }
+
+    @Override
+    public boolean isHidden(Path path) throws IOException
+    {
+        return delegate().isHidden(unwrap(path));
+    }
+
+    @Override
+    public FileStore getFileStore(Path path) throws IOException
+    {
+        return delegate().getFileStore(unwrap(path));
+    }
+
+    @Override
+    public void checkAccess(Path path, AccessMode... modes) throws IOException
+    {
+        delegate().checkAccess(unwrap(path), modes);
+    }
+
+    @Override
+    public <V extends FileAttributeView> V getFileAttributeView(Path path, 
Class<V> type, LinkOption... options)
+    {
+        return delegate().getFileAttributeView(unwrap(path), type, options);
+    }
+
+    @Override
+    public <A extends BasicFileAttributes> A readAttributes(Path path, 
Class<A> type, LinkOption... options) throws IOException
+    {
+        return delegate().readAttributes(unwrap(path), type, options);
+    }
+
+    @Override
+    public Map<String, Object> readAttributes(Path path, String attributes, 
LinkOption... options) throws IOException
+    {
+        return delegate().readAttributes(unwrap(path), attributes, options);
+    }
+
+    @Override
+    public void setAttribute(Path path, String attribute, Object value, 
LinkOption... options) throws IOException
+    {
+        delegate().setAttribute(unwrap(path), attribute, value, options);
+    }
+
+    @Override
+    public FileSystem newFileSystem(Path path, Map<String, ?> env) throws 
IOException
+    {
+        return delegate().newFileSystem(unwrap(path), env);
+    }
+
+    @Override
+    public InputStream newInputStream(Path path, OpenOption... options) throws 
IOException
+    {
+        return delegate().newInputStream(unwrap(path), options);
+    }
+
+    @Override
+    public OutputStream newOutputStream(Path path, OpenOption... options) 
throws IOException
+    {
+        return delegate().newOutputStream(unwrap(path), options);
+    }
+
+    @Override
+    public FileChannel newFileChannel(Path path, Set<? extends OpenOption> 
options, FileAttribute<?>... attrs) throws IOException
+    {
+        return delegate().newFileChannel(unwrap(path), options, attrs);
+    }
+
+    @Override
+    public AsynchronousFileChannel newAsynchronousFileChannel(Path path, Set<? 
extends OpenOption> options, ExecutorService executor, FileAttribute<?>... 
attrs) throws IOException
+    {
+        return delegate().newAsynchronousFileChannel(unwrap(path), options, 
executor, attrs);
+    }
+
+    @Override
+    public void createSymbolicLink(Path link, Path target, FileAttribute<?>... 
attrs) throws IOException
+    {
+        delegate().createSymbolicLink(unwrap(link), target, attrs);
+    }
+
+    @Override
+    public void createLink(Path link, Path existing) throws IOException
+    {
+        delegate().createLink(unwrap(link), unwrap(existing));
+    }
+
+    @Override
+    public boolean deleteIfExists(Path path) throws IOException
+    {
+        return delegate().deleteIfExists(unwrap(path));
+    }
+
+    @Override
+    public Path readSymbolicLink(Path link) throws IOException
+    {
+        return wrap(delegate().readSymbolicLink(unwrap(link)));
+    }
+}
diff --git a/test/unit/org/apache/cassandra/io/filesystem/ForwardingPath.java 
b/test/unit/org/apache/cassandra/io/filesystem/ForwardingPath.java
new file mode 100644
index 0000000000..7dd0037325
--- /dev/null
+++ b/test/unit/org/apache/cassandra/io/filesystem/ForwardingPath.java
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.io.filesystem;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+import java.nio.file.FileSystem;
+import java.nio.file.LinkOption;
+import java.nio.file.Path;
+import java.nio.file.WatchEvent;
+import java.nio.file.WatchKey;
+import java.nio.file.WatchService;
+import java.util.Iterator;
+
+import com.google.common.collect.Iterators;
+
+public class ForwardingPath implements Path
+{
+    protected final Path delegate;
+
+    public ForwardingPath(Path delegate)
+    {
+        this.delegate = delegate;
+    }
+
+    protected Path delegate()
+    {
+        return delegate;
+    }
+
+    protected Path wrap(Path a)
+    {
+        return a;
+    }
+
+    protected Path unwrap(Path p)
+    {
+        return p;
+    }
+
+    @Override
+    public FileSystem getFileSystem()
+    {
+        return delegate().getFileSystem();
+    }
+
+    @Override
+    public boolean isAbsolute()
+    {
+        return delegate().isAbsolute();
+    }
+
+    @Override
+    public Path getRoot()
+    {
+        return wrap(delegate().getRoot());
+    }
+
+    @Override
+    public Path getFileName()
+    {
+        return wrap(delegate().getFileName());
+    }
+
+    @Override
+    public Path getParent()
+    {
+        Path parent = delegate().getParent();
+        if (parent == null)
+            return null;
+        return wrap(parent);
+    }
+
+    @Override
+    public int getNameCount()
+    {
+        return delegate().getNameCount();
+    }
+
+    @Override
+    public Path getName(int index)
+    {
+        return wrap(delegate().getName(index));
+    }
+
+    @Override
+    public Path subpath(int beginIndex, int endIndex)
+    {
+        return wrap(delegate().subpath(beginIndex, endIndex));
+    }
+
+    @Override
+    public boolean startsWith(Path other)
+    {
+        return delegate().startsWith(unwrap(other));
+    }
+
+    @Override
+    public boolean startsWith(String other)
+    {
+        return delegate().startsWith(other);
+    }
+
+    @Override
+    public boolean endsWith(Path other)
+    {
+        return delegate().endsWith(unwrap(other));
+    }
+
+    @Override
+    public boolean endsWith(String other)
+    {
+        return delegate().endsWith(other);
+    }
+
+    @Override
+    public Path normalize()
+    {
+        return wrap(delegate().normalize());
+    }
+
+    @Override
+    public Path resolve(Path other)
+    {
+        return wrap(delegate().resolve(unwrap(other)));
+    }
+
+    @Override
+    public Path resolve(String other)
+    {
+        return wrap(delegate().resolve(other));
+    }
+
+    @Override
+    public Path resolveSibling(Path other)
+    {
+        return wrap(delegate().resolveSibling(unwrap(other)));
+    }
+
+    @Override
+    public Path resolveSibling(String other)
+    {
+        return wrap(delegate().resolveSibling(other));
+    }
+
+    @Override
+    public Path relativize(Path other)
+    {
+        return wrap(delegate().relativize(unwrap(other)));
+    }
+
+    @Override
+    public URI toUri()
+    {
+        return delegate().toUri();
+    }
+
+    @Override
+    public Path toAbsolutePath()
+    {
+        return wrap(delegate().toAbsolutePath());
+    }
+
+    @Override
+    public Path toRealPath(LinkOption... options) throws IOException
+    {
+        return wrap(delegate().toRealPath(options));
+    }
+
+    @Override
+    public File toFile()
+    {
+        return delegate().toFile();
+    }
+
+    @Override
+    public WatchKey register(WatchService watcher, WatchEvent.Kind<?>[] 
events, WatchEvent.Modifier... modifiers) throws IOException
+    {
+        return delegate().register(watcher, events, modifiers);
+    }
+
+    @Override
+    public WatchKey register(WatchService watcher, WatchEvent.Kind<?>... 
events) throws IOException
+    {
+        return delegate().register(watcher, events);
+    }
+
+    @Override
+    public Iterator<Path> iterator()
+    {
+        return Iterators.transform(delegate().iterator(), this::wrap);
+    }
+
+    @Override
+    public int compareTo(Path other)
+    {
+        return delegate().compareTo(unwrap(other));
+    }
+
+    @Override
+    public boolean equals(Object obj)
+    {
+        return delegate().equals(obj instanceof Path ? unwrap((Path) obj) : 
obj);
+    }
+
+    @Override
+    public int hashCode()
+    {
+        return delegate().hashCode();
+    }
+
+    @Override
+    public String toString()
+    {
+        return delegate().toString();
+    }
+}
diff --git 
a/test/unit/org/apache/cassandra/io/filesystem/ListenableFileSystem.java 
b/test/unit/org/apache/cassandra/io/filesystem/ListenableFileSystem.java
new file mode 100644
index 0000000000..659d34dfe2
--- /dev/null
+++ b/test/unit/org/apache/cassandra/io/filesystem/ListenableFileSystem.java
@@ -0,0 +1,856 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.io.filesystem;
+
+import java.io.EOFException;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.UncheckedIOException;
+import java.net.URI;
+import java.nio.ByteBuffer;
+import java.nio.MappedByteBuffer;
+import java.nio.channels.AsynchronousFileChannel;
+import java.nio.channels.Channels;
+import java.nio.channels.FileChannel;
+import java.nio.channels.ReadableByteChannel;
+import java.nio.channels.SeekableByteChannel;
+import java.nio.channels.WritableByteChannel;
+import java.nio.file.FileSystem;
+import java.nio.file.FileSystems;
+import java.nio.file.OpenOption;
+import java.nio.file.Path;
+import java.nio.file.StandardOpenOption;
+import java.nio.file.attribute.FileAttribute;
+import java.nio.file.spi.FileSystemProvider;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.memory.MemoryUtil;
+
+public class ListenableFileSystem extends ForwardingFileSystem
+{
+    @FunctionalInterface
+    public interface PathFilter
+    {
+        boolean accept(Path entry) throws IOException;
+    }
+
+    public interface Listener
+    {
+    }
+
+    public interface OnPreOpen extends Listener
+    {
+        void preOpen(Path path, Set<? extends OpenOption> options, 
FileAttribute<?>[] attrs) throws IOException;
+    }
+
+    public interface OnPostOpen extends Listener
+    {
+        void postOpen(Path path, Set<? extends OpenOption> options, 
FileAttribute<?>[] attrs, FileChannel channel) throws IOException;
+    }
+
+    public interface OnPreRead extends Listener
+    {
+        void preRead(Path path, FileChannel channel, long position, ByteBuffer 
dst) throws IOException;
+    }
+
+    public interface OnPostRead extends Listener
+    {
+        void postRead(Path path, FileChannel channel, long position, 
ByteBuffer dst, int read) throws IOException;
+    }
+
+    public interface OnPreTransferTo extends Listener
+    {
+        void preTransferTo(Path path, FileChannel channel, long position, long 
count, WritableByteChannel target) throws IOException;
+    }
+
+    public interface OnPostTransferTo extends Listener
+    {
+        void postTransferTo(Path path, FileChannel channel, long position, 
long count, WritableByteChannel target, long transfered) throws IOException;
+    }
+
+    public interface OnPreTransferFrom extends Listener
+    {
+        void preTransferFrom(Path path, FileChannel channel, 
ReadableByteChannel src, long position, long count) throws IOException;
+    }
+
+    public interface OnPostTransferFrom extends Listener
+    {
+        void postTransferFrom(Path path, FileChannel channel, 
ReadableByteChannel src, long position, long count, long transfered) throws 
IOException;
+    }
+
+    public interface OnPreWrite extends Listener
+    {
+        void preWrite(Path path, FileChannel channel, long position, 
ByteBuffer src) throws IOException;
+    }
+
+    public interface OnPostWrite extends Listener
+    {
+        void postWrite(Path path, FileChannel channel, long position, 
ByteBuffer src, int wrote) throws IOException;
+    }
+
+    public interface OnPrePosition extends Listener
+    {
+        void prePosition(Path path, FileChannel channel, long position, long 
newPosition) throws IOException;
+    }
+
+    public interface OnPostPosition extends Listener
+    {
+        void postPosition(Path path, FileChannel channel, long position, long 
newPosition) throws IOException;
+    }
+
+    public interface OnPreTruncate extends Listener
+    {
+        void preTruncate(Path path, FileChannel channel, long size, long 
targetSize) throws IOException;
+    }
+
+    public interface OnPostTruncate extends Listener
+    {
+        void postTruncate(Path path, FileChannel channel, long size, long 
targetSize, long newSize) throws IOException;
+    }
+
+    public interface OnPreForce extends Listener
+    {
+        void preForce(Path path, FileChannel channel, boolean metaData) throws 
IOException;
+    }
+
+    public interface OnPostForce extends Listener
+    {
+        void postForce(Path path, FileChannel channel, boolean metaData) 
throws IOException;
+    }
+
+    public interface Unsubscribable extends AutoCloseable
+    {
+        @Override
+        void close();
+    }
+
+    private final List<OnPreOpen> onPreOpen = new CopyOnWriteArrayList<>();
+    private final List<OnPostOpen> onPostOpen = new CopyOnWriteArrayList<>();
+    private final List<OnPreTransferTo> onPreTransferTo = new 
CopyOnWriteArrayList<>();
+    private final List<OnPostTransferTo> onPostTransferTo = new 
CopyOnWriteArrayList<>();
+    private final List<OnPreRead> onPreRead = new CopyOnWriteArrayList<>();
+    private final List<OnPostRead> onPostRead = new CopyOnWriteArrayList<>();
+    private final List<OnPreWrite> onPreWrite = new CopyOnWriteArrayList<>();
+    private final List<OnPostWrite> onPostWrite = new CopyOnWriteArrayList<>();
+    private final List<OnPreTransferFrom> onPreTransferFrom = new 
CopyOnWriteArrayList<>();
+    private final List<OnPostTransferFrom> onPostTransferFrom = new 
CopyOnWriteArrayList<>();
+
+    private final List<OnPreForce> onPreForce = new CopyOnWriteArrayList<>();
+    private final List<OnPostForce> onPostForce = new CopyOnWriteArrayList<>();
+    private final List<OnPreTruncate> onPreTruncate = new 
CopyOnWriteArrayList<>();
+    private final List<OnPostTruncate> onPostTruncate = new 
CopyOnWriteArrayList<>();
+    private final List<OnPrePosition> onPrePosition = new 
CopyOnWriteArrayList<>();
+    private final List<OnPostPosition> onPostPosition = new 
CopyOnWriteArrayList<>();
+    private final List<List<? extends Listener>> lists = 
Arrays.asList(onPreOpen, onPostOpen,
+                                                                       
onPreRead, onPostRead,
+                                                                       
onPreTransferTo, onPostTransferTo,
+                                                                       
onPreWrite, onPostWrite,
+                                                                       
onPreTransferFrom, onPostTransferFrom,
+                                                                       
onPreForce, onPostForce,
+                                                                       
onPreTruncate, onPostTruncate,
+                                                                       
onPrePosition, onPostPosition);
+    private final ListenableFileSystemProvider provider;
+
+    public ListenableFileSystem(FileSystem delegate)
+    {
+        super(delegate);
+        this.provider = new ListenableFileSystemProvider(super.provider());
+    }
+
+    public Unsubscribable listen(Listener listener)
+    {
+        List<List<? extends Listener>> matches = new ArrayList<>(1);
+        if (listener instanceof OnPreOpen)
+        {
+            onPreOpen.add((OnPreOpen) listener);
+            matches.add(onPreOpen);
+        }
+        if (listener instanceof OnPostOpen)
+        {
+            onPostOpen.add((OnPostOpen) listener);
+            matches.add(onPostOpen);
+        }
+        if (listener instanceof OnPreRead)
+        {
+            onPreRead.add((OnPreRead) listener);
+            matches.add(onPreRead);
+        }
+        if (listener instanceof OnPostRead)
+        {
+            onPostRead.add((OnPostRead) listener);
+            matches.add(onPostRead);
+        }
+        if (listener instanceof OnPreTransferTo)
+        {
+            onPreTransferTo.add((OnPreTransferTo) listener);
+            matches.add(onPreTransferTo);
+        }
+        if (listener instanceof OnPostTransferTo)
+        {
+            onPostTransferTo.add((OnPostTransferTo) listener);
+            matches.add(onPostTransferTo);
+        }
+        if (listener instanceof OnPreWrite)
+        {
+            onPreWrite.add((OnPreWrite) listener);
+            matches.add(onPreWrite);
+        }
+        if (listener instanceof OnPostWrite)
+        {
+            onPostWrite.add((OnPostWrite) listener);
+            matches.add(onPostWrite);
+        }
+        if (listener instanceof OnPreTransferFrom)
+        {
+            onPreTransferFrom.add((OnPreTransferFrom) listener);
+            matches.add(onPreTransferFrom);
+        }
+        if (listener instanceof OnPostTransferFrom)
+        {
+            onPostTransferFrom.add((OnPostTransferFrom) listener);
+            matches.add(onPostTransferFrom);
+        }
+        if (listener instanceof OnPreForce)
+        {
+            onPreForce.add((OnPreForce) listener);
+            matches.add(onPreForce);
+        }
+        if (listener instanceof OnPostForce)
+        {
+            onPostForce.add((OnPostForce) listener);
+            matches.add(onPostForce);
+        }
+        if (listener instanceof OnPreTruncate)
+        {
+            onPreTruncate.add((OnPreTruncate) listener);
+            matches.add(onPreTruncate);
+        }
+        if (listener instanceof OnPostTruncate)
+        {
+            onPostTruncate.add((OnPostTruncate) listener);
+            matches.add(onPostTruncate);
+        }
+        if (listener instanceof OnPrePosition)
+        {
+            onPrePosition.add((OnPrePosition) listener);
+            matches.add(onPrePosition);
+        }
+        if (listener instanceof OnPostPosition)
+        {
+            onPostPosition.add((OnPostPosition) listener);
+            matches.add(onPostPosition);
+        }
+        if (matches.isEmpty())
+            throw new IllegalArgumentException("Unable to find a listenable 
type for " + listener.getClass());
+        return () -> remove(matches, listener);
+    }
+
+    public Unsubscribable onPreOpen(OnPreOpen callback)
+    {
+        return listen(callback);
+    }
+
+    public Unsubscribable onPreOpen(PathFilter filter, OnPreOpen callback)
+    {
+        return onPreOpen((path, options, attrs) -> {
+            if (filter.accept(path))
+                callback.preOpen(path, options, attrs);
+        });
+    }
+
+    public Unsubscribable onPostOpen(OnPostOpen callback)
+    {
+        return listen(callback);
+    }
+
+    public Unsubscribable onPostOpen(PathFilter filter, OnPostOpen callback)
+    {
+        return onPostOpen((path, options, attrs, channel) -> {
+            if (filter.accept(path))
+                callback.postOpen(path, options, attrs, channel);
+        });
+    }
+
+    public Unsubscribable onPreRead(OnPreRead callback)
+    {
+        return listen(callback);
+    }
+
+    public Unsubscribable onPreRead(PathFilter filter, OnPreRead callback)
+    {
+        return onPreRead((path, channel, position, dst) -> {
+            if (filter.accept(path))
+                callback.preRead(path, channel, position, dst);
+        });
+    }
+
+    public Unsubscribable onPostRead(OnPostRead callback)
+    {
+        return listen(callback);
+    }
+
+    public Unsubscribable onPostRead(PathFilter filter, OnPostRead callback)
+    {
+        return onPostRead((path, channel, position, dst, read) -> {
+            if (filter.accept(path))
+                callback.postRead(path, channel, position, dst, read);
+        });
+    }
+
+    public Unsubscribable onPreTransferTo(OnPreTransferTo callback)
+    {
+        return listen(callback);
+    }
+
+    public Unsubscribable onPreTransferTo(PathFilter filter, OnPreTransferTo 
callback)
+    {
+        return onPreTransferTo((path, channel, position, count, target) -> {
+            if (filter.accept(path))
+                callback.preTransferTo(path, channel, position, count, target);
+        });
+    }
+
+    public Unsubscribable onPostTransferTo(OnPostTransferTo callback)
+    {
+        return listen(callback);
+    }
+
+    public Unsubscribable onPostTransferTo(PathFilter filter, OnPostTransferTo 
callback)
+    {
+        return onPostTransferTo((path, channel, position, count, target, 
transfered) -> {
+            if (filter.accept(path))
+                callback.postTransferTo(path, channel, position, count, 
target, transfered);
+        });
+    }
+
+    public Unsubscribable onPreTransferFrom(OnPreTransferFrom callback)
+    {
+        return listen(callback);
+    }
+
+    public Unsubscribable onPreTransferFrom(PathFilter filter, 
OnPreTransferFrom callback)
+    {
+        return onPreTransferFrom((path, channel, src, position, count) -> {
+            if (filter.accept(path))
+                callback.preTransferFrom(path, channel, src, position, count);
+        });
+    }
+
+    public Unsubscribable onPostTransferFrom(OnPostTransferFrom callback)
+    {
+        return listen(callback);
+    }
+
+    public Unsubscribable onPostTransferFrom(PathFilter filter, 
OnPostTransferFrom callback)
+    {
+        return onPostTransferFrom((path, channel, src, position, count, 
transfered) -> {
+            if (filter.accept(path))
+                callback.postTransferFrom(path, channel, src, position, count, 
transfered);
+        });
+    }
+
+    public Unsubscribable onPreWrite(OnPreWrite callback)
+    {
+        return listen(callback);
+    }
+
+    public Unsubscribable onPreWrite(PathFilter filter, OnPreWrite callback)
+    {
+        return onPreWrite((path, channel, position, src) -> {
+            if (filter.accept(path))
+                callback.preWrite(path, channel, position, src);
+        });
+    }
+
+    public Unsubscribable onPostWrite(OnPostWrite callback)
+    {
+        return listen(callback);
+    }
+
+    public Unsubscribable onPostWrite(PathFilter filter, OnPostWrite callback)
+    {
+        return onPostWrite((path, channel, position, src, wrote) -> {
+            if (filter.accept(path))
+                callback.postWrite(path, channel, position, src, wrote);
+        });
+    }
+
+    public Unsubscribable onPrePosition(OnPrePosition callbackk)
+    {
+        return listen(callbackk);
+    }
+
+    public Unsubscribable onPrePosition(PathFilter filter, OnPrePosition 
callbackk)
+    {
+        return onPrePosition((path, channel, position, newPosition) -> {
+           if (filter.accept(path))
+               callbackk.prePosition(path, channel, position, newPosition);
+        });
+    }
+
+    public Unsubscribable onPostPosition(OnPostPosition callbackk)
+    {
+        return listen(callbackk);
+    }
+
+    public Unsubscribable onPostPosition(PathFilter filter, OnPostPosition 
callbackk)
+    {
+        return onPostPosition((path, channel, position, newPosition) -> {
+            if (filter.accept(path))
+                callbackk.postPosition(path, channel, position, newPosition);
+        });
+    }
+
+    public Unsubscribable onPreTruncate(OnPreTruncate callbackk)
+    {
+        return listen(callbackk);
+    }
+
+    public Unsubscribable onPreTruncate(PathFilter filter, OnPreTruncate 
callbackk)
+    {
+        return onPreTruncate((path, channel, size, targetSize) -> {
+            if (filter.accept(path))
+                callbackk.preTruncate(path, channel, size, targetSize);
+        });
+    }
+
+    public Unsubscribable onPostTruncate(OnPostTruncate callbackk)
+    {
+        return listen(callbackk);
+    }
+
+    public Unsubscribable onPostTruncate(PathFilter filter, OnPostTruncate 
callbackk)
+    {
+        return onPostTruncate((path, channel, size, targetSize, newSize) -> {
+            if (filter.accept(path))
+                callbackk.postTruncate(path, channel, size, targetSize, 
newSize);
+        });
+    }
+
+    public Unsubscribable onPreForce(OnPreForce callbackk)
+    {
+        return listen(callbackk);
+    }
+
+    public Unsubscribable onPreForce(PathFilter filter, OnPreForce callback)
+    {
+        return onPreForce((path, channel, metadata) -> {
+            if (filter.accept(path))
+                callback.preForce(path, channel, metadata);
+        });
+    }
+
+    public Unsubscribable onPostForce(OnPostForce callbackk)
+    {
+        return listen(callbackk);
+    }
+
+    public Unsubscribable onPostForce(PathFilter filter, OnPostForce callback)
+    {
+        return onPostForce((path, channel, metadata) -> {
+            if (filter.accept(path))
+                callback.postForce(path, channel, metadata);
+        });
+    }
+
+    public void remove(Listener listener)
+    {
+        remove(lists, listener);
+    }
+
+    private static void remove(List<List<? extends Listener>> lists, Listener 
listener)
+    {
+        lists.forEach(l -> l.remove(listener));
+    }
+
+    public void clearListeners()
+    {
+        lists.forEach(List::clear);
+    }
+
+    private interface ListenerAction<T>
+    {
+        void accept(T value) throws IOException;
+    }
+
+    private <T> void notifyListeners(List<T> listeners, ListenerAction<T> fn) 
throws IOException
+    {
+        for (T listener : listeners)
+            fn.accept(listener);
+    }
+
+    @Override
+    protected Path wrap(Path p)
+    {
+        return p instanceof ListenablePath ? p : new ListenablePath(p);
+    }
+
+    @Override
+    protected Path unwrap(Path p)
+    {
+        return p instanceof ListenablePath ? ((ListenablePath) p).delegate : p;
+    }
+
+    @Override
+    public ListenableFileSystemProvider provider()
+    {
+        return provider;
+    }
+
+
+    private class ListenableFileSystemProvider extends 
ForwardingFileSystemProvider
+    {
+        ListenableFileSystemProvider(FileSystemProvider delegate)
+        {
+            super(delegate);
+        }
+
+        @Override
+        protected Path wrap(Path a)
+        {
+            return ListenableFileSystem.this.wrap(a);
+        }
+
+        @Override
+        protected Path unwrap(Path p)
+        {
+            return ListenableFileSystem.this.unwrap(p);
+        }
+
+        @Override
+        public OutputStream newOutputStream(Path path, OpenOption... options) 
throws IOException
+        {
+            int len = options.length;
+            Set<OpenOption> opts = new HashSet<>(len + 3);
+            if (len == 0)
+            {
+                opts.add(StandardOpenOption.CREATE);
+                opts.add(StandardOpenOption.TRUNCATE_EXISTING);
+            }
+            else
+            {
+                for (OpenOption opt : options)
+                {
+                    if (opt == StandardOpenOption.READ)
+                        throw new IllegalArgumentException("READ not allowed");
+                    opts.add(opt);
+                }
+            }
+            opts.add(StandardOpenOption.WRITE);
+            return Channels.newOutputStream(newFileChannel(path, opts));
+        }
+
+        @Override
+        public InputStream newInputStream(Path path, OpenOption... options) 
throws IOException
+        {
+            for (OpenOption opt : options)
+            {
+                // All OpenOption values except for APPEND and WRITE are 
allowed
+                if (opt == StandardOpenOption.APPEND ||
+                    opt == StandardOpenOption.WRITE)
+                    throw new UnsupportedOperationException("'" + opt + "' not 
allowed");
+            }
+            Set<OpenOption> opts = new HashSet<>(Arrays.asList(options));
+            return Channels.newInputStream(newFileChannel(path, opts));
+        }
+
+        @Override
+        public SeekableByteChannel newByteChannel(Path path, Set<? extends 
OpenOption> options, FileAttribute<?>... attrs) throws IOException
+        {
+            return newFileChannel(path, options, attrs);
+        }
+
+        @Override
+        public FileChannel newFileChannel(Path path, Set<? extends OpenOption> 
options, FileAttribute<?>... attrs) throws IOException
+        {
+            notifyListeners(onPreOpen, l -> l.preOpen(path, options, attrs));
+            ListenableFileChannel channel = new ListenableFileChannel(path, 
delegate().newFileChannel(unwrap(path), options, attrs));
+            notifyListeners(onPostOpen, l -> l.postOpen(path, options, attrs, 
channel));
+            return channel;
+        }
+
+        @Override
+        public AsynchronousFileChannel newAsynchronousFileChannel(Path path, 
Set<? extends OpenOption> options, ExecutorService executor, 
FileAttribute<?>... attrs) throws IOException
+        {
+            throw new UnsupportedOperationException("TODO");
+        }
+
+        // block the APIs that try to switch FileSystem based off schema
+        @Override
+        public FileSystem newFileSystem(URI uri, Map<String, ?> env) throws 
IOException
+        {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public FileSystem newFileSystem(Path path, Map<String, ?> env) throws 
IOException
+        {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public String getScheme()
+        {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public FileSystem getFileSystem(URI uri)
+        {
+            throw new UnsupportedOperationException();
+        }
+    }
+
+    private class ListenablePath extends ForwardingPath
+    {
+        public ListenablePath(Path delegate)
+        {
+            super(delegate);
+        }
+
+        @Override
+        protected Path wrap(Path a)
+        {
+            return ListenableFileSystem.this.wrap(a);
+        }
+
+        @Override
+        protected Path unwrap(Path p)
+        {
+            return ListenableFileSystem.this.unwrap(p);
+        }
+
+        @Override
+        public FileSystem getFileSystem()
+        {
+            return ListenableFileSystem.this;
+        }
+
+        @Override
+        public File toFile()
+        {
+            if (delegate().getFileSystem() == FileSystems.getDefault())
+                return delegate().toFile();
+            throw new UnsupportedOperationException();
+        }
+    }
+
+    private class ListenableFileChannel extends ForwardingFileChannel
+    {
+        private final AtomicReference<Mapped> mutable = new 
AtomicReference<>();
+        private final Path path;
+
+        ListenableFileChannel(Path path, FileChannel delegate)
+        {
+            super(delegate);
+            this.path = path;
+        }
+
+        @Override
+        public int read(ByteBuffer dst) throws IOException
+        {
+            long position = position();
+            notifyListeners(onPreRead, l -> l.preRead(path, this, position, 
dst));
+            int read = super.read(dst);
+            notifyListeners(onPostRead, l -> l.postRead(path, this, position, 
dst, read));
+            return read;
+        }
+
+        @Override
+        public int read(ByteBuffer dst, long position) throws IOException
+        {
+            notifyListeners(onPreRead, l -> l.preRead(path, this, position, 
dst));
+            int read = super.read(dst, position);
+            notifyListeners(onPostRead, l -> l.postRead(path, this, position, 
dst, read));
+            return read;
+        }
+
+        @Override
+        public int write(ByteBuffer src) throws IOException
+        {
+            long position = position();
+            notifyListeners(onPreWrite, l -> l.preWrite(path, this, position, 
src));
+            int write = super.write(src);
+            notifyListeners(onPostWrite, l -> l.postWrite(path, this, 
position, src, write));
+            return write;
+        }
+
+        @Override
+        public int write(ByteBuffer src, long position) throws IOException
+        {
+            notifyListeners(onPreWrite, l -> l.preWrite(path, this, position, 
src));
+            int write = super.write(src, position);
+            notifyListeners(onPostWrite, l -> l.postWrite(path, this, 
position, src, write));
+            return write;
+        }
+
+        @Override
+        public FileChannel position(long newPosition) throws IOException
+        {
+            long position = position();
+            notifyListeners(onPrePosition, l -> l.prePosition(path, this, 
position, newPosition));
+            super.position(newPosition);
+            notifyListeners(onPostPosition, l -> l.postPosition(path, this, 
position, newPosition));
+            return this;
+        }
+
+        @Override
+        public FileChannel truncate(long size) throws IOException
+        {
+            long currentSize = this.size();
+            notifyListeners(onPreTruncate, l -> l.preTruncate(path, this, 
currentSize, size));
+            super.truncate(size);
+            long latestSize = this.size();
+            notifyListeners(onPostTruncate, l -> l.postTruncate(path, this, 
currentSize, size, latestSize));
+            return this;
+        }
+
+        @Override
+        public void force(boolean metaData) throws IOException
+        {
+            notifyListeners(onPreForce, l -> l.preForce(path, this, metaData));
+            super.force(metaData);
+            notifyListeners(onPostForce, l -> l.postForce(path, this, 
metaData));
+        }
+
+        @Override
+        public long transferTo(long position, long count, WritableByteChannel 
target) throws IOException
+        {
+            notifyListeners(onPreTransferTo, l -> l.preTransferTo(path, this, 
position, count, target));
+            long transfered = super.transferTo(position, count, target);
+            notifyListeners(onPostTransferTo, l -> l.postTransferTo(path, 
this, position, count, target, transfered));
+            return transfered;
+        }
+
+        @Override
+        public long transferFrom(ReadableByteChannel src, long position, long 
count) throws IOException
+        {
+            notifyListeners(onPreTransferFrom, l -> l.preTransferFrom(path, 
this, src, position, count));
+            long transfered = super.transferFrom(src, position, count);
+            notifyListeners(onPostTransferFrom, l -> l.postTransferFrom(path, 
this, src, position, count, transfered));
+            return transfered;
+        }
+
+        @Override
+        public MappedByteBuffer map(MapMode mode, long position, long size) 
throws IOException
+        {
+            // this behavior isn't 100% correct... if you mix access with 
FileChanel and ByteBuffer you will get different
+            // results than with a real mmap solution... This limitation is 
due to ByteBuffer being private, so not able
+            // to create custom BBs to mimc this access...
+            if (mode == MapMode.READ_WRITE && mutable.get() != null)
+                throw new UnsupportedOperationException("map called twice with 
mode READ_WRITE; first was " + mutable.get() + ", now " + new Mapped(mode, 
null, position, Math.toIntExact(size)));
+
+            int isize = Math.toIntExact(size);
+            MappedByteBuffer bb = (MappedByteBuffer) 
ByteBuffer.allocateDirect(isize);
+
+            Mapped mapped = new Mapped(mode, bb, position, isize);
+            if (mode == MapMode.READ_ONLY)
+            {
+                ByteBufferUtil.readFully(this, mapped.bb, position);
+                mapped.bb.flip();
+
+                Runnable forcer = () -> {
+                };
+                MemoryUtil.setAttachment(bb, forcer);
+            }
+            else if (mode == MapMode.READ_WRITE)
+            {
+                if (delegate().size() - position > 0)
+                {
+                    ByteBufferUtil.readFully(this, mapped.bb, position);
+                    mapped.bb.flip();
+                }
+                // with real files the FD gets copied so the close of the 
channel does not block the BB mutation
+                // from flushing...  it's possible to support this use case, 
but kept things simplier for now by
+                // failing if the backing channel was closed.
+                Runnable forcer = () -> {
+                    ByteBuffer local = bb.duplicate();
+                    local.position(0);
+                    long pos = position;
+                    try
+                    {
+                        while (local.hasRemaining())
+                        {
+                            int wrote = write(local, pos);
+                            if (wrote == -1)
+                                throw new EOFException();
+                            pos += wrote;
+                        }
+                    }
+                    catch (IOException e)
+                    {
+                        throw new UncheckedIOException(e);
+                    }
+                };
+                MemoryUtil.setAttachment(bb, forcer);
+                if (!mutable.compareAndSet(null, mapped))
+                    throw new UnsupportedOperationException("map called 
twice");
+            }
+            else
+            {
+                throw new UnsupportedOperationException("Unsupported mode: " + 
mode);
+            }
+            return mapped.bb;
+        }
+
+        @Override
+        protected void implCloseChannel() throws IOException
+        {
+            super.implCloseChannel();
+            mutable.set(null);
+        }
+    }
+
+    private static class Mapped
+    {
+        final FileChannel.MapMode mode;
+        final MappedByteBuffer bb;
+        final long position;
+        final int size;
+
+        private Mapped(FileChannel.MapMode mode, MappedByteBuffer bb, long 
position, int size)
+        {
+            this.mode = mode;
+            this.bb = bb;
+            this.position = position;
+            this.size = size;
+        }
+
+        @Override
+        public String toString()
+        {
+            return "Mapped{" +
+                   "mode=" + mode +
+                   ", position=" + position +
+                   ", size=" + size +
+                   '}';
+        }
+    }
+}
diff --git a/test/unit/org/apache/cassandra/io/util/FileSystems.java 
b/test/unit/org/apache/cassandra/io/util/FileSystems.java
new file mode 100644
index 0000000000..fcd5db7dac
--- /dev/null
+++ b/test/unit/org/apache/cassandra/io/util/FileSystems.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.io.util;
+
+import java.io.IOException;
+import java.nio.file.FileSystem;
+
+import com.google.common.base.StandardSystemProperty;
+import com.google.common.jimfs.Configuration;
+import com.google.common.jimfs.Jimfs;
+
+import org.apache.cassandra.io.filesystem.ForwardingFileSystem;
+import org.apache.cassandra.io.filesystem.ListenableFileSystem;
+
+public class FileSystems
+{
+    public static ListenableFileSystem newGlobalInMemoryFileSystem()
+    {
+        return global(jimfs());
+    }
+
+    public static ListenableFileSystem global()
+    {
+        return global(File.unsafeGetFilesystem());
+    }
+
+    public static ListenableFileSystem global(FileSystem real)
+    {
+        FileSystem current = File.unsafeGetFilesystem();
+        ListenableFileSystem fs = new ListenableFileSystem(new 
ForwardingFileSystem(real)
+        {
+            @Override
+            public void close() throws IOException
+            {
+                try
+                {
+                    super.close();
+                }
+                finally
+                {
+                    File.unsafeSetFilesystem(current);
+                }
+            }
+        });
+        File.unsafeSetFilesystem(fs);
+        return fs;
+    }
+
+    public static FileSystem jimfs()
+    {
+        return Jimfs.newFileSystem(jimfsConfig());
+    }
+
+    public static FileSystem jimfs(String name)
+    {
+        return Jimfs.newFileSystem(name, jimfsConfig());
+    }
+
+    private static Configuration jimfsConfig()
+    {
+        return Configuration.unix().toBuilder()
+                            .setMaxSize(4L << 30).setBlockSize(512)
+                            .build();
+    }
+
+    public static File maybeCreateTmp()
+    {
+        File dir = new File(StandardSystemProperty.JAVA_IO_TMPDIR.value());
+        if (!dir.exists())
+            dir.tryCreateDirectories();
+        return dir;
+    }
+}
diff --git 
a/test/unit/org/apache/cassandra/schema/RemoveWithoutDroppingTest.java 
b/test/unit/org/apache/cassandra/schema/RemoveWithoutDroppingTest.java
index 9c3271bd69..fb9a3e13cf 100644
--- a/test/unit/org/apache/cassandra/schema/RemoveWithoutDroppingTest.java
+++ b/test/unit/org/apache/cassandra/schema/RemoveWithoutDroppingTest.java
@@ -28,6 +28,7 @@ import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
+import org.apache.cassandra.ServerTestUtils;
 import org.apache.cassandra.cql3.CQLTester;
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.io.util.File;
@@ -46,6 +47,8 @@ public class RemoveWithoutDroppingTest
     @BeforeClass
     public static void beforeClass()
     {
+        ServerTestUtils.daemonInitialization();
+
         
System.setProperty(SchemaUpdateHandlerFactoryProvider.SUH_FACTORY_CLASS_PROPERTY,
 TestSchemaUpdateHandlerFactory.class.getName());
         CQLTester.prepareServer();
         Schema.instance.registerListener(listener);
diff --git a/test/unit/org/apache/cassandra/service/PartitionDenylistTest.java 
b/test/unit/org/apache/cassandra/service/PartitionDenylistTest.java
index ed51518719..4a93c67bfe 100644
--- a/test/unit/org/apache/cassandra/service/PartitionDenylistTest.java
+++ b/test/unit/org/apache/cassandra/service/PartitionDenylistTest.java
@@ -23,6 +23,7 @@ import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
+import org.apache.cassandra.ServerTestUtils;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.cql3.CQLTester;
 import org.apache.cassandra.cql3.UntypedResultSet;
@@ -44,6 +45,8 @@ public class PartitionDenylistTest
     @BeforeClass
     public static void init()
     {
+        ServerTestUtils.daemonInitialization();
+
         CQLTester.prepareServer();
 
         KeyspaceMetadata schema = KeyspaceMetadata.create(ks_cql,


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to