This is an automated email from the ASF dual-hosted git repository.
jwest pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push:
new 35d51725ef Add read ahead buffer for scans of compressed data files
35d51725ef is described below
commit 35d51725ef73cd6749393f3c08ecc0fab3f33513
Author: Jordan West <[email protected]>
AuthorDate: Wed Apr 17 10:27:02 2024 -0700
Add read ahead buffer for scans of compressed data files
Patch by Jordan West, Jon Haddad; Reviewed by David Capwell, Caleb
Rackliffe, Dmitry Konstantinov for CASSANDRA-15452
NOTE: This was originally merged up via merge commit but something went
wrong, this is the fix commit to bring this back to trunk
---
.../org/apache/cassandra/cache/ChunkCache.java | 2 +-
src/java/org/apache/cassandra/config/Config.java | 2 +
.../cassandra/config/DatabaseDescriptor.java | 21 +++
.../cassandra/io/sstable/format/SSTableReader.java | 5 +
.../io/sstable/format/SSTableScanner.java | 2 +-
.../io/util/BufferManagingRebufferer.java | 1 +
.../org/apache/cassandra/io/util/ChunkReader.java | 2 +
.../cassandra/io/util/CompressedChunkReader.java | 202 +++++++++++++++++---
.../apache/cassandra/io/util/EmptyRebufferer.java | 2 +-
.../org/apache/cassandra/io/util/FileHandle.java | 19 +-
.../apache/cassandra/io/util/MmapRebufferer.java | 2 +-
.../cassandra/io/util/RandomAccessReader.java | 2 +-
.../cassandra/io/util/RebuffererFactory.java | 2 +-
.../cassandra/io/util/SimpleChunkReader.java | 2 +-
.../io/util/ThreadLocalReadAheadBuffer.java | 154 ++++++++++++++++
.../apache/cassandra/service/StorageService.java | 13 ++
.../cassandra/service/StorageServiceMBean.java | 3 +
.../io/util/CompressedChunkReaderTest.java | 141 ++++++++++++++
.../io/util/ThreadLocalReadAheadBufferTest.java | 204 +++++++++++++++++++++
19 files changed, 743 insertions(+), 38 deletions(-)
diff --git a/src/java/org/apache/cassandra/cache/ChunkCache.java
b/src/java/org/apache/cassandra/cache/ChunkCache.java
index afd5804089..958007c504 100644
--- a/src/java/org/apache/cassandra/cache/ChunkCache.java
+++ b/src/java/org/apache/cassandra/cache/ChunkCache.java
@@ -258,7 +258,7 @@ public class ChunkCache implements
CacheLoader<ChunkCache.Key, ChunkCache.Buffer
}
@Override
- public Rebufferer instantiateRebufferer()
+ public Rebufferer instantiateRebufferer(boolean isScan)
{
return this;
}
diff --git a/src/java/org/apache/cassandra/config/Config.java
b/src/java/org/apache/cassandra/config/Config.java
index 518469a149..2ec1d78e30 100644
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@ -345,6 +345,8 @@ public class Config
@Replaces(oldName = "min_free_space_per_drive_in_mb", converter =
Converters.MEBIBYTES_DATA_STORAGE_INT, deprecated = true)
public DataStorageSpec.IntMebibytesBound min_free_space_per_drive = new
DataStorageSpec.IntMebibytesBound("50MiB");
+ public DataStorageSpec.IntKibibytesBound compressed_read_ahead_buffer_size
= new DataStorageSpec.IntKibibytesBound("256KiB");
+
// fraction of free disk space available for compaction after min free
space is subtracted
public volatile Double max_space_usable_for_compactions_in_percentage =
.95;
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index 4033aa7918..f6fd1b52ff 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -1025,6 +1025,9 @@ public class DatabaseDescriptor
break;
}
+ if (conf.compressed_read_ahead_buffer_size.toKibibytes() > 0 &&
conf.compressed_read_ahead_buffer_size.toKibibytes() < 256)
+ throw new
ConfigurationException("compressed_read_ahead_buffer_size must be at least
256KiB (set to 0 to disable), but was " +
conf.compressed_read_ahead_buffer_size, false);
+
if (conf.server_encryption_options != null)
{
conf.server_encryption_options.applyConfig();
@@ -2730,6 +2733,24 @@ public class DatabaseDescriptor
conf.concurrent_materialized_view_builders = value;
}
+ public static int getCompressedReadAheadBufferSize()
+ {
+ return conf.compressed_read_ahead_buffer_size.toBytes();
+ }
+
+ public static int getCompressedReadAheadBufferSizeInKB()
+ {
+ return conf.compressed_read_ahead_buffer_size.toKibibytes();
+ }
+
+ public static void setCompressedReadAheadBufferSizeInKb(int sizeInKb)
+ {
+ if (sizeInKb < 256)
+ throw new
IllegalArgumentException("compressed_read_ahead_buffer_size_in_kb must be at
least 256KiB");
+
+ conf.compressed_read_ahead_buffer_size =
createIntKibibyteBoundAndEnsureItIsValidForByteConversion(sizeInKb,
"compressed_read_ahead_buffer_size");
+ }
+
public static long getMinFreeSpacePerDriveInMebibytes()
{
return conf.min_free_space_per_drive.toMebibytes();
diff --git a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
b/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
index 92359e4d0c..ff488694dc 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
@@ -1375,6 +1375,11 @@ public abstract class SSTableReader extends SSTable
implements UnfilteredSource,
return dfile.createReader();
}
+ public RandomAccessReader openDataReaderForScan()
+ {
+ return dfile.createReaderForScan();
+ }
+
public void trySkipFileCacheBefore(DecoratedKey key)
{
long position = getPosition(key, SSTableReader.Operator.GE);
diff --git
a/src/java/org/apache/cassandra/io/sstable/format/SSTableScanner.java
b/src/java/org/apache/cassandra/io/sstable/format/SSTableScanner.java
index 86c0f7ec2d..28035a85da 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/SSTableScanner.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/SSTableScanner.java
@@ -75,7 +75,7 @@ implements ISSTableScanner
{
assert sstable != null;
- this.dfile = sstable.openDataReader();
+ this.dfile = sstable.openDataReaderForScan();
this.sstable = sstable;
this.columns = columns;
this.dataRange = dataRange;
diff --git
a/src/java/org/apache/cassandra/io/util/BufferManagingRebufferer.java
b/src/java/org/apache/cassandra/io/util/BufferManagingRebufferer.java
index 3a297ee0e2..13b7c9d441 100644
--- a/src/java/org/apache/cassandra/io/util/BufferManagingRebufferer.java
+++ b/src/java/org/apache/cassandra/io/util/BufferManagingRebufferer.java
@@ -50,6 +50,7 @@ public abstract class BufferManagingRebufferer implements
Rebufferer, Rebufferer
public void closeReader()
{
BufferPools.forChunkCache().put(buffer);
+ source.releaseUnderlyingResources();
offset = -1;
}
diff --git a/src/java/org/apache/cassandra/io/util/ChunkReader.java
b/src/java/org/apache/cassandra/io/util/ChunkReader.java
index 33bf7921ed..779e7c35f9 100644
--- a/src/java/org/apache/cassandra/io/util/ChunkReader.java
+++ b/src/java/org/apache/cassandra/io/util/ChunkReader.java
@@ -48,4 +48,6 @@ public interface ChunkReader extends RebuffererFactory
* This is not guaranteed to be fulfilled.
*/
BufferType preferredBufferType();
+
+ default void releaseUnderlyingResources() {}
}
diff --git a/src/java/org/apache/cassandra/io/util/CompressedChunkReader.java
b/src/java/org/apache/cassandra/io/util/CompressedChunkReader.java
index b0aa24bd8f..b6b3c9a6a2 100644
--- a/src/java/org/apache/cassandra/io/util/CompressedChunkReader.java
+++ b/src/java/org/apache/cassandra/io/util/CompressedChunkReader.java
@@ -26,11 +26,13 @@ import java.util.function.Supplier;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.primitives.Ints;
+import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.io.compress.BufferType;
import org.apache.cassandra.io.compress.CompressionMetadata;
import org.apache.cassandra.io.compress.CorruptBlockException;
import org.apache.cassandra.io.sstable.CorruptSSTableException;
import org.apache.cassandra.utils.ChecksumType;
+import org.apache.cassandra.utils.Closeable;
public abstract class CompressedChunkReader extends AbstractReaderFileProxy
implements ChunkReader
{
@@ -47,6 +49,11 @@ public abstract class CompressedChunkReader extends
AbstractReaderFileProxy impl
assert Integer.bitCount(metadata.chunkLength()) == 1; //must be a
power of two
}
+ protected CompressedChunkReader forScan()
+ {
+ return this;
+ }
+
@VisibleForTesting
public double getCrcCheckChance()
{
@@ -83,20 +90,167 @@ public abstract class CompressedChunkReader extends
AbstractReaderFileProxy impl
}
@Override
- public Rebufferer instantiateRebufferer()
+ public Rebufferer instantiateRebufferer(boolean isScan)
{
- return new BufferManagingRebufferer.Aligned(this);
+ return new BufferManagingRebufferer.Aligned(isScan ? forScan() : this);
}
- public static class Standard extends CompressedChunkReader
+ protected interface CompressedReader extends Closeable
{
- // we read the raw compressed bytes into this buffer, then
uncompressed them into the provided one.
+ default void allocateResources()
+ {
+ }
+
+ default void deallocateResources()
+ {
+ }
+
+ default boolean allocated()
+ {
+ return false;
+ }
+
+ default void close()
+ {
+
+ }
+
+
+ ByteBuffer read(CompressionMetadata.Chunk chunk, boolean
shouldCheckCrc) throws CorruptBlockException;
+ }
+
+ private static class RandomAccessCompressedReader implements
CompressedReader
+ {
+ private final ChannelProxy channel;
private final ThreadLocalByteBufferHolder bufferHolder;
+ private RandomAccessCompressedReader(ChannelProxy channel,
CompressionMetadata metadata)
+ {
+ this.channel = channel;
+ this.bufferHolder = new
ThreadLocalByteBufferHolder(metadata.compressor().preferredBufferType());
+ }
+
+ @Override
+ public ByteBuffer read(CompressionMetadata.Chunk chunk, boolean
shouldCheckCrc) throws CorruptBlockException
+ {
+ int length = shouldCheckCrc ? chunk.length + Integer.BYTES //
compressed length + checksum length
+ : chunk.length;
+ ByteBuffer compressed = bufferHolder.getBuffer(length);
+ if (channel.read(compressed, chunk.offset) != length)
+ throw new CorruptBlockException(channel.filePath(), chunk);
+ compressed.flip();
+ compressed.limit(chunk.length);
+
+ if (shouldCheckCrc)
+ {
+ int checksum = (int) ChecksumType.CRC32.of(compressed);
+ compressed.limit(length);
+ if (compressed.getInt() != checksum)
+ throw new CorruptBlockException(channel.filePath(), chunk);
+ compressed.position(0).limit(chunk.length);
+ }
+ return compressed;
+ }
+ }
+
+ private static class ScanCompressedReader implements CompressedReader
+ {
+ private final ChannelProxy channel;
+ private final ThreadLocalByteBufferHolder bufferHolder;
+ private final ThreadLocalReadAheadBuffer readAheadBuffer;
+
+ private ScanCompressedReader(ChannelProxy channel, CompressionMetadata
metadata, int readAheadBufferSize)
+ {
+ this.channel = channel;
+ this.bufferHolder = new
ThreadLocalByteBufferHolder(metadata.compressor().preferredBufferType());
+ this.readAheadBuffer = new ThreadLocalReadAheadBuffer(channel,
readAheadBufferSize, metadata.compressor().preferredBufferType());
+ }
+
+ @Override
+ public ByteBuffer read(CompressionMetadata.Chunk chunk, boolean
shouldCheckCrc) throws CorruptBlockException
+ {
+ int length = shouldCheckCrc ? chunk.length + Integer.BYTES //
compressed length + checksum length
+ : chunk.length;
+ ByteBuffer compressed = bufferHolder.getBuffer(length);
+
+ int copied = 0;
+ while (copied < length)
+ {
+ readAheadBuffer.fill(chunk.offset + copied);
+ int leftToRead = length - copied;
+ if (readAheadBuffer.remaining() >= leftToRead)
+ copied += readAheadBuffer.read(compressed, leftToRead);
+ else
+ copied += readAheadBuffer.read(compressed,
readAheadBuffer.remaining());
+ }
+
+ compressed.flip();
+ compressed.limit(chunk.length);
+
+ if (shouldCheckCrc)
+ {
+ int checksum = (int) ChecksumType.CRC32.of(compressed);
+ compressed.limit(length);
+ if (compressed.getInt() != checksum)
+ throw new CorruptBlockException(channel.filePath(), chunk);
+ compressed.position(0).limit(chunk.length);
+ }
+ return compressed;
+ }
+
+ @Override
+ public void allocateResources()
+ {
+ readAheadBuffer.allocateBuffer();
+ }
+
+ @Override
+ public void deallocateResources()
+ {
+ readAheadBuffer.clear(true);
+ }
+
+ @Override
+ public boolean allocated()
+ {
+ return readAheadBuffer.hasBuffer();
+ }
+
+ public void close()
+ {
+ readAheadBuffer.close();
+ }
+ }
+
+ public static class Standard extends CompressedChunkReader
+ {
+
+ private final CompressedReader reader;
+ private final CompressedReader scanReader;
+
public Standard(ChannelProxy channel, CompressionMetadata metadata,
Supplier<Double> crcCheckChanceSupplier)
{
super(channel, metadata, crcCheckChanceSupplier);
- bufferHolder = new
ThreadLocalByteBufferHolder(metadata.compressor().preferredBufferType());
+ reader = new RandomAccessCompressedReader(channel, metadata);
+
+ int readAheadBufferSize =
DatabaseDescriptor.getCompressedReadAheadBufferSize();
+ scanReader = (readAheadBufferSize > 0 && readAheadBufferSize >
metadata.chunkLength())
+ ? new ScanCompressedReader(channel, metadata,
readAheadBufferSize) : null;
+ }
+
+ protected CompressedChunkReader forScan()
+ {
+ if (scanReader != null)
+ scanReader.allocateResources();
+
+ return this;
+ }
+
+ @Override
+ public void releaseUnderlyingResources()
+ {
+ if (scanReader != null)
+ scanReader.deallocateResources();
}
@Override
@@ -110,31 +264,13 @@ public abstract class CompressedChunkReader extends
AbstractReaderFileProxy impl
CompressionMetadata.Chunk chunk = metadata.chunkFor(position);
boolean shouldCheckCrc = shouldCheckCrc();
- int length = shouldCheckCrc ? chunk.length + Integer.BYTES //
compressed length + checksum length
- : chunk.length;
+ CompressedReader readFrom = (scanReader != null &&
scanReader.allocated()) ? scanReader : reader;
if (chunk.length < maxCompressedLength)
{
- ByteBuffer compressed = bufferHolder.getBuffer(length);
-
- if (channel.read(compressed, chunk.offset) != length)
- throw new CorruptBlockException(channel.filePath(),
chunk);
-
- compressed.flip();
- compressed.limit(chunk.length);
+ ByteBuffer compressed = readFrom.read(chunk,
shouldCheckCrc);
uncompressed.clear();
- if (shouldCheckCrc)
- {
- int checksum = (int) ChecksumType.CRC32.of(compressed);
-
- compressed.limit(length);
- if (compressed.getInt() != checksum)
- throw new
CorruptBlockException(channel.filePath(), chunk);
-
- compressed.position(0).limit(chunk.length);
- }
-
try
{
metadata.compressor().uncompress(compressed,
uncompressed);
@@ -155,10 +291,9 @@ public abstract class CompressedChunkReader extends
AbstractReaderFileProxy impl
uncompressed.flip();
int checksum = (int)
ChecksumType.CRC32.of(uncompressed);
- ByteBuffer scratch =
bufferHolder.getBuffer(Integer.BYTES);
-
+ ByteBuffer scratch =
ByteBuffer.allocate(Integer.BYTES);
if (channel.read(scratch, chunk.offset + chunk.length)
!= Integer.BYTES
- || scratch.getInt(0) != checksum)
+ || scratch.getInt(0) != checksum)
throw new
CorruptBlockException(channel.filePath(), chunk);
}
}
@@ -171,6 +306,16 @@ public abstract class CompressedChunkReader extends
AbstractReaderFileProxy impl
throw new CorruptSSTableException(e, channel.filePath());
}
}
+
+ @Override
+ public void close()
+ {
+ reader.close();
+ if (scanReader != null)
+ scanReader.close();
+
+ super.close();
+ }
}
public static class Mmap extends CompressedChunkReader
@@ -233,7 +378,6 @@ public abstract class CompressedChunkReader extends
AbstractReaderFileProxy impl
uncompressed.position(0).limit(0);
throw new CorruptSSTableException(e, channel.filePath());
}
-
}
public void close()
diff --git a/src/java/org/apache/cassandra/io/util/EmptyRebufferer.java
b/src/java/org/apache/cassandra/io/util/EmptyRebufferer.java
index aa8e7e046f..7f54a6b180 100644
--- a/src/java/org/apache/cassandra/io/util/EmptyRebufferer.java
+++ b/src/java/org/apache/cassandra/io/util/EmptyRebufferer.java
@@ -64,7 +64,7 @@ public class EmptyRebufferer implements Rebufferer,
RebuffererFactory
}
@Override
- public Rebufferer instantiateRebufferer()
+ public Rebufferer instantiateRebufferer(boolean isScan)
{
return this;
}
diff --git a/src/java/org/apache/cassandra/io/util/FileHandle.java
b/src/java/org/apache/cassandra/io/util/FileHandle.java
index 943355d01d..67bfd239d6 100644
--- a/src/java/org/apache/cassandra/io/util/FileHandle.java
+++ b/src/java/org/apache/cassandra/io/util/FileHandle.java
@@ -134,6 +134,11 @@ public class FileHandle extends SharedCloseableImpl
return createReader(null);
}
+ public RandomAccessReader createReaderForScan()
+ {
+ return createReader(null, true);
+ }
+
/**
* Create {@link RandomAccessReader} with configured method of reading
content of the file.
* Reading from file will be rate limited by given {@link RateLimiter}.
@@ -143,7 +148,12 @@ public class FileHandle extends SharedCloseableImpl
*/
public RandomAccessReader createReader(RateLimiter limiter)
{
- return new RandomAccessReader(instantiateRebufferer(limiter));
+ return createReader(limiter, false);
+ }
+
+ public RandomAccessReader createReader(RateLimiter limiter, boolean
forScan)
+ {
+ return new RandomAccessReader(instantiateRebufferer(limiter, forScan));
}
public FileDataInput createReader(long position)
@@ -186,7 +196,12 @@ public class FileHandle extends SharedCloseableImpl
public Rebufferer instantiateRebufferer(RateLimiter limiter)
{
- Rebufferer rebufferer = rebuffererFactory.instantiateRebufferer();
+ return instantiateRebufferer(limiter, false);
+ }
+
+ public Rebufferer instantiateRebufferer(RateLimiter limiter, boolean
forScan)
+ {
+ Rebufferer rebufferer =
rebuffererFactory.instantiateRebufferer(forScan);
if (limiter != null)
rebufferer = new LimitingRebufferer(rebufferer, limiter,
DiskOptimizationStrategy.MAX_BUFFER_SIZE);
diff --git a/src/java/org/apache/cassandra/io/util/MmapRebufferer.java
b/src/java/org/apache/cassandra/io/util/MmapRebufferer.java
index 8df6370c5e..884bc97186 100644
--- a/src/java/org/apache/cassandra/io/util/MmapRebufferer.java
+++ b/src/java/org/apache/cassandra/io/util/MmapRebufferer.java
@@ -41,7 +41,7 @@ class MmapRebufferer extends AbstractReaderFileProxy
implements Rebufferer, Rebu
}
@Override
- public Rebufferer instantiateRebufferer()
+ public Rebufferer instantiateRebufferer(boolean isScan)
{
return this;
}
diff --git a/src/java/org/apache/cassandra/io/util/RandomAccessReader.java
b/src/java/org/apache/cassandra/io/util/RandomAccessReader.java
index 3ce1a2eb08..b89e59eb52 100644
--- a/src/java/org/apache/cassandra/io/util/RandomAccessReader.java
+++ b/src/java/org/apache/cassandra/io/util/RandomAccessReader.java
@@ -332,7 +332,7 @@ public class RandomAccessReader extends
RebufferingInputStream implements FileDa
try
{
ChunkReader reader = new SimpleChunkReader(channel, -1,
BufferType.OFF_HEAP, DEFAULT_BUFFER_SIZE);
- Rebufferer rebufferer = reader.instantiateRebufferer();
+ Rebufferer rebufferer = reader.instantiateRebufferer(false);
return new RandomAccessReaderWithOwnChannel(rebufferer);
}
catch (Throwable t)
diff --git a/src/java/org/apache/cassandra/io/util/RebuffererFactory.java
b/src/java/org/apache/cassandra/io/util/RebuffererFactory.java
index ec35f0ba53..192fb8ea0c 100644
--- a/src/java/org/apache/cassandra/io/util/RebuffererFactory.java
+++ b/src/java/org/apache/cassandra/io/util/RebuffererFactory.java
@@ -28,5 +28,5 @@ package org.apache.cassandra.io.util;
*/
public interface RebuffererFactory extends ReaderFileProxy
{
- Rebufferer instantiateRebufferer();
+ Rebufferer instantiateRebufferer(boolean isScan);
}
diff --git a/src/java/org/apache/cassandra/io/util/SimpleChunkReader.java
b/src/java/org/apache/cassandra/io/util/SimpleChunkReader.java
index 8d00ce5d40..fec1216bf4 100644
--- a/src/java/org/apache/cassandra/io/util/SimpleChunkReader.java
+++ b/src/java/org/apache/cassandra/io/util/SimpleChunkReader.java
@@ -55,7 +55,7 @@ class SimpleChunkReader extends AbstractReaderFileProxy
implements ChunkReader
}
@Override
- public Rebufferer instantiateRebufferer()
+ public Rebufferer instantiateRebufferer(boolean forScan)
{
if (Integer.bitCount(bufferSize) == 1)
return new BufferManagingRebufferer.Aligned(this);
diff --git
a/src/java/org/apache/cassandra/io/util/ThreadLocalReadAheadBuffer.java
b/src/java/org/apache/cassandra/io/util/ThreadLocalReadAheadBuffer.java
new file mode 100644
index 0000000000..824acaa8d8
--- /dev/null
+++ b/src/java/org/apache/cassandra/io/util/ThreadLocalReadAheadBuffer.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.io.util;
+
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Map;
+
+import io.netty.util.concurrent.FastThreadLocal;
+import org.apache.cassandra.io.compress.BufferType;
+import org.apache.cassandra.io.sstable.CorruptSSTableException;
+
+public final class ThreadLocalReadAheadBuffer
+{
+ private static class Block
+ {
+ ByteBuffer buffer = null;
+ int index = -1;
+ }
+
+ private final ChannelProxy channel;
+
+ private final BufferType bufferType;
+
+ private static final FastThreadLocal<Map<String, Block>> blockMap = new
FastThreadLocal<>()
+ {
+ @Override
+ protected Map<String, Block> initialValue()
+ {
+ return new HashMap<>();
+ }
+ };
+
+ private final int bufferSize;
+ private final long channelSize;
+
+ public ThreadLocalReadAheadBuffer(ChannelProxy channel, int bufferSize,
BufferType bufferType)
+ {
+ this.channel = channel;
+ this.channelSize = channel.size();
+ this.bufferSize = bufferSize;
+ this.bufferType = bufferType;
+ }
+
+ public boolean hasBuffer()
+ {
+ return block().buffer != null;
+ }
+
+ /**
+ * Safe to call only if {@link #hasBuffer()} is true
+ */
+ public int remaining()
+ {
+ return getBlock().buffer.remaining();
+ }
+
+ public void allocateBuffer()
+ {
+ getBlock();
+ }
+
+ private Block getBlock()
+ {
+ Block block = block();
+ if (block.buffer == null)
+ {
+ block.buffer = bufferType.allocate(bufferSize);
+ block.buffer.clear();
+ }
+ return block;
+ }
+
+ private Block block()
+ {
+ return blockMap.get().computeIfAbsent(channel.filePath(), k -> new
Block());
+ }
+
+ public void fill(long position)
+ {
+ Block block = getBlock();
+ ByteBuffer blockBuffer = block.buffer;
+ long realPosition = Math.min(channelSize, position);
+ int blockNo = (int) (realPosition / bufferSize);
+ long blockPosition = blockNo * (long) bufferSize;
+
+ long remaining = channelSize - blockPosition;
+ int sizeToRead = (int) Math.min(remaining, bufferSize);
+ if (block.index != blockNo)
+ {
+ blockBuffer.flip();
+ blockBuffer.limit(sizeToRead);
+ if (channel.read(blockBuffer, blockPosition) != sizeToRead)
+ throw new CorruptSSTableException(null, channel.filePath());
+
+ block.index = blockNo;
+ }
+
+ blockBuffer.flip();
+ blockBuffer.limit(sizeToRead);
+ blockBuffer.position((int) (realPosition - blockPosition));
+ }
+
+ public int read(ByteBuffer dest, int length)
+ {
+ Block block = getBlock();
+ ByteBuffer blockBuffer = block.buffer;
+ ByteBuffer tmp = blockBuffer.duplicate();
+ tmp.limit(tmp.position() + length);
+ dest.put(tmp);
+ blockBuffer.position(blockBuffer.position() + length);
+
+ return length;
+ }
+
+ public void clear(boolean deallocate)
+ {
+ Block block = getBlock();
+ block.index = -1;
+
+ ByteBuffer blockBuffer = block.buffer;
+ if (blockBuffer != null)
+ {
+ blockBuffer.clear();
+ if (deallocate)
+ {
+ FileUtils.clean(blockBuffer);
+ block.buffer = null;
+ }
+ }
+ }
+
+ public void close()
+ {
+ clear(true);
+ blockMap.get().remove(channel.filePath());
+ }
+}
diff --git a/src/java/org/apache/cassandra/service/StorageService.java
b/src/java/org/apache/cassandra/service/StorageService.java
index 5d90bcb572..53f815a586 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -1430,6 +1430,19 @@ public class StorageService extends
NotificationBroadcasterSupport implements IE
return result;
}
+ @Override
+ public int getCompressedReadAheadBufferInKB()
+ {
+ return DatabaseDescriptor.getCompressedReadAheadBufferSizeInKB();
+ }
+
+ @Override
+ public void setCompressedReadAheadBufferInKB(int sizeInKb)
+ {
+ DatabaseDescriptor.setCompressedReadAheadBufferSizeInKb(sizeInKb);
+ logger.info("set compressed read ahead buffer size to {}KiB",
sizeInKb);
+ }
+
public int getBatchlogReplayThrottleInKB()
{
return DatabaseDescriptor.getBatchlogReplayThrottleInKiB();
diff --git a/src/java/org/apache/cassandra/service/StorageServiceMBean.java
b/src/java/org/apache/cassandra/service/StorageServiceMBean.java
index bfb9eb97b7..7760ea0188 100644
--- a/src/java/org/apache/cassandra/service/StorageServiceMBean.java
+++ b/src/java/org/apache/cassandra/service/StorageServiceMBean.java
@@ -862,6 +862,9 @@ public interface StorageServiceMBean extends
NotificationEmitter
public void setCompactionThroughputMbPerSec(int value);
Map<String, String> getCurrentCompactionThroughputMebibytesPerSec();
+ public int getCompressedReadAheadBufferInKB();
+ public void setCompressedReadAheadBufferInKB(int sizeInKb);
+
public int getBatchlogReplayThrottleInKB();
public void setBatchlogReplayThrottleInKB(int value);
diff --git
a/test/unit/org/apache/cassandra/io/util/CompressedChunkReaderTest.java
b/test/unit/org/apache/cassandra/io/util/CompressedChunkReaderTest.java
new file mode 100644
index 0000000000..af4b458fec
--- /dev/null
+++ b/test/unit/org/apache/cassandra/io/util/CompressedChunkReaderTest.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.io.util;
+
+import accord.utils.Gen;
+import accord.utils.Gens;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.ClusteringComparator;
+import org.apache.cassandra.io.compress.CompressedSequentialWriter;
+import org.apache.cassandra.io.compress.CompressionMetadata;
+import org.apache.cassandra.io.filesystem.ListenableFileSystem;
+import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
+import org.apache.cassandra.schema.CompressionParams;
+import org.assertj.core.api.Assertions;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.nio.ByteBuffer;
+import java.nio.file.Files;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static accord.utils.Property.qt;
+
+public class CompressedChunkReaderTest
+{
+ static
+ {
+ DatabaseDescriptor.clientInitialization();
+ }
+
+ @Test
+ public void scanReaderReadsLessThanRAReader()
+ {
+ var optionGen = options();
+ var paramsGen = params();
+ var lengthGen = Gens.longs().between(1, 1 << 16);
+ qt().withSeed(-1871070464864118891L).forAll(Gens.random(), optionGen,
paramsGen).check((rs, option, params) -> {
+ ListenableFileSystem fs =
FileSystems.newGlobalInMemoryFileSystem();
+
+ File f = new File("/file.db");
+ AtomicInteger reads = new AtomicInteger();
+ fs.onPostRead(f.path::equals, (p, c, pos, dst, r) -> {
+ reads.incrementAndGet();
+ });
+ long length = lengthGen.nextLong(rs);
+ CompressionMetadata metadata1, metadata2;
+ try (CompressedSequentialWriter writer = new
CompressedSequentialWriter(f, new File("/file.offset"), new
File("/file.digest"), option, params, new MetadataCollector(new
ClusteringComparator())))
+ {
+ for (long i = 0; i < length; i++)
+ writer.writeLong(i);
+
+ writer.sync();
+ metadata1 = writer.open(0);
+ metadata2 = writer.open(0);
+ }
+
+ doReads(f, metadata1, length, true);
+ int scanReads = reads.getAndSet(0);
+
+ doReads(f, metadata2, length, false);
+ int raReads = reads.getAndSet(0);
+
+ if (Files.size(f.toPath()) >
DatabaseDescriptor.getCompressedReadAheadBufferSize())
+ Assert.assertTrue(scanReads < raReads);
+ });
+ }
+
+ private void doReads(File f, CompressionMetadata metadata, long length,
boolean useReadAhead)
+ {
+ ByteBuffer buffer = ByteBuffer.allocateDirect(metadata.chunkLength());
+
+ try (ChannelProxy channel = new ChannelProxy(f);
+ CompressedChunkReader reader = new
CompressedChunkReader.Standard(channel, metadata, () -> 1.1);
+ metadata)
+ {
+ if (useReadAhead)
+ reader.forScan();
+
+ long offset = 0;
+ long maxOffset = length * Long.BYTES;
+ do
+ {
+ reader.readChunk(offset, buffer);
+ for (long expected = offset / Long.BYTES;
buffer.hasRemaining(); expected++)
+
Assertions.assertThat(buffer.getLong()).isEqualTo(expected);
+
+ offset += metadata.chunkLength();
+ }
+ while (offset < maxOffset);
+ }
+ finally
+ {
+ FileUtils.clean(buffer);
+ }}
+
+ private static Gen<SequentialWriterOption> options()
+ {
+ Gen<Integer> bufferSizes = Gens.constant(1 << 10); //.pickInt(1 << 4,
1 << 10, 1 << 15);
+ return rs -> SequentialWriterOption.newBuilder()
+ .finishOnClose(false)
+ .bufferSize(bufferSizes.next(rs))
+ .build();
+ }
+
+ private enum CompressionKind { Noop, Snappy, Deflate, Lz4, Zstd }
+
+ private static Gen<CompressionParams> params()
+ {
+ Gen<Integer> chunkLengths =
Gens.constant(CompressionParams.DEFAULT_CHUNK_LENGTH);
+ Gen<Double> compressionRatio = Gens.pick(1.1D);
+ return rs -> {
+ CompressionKind kind = rs.pick(CompressionKind.values());
+ switch (kind)
+ {
+ case Noop: return CompressionParams.noop();
+ case Snappy: return
CompressionParams.snappy(chunkLengths.next(rs), compressionRatio.next(rs));
+ case Deflate: return
CompressionParams.deflate(chunkLengths.next(rs));
+ case Lz4: return CompressionParams.lz4(chunkLengths.next(rs));
+ case Zstd: return
CompressionParams.zstd(chunkLengths.next(rs));
+ default: throw new UnsupportedOperationException(kind.name());
+ }
+ };
+ }
+}
\ No newline at end of file
diff --git
a/test/unit/org/apache/cassandra/io/util/ThreadLocalReadAheadBufferTest.java
b/test/unit/org/apache/cassandra/io/util/ThreadLocalReadAheadBufferTest.java
new file mode 100644
index 0000000000..4d43017b2a
--- /dev/null
+++ b/test/unit/org/apache/cassandra/io/util/ThreadLocalReadAheadBufferTest.java
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.io.util;
+
+
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.file.Files;
+import java.util.List;
+import java.util.Random;
+
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.config.DataStorageSpec;
+import org.apache.cassandra.io.compress.BufferType;
+import org.apache.cassandra.io.sstable.CorruptSSTableException;
+import org.apache.cassandra.utils.Pair;
+import org.quicktheories.WithQuickTheories;
+import org.quicktheories.core.Gen;
+
+import static
org.apache.cassandra.config.CassandraRelevantProperties.JAVA_IO_TMPDIR;
+
+public class ThreadLocalReadAheadBufferTest implements WithQuickTheories
+{
+ private static final int numFiles = 5;
+ private static final File[] files = new File[numFiles];
+ private static final Logger logger =
LoggerFactory.getLogger(ThreadLocalReadAheadBufferTest.class);
+
+ @BeforeClass
+ public static void setup()
+ {
+ int seed = new Random().nextInt();
+ logger.info("Seed: {}", seed);
+
+ for (int i = 0; i < numFiles; i++)
+ {
+ int size = new Random().nextInt((Integer.MAX_VALUE - 1) / 8);
+ files[i] = writeFile(seed, size);
+ }
+ }
+
+ @AfterClass
+ public static void cleanup()
+ {
+ for (File f : files)
+ {
+ try
+ {
+ f.delete();
+ }
+ catch (Exception e)
+ {
+ // ignore
+ }
+ }
+ }
+
+ @Test
+ public void testLastBlockReads()
+ {
+ qt().forAll(lastBlockReads())
+ .checkAssert(this::testReads);
+ }
+
+ @Test
+ public void testReadsLikeChannelProxy()
+ {
+
+ qt().forAll(randomReads())
+ .checkAssert(this::testReads);
+ }
+
+ private void testReads(InputData propertyInputs)
+ {
+ try (ChannelProxy channel = new ChannelProxy(propertyInputs.file))
+ {
+ ThreadLocalReadAheadBuffer trlab = new
ThreadLocalReadAheadBuffer(channel, new
DataStorageSpec.IntKibibytesBound("256KiB").toBytes(), BufferType.OFF_HEAP);
+ for (Pair<Long, Integer> read : propertyInputs.positionsAndLengths)
+ {
+ int readSize = Math.min(read.right,(int) (channel.size() -
read.left));
+ ByteBuffer buf1 = ByteBuffer.allocate(readSize);
+ channel.read(buf1, read.left);
+
+ ByteBuffer buf2 = ByteBuffer.allocate(readSize);
+ try
+ {
+ int copied = 0;
+ while (copied < readSize) {
+ trlab.fill(read.left + copied);
+ int leftToRead = readSize - copied;
+ if (trlab.remaining() >= leftToRead)
+ copied += trlab.read(buf2, leftToRead);
+ else
+ copied += trlab.read(buf2, trlab.remaining());
+ }
+ }
+ catch (CorruptSSTableException e)
+ {
+ throw new RuntimeException(e);
+ }
+
+ Assert.assertEquals(buf1, buf2);
+ }
+ }
+ }
+
+ private Gen<InputData> lastBlockReads()
+ {
+ return arbitrary().pick(List.of(files))
+ .flatMap((file) ->
+ lists().of(longs().between(0,
fileSize(file)).zip(integers().between(1, 100), Pair::create))
+ .ofSizeBetween(5, 10)
+ .map(positionsAndLengths -> new
InputData(file, positionsAndLengths)));
+
+ }
+
+ private Gen<InputData> randomReads()
+ {
+ int blockSize = new
DataStorageSpec.IntKibibytesBound("256KiB").toBytes();
+ return arbitrary().pick(List.of(files))
+ .flatMap((file) ->
+ lists().of(longs().between(fileSize(file) -
blockSize, fileSize(file)).zip(integers().between(1, 100), Pair::create))
+ .ofSizeBetween(5, 10)
+ .map(positionsAndLengths -> new
InputData(file, positionsAndLengths)));
+
+ }
+
+ // need this becasue generators don't handle the IOException
+ private long fileSize(File file)
+ {
+ try
+ {
+ return Files.size(file.toPath());
+ } catch (IOException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private static class InputData
+ {
+
+ private final File file;
+ private final List<Pair<Long, Integer>> positionsAndLengths;
+
+ public InputData(File file, List<Pair<Long, Integer>>
positionsAndLengths)
+ {
+ this.file = file;
+ this.positionsAndLengths = positionsAndLengths;
+ }
+ }
+
+ private static File writeFile(int seed, int length)
+ {
+ String fileName = JAVA_IO_TMPDIR.getString() + "data+" + length +
".bin";
+
+ byte[] dataChunk = new byte[4096 * 8];
+ java.util.Random random = new Random(seed);
+ int writtenData = 0;
+
+ File file = new File(fileName);
+ try (FileOutputStream fos = new FileOutputStream(file.toJavaIOFile()))
+ {
+ while (writtenData < length)
+ {
+ random.nextBytes(dataChunk);
+ int toWrite = Math.min((length - writtenData),
dataChunk.length);
+ fos.write(dataChunk, 0, toWrite);
+ writtenData += toWrite;
+ }
+ fos.flush();
+ }
+ catch (IOException e)
+ {
+ throw new RuntimeException(e);
+ }
+
+ return file;
+ }
+
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]