This is an automated email from the ASF dual-hosted git repository.
dcapwell pushed a commit to branch cep-15-accord
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/cep-15-accord by this push:
new 62948a8c52 CEP-15: (C*) Enhance in-memory FileSystem to work with mmap
and support tests to add custom logic
62948a8c52 is described below
commit 62948a8c525cee5eb2115e9f2441342b48f4cb89
Author: David Capwell <[email protected]>
AuthorDate: Mon May 8 11:56:37 2023 -0700
CEP-15: (C*) Enhance in-memory FileSystem to work with mmap and support
tests to add custom logic
patch by David Capwell; reviewed by Caleb Rackliffe for CASSANDRA-18485
---
.../org/apache/cassandra/hints/HintsCatalog.java | 2 +
src/java/org/apache/cassandra/io/util/File.java | 19 +
.../org/apache/cassandra/utils/ByteBufferUtil.java | 13 +
src/java/org/apache/cassandra/utils/SyncUtil.java | 8 +
.../cassandra/distributed/impl/Instance.java | 5 +-
.../cassandra/simulator/ClusterSimulation.java | 47 +-
.../apache/cassandra/auth/GrantAndRevokeTest.java | 2 +
.../apache/cassandra/cql3/CDCStatementTest.java | 2 +
test/unit/org/apache/cassandra/cql3/CQLTester.java | 52 +-
.../cassandra/cql3/CustomNowInSecondsTest.java | 3 +
.../apache/cassandra/cql3/GcCompactionTest.java | 2 +-
.../org/apache/cassandra/cql3/KeyCacheCqlTest.java | 2 +-
.../cassandra/cql3/PreparedStatementsTest.java | 2 +
.../cql3/selection/SelectionColumnMappingTest.java | 3 +
.../validation/entities/FrozenCollectionsTest.java | 2 +
.../cql3/validation/entities/JsonTest.java | 2 +
.../cql3/validation/entities/UserTypesTest.java | 3 +
.../cql3/validation/entities/VirtualTableTest.java | 3 +
.../miscellaneous/SSTablesIteratedTest.java | 2 +-
.../InsertUpdateIfConditionCollectionsTest.java | 3 +
.../InsertUpdateIfConditionStaticsTest.java | 2 +
.../operations/InsertUpdateIfConditionTest.java | 2 +
.../validation/operations/SelectLimitTest.java | 3 +
.../cassandra/db/CorruptPrimaryIndexTest.java | 56 ++
.../unit/org/apache/cassandra/db/KeyspaceTest.java | 2 +-
.../commitlog/CommitLogSegmentManagerCDCTest.java | 3 +
.../db/memtable/MemtableSizeTestBase.java | 2 +
.../db/virtual/CredentialsCacheKeysTableTest.java | 3 +
.../NetworkPermissionsCacheKeysTableTest.java | 3 +
.../db/virtual/PermissionsCacheKeysTableTest.java | 2 +
.../db/virtual/RolesCacheKeysTableTest.java | 2 +
.../io/filesystem/ForwardingFileChannel.java | 146 ++++
.../io/filesystem/ForwardingFileSystem.java | 129 ++++
.../filesystem/ForwardingFileSystemProvider.java | 246 ++++++
.../cassandra/io/filesystem/ForwardingPath.java | 234 ++++++
.../io/filesystem/ListenableFileSystem.java | 856 +++++++++++++++++++++
.../org/apache/cassandra/io/util/FileSystems.java | 89 +++
.../schema/RemoveWithoutDroppingTest.java | 3 +
.../cassandra/service/PartitionDenylistTest.java | 3 +
39 files changed, 1931 insertions(+), 32 deletions(-)
diff --git a/src/java/org/apache/cassandra/hints/HintsCatalog.java
b/src/java/org/apache/cassandra/hints/HintsCatalog.java
index ecde896b90..6bc0030924 100644
--- a/src/java/org/apache/cassandra/hints/HintsCatalog.java
+++ b/src/java/org/apache/cassandra/hints/HintsCatalog.java
@@ -168,6 +168,8 @@ final class HintsCatalog
}
else
{
+ if (SyncUtil.SKIP_SYNC)
+ return;
logger.error("Unable to open directory {}",
hintsDirectory.absolutePath());
FileUtils.handleFSErrorAndPropagate(new FSWriteError(new
IOException(String.format("Unable to open hint directory %s",
hintsDirectory.absolutePath())), hintsDirectory.absolutePath()));
}
diff --git a/src/java/org/apache/cassandra/io/util/File.java
b/src/java/org/apache/cassandra/io/util/File.java
index c9bc97c3a9..de415388ed 100644
--- a/src/java/org/apache/cassandra/io/util/File.java
+++ b/src/java/org/apache/cassandra/io/util/File.java
@@ -38,6 +38,7 @@ import java.util.function.Predicate;
import java.util.stream.Stream;
import javax.annotation.Nullable;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.RateLimiter;
import net.openhft.chronicle.core.util.ThrowingFunction;
@@ -125,6 +126,18 @@ public class File implements Comparable<File>
if (!path.isAbsolute() || path.isOpaque()) throw new
IllegalArgumentException();
}
+ /**
+ * Unsafe constructor that allows a File to use a differet {@link
FileSystem} than {@link File#filesystem}.
+ *
+ * The main caller of such a method are cases such as JVM Dtest functions
that need access to the logging framwork
+ * files, which exists on in {@link FileSystems#getDefault()}.
+ */
+ @VisibleForTesting
+ public File(FileSystem fs, String first, String... more)
+ {
+ this.path = fs.getPath(first, more);
+ }
+
/**
* @param path the path to wrap
*/
@@ -777,6 +790,12 @@ public class File implements Comparable<File>
return path;
}
+ @VisibleForTesting
+ public static FileSystem unsafeGetFilesystem()
+ {
+ return filesystem;
+ }
+
public static void unsafeSetFilesystem(FileSystem fs)
{
filesystem = fs;
diff --git a/src/java/org/apache/cassandra/utils/ByteBufferUtil.java
b/src/java/org/apache/cassandra/utils/ByteBufferUtil.java
index 615138e7d2..4b740578a3 100644
--- a/src/java/org/apache/cassandra/utils/ByteBufferUtil.java
+++ b/src/java/org/apache/cassandra/utils/ByteBufferUtil.java
@@ -24,10 +24,12 @@ package org.apache.cassandra.utils;
*/
import java.io.DataInput;
+import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.net.InetAddress;
import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
import java.nio.charset.CharacterCodingException;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
@@ -934,6 +936,17 @@ public class ByteBufferUtil
return true;
}
+ public static void readFully(FileChannel channel, ByteBuffer dst, long
position) throws IOException
+ {
+ while (dst.hasRemaining())
+ {
+ int read = channel.read(dst, position);
+ if (read == -1)
+ throw new EOFException();
+ position += read;
+ }
+ }
+
public static <T> ByteBuffer serialized(IVersionedSerializer<T>
serializer, T value, int version)
{
try (DataOutputBuffer dob = new DataOutputBuffer())
diff --git a/src/java/org/apache/cassandra/utils/SyncUtil.java
b/src/java/org/apache/cassandra/utils/SyncUtil.java
index 6055859531..081a5f1e16 100644
--- a/src/java/org/apache/cassandra/utils/SyncUtil.java
+++ b/src/java/org/apache/cassandra/utils/SyncUtil.java
@@ -31,6 +31,8 @@ import org.apache.cassandra.config.Config;
import com.google.common.base.Preconditions;
import org.apache.cassandra.io.util.File;
+import org.apache.cassandra.utils.memory.MemoryUtil;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -99,6 +101,12 @@ public class SyncUtil
public static MappedByteBuffer force(MappedByteBuffer buf)
{
Preconditions.checkNotNull(buf);
+ Object attachment = MemoryUtil.getAttachment(buf);
+ if (attachment instanceof Runnable)
+ {
+ ((Runnable) attachment).run();
+ return buf;
+ }
if (SKIP_SYNC)
{
Object fd = null;
diff --git
a/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
b/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
index 348bde4666..84cf5cf1fc 100644
--- a/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
+++ b/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
@@ -25,6 +25,7 @@ import java.net.BindException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.file.FileSystem;
+import java.nio.file.FileSystems;
import java.security.Permission;
import java.util.ArrayList;
import java.util.Collections;
@@ -221,10 +222,10 @@ public class Instance extends IsolatedExecutor implements
IInvokableInstance
String suite = System.getProperty("suitename",
"suitename_IS_UNDEFINED");
String clusterId = ClusterIDDefiner.getId();
String instanceId = InstanceIDDefiner.getInstanceId();
- File f = new
File(String.format("build/test/logs/%s/%s/%s/%s/system.log", tag, suite,
clusterId, instanceId));
+ File f = new File(FileSystems.getDefault(),
String.format("build/test/logs/%s/%s/%s/%s/system.log", tag, suite, clusterId,
instanceId));
// when creating a cluster globally in a test class we get the logs
without the suite, try finding those logs:
if (!f.exists())
- f = new File(String.format("build/test/logs/%s/%s/%s/system.log",
tag, clusterId, instanceId));
+ f = new File(FileSystems.getDefault(),
String.format("build/test/logs/%s/%s/%s/system.log", tag, clusterId,
instanceId));
if (!f.exists())
throw new AssertionError("Unable to locate system.log under " +
new File("build/test/logs").absolutePath() + "; make sure ICluster.setup() is
called or extend TestBaseImpl and do not define a static beforeClass function
with @BeforeClass");
return new FileLogAction(f);
diff --git
a/test/simulator/main/org/apache/cassandra/simulator/ClusterSimulation.java
b/test/simulator/main/org/apache/cassandra/simulator/ClusterSimulation.java
index ac3a773453..b0257c62b2 100644
--- a/test/simulator/main/org/apache/cassandra/simulator/ClusterSimulation.java
+++ b/test/simulator/main/org/apache/cassandra/simulator/ClusterSimulation.java
@@ -21,8 +21,8 @@ package org.apache.cassandra.simulator;
import java.io.IOException;
import java.lang.reflect.Field;
import java.lang.reflect.Modifier;
-import java.nio.file.FileSystem;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.EnumMap;
import java.util.List;
import java.util.Map;
@@ -35,8 +35,6 @@ import java.util.function.LongConsumer;
import java.util.function.Predicate;
import java.util.function.Supplier;
-import com.google.common.jimfs.Configuration;
-import com.google.common.jimfs.Jimfs;
import com.google.common.util.concurrent.AsyncFunction;
import com.google.common.util.concurrent.FutureCallback;
@@ -55,6 +53,7 @@ import
org.apache.cassandra.distributed.api.IIsolatedExecutor.SerializableRunnab
import org.apache.cassandra.distributed.impl.DirectStreamingConnectionFactory;
import org.apache.cassandra.distributed.impl.IsolatedExecutor;
import org.apache.cassandra.io.compress.LZ4Compressor;
+import org.apache.cassandra.io.util.FileSystems;
import org.apache.cassandra.service.paxos.BallotGenerator;
import org.apache.cassandra.service.paxos.PaxosPrepare;
import org.apache.cassandra.simulator.RandomSource.Choices;
@@ -62,6 +61,7 @@ import
org.apache.cassandra.simulator.asm.InterceptAsClassTransformer;
import org.apache.cassandra.simulator.asm.NemesisFieldSelectors;
import org.apache.cassandra.simulator.cluster.ClusterActions;
import org.apache.cassandra.simulator.cluster.ClusterActions.TopologyChange;
+import org.apache.cassandra.io.filesystem.ListenableFileSystem;
import org.apache.cassandra.simulator.systems.Failures;
import
org.apache.cassandra.simulator.systems.InterceptedWait.CaptureSites.Capture;
import org.apache.cassandra.simulator.systems.InterceptibleThread;
@@ -605,7 +605,7 @@ public class ClusterSimulation<S extends Simulation>
implements AutoCloseable
public final SimulatedSystems simulated;
public final Cluster cluster;
public final S simulation;
- private final FileSystem jimfs;
+ private final ListenableFileSystem fs;
protected final Map<Integer, List<Closeable>> onUnexpectedShutdown = new
TreeMap<>();
protected final List<Callable<Void>> onShutdown = new
CopyOnWriteArrayList<>();
protected final ThreadLocalRandomCheck threadLocalRandomCheck;
@@ -616,9 +616,7 @@ public class ClusterSimulation<S extends Simulation>
implements AutoCloseable
SimulationFactory<S> factory) throws IOException
{
this.random = random;
- this.jimfs = Jimfs.newFileSystem(Long.toHexString(seed) + 'x' +
uniqueNum, Configuration.unix().toBuilder()
-
.setMaxSize(4L << 30).setBlockSize(512)
-
.build());
+ this.fs = new
ListenableFileSystem(FileSystems.jimfs(Long.toHexString(seed) + 'x' +
uniqueNum));
final SimulatedMessageDelivery delivery;
final SimulatedExecution execution;
@@ -671,23 +669,28 @@ public class ClusterSimulation<S extends Simulation>
implements AutoCloseable
Failures failures = new Failures();
ThreadAllocator threadAllocator = new ThreadAllocator(random,
builder.threadCount, numOfNodes);
+ List<String> allowedDiskAccessModes = Arrays.asList("mmap",
"mmap_index_only", "standard");
+ String disk_access_mode = allowedDiskAccessModes.get(random.uniform(0,
allowedDiskAccessModes.size() - 1));
+ boolean commitlogCompressed = random.decide(.5f);
cluster = snitch.setup(Cluster.build(numOfNodes)
- .withRoot(jimfs.getPath("/cassandra"))
+ .withRoot(fs.getPath("/cassandra"))
.withSharedClasses(sharedClassPredicate)
- .withConfig(config ->
configUpdater.accept(threadAllocator.update(config
- .with(Feature.BLANK_GOSSIP)
- .set("read_request_timeout",
String.format("%dms", NANOSECONDS.toMillis(builder.readTimeoutNanos)))
- .set("write_request_timeout",
String.format("%dms", NANOSECONDS.toMillis(builder.writeTimeoutNanos)))
- .set("cas_contention_timeout",
String.format("%dms", NANOSECONDS.toMillis(builder.contentionTimeoutNanos)))
- .set("request_timeout", String.format("%dms",
NANOSECONDS.toMillis(builder.requestTimeoutNanos)))
- .set("memtable_heap_space", "1MiB")
- .set("memtable_allocation_type",
builder.memoryListener != null ? "unslabbed_heap_buffers_logged" :
"heap_buffers")
- .set("file_cache_size", "16MiB")
- .set("use_deterministic_table_id", true)
- .set("disk_access_mode", "standard")
- .set("failure_detector",
SimulatedFailureDetector.Instance.class.getName())
- .set("commitlog_compression", new
ParameterizedClass(LZ4Compressor.class.getName(), emptyMap()))
- )))
+ .withConfig(config -> {
+ config.with(Feature.BLANK_GOSSIP)
+ .set("read_request_timeout",
String.format("%dms", NANOSECONDS.toMillis(builder.readTimeoutNanos)))
+ .set("write_request_timeout",
String.format("%dms", NANOSECONDS.toMillis(builder.writeTimeoutNanos)))
+ .set("cas_contention_timeout",
String.format("%dms", NANOSECONDS.toMillis(builder.contentionTimeoutNanos)))
+ .set("request_timeout",
String.format("%dms", NANOSECONDS.toMillis(builder.requestTimeoutNanos)))
+ .set("memtable_heap_space", "1MiB")
+ .set("memtable_allocation_type",
builder.memoryListener != null ? "unslabbed_heap_buffers_logged" :
"heap_buffers")
+ .set("file_cache_size", "16MiB")
+ .set("use_deterministic_table_id", true)
+ .set("disk_access_mode", disk_access_mode)
+ .set("failure_detector",
SimulatedFailureDetector.Instance.class.getName());
+ if (commitlogCompressed)
+ config.set("commitlog_compression", new
ParameterizedClass(LZ4Compressor.class.getName(), emptyMap()));
+
configUpdater.accept(threadAllocator.update(config));
+ })
.withInstanceInitializer(new IInstanceInitializer()
{
@Override
diff --git a/test/unit/org/apache/cassandra/auth/GrantAndRevokeTest.java
b/test/unit/org/apache/cassandra/auth/GrantAndRevokeTest.java
index 5c1c2a0298..eeff052ed0 100644
--- a/test/unit/org/apache/cassandra/auth/GrantAndRevokeTest.java
+++ b/test/unit/org/apache/cassandra/auth/GrantAndRevokeTest.java
@@ -22,6 +22,7 @@ import org.junit.BeforeClass;
import org.junit.Test;
import com.datastax.driver.core.ResultSet;
+import org.apache.cassandra.ServerTestUtils;
import org.apache.cassandra.Util;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.cql3.CQLTester;
@@ -36,6 +37,7 @@ public class GrantAndRevokeTest extends CQLTester
@BeforeClass
public static void setUpClass()
{
+ ServerTestUtils.daemonInitialization();
DatabaseDescriptor.setPermissionsValidity(0);
CQLTester.setUpClass();
requireAuthentication();
diff --git a/test/unit/org/apache/cassandra/cql3/CDCStatementTest.java
b/test/unit/org/apache/cassandra/cql3/CDCStatementTest.java
index fb24aa99c2..55865b1573 100644
--- a/test/unit/org/apache/cassandra/cql3/CDCStatementTest.java
+++ b/test/unit/org/apache/cassandra/cql3/CDCStatementTest.java
@@ -22,6 +22,7 @@ import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
+import org.apache.cassandra.ServerTestUtils;
import org.apache.cassandra.config.DatabaseDescriptor;
public class CDCStatementTest extends CQLTester
@@ -29,6 +30,7 @@ public class CDCStatementTest extends CQLTester
@BeforeClass
public static void setUpClass()
{
+ ServerTestUtils.daemonInitialization();
DatabaseDescriptor.setCDCEnabled(true);
CQLTester.setUpClass();
}
diff --git a/test/unit/org/apache/cassandra/cql3/CQLTester.java
b/test/unit/org/apache/cassandra/cql3/CQLTester.java
index 92ffecb75c..c532d8b8f9 100644
--- a/test/unit/org/apache/cassandra/cql3/CQLTester.java
+++ b/test/unit/org/apache/cassandra/cql3/CQLTester.java
@@ -69,13 +69,17 @@ import org.apache.cassandra.auth.AuthTestUtils;
import org.apache.cassandra.auth.IRoleManager;
import org.apache.cassandra.concurrent.ScheduledExecutors;
import org.apache.cassandra.concurrent.Stage;
+import org.apache.cassandra.config.CassandraRelevantProperties;
import org.apache.cassandra.config.DataStorageSpec;
import org.apache.cassandra.config.EncryptionOptions;
import org.apache.cassandra.db.virtual.VirtualKeyspaceRegistry;
import org.apache.cassandra.db.virtual.VirtualSchemaKeyspace;
import org.apache.cassandra.exceptions.InvalidRequestException;
import org.apache.cassandra.index.SecondaryIndexManager;
+import org.apache.cassandra.io.filesystem.ListenableFileSystem;
+import org.apache.cassandra.io.sstable.Descriptor;
import org.apache.cassandra.io.util.File;
+import org.apache.cassandra.io.util.FileSystems;
import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.locator.TokenMetadata;
import org.apache.cassandra.metrics.CassandraMetricsRegistry;
@@ -192,8 +196,6 @@ public abstract class CQLTester
nativeAddr = InetAddress.getLoopbackAddress();
nativePort = getAutomaticallyAllocatedPort(nativeAddr);
-
- ServerTestUtils.daemonInitialization();
}
private List<String> keyspaces = new ArrayList<>();
@@ -319,6 +321,8 @@ public abstract class CQLTester
@BeforeClass
public static void setUpClass()
{
+ ServerTestUtils.daemonInitialization();
+
if (ROW_CACHE_SIZE_IN_MIB > 0)
DatabaseDescriptor.setRowCacheSizeInMiB(ROW_CACHE_SIZE_IN_MIB);
StorageService.instance.setPartitionerUnsafe(Murmur3Partitioner.instance);
@@ -1333,7 +1337,7 @@ public abstract class CQLTester
return QueryProcessor.instance.prepare(formatQuery(query),
ClientState.forInternalCalls());
}
- protected UntypedResultSet execute(String query, Object... values) throws
Throwable
+ protected UntypedResultSet execute(String query, Object... values)
{
return executeFormattedQuery(formatQuery(query), values);
}
@@ -1347,7 +1351,7 @@ public abstract class CQLTester
* Executes the provided query using the {@link
ClientState#forInternalCalls()} as the expected ClientState. Note:
* this means permissions checking will not apply and queries will proceed
regardless of role or guardrails.
*/
- protected UntypedResultSet executeFormattedQuery(String query, Object...
values) throws Throwable
+ protected UntypedResultSet executeFormattedQuery(String query, Object...
values)
{
UntypedResultSet rs;
if (usePrepared)
@@ -2414,4 +2418,44 @@ public abstract class CQLTester
&& Objects.equal(password, u.password);
}
}
+
+ public static abstract class InMemory extends CQLTester
+ {
+ protected static ListenableFileSystem fs = null;
+
+ /**
+ * Used by {@link #cleanupFileSystemListeners()} to know if file
system listeners should be removed at the start
+ * of a test; can disable for cases where listeners are needed cross
mutliple tests.
+ */
+ protected boolean cleanupFileSystemListeners = true;
+
+ @BeforeClass
+ public static void setUpClass()
+ {
+ fs = FileSystems.newGlobalInMemoryFileSystem();
+
CassandraRelevantProperties.IGNORE_MISSING_NATIVE_FILE_HINTS.setBoolean(true);
+ FileSystems.maybeCreateTmp();
+
+ CQLTester.setUpClass();
+ }
+ @Before
+ public void cleanupFileSystemListeners()
+ {
+ if (!cleanupFileSystemListeners)
+ return;
+ fs.clearListeners();
+ }
+
+ protected ListenableFileSystem.PathFilter
isCurrentTableIndexFile(String keyspace)
+ {
+ return path -> {
+ if (!path.getFileName().toString().endsWith("Index.db"))
+ return false;
+ Descriptor desc = Descriptor.fromFile(new File(path));
+ if (!desc.ksname.equals(keyspace) &&
desc.cfname.equals(currentTable()))
+ return false;
+ return true;
+ };
+ }
+ }
}
diff --git a/test/unit/org/apache/cassandra/cql3/CustomNowInSecondsTest.java
b/test/unit/org/apache/cassandra/cql3/CustomNowInSecondsTest.java
index 8768b77ca9..ce9b103c67 100644
--- a/test/unit/org/apache/cassandra/cql3/CustomNowInSecondsTest.java
+++ b/test/unit/org/apache/cassandra/cql3/CustomNowInSecondsTest.java
@@ -25,6 +25,7 @@ import com.google.common.collect.ImmutableList;
import org.junit.BeforeClass;
import org.junit.Test;
+import org.apache.cassandra.ServerTestUtils;
import org.apache.cassandra.cql3.statements.BatchStatement;
import org.apache.cassandra.cql3.statements.ModificationStatement;
import org.apache.cassandra.db.ConsistencyLevel;
@@ -43,6 +44,8 @@ public class CustomNowInSecondsTest extends CQLTester
@BeforeClass
public static void setUpClass()
{
+ ServerTestUtils.daemonInitialization();
+
prepareServer();
requireNetwork();
}
diff --git a/test/unit/org/apache/cassandra/cql3/GcCompactionTest.java
b/test/unit/org/apache/cassandra/cql3/GcCompactionTest.java
index eded145c66..91111d19d0 100644
--- a/test/unit/org/apache/cassandra/cql3/GcCompactionTest.java
+++ b/test/unit/org/apache/cassandra/cql3/GcCompactionTest.java
@@ -55,7 +55,7 @@ public class GcCompactionTest extends CQLTester
}
@Override
- protected UntypedResultSet execute(String query, Object... values) throws
Throwable
+ protected UntypedResultSet execute(String query, Object... values)
{
return executeFormattedQuery(formatQuery(KEYSPACE_PER_TEST, query),
values);
}
diff --git a/test/unit/org/apache/cassandra/cql3/KeyCacheCqlTest.java
b/test/unit/org/apache/cassandra/cql3/KeyCacheCqlTest.java
index d40b001e90..779b39eea3 100644
--- a/test/unit/org/apache/cassandra/cql3/KeyCacheCqlTest.java
+++ b/test/unit/org/apache/cassandra/cql3/KeyCacheCqlTest.java
@@ -120,7 +120,7 @@ public class KeyCacheCqlTest extends CQLTester
}
@Override
- protected UntypedResultSet execute(String query, Object... values) throws
Throwable
+ protected UntypedResultSet execute(String query, Object... values)
{
return executeFormattedQuery(formatQuery(KEYSPACE_PER_TEST, query),
values);
}
diff --git a/test/unit/org/apache/cassandra/cql3/PreparedStatementsTest.java
b/test/unit/org/apache/cassandra/cql3/PreparedStatementsTest.java
index e6a41dd70f..efc4793c9d 100644
--- a/test/unit/org/apache/cassandra/cql3/PreparedStatementsTest.java
+++ b/test/unit/org/apache/cassandra/cql3/PreparedStatementsTest.java
@@ -34,6 +34,7 @@ import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.Session;
import com.datastax.driver.core.exceptions.SyntaxError;
import com.datastax.driver.core.exceptions.WriteTimeoutException;
+import org.apache.cassandra.ServerTestUtils;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.ConsistencyLevel;
import org.apache.cassandra.exceptions.PreparedQueryNotFoundException;
@@ -60,6 +61,7 @@ public class PreparedStatementsTest extends CQLTester
@BeforeClass
public static void setUpClass()
{
+ ServerTestUtils.daemonInitialization();
DatabaseDescriptor.setAccordTransactionsEnabled(true);
CQLTester.setUpClass();
}
diff --git
a/test/unit/org/apache/cassandra/cql3/selection/SelectionColumnMappingTest.java
b/test/unit/org/apache/cassandra/cql3/selection/SelectionColumnMappingTest.java
index 659f34a737..d36c0e2bd2 100644
---
a/test/unit/org/apache/cassandra/cql3/selection/SelectionColumnMappingTest.java
+++
b/test/unit/org/apache/cassandra/cql3/selection/SelectionColumnMappingTest.java
@@ -26,6 +26,7 @@ import java.util.List;
import org.junit.BeforeClass;
import org.junit.Test;
+import org.apache.cassandra.ServerTestUtils;
import org.apache.cassandra.schema.ColumnMetadata;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.schema.Schema;
@@ -54,6 +55,8 @@ public class SelectionColumnMappingTest extends CQLTester
@BeforeClass
public static void setUpClass() // overrides CQLTester.setUpClass()
{
+ ServerTestUtils.daemonInitialization();
+
DatabaseDescriptor.setPartitionerUnsafe(ByteOrderedPartitioner.instance);
prepareServer();
diff --git
a/test/unit/org/apache/cassandra/cql3/validation/entities/FrozenCollectionsTest.java
b/test/unit/org/apache/cassandra/cql3/validation/entities/FrozenCollectionsTest.java
index 5cbfb4cff5..c4cdca1e57 100644
---
a/test/unit/org/apache/cassandra/cql3/validation/entities/FrozenCollectionsTest.java
+++
b/test/unit/org/apache/cassandra/cql3/validation/entities/FrozenCollectionsTest.java
@@ -26,6 +26,7 @@ import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
+import org.apache.cassandra.ServerTestUtils;
import org.apache.cassandra.cql3.CQLTester;
import org.apache.cassandra.cql3.restrictions.StatementRestrictions;
import org.apache.cassandra.db.marshal.*;
@@ -43,6 +44,7 @@ public class FrozenCollectionsTest extends CQLTester
@BeforeClass
public static void setUpClass() // overrides CQLTester.setUpClass()
{
+ ServerTestUtils.daemonInitialization();
// Selecting partitioner for a table is not exposed on CREATE TABLE.
StorageService.instance.setPartitionerUnsafe(ByteOrderedPartitioner.instance);
diff --git
a/test/unit/org/apache/cassandra/cql3/validation/entities/JsonTest.java
b/test/unit/org/apache/cassandra/cql3/validation/entities/JsonTest.java
index 3f5408fed4..502192c274 100644
--- a/test/unit/org/apache/cassandra/cql3/validation/entities/JsonTest.java
+++ b/test/unit/org/apache/cassandra/cql3/validation/entities/JsonTest.java
@@ -17,6 +17,7 @@
*/
package org.apache.cassandra.cql3.validation.entities;
+import org.apache.cassandra.ServerTestUtils;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.cql3.Json;
import org.apache.cassandra.cql3.CQLTester;
@@ -50,6 +51,7 @@ public class JsonTest extends CQLTester
@BeforeClass
public static void setUpClass()
{
+ ServerTestUtils.daemonInitialization();
if (ROW_CACHE_SIZE_IN_MIB > 0)
DatabaseDescriptor.setRowCacheSizeInMiB(ROW_CACHE_SIZE_IN_MIB);
diff --git
a/test/unit/org/apache/cassandra/cql3/validation/entities/UserTypesTest.java
b/test/unit/org/apache/cassandra/cql3/validation/entities/UserTypesTest.java
index 1520b4cab9..0d99494979 100644
--- a/test/unit/org/apache/cassandra/cql3/validation/entities/UserTypesTest.java
+++ b/test/unit/org/apache/cassandra/cql3/validation/entities/UserTypesTest.java
@@ -22,6 +22,7 @@ import java.util.UUID;
import org.junit.BeforeClass;
import org.junit.Test;
+import org.apache.cassandra.ServerTestUtils;
import org.apache.cassandra.cql3.CQLTester;
import org.apache.cassandra.dht.ByteOrderedPartitioner;
import org.apache.cassandra.exceptions.InvalidRequestException;
@@ -32,6 +33,8 @@ public class UserTypesTest extends CQLTester
@BeforeClass
public static void setUpClass() // overrides CQLTester.setUpClass()
{
+ ServerTestUtils.daemonInitialization();
+
// Selecting partitioner for a table is not exposed on CREATE TABLE.
StorageService.instance.setPartitionerUnsafe(ByteOrderedPartitioner.instance);
diff --git
a/test/unit/org/apache/cassandra/cql3/validation/entities/VirtualTableTest.java
b/test/unit/org/apache/cassandra/cql3/validation/entities/VirtualTableTest.java
index a17f74cec2..b2da6e59ca 100644
---
a/test/unit/org/apache/cassandra/cql3/validation/entities/VirtualTableTest.java
+++
b/test/unit/org/apache/cassandra/cql3/validation/entities/VirtualTableTest.java
@@ -39,6 +39,7 @@ import org.junit.BeforeClass;
import org.junit.Test;
import com.datastax.driver.core.exceptions.InvalidQueryException;
+import org.apache.cassandra.ServerTestUtils;
import org.apache.cassandra.cql3.CQLTester;
import org.apache.cassandra.db.Mutation;
import org.apache.cassandra.db.marshal.Int32Type;
@@ -208,6 +209,8 @@ public class VirtualTableTest extends CQLTester
@BeforeClass
public static void setUpClass()
{
+ ServerTestUtils.daemonInitialization();
+
TableMetadata vt1Metadata = TableMetadata.builder(KS_NAME, VT1_NAME)
.kind(TableMetadata.Kind.VIRTUAL)
.addPartitionKeyColumn("pk", UTF8Type.instance)
diff --git
a/test/unit/org/apache/cassandra/cql3/validation/miscellaneous/SSTablesIteratedTest.java
b/test/unit/org/apache/cassandra/cql3/validation/miscellaneous/SSTablesIteratedTest.java
index 83d12f507e..3c7ce37d04 100644
---
a/test/unit/org/apache/cassandra/cql3/validation/miscellaneous/SSTablesIteratedTest.java
+++
b/test/unit/org/apache/cassandra/cql3/validation/miscellaneous/SSTablesIteratedTest.java
@@ -79,7 +79,7 @@ public class SSTablesIteratedTest extends CQLTester
}
@Override
- protected UntypedResultSet execute(String query, Object... values) throws
Throwable
+ protected UntypedResultSet execute(String query, Object... values)
{
return executeFormattedQuery(formatQuery(KEYSPACE_PER_TEST, query),
values);
}
diff --git
a/test/unit/org/apache/cassandra/cql3/validation/operations/InsertUpdateIfConditionCollectionsTest.java
b/test/unit/org/apache/cassandra/cql3/validation/operations/InsertUpdateIfConditionCollectionsTest.java
index c0a5139b5c..fe4073c19c 100644
---
a/test/unit/org/apache/cassandra/cql3/validation/operations/InsertUpdateIfConditionCollectionsTest.java
+++
b/test/unit/org/apache/cassandra/cql3/validation/operations/InsertUpdateIfConditionCollectionsTest.java
@@ -29,6 +29,7 @@ import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
+import org.apache.cassandra.ServerTestUtils;
import org.apache.cassandra.cql3.CQLTester;
import org.apache.cassandra.exceptions.InvalidRequestException;
import org.apache.cassandra.exceptions.SyntaxException;
@@ -51,6 +52,8 @@ public class InsertUpdateIfConditionCollectionsTest extends
CQLTester
@Parameterized.Parameters(name = "{index}: clusterMinVersion={0}")
public static Collection<Object[]> data()
{
+ ServerTestUtils.daemonInitialization();
+
return InsertUpdateIfConditionTest.data();
}
diff --git
a/test/unit/org/apache/cassandra/cql3/validation/operations/InsertUpdateIfConditionStaticsTest.java
b/test/unit/org/apache/cassandra/cql3/validation/operations/InsertUpdateIfConditionStaticsTest.java
index b9c9ce33cb..3270fc157a 100644
---
a/test/unit/org/apache/cassandra/cql3/validation/operations/InsertUpdateIfConditionStaticsTest.java
+++
b/test/unit/org/apache/cassandra/cql3/validation/operations/InsertUpdateIfConditionStaticsTest.java
@@ -27,6 +27,7 @@ import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
+import org.apache.cassandra.ServerTestUtils;
import org.apache.cassandra.cql3.CQLTester;
/* InsertUpdateIfConditionCollectionsTest class has been split into multiple
ones because of timeout issues (CASSANDRA-16670)
@@ -47,6 +48,7 @@ public class InsertUpdateIfConditionStaticsTest extends
CQLTester
@Parameterized.Parameters(name = "{index}: clusterMinVersion={0}")
public static Collection<Object[]> data()
{
+ ServerTestUtils.daemonInitialization();
return InsertUpdateIfConditionTest.data();
}
diff --git
a/test/unit/org/apache/cassandra/cql3/validation/operations/InsertUpdateIfConditionTest.java
b/test/unit/org/apache/cassandra/cql3/validation/operations/InsertUpdateIfConditionTest.java
index 8429ec0b3d..c617544684 100644
---
a/test/unit/org/apache/cassandra/cql3/validation/operations/InsertUpdateIfConditionTest.java
+++
b/test/unit/org/apache/cassandra/cql3/validation/operations/InsertUpdateIfConditionTest.java
@@ -28,6 +28,7 @@ import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
+import org.apache.cassandra.ServerTestUtils;
import org.apache.cassandra.Util;
import org.apache.cassandra.cql3.CQLTester;
import org.apache.cassandra.cql3.Duration;
@@ -61,6 +62,7 @@ public class InsertUpdateIfConditionTest extends CQLTester
@Parameterized.Parameters(name = "{index}: clusterMinVersion={0}")
public static Collection<Object[]> data()
{
+ ServerTestUtils.daemonInitialization();
return Arrays.asList(new Object[]{ "3.0", (Runnable) () -> {
Pair<Boolean, CassandraVersion> res =
Gossiper.instance.isUpgradingFromVersionLowerThanC17653(new
CassandraVersion("3.11"));
assertTrue(debugMsgCASSANDRA17653(res),
res.left);
diff --git
a/test/unit/org/apache/cassandra/cql3/validation/operations/SelectLimitTest.java
b/test/unit/org/apache/cassandra/cql3/validation/operations/SelectLimitTest.java
index 2e419e1f1b..347bc1d445 100644
---
a/test/unit/org/apache/cassandra/cql3/validation/operations/SelectLimitTest.java
+++
b/test/unit/org/apache/cassandra/cql3/validation/operations/SelectLimitTest.java
@@ -23,6 +23,7 @@ package org.apache.cassandra.cql3.validation.operations;
import org.junit.BeforeClass;
import org.junit.Test;
+import org.apache.cassandra.ServerTestUtils;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.cql3.CQLTester;
import org.apache.cassandra.dht.ByteOrderedPartitioner;
@@ -34,6 +35,8 @@ public class SelectLimitTest extends CQLTester
@BeforeClass
public static void setUpClass()
{
+ ServerTestUtils.daemonInitialization();
+
StorageService.instance.setPartitionerUnsafe(ByteOrderedPartitioner.instance);
DatabaseDescriptor.setPartitionerUnsafe(ByteOrderedPartitioner.instance);
diff --git a/test/unit/org/apache/cassandra/db/CorruptPrimaryIndexTest.java
b/test/unit/org/apache/cassandra/db/CorruptPrimaryIndexTest.java
new file mode 100644
index 0000000000..c46e96ceec
--- /dev/null
+++ b/test/unit/org/apache/cassandra/db/CorruptPrimaryIndexTest.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db;
+
+import org.junit.Test;
+
+import org.apache.cassandra.cql3.CQLTester;
+import org.apache.cassandra.cql3.UntypedResultSet;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+public class CorruptPrimaryIndexTest extends CQLTester.InMemory
+{
+ @Test
+ public void bigPrimaryIndexDoesNotDetectDiskCorruption()
+ {
+ // Set listener early, before the file is opened; mmap access can not
be listened to, so need to observe the open, which happens on flush
+ fs.onPostRead(isCurrentTableIndexFile(keyspace()), (path, channel,
position, dst, read) -> {
+ // Reading the Primary index for the test!
+ // format
+ // 2 bytes: length of bytes for PK
+ // 4 bytes: pk as an int32
+ // variable bytes (see
org.apache.cassandra.io.sstable.format.big.RowIndexEntry.IndexSerializer.deserialize(org.apache.cassandra.io.util.FileDataInput))
+ assertThat(position).describedAs("Unexpected access, should start
read from start of file").isEqualTo(0);
+
+ // simulate bit rot by having 1 byte change... but make sure it's
the pk!
+ dst.put(2, Byte.MAX_VALUE);
+ });
+
+ createTable("CREATE TABLE %s (id int PRIMARY KEY, value int)");
+ execute("INSERT INTO %s (id, value) VALUES (?, ?)", 0, 0);
+ flush();
+
+ UntypedResultSet rs = execute("SELECT * FROM %s WHERE id=?", 0);
+ // this assert check is here to get the test to be green... if the
format is fixed and this data loss is not
+ // happening anymore, then this check should be updated
+ assertThatThrownBy(() -> assertRows(rs, row(0, 0))).hasMessage("Got
less rows than expected. Expected 1 but got 0");
+ }
+}
diff --git a/test/unit/org/apache/cassandra/db/KeyspaceTest.java
b/test/unit/org/apache/cassandra/db/KeyspaceTest.java
index a2fa9e9528..dab306de7e 100644
--- a/test/unit/org/apache/cassandra/db/KeyspaceTest.java
+++ b/test/unit/org/apache/cassandra/db/KeyspaceTest.java
@@ -61,7 +61,7 @@ public class KeyspaceTest extends CQLTester
}
@Override
- protected UntypedResultSet execute(String query, Object... values) throws
Throwable
+ protected UntypedResultSet execute(String query, Object... values)
{
return executeFormattedQuery(formatQuery(KEYSPACE_PER_TEST, query),
values);
}
diff --git
a/test/unit/org/apache/cassandra/db/commitlog/CommitLogSegmentManagerCDCTest.java
b/test/unit/org/apache/cassandra/db/commitlog/CommitLogSegmentManagerCDCTest.java
index 3789b51714..5f6f235ece 100644
---
a/test/unit/org/apache/cassandra/db/commitlog/CommitLogSegmentManagerCDCTest.java
+++
b/test/unit/org/apache/cassandra/db/commitlog/CommitLogSegmentManagerCDCTest.java
@@ -27,6 +27,7 @@ import java.util.*;
import java.util.concurrent.TimeUnit;
import
com.google.monitoring.runtime.instrumentation.common.util.concurrent.Uninterruptibles;
+import org.apache.cassandra.ServerTestUtils;
import org.apache.cassandra.io.util.File;
import org.apache.cassandra.io.util.FileReader;
import org.junit.Assert;
@@ -51,6 +52,8 @@ public class CommitLogSegmentManagerCDCTest extends CQLTester
@BeforeClass
public static void setUpClass()
{
+ ServerTestUtils.daemonInitialization();
+
DatabaseDescriptor.setCDCEnabled(true);
DatabaseDescriptor.setCDCTotalSpaceInMiB(1024);
CQLTester.setUpClass();
diff --git
a/test/unit/org/apache/cassandra/db/memtable/MemtableSizeTestBase.java
b/test/unit/org/apache/cassandra/db/memtable/MemtableSizeTestBase.java
index d817dbf939..77605279b9 100644
--- a/test/unit/org/apache/cassandra/db/memtable/MemtableSizeTestBase.java
+++ b/test/unit/org/apache/cassandra/db/memtable/MemtableSizeTestBase.java
@@ -30,6 +30,7 @@ import org.junit.runners.Parameterized;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.cassandra.ServerTestUtils;
import org.apache.cassandra.Util;
import org.apache.cassandra.config.Config;
import org.apache.cassandra.config.DatabaseDescriptor;
@@ -76,6 +77,7 @@ public abstract class MemtableSizeTestBase extends CQLTester
public static void setup(Config.MemtableAllocationType allocationType)
{
+ ServerTestUtils.daemonInitialization();
try
{
Field confField =
DatabaseDescriptor.class.getDeclaredField("conf");
diff --git
a/test/unit/org/apache/cassandra/db/virtual/CredentialsCacheKeysTableTest.java
b/test/unit/org/apache/cassandra/db/virtual/CredentialsCacheKeysTableTest.java
index 45132a962e..7aa1ec4fc6 100644
---
a/test/unit/org/apache/cassandra/db/virtual/CredentialsCacheKeysTableTest.java
+++
b/test/unit/org/apache/cassandra/db/virtual/CredentialsCacheKeysTableTest.java
@@ -26,6 +26,7 @@ import org.junit.Test;
import com.datastax.driver.core.EndPoint;
import com.datastax.driver.core.PlainTextAuthProvider;
+import org.apache.cassandra.ServerTestUtils;
import org.apache.cassandra.auth.AuthTestUtils;
import org.apache.cassandra.auth.AuthenticatedUser;
import org.apache.cassandra.auth.IAuthenticator;
@@ -48,6 +49,8 @@ public class CredentialsCacheKeysTableTest extends CQLTester
@BeforeClass
public static void setUpClass()
{
+ ServerTestUtils.daemonInitialization();
+
// high value is used for convenient debugging
DatabaseDescriptor.setCredentialsValidity(20_000);
diff --git
a/test/unit/org/apache/cassandra/db/virtual/NetworkPermissionsCacheKeysTableTest.java
b/test/unit/org/apache/cassandra/db/virtual/NetworkPermissionsCacheKeysTableTest.java
index f944884098..e935bbb214 100644
---
a/test/unit/org/apache/cassandra/db/virtual/NetworkPermissionsCacheKeysTableTest.java
+++
b/test/unit/org/apache/cassandra/db/virtual/NetworkPermissionsCacheKeysTableTest.java
@@ -24,6 +24,7 @@ import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
+import org.apache.cassandra.ServerTestUtils;
import org.apache.cassandra.auth.AuthTestUtils;
import org.apache.cassandra.auth.AuthenticatedUser;
import org.apache.cassandra.auth.DCPermissions;
@@ -46,6 +47,8 @@ public class NetworkPermissionsCacheKeysTableTest extends
CQLTester
@BeforeClass
public static void setUpClass()
{
+ ServerTestUtils.daemonInitialization();
+
// high value is used for convenient debugging
DatabaseDescriptor.setPermissionsValidity(20_000);
diff --git
a/test/unit/org/apache/cassandra/db/virtual/PermissionsCacheKeysTableTest.java
b/test/unit/org/apache/cassandra/db/virtual/PermissionsCacheKeysTableTest.java
index 2171b2d035..429517521f 100644
---
a/test/unit/org/apache/cassandra/db/virtual/PermissionsCacheKeysTableTest.java
+++
b/test/unit/org/apache/cassandra/db/virtual/PermissionsCacheKeysTableTest.java
@@ -28,6 +28,7 @@ import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
+import org.apache.cassandra.ServerTestUtils;
import org.apache.cassandra.auth.AuthTestUtils;
import org.apache.cassandra.auth.AuthenticatedUser;
import org.apache.cassandra.auth.DataResource;
@@ -52,6 +53,7 @@ public class PermissionsCacheKeysTableTest extends CQLTester
@BeforeClass
public static void setUpClass()
{
+ ServerTestUtils.daemonInitialization();
// high value is used for convenient debugging
DatabaseDescriptor.setPermissionsValidity(20_000);
diff --git
a/test/unit/org/apache/cassandra/db/virtual/RolesCacheKeysTableTest.java
b/test/unit/org/apache/cassandra/db/virtual/RolesCacheKeysTableTest.java
index 40c3037005..de14aefd63 100644
--- a/test/unit/org/apache/cassandra/db/virtual/RolesCacheKeysTableTest.java
+++ b/test/unit/org/apache/cassandra/db/virtual/RolesCacheKeysTableTest.java
@@ -24,6 +24,7 @@ import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
+import org.apache.cassandra.ServerTestUtils;
import org.apache.cassandra.auth.AuthTestUtils;
import org.apache.cassandra.auth.AuthenticatedUser;
import org.apache.cassandra.auth.IRoleManager;
@@ -46,6 +47,7 @@ public class RolesCacheKeysTableTest extends CQLTester
@BeforeClass
public static void setUpClass()
{
+ ServerTestUtils.daemonInitialization();
// high value is used for convenient debugging
DatabaseDescriptor.setRolesValidity(20_000);
diff --git
a/test/unit/org/apache/cassandra/io/filesystem/ForwardingFileChannel.java
b/test/unit/org/apache/cassandra/io/filesystem/ForwardingFileChannel.java
new file mode 100644
index 0000000000..4fecd24d38
--- /dev/null
+++ b/test/unit/org/apache/cassandra/io/filesystem/ForwardingFileChannel.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.io.filesystem;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.MappedByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.channels.FileLock;
+import java.nio.channels.ReadableByteChannel;
+import java.nio.channels.WritableByteChannel;
+
+public class ForwardingFileChannel extends FileChannel
+{
+ protected final FileChannel delegate;
+
+ public ForwardingFileChannel(FileChannel delegate)
+ {
+ this.delegate = delegate;
+ }
+
+ protected FileChannel delegate()
+ {
+ return delegate;
+ }
+
+ @Override
+ public int read(ByteBuffer dst) throws IOException
+ {
+ return delegate().read(dst);
+ }
+
+ @Override
+ public long read(ByteBuffer[] dsts, int offset, int length) throws
IOException
+ {
+ return delegate().read(dsts, offset, length);
+ }
+
+ @Override
+ public int write(ByteBuffer src) throws IOException
+ {
+ return delegate().write(src);
+ }
+
+ @Override
+ public long write(ByteBuffer[] srcs, int offset, int length) throws
IOException
+ {
+ return delegate().write(srcs, offset, length);
+ }
+
+ @Override
+ public long position() throws IOException
+ {
+ return delegate().position();
+ }
+
+ @Override
+ public FileChannel position(long newPosition) throws IOException
+ {
+ return delegate().position(newPosition);
+ }
+
+ @Override
+ public long size() throws IOException
+ {
+ return delegate().size();
+ }
+
+ @Override
+ public FileChannel truncate(long size) throws IOException
+ {
+ return delegate().truncate(size);
+ }
+
+ @Override
+ public void force(boolean metaData) throws IOException
+ {
+ delegate().force(metaData);
+ }
+
+ @Override
+ public long transferTo(long position, long count, WritableByteChannel
target) throws IOException
+ {
+ return delegate().transferTo(position, count, target);
+ }
+
+ @Override
+ public long transferFrom(ReadableByteChannel src, long position, long
count) throws IOException
+ {
+ return delegate().transferFrom(src, position, count);
+ }
+
+ @Override
+ public int read(ByteBuffer dst, long position) throws IOException
+ {
+ return delegate().read(dst, position);
+ }
+
+ @Override
+ public int write(ByteBuffer src, long position) throws IOException
+ {
+ return delegate().write(src, position);
+ }
+
+ @Override
+ public MappedByteBuffer map(MapMode mode, long position, long size) throws
IOException
+ {
+ return delegate().map(mode, position, size);
+ }
+
+ @Override
+ public FileLock lock(long position, long size, boolean shared) throws
IOException
+ {
+ return delegate().lock(position, size, shared);
+ }
+
+ @Override
+ public FileLock tryLock(long position, long size, boolean shared) throws
IOException
+ {
+ return delegate().tryLock(position, size, shared);
+ }
+
+ @Override
+ protected void implCloseChannel() throws IOException
+ {
+ // .close(), and .isOpen() are "final", so need to leverage
implCloseChannel
+ // to close the underline channel
+ delegate().close();
+ }
+}
diff --git
a/test/unit/org/apache/cassandra/io/filesystem/ForwardingFileSystem.java
b/test/unit/org/apache/cassandra/io/filesystem/ForwardingFileSystem.java
new file mode 100644
index 0000000000..39c682ca7f
--- /dev/null
+++ b/test/unit/org/apache/cassandra/io/filesystem/ForwardingFileSystem.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.io.filesystem;
+
+import java.io.IOException;
+import java.nio.file.FileStore;
+import java.nio.file.FileSystem;
+import java.nio.file.Path;
+import java.nio.file.PathMatcher;
+import java.nio.file.WatchService;
+import java.nio.file.attribute.UserPrincipalLookupService;
+import java.nio.file.spi.FileSystemProvider;
+import java.util.Set;
+
+import com.google.common.collect.Iterables;
+
+public class ForwardingFileSystem extends FileSystem
+{
+ protected final FileSystem delegate;
+
+ public ForwardingFileSystem(FileSystem delegate)
+ {
+ this.delegate = delegate;
+ }
+
+ protected FileSystem delegate()
+ {
+ return delegate;
+ }
+
+ protected Path wrap(Path p)
+ {
+ return p;
+ }
+
+ protected Path unwrap(Path p)
+ {
+ return p;
+ }
+
+ @Override
+ public FileSystemProvider provider()
+ {
+ return delegate().provider();
+ }
+
+ @Override
+ public void close() throws IOException
+ {
+ delegate().close();
+ }
+
+ @Override
+ public boolean isOpen()
+ {
+ return delegate().isOpen();
+ }
+
+ @Override
+ public boolean isReadOnly()
+ {
+ return delegate().isReadOnly();
+ }
+
+ @Override
+ public String getSeparator()
+ {
+ return delegate().getSeparator();
+ }
+
+ @Override
+ public Iterable<Path> getRootDirectories()
+ {
+ return Iterables.transform(delegate().getRootDirectories(),
this::wrap);
+ }
+
+ @Override
+ public Iterable<FileStore> getFileStores()
+ {
+ return delegate().getFileStores();
+ }
+
+ @Override
+ public Set<String> supportedFileAttributeViews()
+ {
+ return delegate().supportedFileAttributeViews();
+ }
+
+ @Override
+ public Path getPath(String first, String... more)
+ {
+ return wrap(delegate().getPath(first, more));
+ }
+
+ @Override
+ public PathMatcher getPathMatcher(String syntaxAndPattern)
+ {
+ PathMatcher matcher = delegate().getPathMatcher(syntaxAndPattern);
+ return path -> matcher.matches(unwrap(path));
+ }
+
+ @Override
+ public UserPrincipalLookupService getUserPrincipalLookupService()
+ {
+ return delegate().getUserPrincipalLookupService();
+ }
+
+ @Override
+ public WatchService newWatchService() throws IOException
+ {
+ return delegate().newWatchService();
+ }
+}
diff --git
a/test/unit/org/apache/cassandra/io/filesystem/ForwardingFileSystemProvider.java
b/test/unit/org/apache/cassandra/io/filesystem/ForwardingFileSystemProvider.java
new file mode 100644
index 0000000000..fd1f6d90b2
--- /dev/null
+++
b/test/unit/org/apache/cassandra/io/filesystem/ForwardingFileSystemProvider.java
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.io.filesystem;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.URI;
+import java.nio.channels.AsynchronousFileChannel;
+import java.nio.channels.FileChannel;
+import java.nio.channels.SeekableByteChannel;
+import java.nio.file.AccessMode;
+import java.nio.file.CopyOption;
+import java.nio.file.DirectoryStream;
+import java.nio.file.FileStore;
+import java.nio.file.FileSystem;
+import java.nio.file.LinkOption;
+import java.nio.file.OpenOption;
+import java.nio.file.Path;
+import java.nio.file.attribute.BasicFileAttributes;
+import java.nio.file.attribute.FileAttribute;
+import java.nio.file.attribute.FileAttributeView;
+import java.nio.file.spi.FileSystemProvider;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+
+import com.google.common.collect.Iterators;
+
+public class ForwardingFileSystemProvider extends FileSystemProvider
+{
+ protected final FileSystemProvider delegate;
+
+ public ForwardingFileSystemProvider(FileSystemProvider delegate)
+ {
+ this.delegate = delegate;
+ }
+
+ protected FileSystemProvider delegate()
+ {
+ return delegate;
+ }
+
+ protected Path wrap(Path p)
+ {
+ return p;
+ }
+
+ protected Path unwrap(Path p)
+ {
+ return p;
+ }
+
+ @Override
+ public String getScheme()
+ {
+ return delegate().getScheme();
+ }
+
+ @Override
+ public FileSystem newFileSystem(URI uri, Map<String, ?> env) throws
IOException
+ {
+ return delegate().newFileSystem(uri, env);
+ }
+
+ @Override
+ public FileSystem getFileSystem(URI uri)
+ {
+ return delegate().getFileSystem(uri);
+ }
+
+ @Override
+ public Path getPath(URI uri)
+ {
+ return wrap(delegate().getPath(uri));
+ }
+
+ @Override
+ public SeekableByteChannel newByteChannel(Path path, Set<? extends
OpenOption> options, FileAttribute<?>... attrs) throws IOException
+ {
+ return delegate().newByteChannel(unwrap(path), options, attrs);
+ }
+
+ @Override
+ public DirectoryStream<Path> newDirectoryStream(Path dir,
DirectoryStream.Filter<? super Path> filter) throws IOException
+ {
+ DirectoryStream<Path> stream =
delegate().newDirectoryStream(unwrap(dir), filter);
+ return new DirectoryStream<Path>()
+ {
+ @Override
+ public Iterator<Path> iterator()
+ {
+ return Iterators.transform(stream.iterator(),
ForwardingFileSystemProvider.this::wrap);
+ }
+
+ @Override
+ public void close() throws IOException
+ {
+ stream.close();
+ }
+ };
+ }
+
+ @Override
+ public void createDirectory(Path dir, FileAttribute<?>... attrs) throws
IOException
+ {
+ delegate().createDirectory(unwrap(dir), attrs);
+ }
+
+ @Override
+ public void delete(Path path) throws IOException
+ {
+ delegate().delete(unwrap(path));
+ }
+
+ @Override
+ public void copy(Path source, Path target, CopyOption... options) throws
IOException
+ {
+ delegate().copy(unwrap(source), unwrap(target), options);
+ }
+
+ @Override
+ public void move(Path source, Path target, CopyOption... options) throws
IOException
+ {
+ delegate().move(unwrap(source), unwrap(target), options);
+ }
+
+ @Override
+ public boolean isSameFile(Path path, Path path2) throws IOException
+ {
+ return delegate().isSameFile(unwrap(path), unwrap(path2));
+ }
+
+ @Override
+ public boolean isHidden(Path path) throws IOException
+ {
+ return delegate().isHidden(unwrap(path));
+ }
+
+ @Override
+ public FileStore getFileStore(Path path) throws IOException
+ {
+ return delegate().getFileStore(unwrap(path));
+ }
+
+ @Override
+ public void checkAccess(Path path, AccessMode... modes) throws IOException
+ {
+ delegate().checkAccess(unwrap(path), modes);
+ }
+
+ @Override
+ public <V extends FileAttributeView> V getFileAttributeView(Path path,
Class<V> type, LinkOption... options)
+ {
+ return delegate().getFileAttributeView(unwrap(path), type, options);
+ }
+
+ @Override
+ public <A extends BasicFileAttributes> A readAttributes(Path path,
Class<A> type, LinkOption... options) throws IOException
+ {
+ return delegate().readAttributes(unwrap(path), type, options);
+ }
+
+ @Override
+ public Map<String, Object> readAttributes(Path path, String attributes,
LinkOption... options) throws IOException
+ {
+ return delegate().readAttributes(unwrap(path), attributes, options);
+ }
+
+ @Override
+ public void setAttribute(Path path, String attribute, Object value,
LinkOption... options) throws IOException
+ {
+ delegate().setAttribute(unwrap(path), attribute, value, options);
+ }
+
+ @Override
+ public FileSystem newFileSystem(Path path, Map<String, ?> env) throws
IOException
+ {
+ return delegate().newFileSystem(unwrap(path), env);
+ }
+
+ @Override
+ public InputStream newInputStream(Path path, OpenOption... options) throws
IOException
+ {
+ return delegate().newInputStream(unwrap(path), options);
+ }
+
+ @Override
+ public OutputStream newOutputStream(Path path, OpenOption... options)
throws IOException
+ {
+ return delegate().newOutputStream(unwrap(path), options);
+ }
+
+ @Override
+ public FileChannel newFileChannel(Path path, Set<? extends OpenOption>
options, FileAttribute<?>... attrs) throws IOException
+ {
+ return delegate().newFileChannel(unwrap(path), options, attrs);
+ }
+
+ @Override
+ public AsynchronousFileChannel newAsynchronousFileChannel(Path path, Set<?
extends OpenOption> options, ExecutorService executor, FileAttribute<?>...
attrs) throws IOException
+ {
+ return delegate().newAsynchronousFileChannel(unwrap(path), options,
executor, attrs);
+ }
+
+ @Override
+ public void createSymbolicLink(Path link, Path target, FileAttribute<?>...
attrs) throws IOException
+ {
+ delegate().createSymbolicLink(unwrap(link), target, attrs);
+ }
+
+ @Override
+ public void createLink(Path link, Path existing) throws IOException
+ {
+ delegate().createLink(unwrap(link), unwrap(existing));
+ }
+
+ @Override
+ public boolean deleteIfExists(Path path) throws IOException
+ {
+ return delegate().deleteIfExists(unwrap(path));
+ }
+
+ @Override
+ public Path readSymbolicLink(Path link) throws IOException
+ {
+ return wrap(delegate().readSymbolicLink(unwrap(link)));
+ }
+}
diff --git a/test/unit/org/apache/cassandra/io/filesystem/ForwardingPath.java
b/test/unit/org/apache/cassandra/io/filesystem/ForwardingPath.java
new file mode 100644
index 0000000000..7dd0037325
--- /dev/null
+++ b/test/unit/org/apache/cassandra/io/filesystem/ForwardingPath.java
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.io.filesystem;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+import java.nio.file.FileSystem;
+import java.nio.file.LinkOption;
+import java.nio.file.Path;
+import java.nio.file.WatchEvent;
+import java.nio.file.WatchKey;
+import java.nio.file.WatchService;
+import java.util.Iterator;
+
+import com.google.common.collect.Iterators;
+
+public class ForwardingPath implements Path
+{
+ protected final Path delegate;
+
+ public ForwardingPath(Path delegate)
+ {
+ this.delegate = delegate;
+ }
+
+ protected Path delegate()
+ {
+ return delegate;
+ }
+
+ protected Path wrap(Path a)
+ {
+ return a;
+ }
+
+ protected Path unwrap(Path p)
+ {
+ return p;
+ }
+
+ @Override
+ public FileSystem getFileSystem()
+ {
+ return delegate().getFileSystem();
+ }
+
+ @Override
+ public boolean isAbsolute()
+ {
+ return delegate().isAbsolute();
+ }
+
+ @Override
+ public Path getRoot()
+ {
+ return wrap(delegate().getRoot());
+ }
+
+ @Override
+ public Path getFileName()
+ {
+ return wrap(delegate().getFileName());
+ }
+
+ @Override
+ public Path getParent()
+ {
+ Path parent = delegate().getParent();
+ if (parent == null)
+ return null;
+ return wrap(parent);
+ }
+
+ @Override
+ public int getNameCount()
+ {
+ return delegate().getNameCount();
+ }
+
+ @Override
+ public Path getName(int index)
+ {
+ return wrap(delegate().getName(index));
+ }
+
+ @Override
+ public Path subpath(int beginIndex, int endIndex)
+ {
+ return wrap(delegate().subpath(beginIndex, endIndex));
+ }
+
+ @Override
+ public boolean startsWith(Path other)
+ {
+ return delegate().startsWith(unwrap(other));
+ }
+
+ @Override
+ public boolean startsWith(String other)
+ {
+ return delegate().startsWith(other);
+ }
+
+ @Override
+ public boolean endsWith(Path other)
+ {
+ return delegate().endsWith(unwrap(other));
+ }
+
+ @Override
+ public boolean endsWith(String other)
+ {
+ return delegate().endsWith(other);
+ }
+
+ @Override
+ public Path normalize()
+ {
+ return wrap(delegate().normalize());
+ }
+
+ @Override
+ public Path resolve(Path other)
+ {
+ return wrap(delegate().resolve(unwrap(other)));
+ }
+
+ @Override
+ public Path resolve(String other)
+ {
+ return wrap(delegate().resolve(other));
+ }
+
+ @Override
+ public Path resolveSibling(Path other)
+ {
+ return wrap(delegate().resolveSibling(unwrap(other)));
+ }
+
+ @Override
+ public Path resolveSibling(String other)
+ {
+ return wrap(delegate().resolveSibling(other));
+ }
+
+ @Override
+ public Path relativize(Path other)
+ {
+ return wrap(delegate().relativize(unwrap(other)));
+ }
+
+ @Override
+ public URI toUri()
+ {
+ return delegate().toUri();
+ }
+
+ @Override
+ public Path toAbsolutePath()
+ {
+ return wrap(delegate().toAbsolutePath());
+ }
+
+ @Override
+ public Path toRealPath(LinkOption... options) throws IOException
+ {
+ return wrap(delegate().toRealPath(options));
+ }
+
+ @Override
+ public File toFile()
+ {
+ return delegate().toFile();
+ }
+
+ @Override
+ public WatchKey register(WatchService watcher, WatchEvent.Kind<?>[]
events, WatchEvent.Modifier... modifiers) throws IOException
+ {
+ return delegate().register(watcher, events, modifiers);
+ }
+
+ @Override
+ public WatchKey register(WatchService watcher, WatchEvent.Kind<?>...
events) throws IOException
+ {
+ return delegate().register(watcher, events);
+ }
+
+ @Override
+ public Iterator<Path> iterator()
+ {
+ return Iterators.transform(delegate().iterator(), this::wrap);
+ }
+
+ @Override
+ public int compareTo(Path other)
+ {
+ return delegate().compareTo(unwrap(other));
+ }
+
+ @Override
+ public boolean equals(Object obj)
+ {
+ return delegate().equals(obj instanceof Path ? unwrap((Path) obj) :
obj);
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return delegate().hashCode();
+ }
+
+ @Override
+ public String toString()
+ {
+ return delegate().toString();
+ }
+}
diff --git
a/test/unit/org/apache/cassandra/io/filesystem/ListenableFileSystem.java
b/test/unit/org/apache/cassandra/io/filesystem/ListenableFileSystem.java
new file mode 100644
index 0000000000..659d34dfe2
--- /dev/null
+++ b/test/unit/org/apache/cassandra/io/filesystem/ListenableFileSystem.java
@@ -0,0 +1,856 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.io.filesystem;
+
+import java.io.EOFException;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.UncheckedIOException;
+import java.net.URI;
+import java.nio.ByteBuffer;
+import java.nio.MappedByteBuffer;
+import java.nio.channels.AsynchronousFileChannel;
+import java.nio.channels.Channels;
+import java.nio.channels.FileChannel;
+import java.nio.channels.ReadableByteChannel;
+import java.nio.channels.SeekableByteChannel;
+import java.nio.channels.WritableByteChannel;
+import java.nio.file.FileSystem;
+import java.nio.file.FileSystems;
+import java.nio.file.OpenOption;
+import java.nio.file.Path;
+import java.nio.file.StandardOpenOption;
+import java.nio.file.attribute.FileAttribute;
+import java.nio.file.spi.FileSystemProvider;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.memory.MemoryUtil;
+
+public class ListenableFileSystem extends ForwardingFileSystem
+{
+ @FunctionalInterface
+ public interface PathFilter
+ {
+ boolean accept(Path entry) throws IOException;
+ }
+
+ public interface Listener
+ {
+ }
+
+ public interface OnPreOpen extends Listener
+ {
+ void preOpen(Path path, Set<? extends OpenOption> options,
FileAttribute<?>[] attrs) throws IOException;
+ }
+
+ public interface OnPostOpen extends Listener
+ {
+ void postOpen(Path path, Set<? extends OpenOption> options,
FileAttribute<?>[] attrs, FileChannel channel) throws IOException;
+ }
+
+ public interface OnPreRead extends Listener
+ {
+ void preRead(Path path, FileChannel channel, long position, ByteBuffer
dst) throws IOException;
+ }
+
+ public interface OnPostRead extends Listener
+ {
+ void postRead(Path path, FileChannel channel, long position,
ByteBuffer dst, int read) throws IOException;
+ }
+
+ public interface OnPreTransferTo extends Listener
+ {
+ void preTransferTo(Path path, FileChannel channel, long position, long
count, WritableByteChannel target) throws IOException;
+ }
+
+ public interface OnPostTransferTo extends Listener
+ {
+ void postTransferTo(Path path, FileChannel channel, long position,
long count, WritableByteChannel target, long transfered) throws IOException;
+ }
+
+ public interface OnPreTransferFrom extends Listener
+ {
+ void preTransferFrom(Path path, FileChannel channel,
ReadableByteChannel src, long position, long count) throws IOException;
+ }
+
+ public interface OnPostTransferFrom extends Listener
+ {
+ void postTransferFrom(Path path, FileChannel channel,
ReadableByteChannel src, long position, long count, long transfered) throws
IOException;
+ }
+
+ public interface OnPreWrite extends Listener
+ {
+ void preWrite(Path path, FileChannel channel, long position,
ByteBuffer src) throws IOException;
+ }
+
+ public interface OnPostWrite extends Listener
+ {
+ void postWrite(Path path, FileChannel channel, long position,
ByteBuffer src, int wrote) throws IOException;
+ }
+
+ public interface OnPrePosition extends Listener
+ {
+ void prePosition(Path path, FileChannel channel, long position, long
newPosition) throws IOException;
+ }
+
+ public interface OnPostPosition extends Listener
+ {
+ void postPosition(Path path, FileChannel channel, long position, long
newPosition) throws IOException;
+ }
+
+ public interface OnPreTruncate extends Listener
+ {
+ void preTruncate(Path path, FileChannel channel, long size, long
targetSize) throws IOException;
+ }
+
+ public interface OnPostTruncate extends Listener
+ {
+ void postTruncate(Path path, FileChannel channel, long size, long
targetSize, long newSize) throws IOException;
+ }
+
+ public interface OnPreForce extends Listener
+ {
+ void preForce(Path path, FileChannel channel, boolean metaData) throws
IOException;
+ }
+
+ public interface OnPostForce extends Listener
+ {
+ void postForce(Path path, FileChannel channel, boolean metaData)
throws IOException;
+ }
+
+ public interface Unsubscribable extends AutoCloseable
+ {
+ @Override
+ void close();
+ }
+
+ private final List<OnPreOpen> onPreOpen = new CopyOnWriteArrayList<>();
+ private final List<OnPostOpen> onPostOpen = new CopyOnWriteArrayList<>();
+ private final List<OnPreTransferTo> onPreTransferTo = new
CopyOnWriteArrayList<>();
+ private final List<OnPostTransferTo> onPostTransferTo = new
CopyOnWriteArrayList<>();
+ private final List<OnPreRead> onPreRead = new CopyOnWriteArrayList<>();
+ private final List<OnPostRead> onPostRead = new CopyOnWriteArrayList<>();
+ private final List<OnPreWrite> onPreWrite = new CopyOnWriteArrayList<>();
+ private final List<OnPostWrite> onPostWrite = new CopyOnWriteArrayList<>();
+ private final List<OnPreTransferFrom> onPreTransferFrom = new
CopyOnWriteArrayList<>();
+ private final List<OnPostTransferFrom> onPostTransferFrom = new
CopyOnWriteArrayList<>();
+
+ private final List<OnPreForce> onPreForce = new CopyOnWriteArrayList<>();
+ private final List<OnPostForce> onPostForce = new CopyOnWriteArrayList<>();
+ private final List<OnPreTruncate> onPreTruncate = new
CopyOnWriteArrayList<>();
+ private final List<OnPostTruncate> onPostTruncate = new
CopyOnWriteArrayList<>();
+ private final List<OnPrePosition> onPrePosition = new
CopyOnWriteArrayList<>();
+ private final List<OnPostPosition> onPostPosition = new
CopyOnWriteArrayList<>();
+ private final List<List<? extends Listener>> lists =
Arrays.asList(onPreOpen, onPostOpen,
+
onPreRead, onPostRead,
+
onPreTransferTo, onPostTransferTo,
+
onPreWrite, onPostWrite,
+
onPreTransferFrom, onPostTransferFrom,
+
onPreForce, onPostForce,
+
onPreTruncate, onPostTruncate,
+
onPrePosition, onPostPosition);
+ private final ListenableFileSystemProvider provider;
+
+ public ListenableFileSystem(FileSystem delegate)
+ {
+ super(delegate);
+ this.provider = new ListenableFileSystemProvider(super.provider());
+ }
+
+ public Unsubscribable listen(Listener listener)
+ {
+ List<List<? extends Listener>> matches = new ArrayList<>(1);
+ if (listener instanceof OnPreOpen)
+ {
+ onPreOpen.add((OnPreOpen) listener);
+ matches.add(onPreOpen);
+ }
+ if (listener instanceof OnPostOpen)
+ {
+ onPostOpen.add((OnPostOpen) listener);
+ matches.add(onPostOpen);
+ }
+ if (listener instanceof OnPreRead)
+ {
+ onPreRead.add((OnPreRead) listener);
+ matches.add(onPreRead);
+ }
+ if (listener instanceof OnPostRead)
+ {
+ onPostRead.add((OnPostRead) listener);
+ matches.add(onPostRead);
+ }
+ if (listener instanceof OnPreTransferTo)
+ {
+ onPreTransferTo.add((OnPreTransferTo) listener);
+ matches.add(onPreTransferTo);
+ }
+ if (listener instanceof OnPostTransferTo)
+ {
+ onPostTransferTo.add((OnPostTransferTo) listener);
+ matches.add(onPostTransferTo);
+ }
+ if (listener instanceof OnPreWrite)
+ {
+ onPreWrite.add((OnPreWrite) listener);
+ matches.add(onPreWrite);
+ }
+ if (listener instanceof OnPostWrite)
+ {
+ onPostWrite.add((OnPostWrite) listener);
+ matches.add(onPostWrite);
+ }
+ if (listener instanceof OnPreTransferFrom)
+ {
+ onPreTransferFrom.add((OnPreTransferFrom) listener);
+ matches.add(onPreTransferFrom);
+ }
+ if (listener instanceof OnPostTransferFrom)
+ {
+ onPostTransferFrom.add((OnPostTransferFrom) listener);
+ matches.add(onPostTransferFrom);
+ }
+ if (listener instanceof OnPreForce)
+ {
+ onPreForce.add((OnPreForce) listener);
+ matches.add(onPreForce);
+ }
+ if (listener instanceof OnPostForce)
+ {
+ onPostForce.add((OnPostForce) listener);
+ matches.add(onPostForce);
+ }
+ if (listener instanceof OnPreTruncate)
+ {
+ onPreTruncate.add((OnPreTruncate) listener);
+ matches.add(onPreTruncate);
+ }
+ if (listener instanceof OnPostTruncate)
+ {
+ onPostTruncate.add((OnPostTruncate) listener);
+ matches.add(onPostTruncate);
+ }
+ if (listener instanceof OnPrePosition)
+ {
+ onPrePosition.add((OnPrePosition) listener);
+ matches.add(onPrePosition);
+ }
+ if (listener instanceof OnPostPosition)
+ {
+ onPostPosition.add((OnPostPosition) listener);
+ matches.add(onPostPosition);
+ }
+ if (matches.isEmpty())
+ throw new IllegalArgumentException("Unable to find a listenable
type for " + listener.getClass());
+ return () -> remove(matches, listener);
+ }
+
+ public Unsubscribable onPreOpen(OnPreOpen callback)
+ {
+ return listen(callback);
+ }
+
+ public Unsubscribable onPreOpen(PathFilter filter, OnPreOpen callback)
+ {
+ return onPreOpen((path, options, attrs) -> {
+ if (filter.accept(path))
+ callback.preOpen(path, options, attrs);
+ });
+ }
+
+ public Unsubscribable onPostOpen(OnPostOpen callback)
+ {
+ return listen(callback);
+ }
+
+ public Unsubscribable onPostOpen(PathFilter filter, OnPostOpen callback)
+ {
+ return onPostOpen((path, options, attrs, channel) -> {
+ if (filter.accept(path))
+ callback.postOpen(path, options, attrs, channel);
+ });
+ }
+
+ public Unsubscribable onPreRead(OnPreRead callback)
+ {
+ return listen(callback);
+ }
+
+ public Unsubscribable onPreRead(PathFilter filter, OnPreRead callback)
+ {
+ return onPreRead((path, channel, position, dst) -> {
+ if (filter.accept(path))
+ callback.preRead(path, channel, position, dst);
+ });
+ }
+
+ public Unsubscribable onPostRead(OnPostRead callback)
+ {
+ return listen(callback);
+ }
+
+ public Unsubscribable onPostRead(PathFilter filter, OnPostRead callback)
+ {
+ return onPostRead((path, channel, position, dst, read) -> {
+ if (filter.accept(path))
+ callback.postRead(path, channel, position, dst, read);
+ });
+ }
+
+ public Unsubscribable onPreTransferTo(OnPreTransferTo callback)
+ {
+ return listen(callback);
+ }
+
+ public Unsubscribable onPreTransferTo(PathFilter filter, OnPreTransferTo
callback)
+ {
+ return onPreTransferTo((path, channel, position, count, target) -> {
+ if (filter.accept(path))
+ callback.preTransferTo(path, channel, position, count, target);
+ });
+ }
+
+ public Unsubscribable onPostTransferTo(OnPostTransferTo callback)
+ {
+ return listen(callback);
+ }
+
+ public Unsubscribable onPostTransferTo(PathFilter filter, OnPostTransferTo
callback)
+ {
+ return onPostTransferTo((path, channel, position, count, target,
transfered) -> {
+ if (filter.accept(path))
+ callback.postTransferTo(path, channel, position, count,
target, transfered);
+ });
+ }
+
+ public Unsubscribable onPreTransferFrom(OnPreTransferFrom callback)
+ {
+ return listen(callback);
+ }
+
+ public Unsubscribable onPreTransferFrom(PathFilter filter,
OnPreTransferFrom callback)
+ {
+ return onPreTransferFrom((path, channel, src, position, count) -> {
+ if (filter.accept(path))
+ callback.preTransferFrom(path, channel, src, position, count);
+ });
+ }
+
+ public Unsubscribable onPostTransferFrom(OnPostTransferFrom callback)
+ {
+ return listen(callback);
+ }
+
+ public Unsubscribable onPostTransferFrom(PathFilter filter,
OnPostTransferFrom callback)
+ {
+ return onPostTransferFrom((path, channel, src, position, count,
transfered) -> {
+ if (filter.accept(path))
+ callback.postTransferFrom(path, channel, src, position, count,
transfered);
+ });
+ }
+
+ public Unsubscribable onPreWrite(OnPreWrite callback)
+ {
+ return listen(callback);
+ }
+
+ public Unsubscribable onPreWrite(PathFilter filter, OnPreWrite callback)
+ {
+ return onPreWrite((path, channel, position, src) -> {
+ if (filter.accept(path))
+ callback.preWrite(path, channel, position, src);
+ });
+ }
+
+ public Unsubscribable onPostWrite(OnPostWrite callback)
+ {
+ return listen(callback);
+ }
+
+ public Unsubscribable onPostWrite(PathFilter filter, OnPostWrite callback)
+ {
+ return onPostWrite((path, channel, position, src, wrote) -> {
+ if (filter.accept(path))
+ callback.postWrite(path, channel, position, src, wrote);
+ });
+ }
+
+ public Unsubscribable onPrePosition(OnPrePosition callbackk)
+ {
+ return listen(callbackk);
+ }
+
+ public Unsubscribable onPrePosition(PathFilter filter, OnPrePosition
callbackk)
+ {
+ return onPrePosition((path, channel, position, newPosition) -> {
+ if (filter.accept(path))
+ callbackk.prePosition(path, channel, position, newPosition);
+ });
+ }
+
+ public Unsubscribable onPostPosition(OnPostPosition callbackk)
+ {
+ return listen(callbackk);
+ }
+
+ public Unsubscribable onPostPosition(PathFilter filter, OnPostPosition
callbackk)
+ {
+ return onPostPosition((path, channel, position, newPosition) -> {
+ if (filter.accept(path))
+ callbackk.postPosition(path, channel, position, newPosition);
+ });
+ }
+
+ public Unsubscribable onPreTruncate(OnPreTruncate callbackk)
+ {
+ return listen(callbackk);
+ }
+
+ public Unsubscribable onPreTruncate(PathFilter filter, OnPreTruncate
callbackk)
+ {
+ return onPreTruncate((path, channel, size, targetSize) -> {
+ if (filter.accept(path))
+ callbackk.preTruncate(path, channel, size, targetSize);
+ });
+ }
+
+ public Unsubscribable onPostTruncate(OnPostTruncate callbackk)
+ {
+ return listen(callbackk);
+ }
+
+ public Unsubscribable onPostTruncate(PathFilter filter, OnPostTruncate
callbackk)
+ {
+ return onPostTruncate((path, channel, size, targetSize, newSize) -> {
+ if (filter.accept(path))
+ callbackk.postTruncate(path, channel, size, targetSize,
newSize);
+ });
+ }
+
+ public Unsubscribable onPreForce(OnPreForce callbackk)
+ {
+ return listen(callbackk);
+ }
+
+ public Unsubscribable onPreForce(PathFilter filter, OnPreForce callback)
+ {
+ return onPreForce((path, channel, metadata) -> {
+ if (filter.accept(path))
+ callback.preForce(path, channel, metadata);
+ });
+ }
+
+ public Unsubscribable onPostForce(OnPostForce callbackk)
+ {
+ return listen(callbackk);
+ }
+
+ public Unsubscribable onPostForce(PathFilter filter, OnPostForce callback)
+ {
+ return onPostForce((path, channel, metadata) -> {
+ if (filter.accept(path))
+ callback.postForce(path, channel, metadata);
+ });
+ }
+
+ public void remove(Listener listener)
+ {
+ remove(lists, listener);
+ }
+
+ private static void remove(List<List<? extends Listener>> lists, Listener
listener)
+ {
+ lists.forEach(l -> l.remove(listener));
+ }
+
+ public void clearListeners()
+ {
+ lists.forEach(List::clear);
+ }
+
+ private interface ListenerAction<T>
+ {
+ void accept(T value) throws IOException;
+ }
+
+ private <T> void notifyListeners(List<T> listeners, ListenerAction<T> fn)
throws IOException
+ {
+ for (T listener : listeners)
+ fn.accept(listener);
+ }
+
+ @Override
+ protected Path wrap(Path p)
+ {
+ return p instanceof ListenablePath ? p : new ListenablePath(p);
+ }
+
+ @Override
+ protected Path unwrap(Path p)
+ {
+ return p instanceof ListenablePath ? ((ListenablePath) p).delegate : p;
+ }
+
+ @Override
+ public ListenableFileSystemProvider provider()
+ {
+ return provider;
+ }
+
+
+ private class ListenableFileSystemProvider extends
ForwardingFileSystemProvider
+ {
+ ListenableFileSystemProvider(FileSystemProvider delegate)
+ {
+ super(delegate);
+ }
+
+ @Override
+ protected Path wrap(Path a)
+ {
+ return ListenableFileSystem.this.wrap(a);
+ }
+
+ @Override
+ protected Path unwrap(Path p)
+ {
+ return ListenableFileSystem.this.unwrap(p);
+ }
+
+ @Override
+ public OutputStream newOutputStream(Path path, OpenOption... options)
throws IOException
+ {
+ int len = options.length;
+ Set<OpenOption> opts = new HashSet<>(len + 3);
+ if (len == 0)
+ {
+ opts.add(StandardOpenOption.CREATE);
+ opts.add(StandardOpenOption.TRUNCATE_EXISTING);
+ }
+ else
+ {
+ for (OpenOption opt : options)
+ {
+ if (opt == StandardOpenOption.READ)
+ throw new IllegalArgumentException("READ not allowed");
+ opts.add(opt);
+ }
+ }
+ opts.add(StandardOpenOption.WRITE);
+ return Channels.newOutputStream(newFileChannel(path, opts));
+ }
+
+ @Override
+ public InputStream newInputStream(Path path, OpenOption... options)
throws IOException
+ {
+ for (OpenOption opt : options)
+ {
+ // All OpenOption values except for APPEND and WRITE are
allowed
+ if (opt == StandardOpenOption.APPEND ||
+ opt == StandardOpenOption.WRITE)
+ throw new UnsupportedOperationException("'" + opt + "' not
allowed");
+ }
+ Set<OpenOption> opts = new HashSet<>(Arrays.asList(options));
+ return Channels.newInputStream(newFileChannel(path, opts));
+ }
+
+ @Override
+ public SeekableByteChannel newByteChannel(Path path, Set<? extends
OpenOption> options, FileAttribute<?>... attrs) throws IOException
+ {
+ return newFileChannel(path, options, attrs);
+ }
+
+ @Override
+ public FileChannel newFileChannel(Path path, Set<? extends OpenOption>
options, FileAttribute<?>... attrs) throws IOException
+ {
+ notifyListeners(onPreOpen, l -> l.preOpen(path, options, attrs));
+ ListenableFileChannel channel = new ListenableFileChannel(path,
delegate().newFileChannel(unwrap(path), options, attrs));
+ notifyListeners(onPostOpen, l -> l.postOpen(path, options, attrs,
channel));
+ return channel;
+ }
+
+ @Override
+ public AsynchronousFileChannel newAsynchronousFileChannel(Path path,
Set<? extends OpenOption> options, ExecutorService executor,
FileAttribute<?>... attrs) throws IOException
+ {
+ throw new UnsupportedOperationException("TODO");
+ }
+
+ // block the APIs that try to switch FileSystem based off schema
+ @Override
+ public FileSystem newFileSystem(URI uri, Map<String, ?> env) throws
IOException
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public FileSystem newFileSystem(Path path, Map<String, ?> env) throws
IOException
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public String getScheme()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public FileSystem getFileSystem(URI uri)
+ {
+ throw new UnsupportedOperationException();
+ }
+ }
+
+ private class ListenablePath extends ForwardingPath
+ {
+ public ListenablePath(Path delegate)
+ {
+ super(delegate);
+ }
+
+ @Override
+ protected Path wrap(Path a)
+ {
+ return ListenableFileSystem.this.wrap(a);
+ }
+
+ @Override
+ protected Path unwrap(Path p)
+ {
+ return ListenableFileSystem.this.unwrap(p);
+ }
+
+ @Override
+ public FileSystem getFileSystem()
+ {
+ return ListenableFileSystem.this;
+ }
+
+ @Override
+ public File toFile()
+ {
+ if (delegate().getFileSystem() == FileSystems.getDefault())
+ return delegate().toFile();
+ throw new UnsupportedOperationException();
+ }
+ }
+
+ private class ListenableFileChannel extends ForwardingFileChannel
+ {
+ private final AtomicReference<Mapped> mutable = new
AtomicReference<>();
+ private final Path path;
+
+ ListenableFileChannel(Path path, FileChannel delegate)
+ {
+ super(delegate);
+ this.path = path;
+ }
+
+ @Override
+ public int read(ByteBuffer dst) throws IOException
+ {
+ long position = position();
+ notifyListeners(onPreRead, l -> l.preRead(path, this, position,
dst));
+ int read = super.read(dst);
+ notifyListeners(onPostRead, l -> l.postRead(path, this, position,
dst, read));
+ return read;
+ }
+
+ @Override
+ public int read(ByteBuffer dst, long position) throws IOException
+ {
+ notifyListeners(onPreRead, l -> l.preRead(path, this, position,
dst));
+ int read = super.read(dst, position);
+ notifyListeners(onPostRead, l -> l.postRead(path, this, position,
dst, read));
+ return read;
+ }
+
+ @Override
+ public int write(ByteBuffer src) throws IOException
+ {
+ long position = position();
+ notifyListeners(onPreWrite, l -> l.preWrite(path, this, position,
src));
+ int write = super.write(src);
+ notifyListeners(onPostWrite, l -> l.postWrite(path, this,
position, src, write));
+ return write;
+ }
+
+ @Override
+ public int write(ByteBuffer src, long position) throws IOException
+ {
+ notifyListeners(onPreWrite, l -> l.preWrite(path, this, position,
src));
+ int write = super.write(src, position);
+ notifyListeners(onPostWrite, l -> l.postWrite(path, this,
position, src, write));
+ return write;
+ }
+
+ @Override
+ public FileChannel position(long newPosition) throws IOException
+ {
+ long position = position();
+ notifyListeners(onPrePosition, l -> l.prePosition(path, this,
position, newPosition));
+ super.position(newPosition);
+ notifyListeners(onPostPosition, l -> l.postPosition(path, this,
position, newPosition));
+ return this;
+ }
+
+ @Override
+ public FileChannel truncate(long size) throws IOException
+ {
+ long currentSize = this.size();
+ notifyListeners(onPreTruncate, l -> l.preTruncate(path, this,
currentSize, size));
+ super.truncate(size);
+ long latestSize = this.size();
+ notifyListeners(onPostTruncate, l -> l.postTruncate(path, this,
currentSize, size, latestSize));
+ return this;
+ }
+
+ @Override
+ public void force(boolean metaData) throws IOException
+ {
+ notifyListeners(onPreForce, l -> l.preForce(path, this, metaData));
+ super.force(metaData);
+ notifyListeners(onPostForce, l -> l.postForce(path, this,
metaData));
+ }
+
+ @Override
+ public long transferTo(long position, long count, WritableByteChannel
target) throws IOException
+ {
+ notifyListeners(onPreTransferTo, l -> l.preTransferTo(path, this,
position, count, target));
+ long transfered = super.transferTo(position, count, target);
+ notifyListeners(onPostTransferTo, l -> l.postTransferTo(path,
this, position, count, target, transfered));
+ return transfered;
+ }
+
+ @Override
+ public long transferFrom(ReadableByteChannel src, long position, long
count) throws IOException
+ {
+ notifyListeners(onPreTransferFrom, l -> l.preTransferFrom(path,
this, src, position, count));
+ long transfered = super.transferFrom(src, position, count);
+ notifyListeners(onPostTransferFrom, l -> l.postTransferFrom(path,
this, src, position, count, transfered));
+ return transfered;
+ }
+
+ @Override
+ public MappedByteBuffer map(MapMode mode, long position, long size)
throws IOException
+ {
+ // this behavior isn't 100% correct... if you mix access with
FileChanel and ByteBuffer you will get different
+ // results than with a real mmap solution... This limitation is
due to ByteBuffer being private, so not able
+ // to create custom BBs to mimc this access...
+ if (mode == MapMode.READ_WRITE && mutable.get() != null)
+ throw new UnsupportedOperationException("map called twice with
mode READ_WRITE; first was " + mutable.get() + ", now " + new Mapped(mode,
null, position, Math.toIntExact(size)));
+
+ int isize = Math.toIntExact(size);
+ MappedByteBuffer bb = (MappedByteBuffer)
ByteBuffer.allocateDirect(isize);
+
+ Mapped mapped = new Mapped(mode, bb, position, isize);
+ if (mode == MapMode.READ_ONLY)
+ {
+ ByteBufferUtil.readFully(this, mapped.bb, position);
+ mapped.bb.flip();
+
+ Runnable forcer = () -> {
+ };
+ MemoryUtil.setAttachment(bb, forcer);
+ }
+ else if (mode == MapMode.READ_WRITE)
+ {
+ if (delegate().size() - position > 0)
+ {
+ ByteBufferUtil.readFully(this, mapped.bb, position);
+ mapped.bb.flip();
+ }
+ // with real files the FD gets copied so the close of the
channel does not block the BB mutation
+ // from flushing... it's possible to support this use case,
but kept things simplier for now by
+ // failing if the backing channel was closed.
+ Runnable forcer = () -> {
+ ByteBuffer local = bb.duplicate();
+ local.position(0);
+ long pos = position;
+ try
+ {
+ while (local.hasRemaining())
+ {
+ int wrote = write(local, pos);
+ if (wrote == -1)
+ throw new EOFException();
+ pos += wrote;
+ }
+ }
+ catch (IOException e)
+ {
+ throw new UncheckedIOException(e);
+ }
+ };
+ MemoryUtil.setAttachment(bb, forcer);
+ if (!mutable.compareAndSet(null, mapped))
+ throw new UnsupportedOperationException("map called
twice");
+ }
+ else
+ {
+ throw new UnsupportedOperationException("Unsupported mode: " +
mode);
+ }
+ return mapped.bb;
+ }
+
+ @Override
+ protected void implCloseChannel() throws IOException
+ {
+ super.implCloseChannel();
+ mutable.set(null);
+ }
+ }
+
+ private static class Mapped
+ {
+ final FileChannel.MapMode mode;
+ final MappedByteBuffer bb;
+ final long position;
+ final int size;
+
+ private Mapped(FileChannel.MapMode mode, MappedByteBuffer bb, long
position, int size)
+ {
+ this.mode = mode;
+ this.bb = bb;
+ this.position = position;
+ this.size = size;
+ }
+
+ @Override
+ public String toString()
+ {
+ return "Mapped{" +
+ "mode=" + mode +
+ ", position=" + position +
+ ", size=" + size +
+ '}';
+ }
+ }
+}
diff --git a/test/unit/org/apache/cassandra/io/util/FileSystems.java
b/test/unit/org/apache/cassandra/io/util/FileSystems.java
new file mode 100644
index 0000000000..fcd5db7dac
--- /dev/null
+++ b/test/unit/org/apache/cassandra/io/util/FileSystems.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.io.util;
+
+import java.io.IOException;
+import java.nio.file.FileSystem;
+
+import com.google.common.base.StandardSystemProperty;
+import com.google.common.jimfs.Configuration;
+import com.google.common.jimfs.Jimfs;
+
+import org.apache.cassandra.io.filesystem.ForwardingFileSystem;
+import org.apache.cassandra.io.filesystem.ListenableFileSystem;
+
+public class FileSystems
+{
+ public static ListenableFileSystem newGlobalInMemoryFileSystem()
+ {
+ return global(jimfs());
+ }
+
+ public static ListenableFileSystem global()
+ {
+ return global(File.unsafeGetFilesystem());
+ }
+
+ public static ListenableFileSystem global(FileSystem real)
+ {
+ FileSystem current = File.unsafeGetFilesystem();
+ ListenableFileSystem fs = new ListenableFileSystem(new
ForwardingFileSystem(real)
+ {
+ @Override
+ public void close() throws IOException
+ {
+ try
+ {
+ super.close();
+ }
+ finally
+ {
+ File.unsafeSetFilesystem(current);
+ }
+ }
+ });
+ File.unsafeSetFilesystem(fs);
+ return fs;
+ }
+
+ public static FileSystem jimfs()
+ {
+ return Jimfs.newFileSystem(jimfsConfig());
+ }
+
+ public static FileSystem jimfs(String name)
+ {
+ return Jimfs.newFileSystem(name, jimfsConfig());
+ }
+
+ private static Configuration jimfsConfig()
+ {
+ return Configuration.unix().toBuilder()
+ .setMaxSize(4L << 30).setBlockSize(512)
+ .build();
+ }
+
+ public static File maybeCreateTmp()
+ {
+ File dir = new File(StandardSystemProperty.JAVA_IO_TMPDIR.value());
+ if (!dir.exists())
+ dir.tryCreateDirectories();
+ return dir;
+ }
+}
diff --git
a/test/unit/org/apache/cassandra/schema/RemoveWithoutDroppingTest.java
b/test/unit/org/apache/cassandra/schema/RemoveWithoutDroppingTest.java
index 9c3271bd69..fb9a3e13cf 100644
--- a/test/unit/org/apache/cassandra/schema/RemoveWithoutDroppingTest.java
+++ b/test/unit/org/apache/cassandra/schema/RemoveWithoutDroppingTest.java
@@ -28,6 +28,7 @@ import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
+import org.apache.cassandra.ServerTestUtils;
import org.apache.cassandra.cql3.CQLTester;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.io.util.File;
@@ -46,6 +47,8 @@ public class RemoveWithoutDroppingTest
@BeforeClass
public static void beforeClass()
{
+ ServerTestUtils.daemonInitialization();
+
System.setProperty(SchemaUpdateHandlerFactoryProvider.SUH_FACTORY_CLASS_PROPERTY,
TestSchemaUpdateHandlerFactory.class.getName());
CQLTester.prepareServer();
Schema.instance.registerListener(listener);
diff --git a/test/unit/org/apache/cassandra/service/PartitionDenylistTest.java
b/test/unit/org/apache/cassandra/service/PartitionDenylistTest.java
index ed51518719..4a93c67bfe 100644
--- a/test/unit/org/apache/cassandra/service/PartitionDenylistTest.java
+++ b/test/unit/org/apache/cassandra/service/PartitionDenylistTest.java
@@ -23,6 +23,7 @@ import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
+import org.apache.cassandra.ServerTestUtils;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.cql3.CQLTester;
import org.apache.cassandra.cql3.UntypedResultSet;
@@ -44,6 +45,8 @@ public class PartitionDenylistTest
@BeforeClass
public static void init()
{
+ ServerTestUtils.daemonInitialization();
+
CQLTester.prepareServer();
KeyspaceMetadata schema = KeyspaceMetadata.create(ks_cql,
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]