This is an automated email from the ASF dual-hosted git repository.
konstantinov pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push:
new ea3d8a3d31 Wire compaction_read_disk_access_mode through cursor-based
compaction
ea3d8a3d31 is described below
commit ea3d8a3d31040a7c6e953c654482632812e7dbc0
Author: samlightfoot <[email protected]>
AuthorDate: Mon Mar 16 10:51:50 2026 +0000
Wire compaction_read_disk_access_mode through cursor-based compaction
Make MemoryUtil.clean() throw for slices/duplicates that cannot be cleaned
and fix mmap cleanup
Parameterise main compaction tests for cursor/iterator and buffered/direct
combinations
patch by Sam Lightfoot; reviewed by Ariel Weisberg, Dmitry Konstantinov for
CASSANDRA-21147
---
CHANGES.txt | 1 +
.../cassandra/db/compaction/CursorCompactor.java | 39 ++-
.../cassandra/db/compaction/StatefulCursor.java | 5 +-
.../cassandra/io/sstable/SSTableCursorReader.java | 14 +-
.../cassandra/io/sstable/format/SSTableReader.java | 44 +++-
.../io/util/DirectThreadLocalByteBufferHolder.java | 6 +-
.../io/util/DirectThreadLocalReadAheadBuffer.java | 13 +-
.../org/apache/cassandra/io/util/FileHandle.java | 5 -
.../io/util/ThreadLocalReadAheadBuffer.java | 7 +-
.../apache/cassandra/utils/memory/BufferPool.java | 6 +-
.../apache/cassandra/utils/memory/MemoryUtil.java | 47 +++-
.../db/compaction/CompactionsPurgeTest.java | 42 ++++
.../cassandra/db/compaction/CompactionsTest.java | 47 +++-
.../db/compaction/simple/SimpleCompactionTest.java | 43 ++++
.../format/SSTableReaderDataReaderTest.java | 275 +++++++++++++++++++++
.../util/DirectThreadLocalReadAheadBufferTest.java | 49 ++++
.../apache/cassandra/transport/WriteBytesTest.java | 10 +-
.../cassandra/utils/memory/MemoryUtilTest.java | 84 +++++--
18 files changed, 667 insertions(+), 70 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index c99fc2e588..1c7b5a55a5 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
5.1
+ * Support compaction_read_disk_access_mode for cursor-based compaction
(CASSANDRA-21147)
* Allow value/element indexing on frozen collections in SAI (CASSANDRA-18492)
* Add tool to offline dump cluster metadata and the log (CASSANDRA-21129)
* Send client warnings when writing to a large partition (CASSANDRA-17258)
diff --git a/src/java/org/apache/cassandra/db/compaction/CursorCompactor.java
b/src/java/org/apache/cassandra/db/compaction/CursorCompactor.java
index 3b4819c156..7d528b5e2d 100644
--- a/src/java/org/apache/cassandra/db/compaction/CursorCompactor.java
+++ b/src/java/org/apache/cassandra/db/compaction/CursorCompactor.java
@@ -27,11 +27,11 @@ import java.util.List;
import java.util.function.LongPredicate;
import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.UnmodifiableIterator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.cassandra.config.Config.DiskAccessMode;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.AbstractCompactionController;
import org.apache.cassandra.db.ClusteringComparator;
@@ -70,6 +70,7 @@ import org.apache.cassandra.schema.ColumnMetadata;
import org.apache.cassandra.schema.CompactionParams;
import org.apache.cassandra.schema.SchemaConstants;
import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.utils.Throwables;
import org.apache.cassandra.utils.TimeUUID;
import static org.apache.cassandra.db.ClusteringPrefix.Kind.EXCL_END_BOUND;
@@ -294,15 +295,8 @@ public class CursorCompactor extends CompactionInfo.Holder
* {@link CompactionIterator#CompactionIterator(OperationType, List,
AbstractCompactionController, long, TimeUUID, ActiveCompactionsTracker)}
*/
- // Convert Readers to Cursors
- this.sstableCursors = new StatefulCursor[sstables.size()];
+ this.sstableCursors = convertScannersToCursors(scanners, sstables,
DatabaseDescriptor.getCompactionReadDiskAccessMode());
this.sstableCursorsEqualsNext = new boolean[sstables.size()];
- UnmodifiableIterator<SSTableReader> iterator = sstables.iterator();
- for (int i = 0; i < this.sstableCursors.length; i++)
- {
- SSTableReader ssTableReader = iterator.next();
- this.sstableCursors[i] = new StatefulCursor(ssTableReader);
- }
this.enforceStrictLiveness =
controller.cfs.metadata.get().enforceStrictLiveness();
purger = new Purger(type, controller, nowInSec);
@@ -1553,6 +1547,33 @@ public class CursorCompactor extends
CompactionInfo.Holder
return sb.toString();
}
+ /**
+ * Closes scanner-opened readers before opening cursor-specific readers
with the configured disk access mode.
+ * In cursor compaction, scanners are only used for metadata; closing them
avoids holding redundant file
+ * descriptors and prevents conflicts when scan and non-scan readers for
the same file share thread-local
+ * buffer state on the same thread.
+ */
+ private static StatefulCursor[]
convertScannersToCursors(List<ISSTableScanner> scanners,
ImmutableSet<SSTableReader> sstables,
+ DiskAccessMode
diskAccessMode)
+ {
+ for (ISSTableScanner scanner : scanners)
+ scanner.close();
+
+ StatefulCursor[] cursors = new StatefulCursor[sstables.size()];
+ int i = 0;
+ try
+ {
+ for (SSTableReader reader : sstables)
+ cursors[i++] = new StatefulCursor(reader, diskAccessMode);
+ return cursors;
+ }
+ catch (RuntimeException | Error e)
+ {
+ Throwables.closeNonNullAndAddSuppressed(e, cursors);
+ throw e;
+ }
+ }
+
public void close()
{
try
diff --git a/src/java/org/apache/cassandra/db/compaction/StatefulCursor.java
b/src/java/org/apache/cassandra/db/compaction/StatefulCursor.java
index c28d218ac3..f0d81a26c1 100644
--- a/src/java/org/apache/cassandra/db/compaction/StatefulCursor.java
+++ b/src/java/org/apache/cassandra/db/compaction/StatefulCursor.java
@@ -19,6 +19,7 @@
package org.apache.cassandra.db.compaction;
import org.apache.cassandra.config.Config;
+import org.apache.cassandra.config.Config.DiskAccessMode;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.db.ReusableLivenessInfo;
@@ -55,9 +56,9 @@ class StatefulCursor extends SSTableCursorReader
private boolean isOpenRangeTombstonePresent = false;
- public StatefulCursor(SSTableReader reader)
+ public StatefulCursor(SSTableReader reader, DiskAccessMode diskAccessMode)
{
- super(reader);
+ super(reader, diskAccessMode);
currPartition = new
PartitionDescriptor(reader.getPartitioner().createReusableKey(0));
prevPartition = new
PartitionDescriptor(reader.getPartitioner().createReusableKey(0));
unfiltered = new
UnfilteredDescriptor(reader.header.clusteringTypes().toArray(AbstractType[]::new));
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableCursorReader.java
b/src/java/org/apache/cassandra/io/sstable/SSTableCursorReader.java
index ccf94ce6da..bfa6d4be01 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableCursorReader.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableCursorReader.java
@@ -22,6 +22,7 @@ import java.io.IOException;
import com.google.common.collect.ImmutableList;
+import org.apache.cassandra.config.Config.DiskAccessMode;
import org.apache.cassandra.db.ClusteringPrefix;
import org.apache.cassandra.db.Columns;
import org.apache.cassandra.db.DeletionTime;
@@ -197,15 +198,20 @@ public class SSTableCursorReader implements AutoCloseable
{
TableMetadata metadata = Util.metadataFromSSTable(desc);
SSTableReader reader = SSTableReader.openNoValidation(null, desc,
TableMetadataRef.forOfflineTools(metadata));
- return new SSTableCursorReader(reader, metadata, reader.ref());
+ return new SSTableCursorReader(reader, metadata, reader.ref(), null);
}
public SSTableCursorReader(SSTableReader reader)
{
- this(reader, reader.metadata(), null);
+ this(reader, reader.metadata(), null, null);
}
- private SSTableCursorReader(SSTableReader reader, TableMetadata metadata,
Ref<SSTableReader> readerRef)
+ public SSTableCursorReader(SSTableReader reader, DiskAccessMode
diskAccessMode)
+ {
+ this(reader, reader.metadata(), null, diskAccessMode);
+ }
+
+ private SSTableCursorReader(SSTableReader reader, TableMetadata metadata,
Ref<SSTableReader> readerRef, DiskAccessMode diskAccessMode)
{
ssTableReader = reader;
ssTableReaderRef = readerRef;
@@ -221,7 +227,7 @@ public class SSTableCursorReader implements AutoCloseable
deserializationHelper = new DeserializationHelper(metadata,
version.correspondingMessagingVersion(), DeserializationHelper.Flag.LOCAL,
null);
serializationHeader = reader.header;
- dataReader = reader.openDataReader();
+ dataReader = reader.openDataReaderForScan(diskAccessMode);
hasStaticColumns = metadata.hasStaticColumns();
}
diff --git a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
b/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
index 37e73d47b4..ab3171200f 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
@@ -38,6 +38,8 @@ import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantReadWriteLock;
+import javax.annotation.Nullable;
+
import com.clearspring.analytics.stream.cardinality.CardinalityMergeException;
import com.clearspring.analytics.stream.cardinality.ICardinality;
import com.google.common.annotations.VisibleForTesting;
@@ -1417,44 +1419,60 @@ public abstract class SSTableReader extends SSTable
implements UnfilteredSource,
return sstableMetadata;
}
+ public RandomAccessReader openDataReader()
+ {
+ return openDataReaderInternal(null, null, false);
+ }
+
public RandomAccessReader openDataReader(RateLimiter limiter)
{
assert limiter != null;
- return dfile.createReader(limiter);
+ return openDataReaderInternal(null, limiter, false);
}
- public RandomAccessReader openDataReader()
+ public RandomAccessReader openDataReader(DiskAccessMode diskAccessMode)
{
- return dfile.createReader();
+ return openDataReaderInternal(diskAccessMode, null, false);
}
public RandomAccessReader openDataReaderForScan()
{
- return openDataReaderForScan(dfile.diskAccessMode());
+ return openDataReaderInternal(null, null, true);
}
public RandomAccessReader openDataReaderForScan(DiskAccessMode
diskAccessMode)
{
- boolean isSameDiskAccessMode = diskAccessMode ==
dfile.diskAccessMode();
- boolean isDirectIONotSupported = diskAccessMode ==
DiskAccessMode.direct && !dfile.supportsDirectIO();
+ return openDataReaderInternal(diskAccessMode, null, true);
+ }
- if (isSameDiskAccessMode || isDirectIONotSupported)
- return dfile.createReaderForScan(OnReaderClose.RETAIN_FILE_OPEN);
+ private RandomAccessReader openDataReaderInternal(@Nullable DiskAccessMode
diskAccessMode,
+ @Nullable RateLimiter
limiter,
+ boolean forScan)
+ {
+ if (canReuseDfile(diskAccessMode))
+ return dfile.createReader(limiter, forScan,
OnReaderClose.RETAIN_FILE_OPEN);
- FileHandle dataFile = dfile.toBuilder()
- .withDiskAccessMode(diskAccessMode)
- .complete();
+ FileHandle handle = dfile.toBuilder()
+ .withDiskAccessMode(diskAccessMode)
+ .complete();
try
{
- return dataFile.createReaderForScan(OnReaderClose.CLOSE_FILE);
+ return handle.createReader(limiter, forScan,
OnReaderClose.CLOSE_FILE);
}
catch (Throwable t)
{
- dataFile.close();
+ handle.close();
throw t;
}
}
+ private boolean canReuseDfile(@Nullable DiskAccessMode diskAccessMode)
+ {
+ return diskAccessMode == null
+ || diskAccessMode == dfile.diskAccessMode()
+ || (diskAccessMode == DiskAccessMode.direct &&
!dfile.supportsDirectIO());
+ }
+
public void trySkipFileCacheBefore(DecoratedKey key)
{
long position = getPosition(key, SSTableReader.Operator.GE);
diff --git
a/src/java/org/apache/cassandra/io/util/DirectThreadLocalByteBufferHolder.java
b/src/java/org/apache/cassandra/io/util/DirectThreadLocalByteBufferHolder.java
index ec4c9abb77..eb46149665 100644
---
a/src/java/org/apache/cassandra/io/util/DirectThreadLocalByteBufferHolder.java
+++
b/src/java/org/apache/cassandra/io/util/DirectThreadLocalByteBufferHolder.java
@@ -70,10 +70,8 @@ public final class DirectThreadLocalByteBufferHolder
implements ByteBufferHolder
private static void cleanBuffer(ByteBuffer buffer)
{
- // Aligned buffers are slices; clean the backing buffer (attachment)
- DirectBuffer db = (DirectBuffer) buffer;
- ByteBuffer attachment = (ByteBuffer) db.attachment();
- MemoryUtil.clean(attachment != null ? attachment : buffer);
+ // Aligned buffers from BufferUtil.allocateDirectAligned are slices;
clean the backing buffer (attachment)
+ MemoryUtil.clean((ByteBuffer) ((DirectBuffer) buffer).attachment());
}
}
\ No newline at end of file
diff --git
a/src/java/org/apache/cassandra/io/util/DirectThreadLocalReadAheadBuffer.java
b/src/java/org/apache/cassandra/io/util/DirectThreadLocalReadAheadBuffer.java
index a2c66cc0f7..934e3620e1 100644
---
a/src/java/org/apache/cassandra/io/util/DirectThreadLocalReadAheadBuffer.java
+++
b/src/java/org/apache/cassandra/io/util/DirectThreadLocalReadAheadBuffer.java
@@ -24,6 +24,9 @@ import org.agrona.BitUtil;
import org.agrona.BufferUtil;
import org.apache.cassandra.io.sstable.CorruptSSTableException;
+import org.apache.cassandra.utils.memory.MemoryUtil;
+
+import sun.nio.ch.DirectBuffer;
public final class DirectThreadLocalReadAheadBuffer extends
ThreadLocalReadAheadBuffer
{
@@ -46,4 +49,12 @@ public final class DirectThreadLocalReadAheadBuffer extends
ThreadLocalReadAhead
if (channel.read(blockBuffer, blockPosition) < sizeToRead)
throw new CorruptSSTableException(null, channel.filePath());
}
-}
+
+ @Override
+ protected void cleanBuffer(ByteBuffer buffer)
+ {
+ // Aligned buffers from BufferUtil.allocateDirectAligned are slices;
clean the backing buffer (attachment)
+ MemoryUtil.clean((ByteBuffer) ((DirectBuffer) buffer).attachment());
+ }
+
+}
\ No newline at end of file
diff --git a/src/java/org/apache/cassandra/io/util/FileHandle.java
b/src/java/org/apache/cassandra/io/util/FileHandle.java
index 1153dcb046..e4101e7eb3 100644
--- a/src/java/org/apache/cassandra/io/util/FileHandle.java
+++ b/src/java/org/apache/cassandra/io/util/FileHandle.java
@@ -196,11 +196,6 @@ public class FileHandle extends SharedCloseableImpl
return createReader(null);
}
- public RandomAccessReader createReaderForScan(OnReaderClose onReaderClose)
- {
- return createReader(null, true, onReaderClose);
- }
-
/**
* Create {@link RandomAccessReader} with configured method of reading
content of the file.
* Reading from file will be rate limited by given {@link RateLimiter}.
diff --git
a/src/java/org/apache/cassandra/io/util/ThreadLocalReadAheadBuffer.java
b/src/java/org/apache/cassandra/io/util/ThreadLocalReadAheadBuffer.java
index 3a564931a9..c8ef6cf1e3 100644
--- a/src/java/org/apache/cassandra/io/util/ThreadLocalReadAheadBuffer.java
+++ b/src/java/org/apache/cassandra/io/util/ThreadLocalReadAheadBuffer.java
@@ -156,11 +156,16 @@ public class ThreadLocalReadAheadBuffer implements
Closeable
blockBuffer.clear();
if (deallocate)
{
- MemoryUtil.clean(blockBuffer);
+ cleanBuffer(blockBuffer);
block.buffer = null;
}
}
+ protected void cleanBuffer(ByteBuffer buffer)
+ {
+ MemoryUtil.clean(buffer);
+ }
+
@Override
public void close()
{
diff --git a/src/java/org/apache/cassandra/utils/memory/BufferPool.java
b/src/java/org/apache/cassandra/utils/memory/BufferPool.java
index c85368c3a8..dfd9702d28 100644
--- a/src/java/org/apache/cassandra/utils/memory/BufferPool.java
+++ b/src/java/org/apache/cassandra/utils/memory/BufferPool.java
@@ -1566,7 +1566,11 @@ public class BufferPool
if (parent != null)
parent.free(slab);
else
- MemoryUtil.clean(slab);
+ {
+ // slab may be an aligned slice from allocateDirectAligned();
clean the root allocation
+ ByteBuffer attachment = (ByteBuffer) ((DirectBuffer)
slab).attachment();
+ MemoryUtil.clean(attachment != null ? attachment : slab);
+ }
}
static void unsafeRecycle(Chunk chunk)
diff --git a/src/java/org/apache/cassandra/utils/memory/MemoryUtil.java
b/src/java/org/apache/cassandra/utils/memory/MemoryUtil.java
index ec94da2c93..0f9fca5c55 100644
--- a/src/java/org/apache/cassandra/utils/memory/MemoryUtil.java
+++ b/src/java/org/apache/cassandra/utils/memory/MemoryUtil.java
@@ -24,6 +24,9 @@ import java.nio.ByteOrder;
import com.sun.jna.Native;
+import org.apache.cassandra.utils.concurrent.Ref;
+
+import jdk.internal.ref.Cleaner;
import sun.misc.Unsafe;
import sun.nio.ch.DirectBuffer;
@@ -330,9 +333,47 @@ public abstract class MemoryUtil
return;
DirectBuffer db = (DirectBuffer) buffer;
- if (db.attachment() != null)
- return; // duplicate or slice
+ Cleaner cleaner = db.cleaner();
+
+ if (cleaner == null)
+ {
+ // No cleaner means this buffer does not own its memory (e.g.
slice, duplicate, pool sub-allocation,
+ // hollow buffer). A non-null attachment confirms it's a view;
null attachment means it's a hollow/synthetic
+ // buffer or was already cleaned.
+ if (db.attachment() != null)
+ throw new IllegalArgumentException(
+ "Cannot clean a buffer with no cleaner and attachment type
"
+ + db.attachment().getClass().getName() + "; this buffer
does not own its memory. "
+ + "For slices/duplicates, resolve to the root allocation
before calling clean()");
+ return;
+ }
+
+ Object attachment = db.attachment();
+ if (!isSafeAttachment(attachment))
+ throw new IllegalArgumentException(
+ "Buffer has a cleaner but an unexpected attachment type "
+ + attachment.getClass().getName()
+ + "; this may indicate corrupted buffer state");
- unsafe.invokeCleaner(buffer);
+ cleaner.clean();
}
+
+ /**
+ * Allow-list of attachment types expected on buffers that have a cleaner.
Any new attachment type set
+ * on a root allocation via {@link #setAttachment} must be added here with
justification.
+ * <ul>
+ * <li>{@code null} – normal case for root allocations ({@code
ByteBuffer.allocateDirect}) and mmap buffers</li>
+ * <li>{@link Runnable} – mmap force callback set by {@code
ListenableFileSystem} and dispatched
+ * by {@code SyncUtil.force()} to simulate mmap flush behaviour in
tests</li>
+ * <li>{@link Ref.DirectBufferRef} – reference tracking metadata set on
root allocations by
+ * {@code MerkleTree} when {@code Ref.TRACE_ENABLED} is true</li>
+ * </ul>
+ */
+ private static boolean isSafeAttachment(Object attachment)
+ {
+ return attachment == null
+ || attachment instanceof Runnable
+ || attachment instanceof Ref.DirectBufferRef;
+ }
+
}
diff --git
a/test/unit/org/apache/cassandra/db/compaction/CompactionsPurgeTest.java
b/test/unit/org/apache/cassandra/db/compaction/CompactionsPurgeTest.java
index 315e268abe..3fa099b6fe 100644
--- a/test/unit/org/apache/cassandra/db/compaction/CompactionsPurgeTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/CompactionsPurgeTest.java
@@ -18,16 +18,23 @@
*/
package org.apache.cassandra.db.compaction;
+import java.util.Arrays;
import java.util.Collection;
import java.util.concurrent.ExecutionException;
import com.google.common.collect.Iterables;
+import org.junit.After;
+import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
import org.apache.cassandra.SchemaLoader;
import org.apache.cassandra.Util;
+import org.apache.cassandra.config.Config.DiskAccessMode;
+import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.cql3.QueryProcessor;
import org.apache.cassandra.cql3.UntypedResultSet;
import org.apache.cassandra.cql3.statements.schema.CreateTableStatement;
@@ -51,6 +58,7 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
+@RunWith(Parameterized.class)
public class CompactionsPurgeTest
{
private static final String KEYSPACE1 = "CompactionsPurgeTest1";
@@ -62,6 +70,40 @@ public class CompactionsPurgeTest
private static final String KEYSPACE_CQL = "cql_keyspace";
private static final String CF_CQL = "table1";
+ @Parameterized.Parameter(0)
+ public DiskAccessMode compactionReadDiskAccessMode;
+
+ @Parameterized.Parameter(1)
+ public boolean cursorCompactionEnabled;
+
+ @Parameterized.Parameters(name = "diskAccessMode={0},cursor={1}")
+ public static Collection<Object[]> params()
+ {
+ return Arrays.asList(new Object[]{ DiskAccessMode.standard, true },
+ new Object[]{ DiskAccessMode.standard, false },
+ new Object[]{ DiskAccessMode.direct, true },
+ new Object[]{ DiskAccessMode.direct, false });
+ }
+
+ private DiskAccessMode originalDiskAccessMode;
+ private boolean originalCursorCompactionEnabled;
+
+ @Before
+ public void setCompactionParams()
+ {
+ originalDiskAccessMode =
DatabaseDescriptor.getCompactionReadDiskAccessMode();
+ originalCursorCompactionEnabled =
DatabaseDescriptor.cursorCompactionEnabled();
+
DatabaseDescriptor.setCompactionReadDiskAccessMode(compactionReadDiskAccessMode);
+ DatabaseDescriptor.setCursorCompactionEnabled(cursorCompactionEnabled);
+ }
+
+ @After
+ public void restoreCompactionParams()
+ {
+
DatabaseDescriptor.setCompactionReadDiskAccessMode(originalDiskAccessMode);
+
DatabaseDescriptor.setCursorCompactionEnabled(originalCursorCompactionEnabled);
+ }
+
@BeforeClass
public static void defineSchema() throws ConfigurationException
{
diff --git a/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java
b/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java
index 8d9a8dcaa7..3c536fc677 100644
--- a/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java
@@ -19,6 +19,7 @@
package org.apache.cassandra.db.compaction;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
@@ -26,13 +27,19 @@ import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.TimeUnit;
+import org.junit.After;
+import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
import org.apache.cassandra.SchemaLoader;
import org.apache.cassandra.Util;
import org.apache.cassandra.config.CassandraRelevantProperties;
+import org.apache.cassandra.config.Config.DiskAccessMode;
+import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.cql3.ColumnIdentifier;
import org.apache.cassandra.db.Clustering;
import org.apache.cassandra.db.ColumnFamilyStore;
@@ -81,6 +88,7 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
+@RunWith(Parameterized.class)
public class CompactionsTest
{
private static final String KEYSPACE1 = "Keyspace1";
@@ -89,6 +97,41 @@ public class CompactionsTest
private static final String CF_STANDARD2 = "Standard2";
private static final String CF_STANDARD3 = "Standard3";
private static final String CF_STANDARD4 = "Standard4";
+
+ @Parameterized.Parameter(0)
+ public DiskAccessMode compactionReadDiskAccessMode;
+
+ @Parameterized.Parameter(1)
+ public boolean cursorCompactionEnabled;
+
+ @Parameterized.Parameters(name = "diskAccessMode={0},cursor={1}")
+ public static Collection<Object[]> params()
+ {
+ return Arrays.asList(new Object[]{ DiskAccessMode.standard, true },
+ new Object[]{ DiskAccessMode.standard, false },
+ new Object[]{ DiskAccessMode.direct, true },
+ new Object[]{ DiskAccessMode.direct, false });
+ }
+
+ private DiskAccessMode originalDiskAccessMode;
+ private boolean originalCursorCompactionEnabled;
+
+ @Before
+ public void setCompactionParams()
+ {
+ originalDiskAccessMode =
DatabaseDescriptor.getCompactionReadDiskAccessMode();
+ originalCursorCompactionEnabled =
DatabaseDescriptor.cursorCompactionEnabled();
+
DatabaseDescriptor.setCompactionReadDiskAccessMode(compactionReadDiskAccessMode);
+ DatabaseDescriptor.setCursorCompactionEnabled(cursorCompactionEnabled);
+ }
+
+ @After
+ public void restoreCompactionParams()
+ {
+
DatabaseDescriptor.setCompactionReadDiskAccessMode(originalDiskAccessMode);
+
DatabaseDescriptor.setCursorCompactionEnabled(originalCursorCompactionEnabled);
+ }
+
@BeforeClass
public static void defineSchema() throws ConfigurationException
{
@@ -260,8 +303,8 @@ public class CompactionsTest
public void testUserDefinedCompaction() throws Exception
{
Keyspace keyspace = Keyspace.open(KEYSPACE1);
- final String cfname = "Standard3"; // use clean(no sstable) CF
- ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(cfname);
+ ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF_STANDARD3);
+ cfs.clearUnsafe();
TableMetadata table = cfs.metadata();
// disable compaction while flushing
diff --git
a/test/unit/org/apache/cassandra/db/compaction/simple/SimpleCompactionTest.java
b/test/unit/org/apache/cassandra/db/compaction/simple/SimpleCompactionTest.java
index 27c7ba7e17..d5f7eedf62 100644
---
a/test/unit/org/apache/cassandra/db/compaction/simple/SimpleCompactionTest.java
+++
b/test/unit/org/apache/cassandra/db/compaction/simple/SimpleCompactionTest.java
@@ -19,18 +19,61 @@
package org.apache.cassandra.db.compaction.simple;
import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collection;
import java.util.concurrent.ExecutionException;
+import org.junit.After;
import org.junit.AfterClass;
+import org.junit.Before;
import org.junit.Ignore;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.apache.cassandra.config.Config.DiskAccessMode;
+import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.cql3.CQLTester;
import org.apache.cassandra.utils.TestHelper;
@Ignore
+@RunWith(Parameterized.class)
public abstract class SimpleCompactionTest extends CQLTester
{
+ @Parameterized.Parameter(0)
+ public DiskAccessMode compactionReadDiskAccessMode;
+
+ @Parameterized.Parameter(1)
+ public boolean cursorCompactionEnabled;
+
+ @Parameterized.Parameters(name = "diskAccessMode={0},cursor={1}")
+ public static Collection<Object[]> params()
+ {
+ return Arrays.asList(new Object[]{ DiskAccessMode.standard, true },
+ new Object[]{ DiskAccessMode.standard, false },
+ new Object[]{ DiskAccessMode.direct, true },
+ new Object[]{ DiskAccessMode.direct, false });
+ }
+
+ private DiskAccessMode originalDiskAccessMode;
+ private boolean originalCursorCompactionEnabled;
+
+ @Before
+ public void setCompactionParams()
+ {
+ originalDiskAccessMode =
DatabaseDescriptor.getCompactionReadDiskAccessMode();
+ originalCursorCompactionEnabled =
DatabaseDescriptor.cursorCompactionEnabled();
+
DatabaseDescriptor.setCompactionReadDiskAccessMode(compactionReadDiskAccessMode);
+ DatabaseDescriptor.setCursorCompactionEnabled(cursorCompactionEnabled);
+ }
+
+ @After
+ public void restoreCompactionParams()
+ {
+
DatabaseDescriptor.setCompactionReadDiskAccessMode(originalDiskAccessMode);
+
DatabaseDescriptor.setCursorCompactionEnabled(originalCursorCompactionEnabled);
+ }
+
@AfterClass
public static void teardown() throws IOException, InterruptedException,
ExecutionException
{
diff --git
a/test/unit/org/apache/cassandra/io/sstable/format/SSTableReaderDataReaderTest.java
b/test/unit/org/apache/cassandra/io/sstable/format/SSTableReaderDataReaderTest.java
new file mode 100644
index 0000000000..49165b6577
--- /dev/null
+++
b/test/unit/org/apache/cassandra/io/sstable/format/SSTableReaderDataReaderTest.java
@@ -0,0 +1,275 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.io.sstable.format;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.config.Config.DiskAccessMode;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.RowUpdateBuilder;
+import org.apache.cassandra.db.compaction.CompactionManager;
+import org.apache.cassandra.io.util.ChannelProxy;
+import org.apache.cassandra.io.util.RandomAccessReader;
+import org.apache.cassandra.schema.CompressionParams;
+import org.apache.cassandra.schema.KeyspaceParams;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.Throwables;
+import org.apache.cassandra.utils.concurrent.Ref;
+
+import static org.apache.cassandra.db.ColumnFamilyStore.FlushReason.UNIT_TESTS;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNotSame;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests for {@code SSTableReader#canReuseDfile} / {@code
SSTableReader#openDataReaderInternal}.
+ */
+public class SSTableReaderDataReaderTest
+{
+ private static final String KEYSPACE = "SSTableReaderDataReaderTest";
+ private static final String CF_UNCOMPRESSED = "Uncompressed";
+ private static final String CF_COMPRESSED = "Compressed";
+
+ private static DiskAccessMode originalDiskAccessMode;
+ private final List<Ref<?>> refsToRelease = new ArrayList<>();
+
+ @BeforeClass
+ public static void defineSchema() throws Exception
+ {
+ SchemaLoader.prepareServer();
+ originalDiskAccessMode = DatabaseDescriptor.getDiskAccessMode();
+ DatabaseDescriptor.setDiskAccessMode(DiskAccessMode.standard);
+ SchemaLoader.createKeyspace(KEYSPACE,
+ KeyspaceParams.simple(1),
+ SchemaLoader.standardCFMD(KEYSPACE,
CF_UNCOMPRESSED)
+
.compression(CompressionParams.noCompression()),
+ SchemaLoader.standardCFMD(KEYSPACE,
CF_COMPRESSED)
+
.compression(CompressionParams.DEFAULT));
+ CompactionManager.instance.disableAutoCompaction();
+ }
+
+ @AfterClass
+ public static void restoreConfiguration()
+ {
+ DatabaseDescriptor.setDiskAccessMode(originalDiskAccessMode);
+ }
+
+ @After
+ public void teardown()
+ {
+ Throwable exceptions = null;
+ for (Ref<?> ref : refsToRelease)
+ {
+ try
+ {
+ ref.close();
+ }
+ catch (Throwable t)
+ {
+ exceptions = Throwables.merge(exceptions, t);
+ }
+ }
+ refsToRelease.clear();
+ Throwables.maybeFail(exceptions);
+ }
+
+ @Test
+ public void testNullModeReusesExistingDfile()
+ {
+ SSTableReader sstable = createSSTable(CF_UNCOMPRESSED);
+
+ try (RandomAccessReader reader = sstable.openDataReader())
+ {
+ assertReaderSharesDfileChannel(sstable, reader);
+ }
+ }
+
+ @Test
+ public void testSameModeReusesExistingDfile()
+ {
+ SSTableReader sstable = createSSTable(CF_UNCOMPRESSED);
+
+ try (RandomAccessReader reader =
sstable.openDataReader(sstable.dfile.diskAccessMode()))
+ {
+ assertReaderSharesDfileChannel(sstable, reader);
+ }
+ }
+
+ @Test
+ public void testDirectOnUnsupportedFallsBackToReuse()
+ {
+ SSTableReader sstable = createSSTable(CF_UNCOMPRESSED);
+ assertFalse("Uncompressed SSTable should not support direct IO",
+ sstable.dfile.supportsDirectIO());
+
+ try (RandomAccessReader reader =
sstable.openDataReader(DiskAccessMode.direct))
+ {
+ assertReaderSharesDfileChannel(sstable, reader);
+ }
+ }
+
+ @Test
+ public void testDirectOnCompressedCreatesNewHandle()
+ {
+ SSTableReader sstable = createSSTable(CF_COMPRESSED);
+
+ try (RandomAccessReader reader =
sstable.openDataReader(DiskAccessMode.direct))
+ {
+ assertReaderHasOwnChannel(sstable, reader);
+ }
+ }
+
+ @Test
+ public void testNewHandleCloseDoesNotAffectOriginalDfile()
+ {
+ SSTableReader sstable = createSSTable(CF_COMPRESSED);
+
+ RandomAccessReader reader =
sstable.openDataReader(DiskAccessMode.direct);
+ ChannelProxy newChannel = reader.getChannel();
+ assertNotSame(sstable.dfile.channel, newChannel);
+
+ reader.close();
+
+ assertTrue("New handle's channel should be cleaned up after reader
close",
+ newChannel.isCleanedUp());
+ assertFalse("Original dfile channel should not be affected",
+ sstable.dfile.channel.isCleanedUp());
+
+ try (RandomAccessReader reader2 = sstable.openDataReader())
+ {
+ assertReaderSharesDfileChannel(sstable, reader2);
+ }
+ }
+
+ @Test
+ public void testReusedReaderCloseDoesNotAffectDfile()
+ {
+ SSTableReader sstable = createSSTable(CF_UNCOMPRESSED);
+
+ RandomAccessReader reader = sstable.openDataReader();
+ ChannelProxy channel = reader.getChannel();
+ assertSame(sstable.dfile.channel, channel);
+
+ reader.close();
+
+ assertFalse("Dfile channel should not be cleaned up after reused
reader close",
+ channel.isCleanedUp());
+
+ try (RandomAccessReader reader2 = sstable.openDataReader())
+ {
+ assertReaderSharesDfileChannel(sstable, reader2);
+ }
+ }
+
+ @Test
+ public void testMultipleNewHandleReadersDoNotLeakResources()
+ {
+ SSTableReader sstable = createSSTable(CF_COMPRESSED);
+
+ ChannelProxy[] newChannels = new ChannelProxy[3];
+ for (int i = 0; i < 3; i++)
+ {
+ try (RandomAccessReader reader =
sstable.openDataReader(DiskAccessMode.direct))
+ {
+ newChannels[i] = reader.getChannel();
+ assertNotSame(sstable.dfile.channel, newChannels[i]);
+ }
+ }
+
+ for (int i = 0; i < 3; i++)
+ assertTrue("New handle channel " + i + " should be cleaned up",
+ newChannels[i].isCleanedUp());
+
+ assertFalse(sstable.dfile.channel.isCleanedUp());
+ try (RandomAccessReader reader = sstable.openDataReader())
+ {
+ assertReaderSharesDfileChannel(sstable, reader);
+ assertEquals(0, reader.getFilePointer());
+ }
+ }
+
+ @Test
+ public void testForScanReusesWithNullMode()
+ {
+ SSTableReader sstable = createSSTable(CF_UNCOMPRESSED);
+
+ try (RandomAccessReader reader = sstable.openDataReaderForScan())
+ {
+ assertReaderSharesDfileChannel(sstable, reader);
+ }
+ }
+
+ @Test
+ public void testForScanCreatesNewHandleWithDirect()
+ {
+ SSTableReader sstable = createSSTable(CF_COMPRESSED);
+
+ try (RandomAccessReader reader =
sstable.openDataReaderForScan(DiskAccessMode.direct))
+ {
+ assertReaderHasOwnChannel(sstable, reader);
+ }
+ }
+
+ private void assertReaderSharesDfileChannel(SSTableReader sstable,
RandomAccessReader reader)
+ {
+ assertNotNull(reader);
+ assertSame("Reader should share the dfile's channel (reuse path)",
+ sstable.dfile.channel, reader.getChannel());
+ }
+
+ private void assertReaderHasOwnChannel(SSTableReader sstable,
RandomAccessReader reader)
+ {
+ assertNotNull(reader);
+ assertNotSame("Reader should have its own channel (new handle path)",
+ sstable.dfile.channel, reader.getChannel());
+ }
+
+ private SSTableReader createSSTable(String cf)
+ {
+ Keyspace keyspace = Keyspace.open(KEYSPACE);
+ ColumnFamilyStore store = keyspace.getColumnFamilyStore(cf);
+ store.clearUnsafe();
+
+ long timestamp = System.currentTimeMillis();
+ for (int i = 0; i < 10; i++)
+ {
+ new RowUpdateBuilder(store.metadata(), timestamp,
String.valueOf(i))
+ .clustering("col")
+ .add("val", ByteBufferUtil.EMPTY_BYTE_BUFFER)
+ .build()
+ .applyUnsafe();
+ }
+ store.forceBlockingFlush(UNIT_TESTS);
+
+ SSTableReader sstable = store.getLiveSSTables().iterator().next();
+ refsToRelease.add(sstable.selfRef());
+ return sstable;
+ }
+}
diff --git
a/test/unit/org/apache/cassandra/io/util/DirectThreadLocalReadAheadBufferTest.java
b/test/unit/org/apache/cassandra/io/util/DirectThreadLocalReadAheadBufferTest.java
index 84b60bc5c8..46b3091f11 100644
---
a/test/unit/org/apache/cassandra/io/util/DirectThreadLocalReadAheadBufferTest.java
+++
b/test/unit/org/apache/cassandra/io/util/DirectThreadLocalReadAheadBufferTest.java
@@ -17,8 +17,12 @@
*/
package org.apache.cassandra.io.util;
+import java.lang.management.BufferPoolMXBean;
+import java.lang.management.ManagementFactory;
import java.util.Arrays;
+import java.util.List;
+import org.junit.Assert;
import org.junit.Test;
import org.apache.cassandra.utils.Pair;
@@ -61,4 +65,49 @@ public class DirectThreadLocalReadAheadBufferTest extends
ThreadLocalReadAheadBu
tlrab.close();
}
}
+
+ @Test
+ public void testDirectMemoryIsCleanedOnClose()
+ {
+ BufferPoolMXBean directPool = getDirectBufferPool();
+ int blockSize = FileUtils.getFileBlockSize(files[0]);
+ int bufferSize = 64 * 1024 * 1024; // 64MB - large enough to reliably
detect
+
+ try (ChannelProxy channel = new ChannelProxy(files[0],
ChannelProxy.IOMode.DIRECT))
+ {
+ DirectThreadLocalReadAheadBuffer tlrab =
+ new DirectThreadLocalReadAheadBuffer(channel, bufferSize,
blockSize);
+
+ // Force buffer allocation
+ tlrab.allocateBuffer();
+
+ long memoryUsedBefore = directPool.getMemoryUsed();
+
+ // Close should clean the direct memory
+ tlrab.close();
+
+ long memoryUsedAfter = directPool.getMemoryUsed();
+
+ // Memory should decrease by approximately buffer size (+
alignment overhead)
+ long expectedDecrease = bufferSize;
+ long actualDecrease = memoryUsedBefore - memoryUsedAfter;
+
+ Assert.assertTrue(
+ "Direct memory should decrease after close(). " +
+ "Before: " + memoryUsedBefore + ", After: " + memoryUsedAfter +
+ ", Expected decrease: ~" + expectedDecrease + ", Actual: " +
actualDecrease,
+ actualDecrease >= expectedDecrease * 0.9); // 10% tolerance
for alignment
+ }
+ }
+
+ private static BufferPoolMXBean getDirectBufferPool()
+ {
+ List<BufferPoolMXBean> pools =
ManagementFactory.getPlatformMXBeans(BufferPoolMXBean.class);
+ for (BufferPoolMXBean pool : pools)
+ if (pool.getName().equals("direct"))
+ return pool;
+
+ throw new IllegalStateException("Direct buffer pool not found");
+ }
+
}
\ No newline at end of file
diff --git a/test/unit/org/apache/cassandra/transport/WriteBytesTest.java
b/test/unit/org/apache/cassandra/transport/WriteBytesTest.java
index 19a70e999f..dc522a4f2e 100644
--- a/test/unit/org/apache/cassandra/transport/WriteBytesTest.java
+++ b/test/unit/org/apache/cassandra/transport/WriteBytesTest.java
@@ -18,6 +18,8 @@
package org.apache.cassandra.transport;
+import java.nio.ByteBuffer;
+
import org.assertj.core.api.Assertions;
import org.junit.Test;
@@ -26,6 +28,7 @@ import org.apache.cassandra.utils.memory.MemoryUtil;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
+import sun.nio.ch.DirectBuffer;
import static org.quicktheories.QuickTheory.qt;
@@ -52,7 +55,12 @@ public class WriteBytesTest
Assertions.assertThat(buf.writerIndex()).isEqualTo(size);
for (int i = 0; i < size; i++)
Assertions.assertThat(buf.getByte(buf.readerIndex() +
i)).describedAs("byte mismatch at index %d", i).isEqualTo(bb.get(bb.position()
+ i));
- MemoryUtil.clean(bb);
+
+ if (bb.isDirect())
+ {
+ Object attachment = ((DirectBuffer) bb).attachment();
+ MemoryUtil.clean(attachment == null ? bb : (ByteBuffer)
attachment);
+ }
});
}
diff --git a/test/unit/org/apache/cassandra/utils/memory/MemoryUtilTest.java
b/test/unit/org/apache/cassandra/utils/memory/MemoryUtilTest.java
index 842823a2d5..c021af98ad 100644
--- a/test/unit/org/apache/cassandra/utils/memory/MemoryUtilTest.java
+++ b/test/unit/org/apache/cassandra/utils/memory/MemoryUtilTest.java
@@ -50,35 +50,31 @@ public class MemoryUtilTest
}
@Test
- public void testCleanViewDoesNotThrow()
+ public void testCleanSliceThrows()
{
- // Use a large buffer to likely get mmap'd memory from the OS. This
ensures that if cleaning a view incorrectly
- // unmaps the original buffer's memory, subsequent access to
'original' would more reliably fail.
- // For context: glibc's mmap threshold is 32MB on 64-bit systems
- ByteBuffer original = ByteBuffer.allocateDirect(64 * 1024 * 1024);
-
+ ByteBuffer original = ByteBuffer.allocateDirect(1024);
ByteBuffer slice = original.slice();
- MemoryUtil.clean(slice);
- try
- {
- original.putInt(10);
- }
- catch (Exception exc)
- {
- Assertions.fail("Unable to write to original buffer after cleaning
(slice). " + exc.getMessage(), exc);
- }
+ Assertions.assertThatThrownBy(() -> MemoryUtil.clean(slice))
+ .isInstanceOf(IllegalArgumentException.class);
+
+ // original should still be usable after the rejected clean
+ original.putInt(10);
+ MemoryUtil.clean(original);
+ }
+
+ @Test
+ public void testCleanDuplicateThrows()
+ {
+ ByteBuffer original = ByteBuffer.allocateDirect(1024);
ByteBuffer duplicate = original.duplicate();
- MemoryUtil.clean(duplicate);
- try
- {
- original.putInt(10);
- }
- catch (Exception exc)
- {
- Assertions.fail("Unable to write to original buffer after cleaning
(duplicate). " + exc.getMessage(), exc);
- }
+ Assertions.assertThatThrownBy(() -> MemoryUtil.clean(duplicate))
+ .isInstanceOf(IllegalArgumentException.class);
+
+ // original should still be usable after the rejected clean
+ original.putInt(10);
+ MemoryUtil.clean(original);
}
@Test
@@ -88,6 +84,46 @@ public class MemoryUtilTest
MemoryUtil.clean(original);
}
+ @Test
+ public void testCleanWithNonNullAttachmentAndCleanerSucceeds()
+ {
+ ByteBuffer buffer = ByteBuffer.allocateDirect(1024);
+ MemoryUtil.setAttachment(buffer, (Runnable) () -> {});
+ MemoryUtil.clean(buffer);
+ }
+
+ @Test
+ public void testCleanNoCleanerWithAttachmentThrows()
+ {
+ ByteBuffer hollow = MemoryUtil.getHollowDirectByteBuffer();
+ MemoryUtil.setAttachment(hollow, new Object());
+
+ Assertions.assertThatThrownBy(() -> MemoryUtil.clean(hollow))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessageContaining("does not own its memory");
+ }
+
+ @Test
+ public void testCleanNoCleanerNoAttachmentIsNoOp()
+ {
+ ByteBuffer hollow = MemoryUtil.getHollowDirectByteBuffer();
+ MemoryUtil.clean(hollow);
+ }
+
+ @Test
+ public void testCleanWithCleanerButUnexpectedAttachmentThrows()
+ {
+ ByteBuffer buffer = ByteBuffer.allocateDirect(1024);
+ MemoryUtil.setAttachment(buffer, new Object());
+
+ Assertions.assertThatThrownBy(() -> MemoryUtil.clean(buffer))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessageContaining("unexpected attachment type");
+
+ MemoryUtil.setAttachment(buffer, null);
+ MemoryUtil.clean(buffer);
+ }
+
private static BufferPoolMXBean getDirectBufferPool()
{
List<BufferPoolMXBean> pools =
ManagementFactory.getPlatformMXBeans(BufferPoolMXBean.class);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]