This is an automated email from the ASF dual-hosted git repository.

konstantinov pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/trunk by this push:
     new ea3d8a3d31 Wire compaction_read_disk_access_mode through cursor-based 
compaction
ea3d8a3d31 is described below

commit ea3d8a3d31040a7c6e953c654482632812e7dbc0
Author: samlightfoot <[email protected]>
AuthorDate: Mon Mar 16 10:51:50 2026 +0000

    Wire compaction_read_disk_access_mode through cursor-based compaction
    
    Make MemoryUtil.clean() throw for slices/duplicates that cannot be cleaned 
and fix mmap cleanup
    Parameterise main compaction tests for cursor/iterator and buffered/direct 
combinations
    
    patch by Sam Lightfoot; reviewed by Ariel Weisberg, Dmitry Konstantinov for 
CASSANDRA-21147
---
 CHANGES.txt                                        |   1 +
 .../cassandra/db/compaction/CursorCompactor.java   |  39 ++-
 .../cassandra/db/compaction/StatefulCursor.java    |   5 +-
 .../cassandra/io/sstable/SSTableCursorReader.java  |  14 +-
 .../cassandra/io/sstable/format/SSTableReader.java |  44 +++-
 .../io/util/DirectThreadLocalByteBufferHolder.java |   6 +-
 .../io/util/DirectThreadLocalReadAheadBuffer.java  |  13 +-
 .../org/apache/cassandra/io/util/FileHandle.java   |   5 -
 .../io/util/ThreadLocalReadAheadBuffer.java        |   7 +-
 .../apache/cassandra/utils/memory/BufferPool.java  |   6 +-
 .../apache/cassandra/utils/memory/MemoryUtil.java  |  47 +++-
 .../db/compaction/CompactionsPurgeTest.java        |  42 ++++
 .../cassandra/db/compaction/CompactionsTest.java   |  47 +++-
 .../db/compaction/simple/SimpleCompactionTest.java |  43 ++++
 .../format/SSTableReaderDataReaderTest.java        | 275 +++++++++++++++++++++
 .../util/DirectThreadLocalReadAheadBufferTest.java |  49 ++++
 .../apache/cassandra/transport/WriteBytesTest.java |  10 +-
 .../cassandra/utils/memory/MemoryUtilTest.java     |  84 +++++--
 18 files changed, 667 insertions(+), 70 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index c99fc2e588..1c7b5a55a5 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 5.1
+ * Support compaction_read_disk_access_mode for cursor-based compaction 
(CASSANDRA-21147)
  * Allow value/element indexing on frozen collections in SAI (CASSANDRA-18492)
  * Add tool to offline dump cluster metadata and the log (CASSANDRA-21129)
  * Send client warnings when writing to a large partition (CASSANDRA-17258)
diff --git a/src/java/org/apache/cassandra/db/compaction/CursorCompactor.java 
b/src/java/org/apache/cassandra/db/compaction/CursorCompactor.java
index 3b4819c156..7d528b5e2d 100644
--- a/src/java/org/apache/cassandra/db/compaction/CursorCompactor.java
+++ b/src/java/org/apache/cassandra/db/compaction/CursorCompactor.java
@@ -27,11 +27,11 @@ import java.util.List;
 import java.util.function.LongPredicate;
 
 import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.UnmodifiableIterator;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.cassandra.config.Config.DiskAccessMode;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.AbstractCompactionController;
 import org.apache.cassandra.db.ClusteringComparator;
@@ -70,6 +70,7 @@ import org.apache.cassandra.schema.ColumnMetadata;
 import org.apache.cassandra.schema.CompactionParams;
 import org.apache.cassandra.schema.SchemaConstants;
 import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.utils.Throwables;
 import org.apache.cassandra.utils.TimeUUID;
 
 import static org.apache.cassandra.db.ClusteringPrefix.Kind.EXCL_END_BOUND;
@@ -294,15 +295,8 @@ public class CursorCompactor extends CompactionInfo.Holder
          * {@link CompactionIterator#CompactionIterator(OperationType, List, 
AbstractCompactionController, long, TimeUUID, ActiveCompactionsTracker)}
          */
 
-        // Convert Readers to Cursors
-        this.sstableCursors = new StatefulCursor[sstables.size()];
+        this.sstableCursors = convertScannersToCursors(scanners, sstables, 
DatabaseDescriptor.getCompactionReadDiskAccessMode());
         this.sstableCursorsEqualsNext = new boolean[sstables.size()];
-        UnmodifiableIterator<SSTableReader> iterator = sstables.iterator();
-        for (int i = 0; i < this.sstableCursors.length; i++)
-        {
-            SSTableReader ssTableReader = iterator.next();
-            this.sstableCursors[i] = new StatefulCursor(ssTableReader);
-        }
         this.enforceStrictLiveness = 
controller.cfs.metadata.get().enforceStrictLiveness();
 
         purger = new Purger(type, controller, nowInSec);
@@ -1553,6 +1547,33 @@ public class CursorCompactor extends 
CompactionInfo.Holder
         return sb.toString();
     }
 
+    /**
+     * Closes scanner-opened readers before opening cursor-specific readers 
with the configured disk access mode.
+     * In cursor compaction, scanners are only used for metadata; closing them 
avoids holding redundant file
+     * descriptors and prevents conflicts when scan and non-scan readers for 
the same file share thread-local
+     * buffer state on the same thread.
+     */
+    private static StatefulCursor[] 
convertScannersToCursors(List<ISSTableScanner> scanners, 
ImmutableSet<SSTableReader> sstables,
+                                                             DiskAccessMode 
diskAccessMode)
+    {
+        for (ISSTableScanner scanner : scanners)
+            scanner.close();
+
+        StatefulCursor[] cursors = new StatefulCursor[sstables.size()];
+        int i = 0;
+        try
+        {
+            for (SSTableReader reader : sstables)
+                cursors[i++] = new StatefulCursor(reader, diskAccessMode);
+            return cursors;
+        }
+        catch (RuntimeException | Error e)
+        {
+            Throwables.closeNonNullAndAddSuppressed(e, cursors);
+            throw e;
+        }
+    }
+
     public void close()
     {
         try
diff --git a/src/java/org/apache/cassandra/db/compaction/StatefulCursor.java 
b/src/java/org/apache/cassandra/db/compaction/StatefulCursor.java
index c28d218ac3..f0d81a26c1 100644
--- a/src/java/org/apache/cassandra/db/compaction/StatefulCursor.java
+++ b/src/java/org/apache/cassandra/db/compaction/StatefulCursor.java
@@ -19,6 +19,7 @@
 package org.apache.cassandra.db.compaction;
 
 import org.apache.cassandra.config.Config;
+import org.apache.cassandra.config.Config.DiskAccessMode;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.DecoratedKey;
 import org.apache.cassandra.db.ReusableLivenessInfo;
@@ -55,9 +56,9 @@ class StatefulCursor extends SSTableCursorReader
 
     private boolean isOpenRangeTombstonePresent = false;
 
-    public StatefulCursor(SSTableReader reader)
+    public StatefulCursor(SSTableReader reader, DiskAccessMode diskAccessMode)
     {
-        super(reader);
+        super(reader, diskAccessMode);
         currPartition = new 
PartitionDescriptor(reader.getPartitioner().createReusableKey(0));
         prevPartition = new 
PartitionDescriptor(reader.getPartitioner().createReusableKey(0));
         unfiltered = new 
UnfilteredDescriptor(reader.header.clusteringTypes().toArray(AbstractType[]::new));
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableCursorReader.java 
b/src/java/org/apache/cassandra/io/sstable/SSTableCursorReader.java
index ccf94ce6da..bfa6d4be01 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableCursorReader.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableCursorReader.java
@@ -22,6 +22,7 @@ import java.io.IOException;
 
 import com.google.common.collect.ImmutableList;
 
+import org.apache.cassandra.config.Config.DiskAccessMode;
 import org.apache.cassandra.db.ClusteringPrefix;
 import org.apache.cassandra.db.Columns;
 import org.apache.cassandra.db.DeletionTime;
@@ -197,15 +198,20 @@ public class SSTableCursorReader implements AutoCloseable
     {
         TableMetadata metadata = Util.metadataFromSSTable(desc);
         SSTableReader reader = SSTableReader.openNoValidation(null, desc, 
TableMetadataRef.forOfflineTools(metadata));
-        return new SSTableCursorReader(reader, metadata, reader.ref());
+        return new SSTableCursorReader(reader, metadata, reader.ref(), null);
     }
 
     public SSTableCursorReader(SSTableReader reader)
     {
-        this(reader, reader.metadata(), null);
+        this(reader, reader.metadata(), null, null);
     }
 
-    private SSTableCursorReader(SSTableReader reader, TableMetadata metadata, 
Ref<SSTableReader> readerRef)
+    public SSTableCursorReader(SSTableReader reader, DiskAccessMode 
diskAccessMode)
+    {
+        this(reader, reader.metadata(), null, diskAccessMode);
+    }
+
+    private SSTableCursorReader(SSTableReader reader, TableMetadata metadata, 
Ref<SSTableReader> readerRef, DiskAccessMode diskAccessMode)
     {
         ssTableReader = reader;
         ssTableReaderRef = readerRef;
@@ -221,7 +227,7 @@ public class SSTableCursorReader implements AutoCloseable
         deserializationHelper = new DeserializationHelper(metadata, 
version.correspondingMessagingVersion(), DeserializationHelper.Flag.LOCAL, 
null);
         serializationHeader = reader.header;
 
-        dataReader = reader.openDataReader();
+        dataReader = reader.openDataReaderForScan(diskAccessMode);
         hasStaticColumns = metadata.hasStaticColumns();
     }
 
diff --git a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java 
b/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
index 37e73d47b4..ab3171200f 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
@@ -38,6 +38,8 @@ import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
+import javax.annotation.Nullable;
+
 import com.clearspring.analytics.stream.cardinality.CardinalityMergeException;
 import com.clearspring.analytics.stream.cardinality.ICardinality;
 import com.google.common.annotations.VisibleForTesting;
@@ -1417,44 +1419,60 @@ public abstract class SSTableReader extends SSTable 
implements UnfilteredSource,
         return sstableMetadata;
     }
 
+    public RandomAccessReader openDataReader()
+    {
+        return openDataReaderInternal(null, null, false);
+    }
+
     public RandomAccessReader openDataReader(RateLimiter limiter)
     {
         assert limiter != null;
-        return dfile.createReader(limiter);
+        return openDataReaderInternal(null, limiter, false);
     }
 
-    public RandomAccessReader openDataReader()
+    public RandomAccessReader openDataReader(DiskAccessMode diskAccessMode)
     {
-        return dfile.createReader();
+        return openDataReaderInternal(diskAccessMode, null, false);
     }
 
     public RandomAccessReader openDataReaderForScan()
     {
-        return openDataReaderForScan(dfile.diskAccessMode());
+        return openDataReaderInternal(null, null, true);
     }
 
     public RandomAccessReader openDataReaderForScan(DiskAccessMode 
diskAccessMode)
     {
-        boolean isSameDiskAccessMode = diskAccessMode == 
dfile.diskAccessMode();
-        boolean isDirectIONotSupported = diskAccessMode == 
DiskAccessMode.direct && !dfile.supportsDirectIO();
+        return openDataReaderInternal(diskAccessMode, null, true);
+    }
 
-        if (isSameDiskAccessMode || isDirectIONotSupported)
-            return dfile.createReaderForScan(OnReaderClose.RETAIN_FILE_OPEN);
+    private RandomAccessReader openDataReaderInternal(@Nullable DiskAccessMode 
diskAccessMode,
+                                                      @Nullable RateLimiter 
limiter,
+                                                      boolean forScan)
+    {
+        if (canReuseDfile(diskAccessMode))
+            return dfile.createReader(limiter, forScan, 
OnReaderClose.RETAIN_FILE_OPEN);
 
-        FileHandle dataFile = dfile.toBuilder()
-                                   .withDiskAccessMode(diskAccessMode)
-                                   .complete();
+        FileHandle handle = dfile.toBuilder()
+                                 .withDiskAccessMode(diskAccessMode)
+                                 .complete();
         try
         {
-            return dataFile.createReaderForScan(OnReaderClose.CLOSE_FILE);
+            return handle.createReader(limiter, forScan, 
OnReaderClose.CLOSE_FILE);
         }
         catch (Throwable t)
         {
-            dataFile.close();
+            handle.close();
             throw t;
         }
     }
 
+    private boolean canReuseDfile(@Nullable DiskAccessMode diskAccessMode)
+    {
+        return diskAccessMode == null
+               || diskAccessMode == dfile.diskAccessMode()
+               || (diskAccessMode == DiskAccessMode.direct && 
!dfile.supportsDirectIO());
+    }
+
     public void trySkipFileCacheBefore(DecoratedKey key)
     {
         long position = getPosition(key, SSTableReader.Operator.GE);
diff --git 
a/src/java/org/apache/cassandra/io/util/DirectThreadLocalByteBufferHolder.java 
b/src/java/org/apache/cassandra/io/util/DirectThreadLocalByteBufferHolder.java
index ec4c9abb77..eb46149665 100644
--- 
a/src/java/org/apache/cassandra/io/util/DirectThreadLocalByteBufferHolder.java
+++ 
b/src/java/org/apache/cassandra/io/util/DirectThreadLocalByteBufferHolder.java
@@ -70,10 +70,8 @@ public final class DirectThreadLocalByteBufferHolder 
implements ByteBufferHolder
 
     private static void cleanBuffer(ByteBuffer buffer)
     {
-        // Aligned buffers are slices; clean the backing buffer (attachment)
-        DirectBuffer db = (DirectBuffer) buffer;
-        ByteBuffer attachment = (ByteBuffer) db.attachment();
-        MemoryUtil.clean(attachment != null ? attachment : buffer);
+        // Aligned buffers from BufferUtil.allocateDirectAligned are slices; 
clean the backing buffer (attachment)
+        MemoryUtil.clean((ByteBuffer) ((DirectBuffer) buffer).attachment());
     }
 
 }
\ No newline at end of file
diff --git 
a/src/java/org/apache/cassandra/io/util/DirectThreadLocalReadAheadBuffer.java 
b/src/java/org/apache/cassandra/io/util/DirectThreadLocalReadAheadBuffer.java
index a2c66cc0f7..934e3620e1 100644
--- 
a/src/java/org/apache/cassandra/io/util/DirectThreadLocalReadAheadBuffer.java
+++ 
b/src/java/org/apache/cassandra/io/util/DirectThreadLocalReadAheadBuffer.java
@@ -24,6 +24,9 @@ import org.agrona.BitUtil;
 import org.agrona.BufferUtil;
 
 import org.apache.cassandra.io.sstable.CorruptSSTableException;
+import org.apache.cassandra.utils.memory.MemoryUtil;
+
+import sun.nio.ch.DirectBuffer;
 
 public final class DirectThreadLocalReadAheadBuffer extends 
ThreadLocalReadAheadBuffer
 {
@@ -46,4 +49,12 @@ public final class DirectThreadLocalReadAheadBuffer extends 
ThreadLocalReadAhead
         if (channel.read(blockBuffer, blockPosition) < sizeToRead)
             throw new CorruptSSTableException(null, channel.filePath());
     }
-}
+
+    @Override
+    protected void cleanBuffer(ByteBuffer buffer)
+    {
+        // Aligned buffers from BufferUtil.allocateDirectAligned are slices; 
clean the backing buffer (attachment)
+        MemoryUtil.clean((ByteBuffer) ((DirectBuffer) buffer).attachment());
+    }
+
+}
\ No newline at end of file
diff --git a/src/java/org/apache/cassandra/io/util/FileHandle.java 
b/src/java/org/apache/cassandra/io/util/FileHandle.java
index 1153dcb046..e4101e7eb3 100644
--- a/src/java/org/apache/cassandra/io/util/FileHandle.java
+++ b/src/java/org/apache/cassandra/io/util/FileHandle.java
@@ -196,11 +196,6 @@ public class FileHandle extends SharedCloseableImpl
         return createReader(null);
     }
 
-    public RandomAccessReader createReaderForScan(OnReaderClose onReaderClose)
-    {
-        return createReader(null, true, onReaderClose);
-    }
-
     /**
      * Create {@link RandomAccessReader} with configured method of reading 
content of the file.
      * Reading from file will be rate limited by given {@link RateLimiter}.
diff --git 
a/src/java/org/apache/cassandra/io/util/ThreadLocalReadAheadBuffer.java 
b/src/java/org/apache/cassandra/io/util/ThreadLocalReadAheadBuffer.java
index 3a564931a9..c8ef6cf1e3 100644
--- a/src/java/org/apache/cassandra/io/util/ThreadLocalReadAheadBuffer.java
+++ b/src/java/org/apache/cassandra/io/util/ThreadLocalReadAheadBuffer.java
@@ -156,11 +156,16 @@ public class ThreadLocalReadAheadBuffer implements 
Closeable
         blockBuffer.clear();
         if (deallocate)
         {
-            MemoryUtil.clean(blockBuffer);
+            cleanBuffer(blockBuffer);
             block.buffer = null;
         }
     }
 
+    protected void cleanBuffer(ByteBuffer buffer)
+    {
+        MemoryUtil.clean(buffer);
+    }
+
     @Override
     public void close()
     {
diff --git a/src/java/org/apache/cassandra/utils/memory/BufferPool.java 
b/src/java/org/apache/cassandra/utils/memory/BufferPool.java
index c85368c3a8..dfd9702d28 100644
--- a/src/java/org/apache/cassandra/utils/memory/BufferPool.java
+++ b/src/java/org/apache/cassandra/utils/memory/BufferPool.java
@@ -1566,7 +1566,11 @@ public class BufferPool
             if (parent != null)
                 parent.free(slab);
             else
-                MemoryUtil.clean(slab);
+            {
+                // slab may be an aligned slice from allocateDirectAligned(); 
clean the root allocation
+                ByteBuffer attachment = (ByteBuffer) ((DirectBuffer) 
slab).attachment();
+                MemoryUtil.clean(attachment != null ? attachment : slab);
+            }
         }
 
         static void unsafeRecycle(Chunk chunk)
diff --git a/src/java/org/apache/cassandra/utils/memory/MemoryUtil.java 
b/src/java/org/apache/cassandra/utils/memory/MemoryUtil.java
index ec94da2c93..0f9fca5c55 100644
--- a/src/java/org/apache/cassandra/utils/memory/MemoryUtil.java
+++ b/src/java/org/apache/cassandra/utils/memory/MemoryUtil.java
@@ -24,6 +24,9 @@ import java.nio.ByteOrder;
 
 import com.sun.jna.Native;
 
+import org.apache.cassandra.utils.concurrent.Ref;
+
+import jdk.internal.ref.Cleaner;
 import sun.misc.Unsafe;
 import sun.nio.ch.DirectBuffer;
 
@@ -330,9 +333,47 @@ public abstract class MemoryUtil
             return;
 
         DirectBuffer db = (DirectBuffer) buffer;
-        if (db.attachment() != null)
-            return; // duplicate or slice
+        Cleaner cleaner = db.cleaner();
+
+        if (cleaner == null)
+        {
+            // No cleaner means this buffer does not own its memory (e.g. 
slice, duplicate, pool sub-allocation,
+            // hollow buffer). A non-null attachment confirms it's a view; 
null attachment means it's a hollow/synthetic
+            // buffer or was already cleaned.
+            if (db.attachment() != null)
+                throw new IllegalArgumentException(
+                    "Cannot clean a buffer with no cleaner and attachment type 
"
+                    + db.attachment().getClass().getName() + "; this buffer 
does not own its memory. "
+                    + "For slices/duplicates, resolve to the root allocation 
before calling clean()");
+            return;
+        }
+
+        Object attachment = db.attachment();
+        if (!isSafeAttachment(attachment))
+            throw new IllegalArgumentException(
+                "Buffer has a cleaner but an unexpected attachment type "
+                + attachment.getClass().getName()
+                + "; this may indicate corrupted buffer state");
 
-        unsafe.invokeCleaner(buffer);
+        cleaner.clean();
     }
+
+    /**
+     * Allow-list of attachment types expected on buffers that have a cleaner. 
Any new attachment type set
+     * on a root allocation via {@link #setAttachment} must be added here with 
justification.
+     * <ul>
+     *   <li>{@code null} – normal case for root allocations ({@code 
ByteBuffer.allocateDirect}) and mmap buffers</li>
+     *   <li>{@link Runnable} – mmap force callback set by {@code 
ListenableFileSystem} and dispatched
+     *       by {@code SyncUtil.force()} to simulate mmap flush behaviour in 
tests</li>
+     *   <li>{@link Ref.DirectBufferRef} – reference tracking metadata set on 
root allocations by
+     *       {@code MerkleTree} when {@code Ref.TRACE_ENABLED} is true</li>
+     * </ul>
+     */
+    private static boolean isSafeAttachment(Object attachment)
+    {
+        return attachment == null
+               || attachment instanceof Runnable
+               || attachment instanceof Ref.DirectBufferRef;
+    }
+
 }
diff --git 
a/test/unit/org/apache/cassandra/db/compaction/CompactionsPurgeTest.java 
b/test/unit/org/apache/cassandra/db/compaction/CompactionsPurgeTest.java
index 315e268abe..3fa099b6fe 100644
--- a/test/unit/org/apache/cassandra/db/compaction/CompactionsPurgeTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/CompactionsPurgeTest.java
@@ -18,16 +18,23 @@
 */
 package org.apache.cassandra.db.compaction;
 
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.concurrent.ExecutionException;
 
 import com.google.common.collect.Iterables;
 
+import org.junit.After;
+import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 
 import org.apache.cassandra.SchemaLoader;
 import org.apache.cassandra.Util;
+import org.apache.cassandra.config.Config.DiskAccessMode;
+import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.cql3.QueryProcessor;
 import org.apache.cassandra.cql3.UntypedResultSet;
 import org.apache.cassandra.cql3.statements.schema.CreateTableStatement;
@@ -51,6 +58,7 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
+@RunWith(Parameterized.class)
 public class CompactionsPurgeTest
 {
     private static final String KEYSPACE1 = "CompactionsPurgeTest1";
@@ -62,6 +70,40 @@ public class CompactionsPurgeTest
     private static final String KEYSPACE_CQL = "cql_keyspace";
     private static final String CF_CQL = "table1";
 
+    @Parameterized.Parameter(0)
+    public DiskAccessMode compactionReadDiskAccessMode;
+
+    @Parameterized.Parameter(1)
+    public boolean cursorCompactionEnabled;
+
+    @Parameterized.Parameters(name = "diskAccessMode={0},cursor={1}")
+    public static Collection<Object[]> params()
+    {
+        return Arrays.asList(new Object[]{ DiskAccessMode.standard, true },
+                             new Object[]{ DiskAccessMode.standard, false },
+                             new Object[]{ DiskAccessMode.direct, true },
+                             new Object[]{ DiskAccessMode.direct, false });
+    }
+
+    private DiskAccessMode originalDiskAccessMode;
+    private boolean originalCursorCompactionEnabled;
+
+    @Before
+    public void setCompactionParams()
+    {
+        originalDiskAccessMode = 
DatabaseDescriptor.getCompactionReadDiskAccessMode();
+        originalCursorCompactionEnabled = 
DatabaseDescriptor.cursorCompactionEnabled();
+        
DatabaseDescriptor.setCompactionReadDiskAccessMode(compactionReadDiskAccessMode);
+        DatabaseDescriptor.setCursorCompactionEnabled(cursorCompactionEnabled);
+    }
+
+    @After
+    public void restoreCompactionParams()
+    {
+        
DatabaseDescriptor.setCompactionReadDiskAccessMode(originalDiskAccessMode);
+        
DatabaseDescriptor.setCursorCompactionEnabled(originalCursorCompactionEnabled);
+    }
+
     @BeforeClass
     public static void defineSchema() throws ConfigurationException
     {
diff --git a/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java 
b/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java
index 8d9a8dcaa7..3c536fc677 100644
--- a/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java
@@ -19,6 +19,7 @@
 package org.apache.cassandra.db.compaction;
 
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
@@ -26,13 +27,19 @@ import java.util.Iterator;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
 
+import org.junit.After;
+import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Ignore;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 
 import org.apache.cassandra.SchemaLoader;
 import org.apache.cassandra.Util;
 import org.apache.cassandra.config.CassandraRelevantProperties;
+import org.apache.cassandra.config.Config.DiskAccessMode;
+import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.cql3.ColumnIdentifier;
 import org.apache.cassandra.db.Clustering;
 import org.apache.cassandra.db.ColumnFamilyStore;
@@ -81,6 +88,7 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
+@RunWith(Parameterized.class)
 public class CompactionsTest
 {
     private static final String KEYSPACE1 = "Keyspace1";
@@ -89,6 +97,41 @@ public class CompactionsTest
     private static final String CF_STANDARD2 = "Standard2";
     private static final String CF_STANDARD3 = "Standard3";
     private static final String CF_STANDARD4 = "Standard4";
+
+    @Parameterized.Parameter(0)
+    public DiskAccessMode compactionReadDiskAccessMode;
+
+    @Parameterized.Parameter(1)
+    public boolean cursorCompactionEnabled;
+
+    @Parameterized.Parameters(name = "diskAccessMode={0},cursor={1}")
+    public static Collection<Object[]> params()
+    {
+        return Arrays.asList(new Object[]{ DiskAccessMode.standard, true },
+                             new Object[]{ DiskAccessMode.standard, false },
+                             new Object[]{ DiskAccessMode.direct, true },
+                             new Object[]{ DiskAccessMode.direct, false });
+    }
+
+    private DiskAccessMode originalDiskAccessMode;
+    private boolean originalCursorCompactionEnabled;
+
+    @Before
+    public void setCompactionParams()
+    {
+        originalDiskAccessMode = 
DatabaseDescriptor.getCompactionReadDiskAccessMode();
+        originalCursorCompactionEnabled = 
DatabaseDescriptor.cursorCompactionEnabled();
+        
DatabaseDescriptor.setCompactionReadDiskAccessMode(compactionReadDiskAccessMode);
+        DatabaseDescriptor.setCursorCompactionEnabled(cursorCompactionEnabled);
+    }
+
+    @After
+    public void restoreCompactionParams()
+    {
+        
DatabaseDescriptor.setCompactionReadDiskAccessMode(originalDiskAccessMode);
+        
DatabaseDescriptor.setCursorCompactionEnabled(originalCursorCompactionEnabled);
+    }
+
     @BeforeClass
     public static void defineSchema() throws ConfigurationException
     {
@@ -260,8 +303,8 @@ public class CompactionsTest
     public void testUserDefinedCompaction() throws Exception
     {
         Keyspace keyspace = Keyspace.open(KEYSPACE1);
-        final String cfname = "Standard3"; // use clean(no sstable) CF
-        ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(cfname);
+        ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF_STANDARD3);
+        cfs.clearUnsafe();
         TableMetadata table = cfs.metadata();
 
         // disable compaction while flushing
diff --git 
a/test/unit/org/apache/cassandra/db/compaction/simple/SimpleCompactionTest.java 
b/test/unit/org/apache/cassandra/db/compaction/simple/SimpleCompactionTest.java
index 27c7ba7e17..d5f7eedf62 100644
--- 
a/test/unit/org/apache/cassandra/db/compaction/simple/SimpleCompactionTest.java
+++ 
b/test/unit/org/apache/cassandra/db/compaction/simple/SimpleCompactionTest.java
@@ -19,18 +19,61 @@
 package org.apache.cassandra.db.compaction.simple;
 
 import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collection;
 import java.util.concurrent.ExecutionException;
 
+import org.junit.After;
 import org.junit.AfterClass;
+import org.junit.Before;
 import org.junit.Ignore;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 
+import org.apache.cassandra.config.Config.DiskAccessMode;
+import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.cql3.CQLTester;
 import org.apache.cassandra.utils.TestHelper;
 
 
 @Ignore
+@RunWith(Parameterized.class)
 public abstract class SimpleCompactionTest extends CQLTester
 {
+    @Parameterized.Parameter(0)
+    public DiskAccessMode compactionReadDiskAccessMode;
+
+    @Parameterized.Parameter(1)
+    public boolean cursorCompactionEnabled;
+
+    @Parameterized.Parameters(name = "diskAccessMode={0},cursor={1}")
+    public static Collection<Object[]> params()
+    {
+        return Arrays.asList(new Object[]{ DiskAccessMode.standard, true },
+                             new Object[]{ DiskAccessMode.standard, false },
+                             new Object[]{ DiskAccessMode.direct, true },
+                             new Object[]{ DiskAccessMode.direct, false });
+    }
+
+    private DiskAccessMode originalDiskAccessMode;
+    private boolean originalCursorCompactionEnabled;
+
+    @Before
+    public void setCompactionParams()
+    {
+        originalDiskAccessMode = 
DatabaseDescriptor.getCompactionReadDiskAccessMode();
+        originalCursorCompactionEnabled = 
DatabaseDescriptor.cursorCompactionEnabled();
+        
DatabaseDescriptor.setCompactionReadDiskAccessMode(compactionReadDiskAccessMode);
+        DatabaseDescriptor.setCursorCompactionEnabled(cursorCompactionEnabled);
+    }
+
+    @After
+    public void restoreCompactionParams()
+    {
+        
DatabaseDescriptor.setCompactionReadDiskAccessMode(originalDiskAccessMode);
+        
DatabaseDescriptor.setCursorCompactionEnabled(originalCursorCompactionEnabled);
+    }
+
     @AfterClass
     public static void teardown() throws IOException, InterruptedException, 
ExecutionException
     {
diff --git 
a/test/unit/org/apache/cassandra/io/sstable/format/SSTableReaderDataReaderTest.java
 
b/test/unit/org/apache/cassandra/io/sstable/format/SSTableReaderDataReaderTest.java
new file mode 100644
index 0000000000..49165b6577
--- /dev/null
+++ 
b/test/unit/org/apache/cassandra/io/sstable/format/SSTableReaderDataReaderTest.java
@@ -0,0 +1,275 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.io.sstable.format;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.config.Config.DiskAccessMode;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.RowUpdateBuilder;
+import org.apache.cassandra.db.compaction.CompactionManager;
+import org.apache.cassandra.io.util.ChannelProxy;
+import org.apache.cassandra.io.util.RandomAccessReader;
+import org.apache.cassandra.schema.CompressionParams;
+import org.apache.cassandra.schema.KeyspaceParams;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.Throwables;
+import org.apache.cassandra.utils.concurrent.Ref;
+
+import static org.apache.cassandra.db.ColumnFamilyStore.FlushReason.UNIT_TESTS;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNotSame;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests for {@code SSTableReader#canReuseDfile} / {@code 
SSTableReader#openDataReaderInternal}.
+ */
+public class SSTableReaderDataReaderTest
+{
+    private static final String KEYSPACE = "SSTableReaderDataReaderTest";
+    private static final String CF_UNCOMPRESSED = "Uncompressed";
+    private static final String CF_COMPRESSED = "Compressed";
+
+    private static DiskAccessMode originalDiskAccessMode;
+    private final List<Ref<?>> refsToRelease = new ArrayList<>();
+
+    @BeforeClass
+    public static void defineSchema() throws Exception
+    {
+        SchemaLoader.prepareServer();
+        originalDiskAccessMode = DatabaseDescriptor.getDiskAccessMode();
+        DatabaseDescriptor.setDiskAccessMode(DiskAccessMode.standard);
+        SchemaLoader.createKeyspace(KEYSPACE,
+                                    KeyspaceParams.simple(1),
+                                    SchemaLoader.standardCFMD(KEYSPACE, 
CF_UNCOMPRESSED)
+                                                
.compression(CompressionParams.noCompression()),
+                                    SchemaLoader.standardCFMD(KEYSPACE, 
CF_COMPRESSED)
+                                                
.compression(CompressionParams.DEFAULT));
+        CompactionManager.instance.disableAutoCompaction();
+    }
+
+    @AfterClass
+    public static void restoreConfiguration()
+    {
+        DatabaseDescriptor.setDiskAccessMode(originalDiskAccessMode);
+    }
+
+    @After
+    public void teardown()
+    {
+        Throwable exceptions = null;
+        for (Ref<?> ref : refsToRelease)
+        {
+            try
+            {
+                ref.close();
+            }
+            catch (Throwable t)
+            {
+                exceptions = Throwables.merge(exceptions, t);
+            }
+        }
+        refsToRelease.clear();
+        Throwables.maybeFail(exceptions);
+    }
+
+    @Test
+    public void testNullModeReusesExistingDfile()
+    {
+        SSTableReader sstable = createSSTable(CF_UNCOMPRESSED);
+
+        try (RandomAccessReader reader = sstable.openDataReader())
+        {
+            assertReaderSharesDfileChannel(sstable, reader);
+        }
+    }
+
+    @Test
+    public void testSameModeReusesExistingDfile()
+    {
+        SSTableReader sstable = createSSTable(CF_UNCOMPRESSED);
+
+        try (RandomAccessReader reader = 
sstable.openDataReader(sstable.dfile.diskAccessMode()))
+        {
+            assertReaderSharesDfileChannel(sstable, reader);
+        }
+    }
+
+    @Test
+    public void testDirectOnUnsupportedFallsBackToReuse()
+    {
+        SSTableReader sstable = createSSTable(CF_UNCOMPRESSED);
+        assertFalse("Uncompressed SSTable should not support direct IO",
+                     sstable.dfile.supportsDirectIO());
+
+        try (RandomAccessReader reader = 
sstable.openDataReader(DiskAccessMode.direct))
+        {
+            assertReaderSharesDfileChannel(sstable, reader);
+        }
+    }
+
+    @Test
+    public void testDirectOnCompressedCreatesNewHandle()
+    {
+        SSTableReader sstable = createSSTable(CF_COMPRESSED);
+
+        try (RandomAccessReader reader = 
sstable.openDataReader(DiskAccessMode.direct))
+        {
+            assertReaderHasOwnChannel(sstable, reader);
+        }
+    }
+
+    @Test
+    public void testNewHandleCloseDoesNotAffectOriginalDfile()
+    {
+        SSTableReader sstable = createSSTable(CF_COMPRESSED);
+
+        RandomAccessReader reader = 
sstable.openDataReader(DiskAccessMode.direct);
+        ChannelProxy newChannel = reader.getChannel();
+        assertNotSame(sstable.dfile.channel, newChannel);
+
+        reader.close();
+
+        assertTrue("New handle's channel should be cleaned up after reader 
close",
+                    newChannel.isCleanedUp());
+        assertFalse("Original dfile channel should not be affected",
+                     sstable.dfile.channel.isCleanedUp());
+
+        try (RandomAccessReader reader2 = sstable.openDataReader())
+        {
+            assertReaderSharesDfileChannel(sstable, reader2);
+        }
+    }
+
+    @Test
+    public void testReusedReaderCloseDoesNotAffectDfile()
+    {
+        SSTableReader sstable = createSSTable(CF_UNCOMPRESSED);
+
+        RandomAccessReader reader = sstable.openDataReader();
+        ChannelProxy channel = reader.getChannel();
+        assertSame(sstable.dfile.channel, channel);
+
+        reader.close();
+
+        assertFalse("Dfile channel should not be cleaned up after reused 
reader close",
+                     channel.isCleanedUp());
+
+        try (RandomAccessReader reader2 = sstable.openDataReader())
+        {
+            assertReaderSharesDfileChannel(sstable, reader2);
+        }
+    }
+
+    @Test
+    public void testMultipleNewHandleReadersDoNotLeakResources()
+    {
+        SSTableReader sstable = createSSTable(CF_COMPRESSED);
+
+        ChannelProxy[] newChannels = new ChannelProxy[3];
+        for (int i = 0; i < 3; i++)
+        {
+            try (RandomAccessReader reader = 
sstable.openDataReader(DiskAccessMode.direct))
+            {
+                newChannels[i] = reader.getChannel();
+                assertNotSame(sstable.dfile.channel, newChannels[i]);
+            }
+        }
+
+        for (int i = 0; i < 3; i++)
+            assertTrue("New handle channel " + i + " should be cleaned up",
+                        newChannels[i].isCleanedUp());
+
+        assertFalse(sstable.dfile.channel.isCleanedUp());
+        try (RandomAccessReader reader = sstable.openDataReader())
+        {
+            assertReaderSharesDfileChannel(sstable, reader);
+            assertEquals(0, reader.getFilePointer());
+        }
+    }
+
+    @Test
+    public void testForScanReusesWithNullMode()
+    {
+        SSTableReader sstable = createSSTable(CF_UNCOMPRESSED);
+
+        try (RandomAccessReader reader = sstable.openDataReaderForScan())
+        {
+            assertReaderSharesDfileChannel(sstable, reader);
+        }
+    }
+
+    @Test
+    public void testForScanCreatesNewHandleWithDirect()
+    {
+        SSTableReader sstable = createSSTable(CF_COMPRESSED);
+
+        try (RandomAccessReader reader = 
sstable.openDataReaderForScan(DiskAccessMode.direct))
+        {
+            assertReaderHasOwnChannel(sstable, reader);
+        }
+    }
+
+    private void assertReaderSharesDfileChannel(SSTableReader sstable, 
RandomAccessReader reader)
+    {
+        assertNotNull(reader);
+        assertSame("Reader should share the dfile's channel (reuse path)",
+                    sstable.dfile.channel, reader.getChannel());
+    }
+
+    private void assertReaderHasOwnChannel(SSTableReader sstable, 
RandomAccessReader reader)
+    {
+        assertNotNull(reader);
+        assertNotSame("Reader should have its own channel (new handle path)",
+                       sstable.dfile.channel, reader.getChannel());
+    }
+
+    private SSTableReader createSSTable(String cf)
+    {
+        Keyspace keyspace = Keyspace.open(KEYSPACE);
+        ColumnFamilyStore store = keyspace.getColumnFamilyStore(cf);
+        store.clearUnsafe();
+
+        long timestamp = System.currentTimeMillis();
+        for (int i = 0; i < 10; i++)
+        {
+            new RowUpdateBuilder(store.metadata(), timestamp, 
String.valueOf(i))
+                .clustering("col")
+                .add("val", ByteBufferUtil.EMPTY_BYTE_BUFFER)
+                .build()
+                .applyUnsafe();
+        }
+        store.forceBlockingFlush(UNIT_TESTS);
+
+        SSTableReader sstable = store.getLiveSSTables().iterator().next();
+        refsToRelease.add(sstable.selfRef());
+        return sstable;
+    }
+}
diff --git 
a/test/unit/org/apache/cassandra/io/util/DirectThreadLocalReadAheadBufferTest.java
 
b/test/unit/org/apache/cassandra/io/util/DirectThreadLocalReadAheadBufferTest.java
index 84b60bc5c8..46b3091f11 100644
--- 
a/test/unit/org/apache/cassandra/io/util/DirectThreadLocalReadAheadBufferTest.java
+++ 
b/test/unit/org/apache/cassandra/io/util/DirectThreadLocalReadAheadBufferTest.java
@@ -17,8 +17,12 @@
  */
 package org.apache.cassandra.io.util;
 
+import java.lang.management.BufferPoolMXBean;
+import java.lang.management.ManagementFactory;
 import java.util.Arrays;
+import java.util.List;
 
+import org.junit.Assert;
 import org.junit.Test;
 
 import org.apache.cassandra.utils.Pair;
@@ -61,4 +65,49 @@ public class DirectThreadLocalReadAheadBufferTest extends 
ThreadLocalReadAheadBu
             tlrab.close();
         }
     }
+
+    @Test
+    public void testDirectMemoryIsCleanedOnClose()
+    {
+        BufferPoolMXBean directPool = getDirectBufferPool();
+        int blockSize = FileUtils.getFileBlockSize(files[0]);
+        int bufferSize = 64 * 1024 * 1024; // 64MB - large enough to reliably 
detect
+
+        try (ChannelProxy channel = new ChannelProxy(files[0], 
ChannelProxy.IOMode.DIRECT))
+        {
+            DirectThreadLocalReadAheadBuffer tlrab =
+                new DirectThreadLocalReadAheadBuffer(channel, bufferSize, 
blockSize);
+
+            // Force buffer allocation
+            tlrab.allocateBuffer();
+
+            long memoryUsedBefore = directPool.getMemoryUsed();
+
+            // Close should clean the direct memory
+            tlrab.close();
+
+            long memoryUsedAfter = directPool.getMemoryUsed();
+
+            // Memory should decrease by approximately buffer size (+ 
alignment overhead)
+            long expectedDecrease = bufferSize;
+            long actualDecrease = memoryUsedBefore - memoryUsedAfter;
+
+            Assert.assertTrue(
+                "Direct memory should decrease after close(). " +
+                "Before: " + memoryUsedBefore + ", After: " + memoryUsedAfter +
+                ", Expected decrease: ~" + expectedDecrease + ", Actual: " + 
actualDecrease,
+                actualDecrease >= expectedDecrease * 0.9); // 10% tolerance 
for alignment
+        }
+    }
+
+    private static BufferPoolMXBean getDirectBufferPool()
+    {
+        List<BufferPoolMXBean> pools = 
ManagementFactory.getPlatformMXBeans(BufferPoolMXBean.class);
+        for (BufferPoolMXBean pool : pools)
+          if (pool.getName().equals("direct"))
+                return pool;
+
+        throw new IllegalStateException("Direct buffer pool not found");
+    }
+
 }
\ No newline at end of file
diff --git a/test/unit/org/apache/cassandra/transport/WriteBytesTest.java 
b/test/unit/org/apache/cassandra/transport/WriteBytesTest.java
index 19a70e999f..dc522a4f2e 100644
--- a/test/unit/org/apache/cassandra/transport/WriteBytesTest.java
+++ b/test/unit/org/apache/cassandra/transport/WriteBytesTest.java
@@ -18,6 +18,8 @@
 
 package org.apache.cassandra.transport;
 
+import java.nio.ByteBuffer;
+
 import org.assertj.core.api.Assertions;
 import org.junit.Test;
 
@@ -26,6 +28,7 @@ import org.apache.cassandra.utils.memory.MemoryUtil;
 
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
+import sun.nio.ch.DirectBuffer;
 
 import static org.quicktheories.QuickTheory.qt;
 
@@ -52,7 +55,12 @@ public class WriteBytesTest
             Assertions.assertThat(buf.writerIndex()).isEqualTo(size);
             for (int i = 0; i < size; i++)
                 Assertions.assertThat(buf.getByte(buf.readerIndex() + 
i)).describedAs("byte mismatch at index %d", i).isEqualTo(bb.get(bb.position() 
+ i));
-            MemoryUtil.clean(bb);
+
+            if (bb.isDirect())
+            {
+                Object attachment = ((DirectBuffer) bb).attachment();
+                MemoryUtil.clean(attachment == null ? bb : (ByteBuffer) 
attachment);
+            }
         });
     }
 
diff --git a/test/unit/org/apache/cassandra/utils/memory/MemoryUtilTest.java 
b/test/unit/org/apache/cassandra/utils/memory/MemoryUtilTest.java
index 842823a2d5..c021af98ad 100644
--- a/test/unit/org/apache/cassandra/utils/memory/MemoryUtilTest.java
+++ b/test/unit/org/apache/cassandra/utils/memory/MemoryUtilTest.java
@@ -50,35 +50,31 @@ public class MemoryUtilTest
     }
 
     @Test
-    public void testCleanViewDoesNotThrow()
+    public void testCleanSliceThrows()
     {
-        // Use a large buffer to likely get mmap'd memory from the OS. This 
ensures that if cleaning a view incorrectly
-        // unmaps the original buffer's memory, subsequent access to 
'original' would more reliably fail.
-        // For context: glibc's mmap threshold is 32MB on 64-bit systems
-        ByteBuffer original = ByteBuffer.allocateDirect(64 * 1024 * 1024);
-
+        ByteBuffer original = ByteBuffer.allocateDirect(1024);
         ByteBuffer slice = original.slice();
-        MemoryUtil.clean(slice);
-        try
-        {
-            original.putInt(10);
-        }
-        catch (Exception exc)
-        {
-            Assertions.fail("Unable to write to original buffer after cleaning 
(slice). " + exc.getMessage(), exc);
-        }
 
+        Assertions.assertThatThrownBy(() -> MemoryUtil.clean(slice))
+                  .isInstanceOf(IllegalArgumentException.class);
+
+        // original should still be usable after the rejected clean
+        original.putInt(10);
+        MemoryUtil.clean(original);
+    }
+
+    @Test
+    public void testCleanDuplicateThrows()
+    {
+        ByteBuffer original = ByteBuffer.allocateDirect(1024);
         ByteBuffer duplicate = original.duplicate();
-        MemoryUtil.clean(duplicate);
 
-        try
-        {
-            original.putInt(10);
-        }
-        catch (Exception exc)
-        {
-            Assertions.fail("Unable to write to original buffer after cleaning 
(duplicate). " + exc.getMessage(), exc);
-        }
+        Assertions.assertThatThrownBy(() -> MemoryUtil.clean(duplicate))
+                  .isInstanceOf(IllegalArgumentException.class);
+
+        // original should still be usable after the rejected clean
+        original.putInt(10);
+        MemoryUtil.clean(original);
     }
 
     @Test
@@ -88,6 +84,46 @@ public class MemoryUtilTest
         MemoryUtil.clean(original);
     }
 
+    @Test
+    public void testCleanWithNonNullAttachmentAndCleanerSucceeds()
+    {
+        ByteBuffer buffer = ByteBuffer.allocateDirect(1024);
+        MemoryUtil.setAttachment(buffer, (Runnable) () -> {});
+        MemoryUtil.clean(buffer);
+    }
+
+    @Test
+    public void testCleanNoCleanerWithAttachmentThrows()
+    {
+        ByteBuffer hollow = MemoryUtil.getHollowDirectByteBuffer();
+        MemoryUtil.setAttachment(hollow, new Object());
+
+        Assertions.assertThatThrownBy(() -> MemoryUtil.clean(hollow))
+                  .isInstanceOf(IllegalArgumentException.class)
+                  .hasMessageContaining("does not own its memory");
+    }
+
+    @Test
+    public void testCleanNoCleanerNoAttachmentIsNoOp()
+    {
+        ByteBuffer hollow = MemoryUtil.getHollowDirectByteBuffer();
+        MemoryUtil.clean(hollow);
+    }
+
+    @Test
+    public void testCleanWithCleanerButUnexpectedAttachmentThrows()
+    {
+        ByteBuffer buffer = ByteBuffer.allocateDirect(1024);
+        MemoryUtil.setAttachment(buffer, new Object());
+
+        Assertions.assertThatThrownBy(() -> MemoryUtil.clean(buffer))
+                  .isInstanceOf(IllegalArgumentException.class)
+                  .hasMessageContaining("unexpected attachment type");
+
+        MemoryUtil.setAttachment(buffer, null);
+        MemoryUtil.clean(buffer);
+    }
+
     private static BufferPoolMXBean getDirectBufferPool()
     {
         List<BufferPoolMXBean> pools = 
ManagementFactory.getPlatformMXBeans(BufferPoolMXBean.class);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to