This is an automated email from the ASF dual-hosted git repository.

jwest pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 35d51725ef Add read ahead buffer for scans of compressed data files
35d51725ef is described below

commit 35d51725ef73cd6749393f3c08ecc0fab3f33513
Author: Jordan West <[email protected]>
AuthorDate: Wed Apr 17 10:27:02 2024 -0700

    Add read ahead buffer for scans of compressed data files
    
    Patch by Jordan West, Jon Haddad; Reviewed by David Capwell, Caleb 
Rackliffe, Dmitry Konstantinov for CASSANDRA-15452
    
    NOTE: This was originally merged up via merge commit but something went 
wrong, this is the fix commit to bring this back to trunk
---
 .../org/apache/cassandra/cache/ChunkCache.java     |   2 +-
 src/java/org/apache/cassandra/config/Config.java   |   2 +
 .../cassandra/config/DatabaseDescriptor.java       |  21 +++
 .../cassandra/io/sstable/format/SSTableReader.java |   5 +
 .../io/sstable/format/SSTableScanner.java          |   2 +-
 .../io/util/BufferManagingRebufferer.java          |   1 +
 .../org/apache/cassandra/io/util/ChunkReader.java  |   2 +
 .../cassandra/io/util/CompressedChunkReader.java   | 202 +++++++++++++++++---
 .../apache/cassandra/io/util/EmptyRebufferer.java  |   2 +-
 .../org/apache/cassandra/io/util/FileHandle.java   |  19 +-
 .../apache/cassandra/io/util/MmapRebufferer.java   |   2 +-
 .../cassandra/io/util/RandomAccessReader.java      |   2 +-
 .../cassandra/io/util/RebuffererFactory.java       |   2 +-
 .../cassandra/io/util/SimpleChunkReader.java       |   2 +-
 .../io/util/ThreadLocalReadAheadBuffer.java        | 154 ++++++++++++++++
 .../apache/cassandra/service/StorageService.java   |  13 ++
 .../cassandra/service/StorageServiceMBean.java     |   3 +
 .../io/util/CompressedChunkReaderTest.java         | 141 ++++++++++++++
 .../io/util/ThreadLocalReadAheadBufferTest.java    | 204 +++++++++++++++++++++
 19 files changed, 743 insertions(+), 38 deletions(-)

diff --git a/src/java/org/apache/cassandra/cache/ChunkCache.java 
b/src/java/org/apache/cassandra/cache/ChunkCache.java
index afd5804089..958007c504 100644
--- a/src/java/org/apache/cassandra/cache/ChunkCache.java
+++ b/src/java/org/apache/cassandra/cache/ChunkCache.java
@@ -258,7 +258,7 @@ public class ChunkCache implements 
CacheLoader<ChunkCache.Key, ChunkCache.Buffer
         }
 
         @Override
-        public Rebufferer instantiateRebufferer()
+        public Rebufferer instantiateRebufferer(boolean isScan)
         {
             return this;
         }
diff --git a/src/java/org/apache/cassandra/config/Config.java 
b/src/java/org/apache/cassandra/config/Config.java
index 518469a149..2ec1d78e30 100644
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@ -345,6 +345,8 @@ public class Config
     @Replaces(oldName = "min_free_space_per_drive_in_mb", converter = 
Converters.MEBIBYTES_DATA_STORAGE_INT, deprecated = true)
     public DataStorageSpec.IntMebibytesBound min_free_space_per_drive = new 
DataStorageSpec.IntMebibytesBound("50MiB");
 
+    public DataStorageSpec.IntKibibytesBound compressed_read_ahead_buffer_size 
= new DataStorageSpec.IntKibibytesBound("256KiB");
+
     // fraction of free disk space available for compaction after min free 
space is subtracted
     public volatile Double max_space_usable_for_compactions_in_percentage = 
.95;
 
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java 
b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index 4033aa7918..f6fd1b52ff 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -1025,6 +1025,9 @@ public class DatabaseDescriptor
                 break;
         }
 
+        if (conf.compressed_read_ahead_buffer_size.toKibibytes() > 0 && 
conf.compressed_read_ahead_buffer_size.toKibibytes() < 256)
+            throw new 
ConfigurationException("compressed_read_ahead_buffer_size must be at least 
256KiB (set to 0 to disable), but was " + 
conf.compressed_read_ahead_buffer_size, false);
+
         if (conf.server_encryption_options != null)
         {
             conf.server_encryption_options.applyConfig();
@@ -2730,6 +2733,24 @@ public class DatabaseDescriptor
         conf.concurrent_materialized_view_builders = value;
     }
 
+    public static int getCompressedReadAheadBufferSize()
+    {
+        return conf.compressed_read_ahead_buffer_size.toBytes();
+    }
+
+    public static int getCompressedReadAheadBufferSizeInKB()
+    {
+        return conf.compressed_read_ahead_buffer_size.toKibibytes();
+    }
+
+    public static void setCompressedReadAheadBufferSizeInKb(int sizeInKb)
+    {
+        if (sizeInKb < 256)
+            throw new 
IllegalArgumentException("compressed_read_ahead_buffer_size_in_kb must be at 
least 256KiB");
+
+        conf.compressed_read_ahead_buffer_size = 
createIntKibibyteBoundAndEnsureItIsValidForByteConversion(sizeInKb, 
"compressed_read_ahead_buffer_size");
+    }
+
     public static long getMinFreeSpacePerDriveInMebibytes()
     {
         return conf.min_free_space_per_drive.toMebibytes();
diff --git a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java 
b/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
index 92359e4d0c..ff488694dc 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
@@ -1375,6 +1375,11 @@ public abstract class SSTableReader extends SSTable 
implements UnfilteredSource,
         return dfile.createReader();
     }
 
+    public RandomAccessReader openDataReaderForScan()
+    {
+        return dfile.createReaderForScan();
+    }
+
     public void trySkipFileCacheBefore(DecoratedKey key)
     {
         long position = getPosition(key, SSTableReader.Operator.GE);
diff --git 
a/src/java/org/apache/cassandra/io/sstable/format/SSTableScanner.java 
b/src/java/org/apache/cassandra/io/sstable/format/SSTableScanner.java
index 86c0f7ec2d..28035a85da 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/SSTableScanner.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/SSTableScanner.java
@@ -75,7 +75,7 @@ implements ISSTableScanner
     {
         assert sstable != null;
 
-        this.dfile = sstable.openDataReader();
+        this.dfile = sstable.openDataReaderForScan();
         this.sstable = sstable;
         this.columns = columns;
         this.dataRange = dataRange;
diff --git 
a/src/java/org/apache/cassandra/io/util/BufferManagingRebufferer.java 
b/src/java/org/apache/cassandra/io/util/BufferManagingRebufferer.java
index 3a297ee0e2..13b7c9d441 100644
--- a/src/java/org/apache/cassandra/io/util/BufferManagingRebufferer.java
+++ b/src/java/org/apache/cassandra/io/util/BufferManagingRebufferer.java
@@ -50,6 +50,7 @@ public abstract class BufferManagingRebufferer implements 
Rebufferer, Rebufferer
     public void closeReader()
     {
         BufferPools.forChunkCache().put(buffer);
+        source.releaseUnderlyingResources();
         offset = -1;
     }
 
diff --git a/src/java/org/apache/cassandra/io/util/ChunkReader.java 
b/src/java/org/apache/cassandra/io/util/ChunkReader.java
index 33bf7921ed..779e7c35f9 100644
--- a/src/java/org/apache/cassandra/io/util/ChunkReader.java
+++ b/src/java/org/apache/cassandra/io/util/ChunkReader.java
@@ -48,4 +48,6 @@ public interface ChunkReader extends RebuffererFactory
      * This is not guaranteed to be fulfilled.
      */
     BufferType preferredBufferType();
+
+    default void releaseUnderlyingResources() {}
 }
diff --git a/src/java/org/apache/cassandra/io/util/CompressedChunkReader.java 
b/src/java/org/apache/cassandra/io/util/CompressedChunkReader.java
index b0aa24bd8f..b6b3c9a6a2 100644
--- a/src/java/org/apache/cassandra/io/util/CompressedChunkReader.java
+++ b/src/java/org/apache/cassandra/io/util/CompressedChunkReader.java
@@ -26,11 +26,13 @@ import java.util.function.Supplier;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.primitives.Ints;
 
+import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.io.compress.BufferType;
 import org.apache.cassandra.io.compress.CompressionMetadata;
 import org.apache.cassandra.io.compress.CorruptBlockException;
 import org.apache.cassandra.io.sstable.CorruptSSTableException;
 import org.apache.cassandra.utils.ChecksumType;
+import org.apache.cassandra.utils.Closeable;
 
 public abstract class CompressedChunkReader extends AbstractReaderFileProxy 
implements ChunkReader
 {
@@ -47,6 +49,11 @@ public abstract class CompressedChunkReader extends 
AbstractReaderFileProxy impl
         assert Integer.bitCount(metadata.chunkLength()) == 1; //must be a 
power of two
     }
 
+    protected CompressedChunkReader forScan()
+    {
+        return this;
+    }
+
     @VisibleForTesting
     public double getCrcCheckChance()
     {
@@ -83,20 +90,167 @@ public abstract class CompressedChunkReader extends 
AbstractReaderFileProxy impl
     }
 
     @Override
-    public Rebufferer instantiateRebufferer()
+    public Rebufferer instantiateRebufferer(boolean isScan)
     {
-        return new BufferManagingRebufferer.Aligned(this);
+        return new BufferManagingRebufferer.Aligned(isScan ? forScan() : this);
     }
 
-    public static class Standard extends CompressedChunkReader
+    protected interface CompressedReader extends Closeable
     {
-        // we read the raw compressed bytes into this buffer, then 
uncompressed them into the provided one.
+        default void allocateResources()
+        {
+        }
+
+        default void deallocateResources()
+        {
+        }
+
+        default boolean allocated()
+        {
+            return false;
+        }
+
+        default void close()
+        {
+
+        }
+
+
+        ByteBuffer read(CompressionMetadata.Chunk chunk, boolean 
shouldCheckCrc) throws CorruptBlockException;
+    }
+
+    private static class RandomAccessCompressedReader implements 
CompressedReader
+    {
+        private final ChannelProxy channel;
         private final ThreadLocalByteBufferHolder bufferHolder;
 
+        private RandomAccessCompressedReader(ChannelProxy channel, 
CompressionMetadata metadata)
+        {
+            this.channel = channel;
+            this.bufferHolder = new 
ThreadLocalByteBufferHolder(metadata.compressor().preferredBufferType());
+        }
+
+        @Override
+        public ByteBuffer read(CompressionMetadata.Chunk chunk, boolean 
shouldCheckCrc) throws CorruptBlockException
+        {
+            int length = shouldCheckCrc ? chunk.length + Integer.BYTES // 
compressed length + checksum length
+                                        : chunk.length;
+            ByteBuffer compressed = bufferHolder.getBuffer(length);
+            if (channel.read(compressed, chunk.offset) != length)
+                throw new CorruptBlockException(channel.filePath(), chunk);
+            compressed.flip();
+            compressed.limit(chunk.length);
+
+            if (shouldCheckCrc)
+            {
+                int checksum = (int) ChecksumType.CRC32.of(compressed);
+                compressed.limit(length);
+                if (compressed.getInt() != checksum)
+                    throw new CorruptBlockException(channel.filePath(), chunk);
+                compressed.position(0).limit(chunk.length);
+            }
+            return compressed;
+        }
+    }
+
+    private static class ScanCompressedReader implements CompressedReader
+    {
+        private final ChannelProxy channel;
+        private final ThreadLocalByteBufferHolder bufferHolder;
+        private final ThreadLocalReadAheadBuffer readAheadBuffer;
+
+        private ScanCompressedReader(ChannelProxy channel, CompressionMetadata 
metadata, int readAheadBufferSize)
+        {
+            this.channel = channel;
+            this.bufferHolder = new 
ThreadLocalByteBufferHolder(metadata.compressor().preferredBufferType());
+            this.readAheadBuffer = new ThreadLocalReadAheadBuffer(channel, 
readAheadBufferSize, metadata.compressor().preferredBufferType());
+        }
+
+        @Override
+        public ByteBuffer read(CompressionMetadata.Chunk chunk, boolean 
shouldCheckCrc) throws CorruptBlockException
+        {
+            int length = shouldCheckCrc ? chunk.length + Integer.BYTES // 
compressed length + checksum length
+                                        : chunk.length;
+            ByteBuffer compressed = bufferHolder.getBuffer(length);
+
+            int copied = 0;
+            while (copied < length)
+            {
+                readAheadBuffer.fill(chunk.offset + copied);
+                int leftToRead = length - copied;
+                if (readAheadBuffer.remaining() >= leftToRead)
+                    copied += readAheadBuffer.read(compressed, leftToRead);
+                else
+                    copied += readAheadBuffer.read(compressed, 
readAheadBuffer.remaining());
+            }
+
+            compressed.flip();
+            compressed.limit(chunk.length);
+
+            if (shouldCheckCrc)
+            {
+                int checksum = (int) ChecksumType.CRC32.of(compressed);
+                compressed.limit(length);
+                if (compressed.getInt() != checksum)
+                    throw new CorruptBlockException(channel.filePath(), chunk);
+                compressed.position(0).limit(chunk.length);
+            }
+            return compressed;
+        }
+
+        @Override
+        public void allocateResources()
+        {
+            readAheadBuffer.allocateBuffer();
+        }
+
+        @Override
+        public void deallocateResources()
+        {
+            readAheadBuffer.clear(true);
+        }
+
+        @Override
+        public boolean allocated()
+        {
+            return readAheadBuffer.hasBuffer();
+        }
+
+        public void close()
+        {
+            readAheadBuffer.close();
+        }
+    }
+
+    public static class Standard extends CompressedChunkReader
+    {
+
+        private final CompressedReader reader;
+        private final CompressedReader scanReader;
+
         public Standard(ChannelProxy channel, CompressionMetadata metadata, 
Supplier<Double> crcCheckChanceSupplier)
         {
             super(channel, metadata, crcCheckChanceSupplier);
-            bufferHolder = new 
ThreadLocalByteBufferHolder(metadata.compressor().preferredBufferType());
+            reader = new RandomAccessCompressedReader(channel, metadata);
+
+            int readAheadBufferSize = 
DatabaseDescriptor.getCompressedReadAheadBufferSize();
+            scanReader = (readAheadBufferSize > 0 && readAheadBufferSize > 
metadata.chunkLength())
+                         ? new ScanCompressedReader(channel, metadata, 
readAheadBufferSize) : null;
+        }
+
+        protected CompressedChunkReader forScan()
+        {
+            if (scanReader != null)
+                scanReader.allocateResources();
+
+            return this;
+        }
+
+        @Override
+        public void releaseUnderlyingResources()
+        {
+            if (scanReader != null)
+                scanReader.deallocateResources();
         }
 
         @Override
@@ -110,31 +264,13 @@ public abstract class CompressedChunkReader extends 
AbstractReaderFileProxy impl
 
                 CompressionMetadata.Chunk chunk = metadata.chunkFor(position);
                 boolean shouldCheckCrc = shouldCheckCrc();
-                int length = shouldCheckCrc ? chunk.length + Integer.BYTES // 
compressed length + checksum length
-                                            : chunk.length;
 
+                CompressedReader readFrom = (scanReader != null && 
scanReader.allocated()) ? scanReader : reader;
                 if (chunk.length < maxCompressedLength)
                 {
-                    ByteBuffer compressed = bufferHolder.getBuffer(length);
-
-                    if (channel.read(compressed, chunk.offset) != length)
-                        throw new CorruptBlockException(channel.filePath(), 
chunk);
-
-                    compressed.flip();
-                    compressed.limit(chunk.length);
+                    ByteBuffer compressed = readFrom.read(chunk, 
shouldCheckCrc);
                     uncompressed.clear();
 
-                    if (shouldCheckCrc)
-                    {
-                        int checksum = (int) ChecksumType.CRC32.of(compressed);
-
-                        compressed.limit(length);
-                        if (compressed.getInt() != checksum)
-                            throw new 
CorruptBlockException(channel.filePath(), chunk);
-
-                        compressed.position(0).limit(chunk.length);
-                    }
-
                     try
                     {
                         metadata.compressor().uncompress(compressed, 
uncompressed);
@@ -155,10 +291,9 @@ public abstract class CompressedChunkReader extends 
AbstractReaderFileProxy impl
                         uncompressed.flip();
                         int checksum = (int) 
ChecksumType.CRC32.of(uncompressed);
 
-                        ByteBuffer scratch = 
bufferHolder.getBuffer(Integer.BYTES);
-
+                        ByteBuffer scratch = 
ByteBuffer.allocate(Integer.BYTES);
                         if (channel.read(scratch, chunk.offset + chunk.length) 
!= Integer.BYTES
-                                || scratch.getInt(0) != checksum)
+                            || scratch.getInt(0) != checksum)
                             throw new 
CorruptBlockException(channel.filePath(), chunk);
                     }
                 }
@@ -171,6 +306,16 @@ public abstract class CompressedChunkReader extends 
AbstractReaderFileProxy impl
                 throw new CorruptSSTableException(e, channel.filePath());
             }
         }
+
+        @Override
+        public void close()
+        {
+            reader.close();
+            if (scanReader != null)
+                scanReader.close();
+
+            super.close();
+        }
     }
 
     public static class Mmap extends CompressedChunkReader
@@ -233,7 +378,6 @@ public abstract class CompressedChunkReader extends 
AbstractReaderFileProxy impl
                 uncompressed.position(0).limit(0);
                 throw new CorruptSSTableException(e, channel.filePath());
             }
-
         }
 
         public void close()
diff --git a/src/java/org/apache/cassandra/io/util/EmptyRebufferer.java 
b/src/java/org/apache/cassandra/io/util/EmptyRebufferer.java
index aa8e7e046f..7f54a6b180 100644
--- a/src/java/org/apache/cassandra/io/util/EmptyRebufferer.java
+++ b/src/java/org/apache/cassandra/io/util/EmptyRebufferer.java
@@ -64,7 +64,7 @@ public class EmptyRebufferer implements Rebufferer, 
RebuffererFactory
     }
 
     @Override
-    public Rebufferer instantiateRebufferer()
+    public Rebufferer instantiateRebufferer(boolean isScan)
     {
         return this;
     }
diff --git a/src/java/org/apache/cassandra/io/util/FileHandle.java 
b/src/java/org/apache/cassandra/io/util/FileHandle.java
index 943355d01d..67bfd239d6 100644
--- a/src/java/org/apache/cassandra/io/util/FileHandle.java
+++ b/src/java/org/apache/cassandra/io/util/FileHandle.java
@@ -134,6 +134,11 @@ public class FileHandle extends SharedCloseableImpl
         return createReader(null);
     }
 
+    public RandomAccessReader createReaderForScan()
+    {
+        return createReader(null, true);
+    }
+
     /**
      * Create {@link RandomAccessReader} with configured method of reading 
content of the file.
      * Reading from file will be rate limited by given {@link RateLimiter}.
@@ -143,7 +148,12 @@ public class FileHandle extends SharedCloseableImpl
      */
     public RandomAccessReader createReader(RateLimiter limiter)
     {
-        return new RandomAccessReader(instantiateRebufferer(limiter));
+        return createReader(limiter, false);
+    }
+
+    public RandomAccessReader createReader(RateLimiter limiter, boolean 
forScan)
+    {
+       return new RandomAccessReader(instantiateRebufferer(limiter, forScan));
     }
 
     public FileDataInput createReader(long position)
@@ -186,7 +196,12 @@ public class FileHandle extends SharedCloseableImpl
 
     public Rebufferer instantiateRebufferer(RateLimiter limiter)
     {
-        Rebufferer rebufferer = rebuffererFactory.instantiateRebufferer();
+        return instantiateRebufferer(limiter, false);
+    }
+
+    public Rebufferer instantiateRebufferer(RateLimiter limiter, boolean 
forScan)
+    {
+        Rebufferer rebufferer = 
rebuffererFactory.instantiateRebufferer(forScan);
 
         if (limiter != null)
             rebufferer = new LimitingRebufferer(rebufferer, limiter, 
DiskOptimizationStrategy.MAX_BUFFER_SIZE);
diff --git a/src/java/org/apache/cassandra/io/util/MmapRebufferer.java 
b/src/java/org/apache/cassandra/io/util/MmapRebufferer.java
index 8df6370c5e..884bc97186 100644
--- a/src/java/org/apache/cassandra/io/util/MmapRebufferer.java
+++ b/src/java/org/apache/cassandra/io/util/MmapRebufferer.java
@@ -41,7 +41,7 @@ class MmapRebufferer extends AbstractReaderFileProxy 
implements Rebufferer, Rebu
     }
 
     @Override
-    public Rebufferer instantiateRebufferer()
+    public Rebufferer instantiateRebufferer(boolean isScan)
     {
         return this;
     }
diff --git a/src/java/org/apache/cassandra/io/util/RandomAccessReader.java 
b/src/java/org/apache/cassandra/io/util/RandomAccessReader.java
index 3ce1a2eb08..b89e59eb52 100644
--- a/src/java/org/apache/cassandra/io/util/RandomAccessReader.java
+++ b/src/java/org/apache/cassandra/io/util/RandomAccessReader.java
@@ -332,7 +332,7 @@ public class RandomAccessReader extends 
RebufferingInputStream implements FileDa
         try
         {
             ChunkReader reader = new SimpleChunkReader(channel, -1, 
BufferType.OFF_HEAP, DEFAULT_BUFFER_SIZE);
-            Rebufferer rebufferer = reader.instantiateRebufferer();
+            Rebufferer rebufferer = reader.instantiateRebufferer(false);
             return new RandomAccessReaderWithOwnChannel(rebufferer);
         }
         catch (Throwable t)
diff --git a/src/java/org/apache/cassandra/io/util/RebuffererFactory.java 
b/src/java/org/apache/cassandra/io/util/RebuffererFactory.java
index ec35f0ba53..192fb8ea0c 100644
--- a/src/java/org/apache/cassandra/io/util/RebuffererFactory.java
+++ b/src/java/org/apache/cassandra/io/util/RebuffererFactory.java
@@ -28,5 +28,5 @@ package org.apache.cassandra.io.util;
  */
 public interface RebuffererFactory extends ReaderFileProxy
 {
-    Rebufferer instantiateRebufferer();
+    Rebufferer instantiateRebufferer(boolean isScan);
 }
diff --git a/src/java/org/apache/cassandra/io/util/SimpleChunkReader.java 
b/src/java/org/apache/cassandra/io/util/SimpleChunkReader.java
index 8d00ce5d40..fec1216bf4 100644
--- a/src/java/org/apache/cassandra/io/util/SimpleChunkReader.java
+++ b/src/java/org/apache/cassandra/io/util/SimpleChunkReader.java
@@ -55,7 +55,7 @@ class SimpleChunkReader extends AbstractReaderFileProxy 
implements ChunkReader
     }
 
     @Override
-    public Rebufferer instantiateRebufferer()
+    public Rebufferer instantiateRebufferer(boolean forScan)
     {
         if (Integer.bitCount(bufferSize) == 1)
             return new BufferManagingRebufferer.Aligned(this);
diff --git 
a/src/java/org/apache/cassandra/io/util/ThreadLocalReadAheadBuffer.java 
b/src/java/org/apache/cassandra/io/util/ThreadLocalReadAheadBuffer.java
new file mode 100644
index 0000000000..824acaa8d8
--- /dev/null
+++ b/src/java/org/apache/cassandra/io/util/ThreadLocalReadAheadBuffer.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.io.util;
+
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Map;
+
+import io.netty.util.concurrent.FastThreadLocal;
+import org.apache.cassandra.io.compress.BufferType;
+import org.apache.cassandra.io.sstable.CorruptSSTableException;
+
+public final class ThreadLocalReadAheadBuffer
+{
+    private static class Block
+    {
+        ByteBuffer buffer = null;
+        int index = -1;
+    }
+
+    private final ChannelProxy channel;
+
+    private final BufferType bufferType;
+
+    private static final FastThreadLocal<Map<String, Block>> blockMap = new 
FastThreadLocal<>()
+    {
+        @Override
+        protected Map<String, Block> initialValue()
+        {
+            return new HashMap<>();
+        }
+    };
+
+    private final int bufferSize;
+    private final long channelSize;
+
+    public ThreadLocalReadAheadBuffer(ChannelProxy channel, int bufferSize, 
BufferType bufferType)
+    {
+        this.channel = channel;
+        this.channelSize = channel.size();
+        this.bufferSize = bufferSize;
+        this.bufferType = bufferType;
+    }
+
+    public boolean hasBuffer()
+    {
+        return block().buffer != null;
+    }
+
+    /**
+     * Safe to call only if {@link #hasBuffer()} is true
+     */
+    public int remaining()
+    {
+        return getBlock().buffer.remaining();
+    }
+
+    public void allocateBuffer()
+    {
+        getBlock();
+    }
+
+    private Block getBlock()
+    {
+        Block block = block();
+        if (block.buffer == null)
+        {
+            block.buffer = bufferType.allocate(bufferSize);
+            block.buffer.clear();
+        }
+        return block;
+    }
+
+    private Block block()
+    {
+        return blockMap.get().computeIfAbsent(channel.filePath(), k -> new 
Block());
+    }
+
+    public void fill(long position)
+    {
+        Block block = getBlock();
+        ByteBuffer blockBuffer = block.buffer;
+        long realPosition = Math.min(channelSize, position);
+        int blockNo = (int) (realPosition / bufferSize);
+        long blockPosition = blockNo * (long) bufferSize;
+
+        long remaining = channelSize - blockPosition;
+        int sizeToRead = (int) Math.min(remaining, bufferSize);
+        if (block.index != blockNo)
+        {
+            blockBuffer.flip();
+            blockBuffer.limit(sizeToRead);
+            if (channel.read(blockBuffer, blockPosition) != sizeToRead)
+                throw new CorruptSSTableException(null, channel.filePath());
+
+            block.index = blockNo;
+        }
+
+        blockBuffer.flip();
+        blockBuffer.limit(sizeToRead);
+        blockBuffer.position((int) (realPosition - blockPosition));
+    }
+
+    public int read(ByteBuffer dest, int length)
+    {
+        Block block = getBlock();
+        ByteBuffer blockBuffer = block.buffer;
+        ByteBuffer tmp = blockBuffer.duplicate();
+        tmp.limit(tmp.position() + length);
+        dest.put(tmp);
+        blockBuffer.position(blockBuffer.position() + length);
+
+        return length;
+    }
+
+    public void clear(boolean deallocate)
+    {
+        Block block = getBlock();
+        block.index = -1;
+
+        ByteBuffer blockBuffer = block.buffer;
+        if (blockBuffer != null)
+        {
+            blockBuffer.clear();
+            if (deallocate)
+            {
+                FileUtils.clean(blockBuffer);
+                block.buffer = null;
+            }
+        }
+    }
+
+    public void close()
+    {
+        clear(true);
+        blockMap.get().remove(channel.filePath());
+    }
+}
diff --git a/src/java/org/apache/cassandra/service/StorageService.java 
b/src/java/org/apache/cassandra/service/StorageService.java
index 5d90bcb572..53f815a586 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -1430,6 +1430,19 @@ public class StorageService extends 
NotificationBroadcasterSupport implements IE
         return result;
     }
 
+    @Override
+    public int getCompressedReadAheadBufferInKB()
+    {
+        return DatabaseDescriptor.getCompressedReadAheadBufferSizeInKB();
+    }
+
+    @Override
+    public void setCompressedReadAheadBufferInKB(int sizeInKb)
+    {
+        DatabaseDescriptor.setCompressedReadAheadBufferSizeInKb(sizeInKb);
+        logger.info("set compressed read ahead buffer size to {}KiB", 
sizeInKb);
+    }
+
     public int getBatchlogReplayThrottleInKB()
     {
         return DatabaseDescriptor.getBatchlogReplayThrottleInKiB();
diff --git a/src/java/org/apache/cassandra/service/StorageServiceMBean.java 
b/src/java/org/apache/cassandra/service/StorageServiceMBean.java
index bfb9eb97b7..7760ea0188 100644
--- a/src/java/org/apache/cassandra/service/StorageServiceMBean.java
+++ b/src/java/org/apache/cassandra/service/StorageServiceMBean.java
@@ -862,6 +862,9 @@ public interface StorageServiceMBean extends 
NotificationEmitter
     public void setCompactionThroughputMbPerSec(int value);
     Map<String, String> getCurrentCompactionThroughputMebibytesPerSec();
 
+    public int getCompressedReadAheadBufferInKB();
+    public void setCompressedReadAheadBufferInKB(int sizeInKb);
+
     public int getBatchlogReplayThrottleInKB();
     public void setBatchlogReplayThrottleInKB(int value);
 
diff --git 
a/test/unit/org/apache/cassandra/io/util/CompressedChunkReaderTest.java 
b/test/unit/org/apache/cassandra/io/util/CompressedChunkReaderTest.java
new file mode 100644
index 0000000000..af4b458fec
--- /dev/null
+++ b/test/unit/org/apache/cassandra/io/util/CompressedChunkReaderTest.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.io.util;
+
+import accord.utils.Gen;
+import accord.utils.Gens;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.ClusteringComparator;
+import org.apache.cassandra.io.compress.CompressedSequentialWriter;
+import org.apache.cassandra.io.compress.CompressionMetadata;
+import org.apache.cassandra.io.filesystem.ListenableFileSystem;
+import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
+import org.apache.cassandra.schema.CompressionParams;
+import org.assertj.core.api.Assertions;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.nio.ByteBuffer;
+import java.nio.file.Files;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static accord.utils.Property.qt;
+
+public class CompressedChunkReaderTest
+{
+    static
+    {
+        DatabaseDescriptor.clientInitialization();
+    }
+
+    @Test
+    public void scanReaderReadsLessThanRAReader()
+    {
+        var optionGen = options();
+        var paramsGen = params();
+        var lengthGen = Gens.longs().between(1, 1 << 16);
+        qt().withSeed(-1871070464864118891L).forAll(Gens.random(), optionGen, 
paramsGen).check((rs, option, params) -> {
+            ListenableFileSystem fs = 
FileSystems.newGlobalInMemoryFileSystem();
+
+            File f = new File("/file.db");
+            AtomicInteger reads = new AtomicInteger();
+            fs.onPostRead(f.path::equals, (p, c, pos, dst, r) -> {
+                reads.incrementAndGet();
+            });
+            long length = lengthGen.nextLong(rs);
+            CompressionMetadata metadata1, metadata2;
+            try (CompressedSequentialWriter writer = new 
CompressedSequentialWriter(f, new File("/file.offset"), new 
File("/file.digest"), option, params, new MetadataCollector(new 
ClusteringComparator())))
+            {
+                for (long i = 0; i < length; i++)
+                    writer.writeLong(i);
+
+                writer.sync();
+                metadata1 = writer.open(0);
+                metadata2 = writer.open(0);
+            }
+
+            doReads(f, metadata1, length, true);
+            int scanReads = reads.getAndSet(0);
+
+            doReads(f, metadata2, length, false);
+            int raReads = reads.getAndSet(0);
+            
+            if (Files.size(f.toPath()) > 
DatabaseDescriptor.getCompressedReadAheadBufferSize())
+                Assert.assertTrue(scanReads < raReads);
+        });
+    }
+
+    private void doReads(File f, CompressionMetadata metadata, long length, 
boolean useReadAhead)
+    {
+        ByteBuffer buffer = ByteBuffer.allocateDirect(metadata.chunkLength());
+
+        try (ChannelProxy channel = new ChannelProxy(f);
+             CompressedChunkReader reader = new 
CompressedChunkReader.Standard(channel, metadata, () -> 1.1);
+             metadata)
+        {
+            if (useReadAhead)
+                reader.forScan();
+
+            long offset = 0;
+            long maxOffset = length * Long.BYTES;
+            do
+            {
+                reader.readChunk(offset, buffer);
+                for (long expected = offset / Long.BYTES; 
buffer.hasRemaining(); expected++)
+                    
Assertions.assertThat(buffer.getLong()).isEqualTo(expected);
+
+                offset += metadata.chunkLength();
+            }
+            while (offset < maxOffset);
+        }
+        finally
+        {
+            FileUtils.clean(buffer);
+        }}
+
+    private static Gen<SequentialWriterOption> options()
+    {
+        Gen<Integer> bufferSizes = Gens.constant(1 << 10); //.pickInt(1 << 4, 
1 << 10, 1 << 15);
+        return rs -> SequentialWriterOption.newBuilder()
+                                           .finishOnClose(false)
+                                           .bufferSize(bufferSizes.next(rs))
+                                           .build();
+    }
+
+    private enum CompressionKind { Noop, Snappy, Deflate, Lz4, Zstd }
+
+    private static Gen<CompressionParams> params()
+    {
+        Gen<Integer> chunkLengths = 
Gens.constant(CompressionParams.DEFAULT_CHUNK_LENGTH);
+        Gen<Double> compressionRatio = Gens.pick(1.1D);
+        return rs -> {
+            CompressionKind kind = rs.pick(CompressionKind.values());
+            switch (kind)
+            {
+                case Noop: return CompressionParams.noop();
+                case Snappy: return 
CompressionParams.snappy(chunkLengths.next(rs), compressionRatio.next(rs));
+                case Deflate: return 
CompressionParams.deflate(chunkLengths.next(rs));
+                case Lz4: return CompressionParams.lz4(chunkLengths.next(rs));
+                case Zstd: return 
CompressionParams.zstd(chunkLengths.next(rs));
+                default: throw new UnsupportedOperationException(kind.name());
+            }
+        };
+    }
+}
\ No newline at end of file
diff --git 
a/test/unit/org/apache/cassandra/io/util/ThreadLocalReadAheadBufferTest.java 
b/test/unit/org/apache/cassandra/io/util/ThreadLocalReadAheadBufferTest.java
new file mode 100644
index 0000000000..4d43017b2a
--- /dev/null
+++ b/test/unit/org/apache/cassandra/io/util/ThreadLocalReadAheadBufferTest.java
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.io.util;
+
+
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.file.Files;
+import java.util.List;
+import java.util.Random;
+
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.config.DataStorageSpec;
+import org.apache.cassandra.io.compress.BufferType;
+import org.apache.cassandra.io.sstable.CorruptSSTableException;
+import org.apache.cassandra.utils.Pair;
+import org.quicktheories.WithQuickTheories;
+import org.quicktheories.core.Gen;
+
+import static 
org.apache.cassandra.config.CassandraRelevantProperties.JAVA_IO_TMPDIR;
+
+public class ThreadLocalReadAheadBufferTest implements WithQuickTheories
+{
+    private static final int numFiles = 5;
+    private static final File[] files = new File[numFiles];
+    private static final Logger logger = 
LoggerFactory.getLogger(ThreadLocalReadAheadBufferTest.class);
+
+    @BeforeClass
+    public static void setup()
+    {
+        int seed = new Random().nextInt();
+        logger.info("Seed: {}", seed);
+
+        for (int i = 0; i < numFiles; i++)
+        {
+            int size = new Random().nextInt((Integer.MAX_VALUE - 1) / 8);
+            files[i] = writeFile(seed, size);
+        }
+    }
+
+    @AfterClass
+    public static void cleanup()
+    {
+        for (File f : files)
+        {
+            try
+            {
+                f.delete();
+            }
+            catch (Exception e)
+            {
+               // ignore
+            }
+        }
+    }
+
+    @Test
+    public void testLastBlockReads()
+    {
+        qt().forAll(lastBlockReads())
+            .checkAssert(this::testReads);
+    }
+
+    @Test
+    public void testReadsLikeChannelProxy()
+    {
+
+        qt().forAll(randomReads())
+            .checkAssert(this::testReads);
+    }
+
+    private void testReads(InputData propertyInputs)
+    {
+        try (ChannelProxy channel = new ChannelProxy(propertyInputs.file))
+        {
+            ThreadLocalReadAheadBuffer trlab = new 
ThreadLocalReadAheadBuffer(channel, new 
DataStorageSpec.IntKibibytesBound("256KiB").toBytes(), BufferType.OFF_HEAP);
+            for (Pair<Long, Integer> read : propertyInputs.positionsAndLengths)
+            {
+                int readSize = Math.min(read.right,(int) (channel.size() - 
read.left));
+                ByteBuffer buf1 = ByteBuffer.allocate(readSize);
+                channel.read(buf1, read.left);
+
+                ByteBuffer buf2 = ByteBuffer.allocate(readSize);
+                try
+                {
+                    int copied = 0;
+                    while (copied < readSize) {
+                        trlab.fill(read.left + copied);
+                        int leftToRead = readSize - copied;
+                        if (trlab.remaining() >= leftToRead)
+                            copied += trlab.read(buf2, leftToRead);
+                        else
+                            copied += trlab.read(buf2, trlab.remaining());
+                    }
+                }
+                catch (CorruptSSTableException e)
+                {
+                    throw new RuntimeException(e);
+                }
+
+                Assert.assertEquals(buf1, buf2);
+            }
+        }
+    }
+
+    private Gen<InputData> lastBlockReads()
+    {
+        return arbitrary().pick(List.of(files))
+                          .flatMap((file) ->
+                                   lists().of(longs().between(0, 
fileSize(file)).zip(integers().between(1, 100), Pair::create))
+                                          .ofSizeBetween(5, 10)
+                                          .map(positionsAndLengths -> new 
InputData(file, positionsAndLengths)));
+
+    }
+
+    private Gen<InputData> randomReads()
+    {
+        int blockSize = new 
DataStorageSpec.IntKibibytesBound("256KiB").toBytes();
+        return arbitrary().pick(List.of(files))
+                         .flatMap((file) ->
+                                  lists().of(longs().between(fileSize(file) - 
blockSize, fileSize(file)).zip(integers().between(1, 100), Pair::create))
+                                         .ofSizeBetween(5, 10)
+                                         .map(positionsAndLengths -> new 
InputData(file, positionsAndLengths)));
+
+    }
+
+    // need this becasue generators don't handle the IOException
+    private long fileSize(File file)
+    {
+        try
+        {
+            return Files.size(file.toPath());
+        } catch (IOException e)
+        {
+            throw new RuntimeException(e);
+        }
+    }
+
+    private static class InputData
+    {
+
+        private final File file;
+        private final List<Pair<Long, Integer>> positionsAndLengths;
+
+        public InputData(File file, List<Pair<Long, Integer>> 
positionsAndLengths)
+        {
+            this.file = file;
+            this.positionsAndLengths = positionsAndLengths;
+        }
+    }
+
+    private static File writeFile(int seed, int length)
+    {
+        String fileName = JAVA_IO_TMPDIR.getString() + "data+" + length + 
".bin";
+
+        byte[] dataChunk = new byte[4096 * 8];
+        java.util.Random random = new Random(seed);
+        int writtenData = 0;
+
+        File file = new File(fileName);
+        try (FileOutputStream fos = new FileOutputStream(file.toJavaIOFile()))
+        {
+            while (writtenData < length)
+            {
+                random.nextBytes(dataChunk);
+                int toWrite = Math.min((length - writtenData), 
dataChunk.length);
+                fos.write(dataChunk, 0, toWrite);
+                writtenData += toWrite;
+            }
+            fos.flush();
+        }
+        catch (IOException e)
+        {
+            throw new RuntimeException(e);
+        }
+
+        return file;
+    }
+
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to