Repository: cassandra Updated Branches: refs/heads/trunk a59be2693 -> 6422e3476
Reduce IOP cost for small reads Makes default buffer size for (uncompressed) buffered reads smaller, based on the expected partition size patch by stefania; reviewed by benedict for CASSANDRA-8894 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/6422e347 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/6422e347 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/6422e347 Branch: refs/heads/trunk Commit: 6422e34769e42b4eef5d9c073301c9791e9ca8b2 Parents: a59be26 Author: Stefania Alborghetti <[email protected]> Authored: Mon Jun 22 15:35:03 2015 +0800 Committer: Benedict Elliott Smith <[email protected]> Committed: Tue Jul 28 15:31:13 2015 +0100 ---------------------------------------------------------------------- conf/cassandra.yaml | 8 +- .../org/apache/cassandra/config/Config.java | 28 +++-- .../cassandra/config/DatabaseDescriptor.java | 27 +++++ .../io/sstable/format/SSTableReader.java | 13 +-- .../io/sstable/format/big/BigTableWriter.java | 14 +-- .../io/util/BufferedSegmentedFile.java | 10 +- .../io/util/CompressedSegmentedFile.java | 12 +-- .../cassandra/io/util/MmappedSegmentedFile.java | 10 +- .../cassandra/io/util/RandomAccessReader.java | 27 +++-- .../apache/cassandra/io/util/SegmentedFile.java | 103 ++++++++++++++++--- .../cassandra/io/util/ThrottledReader.java | 8 +- test/unit/org/apache/cassandra/MockSchema.java | 3 +- .../db/lifecycle/TransactionLogsTest.java | 5 +- .../cassandra/io/RandomAccessReaderTest.java | 48 ++++++++- .../cassandra/io/util/SegmentedFileTest.java | 88 ++++++++++++++++ 15 files changed, 332 insertions(+), 72 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/6422e347/conf/cassandra.yaml ---------------------------------------------------------------------- diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml index 5fe3d87..7ce36af 100644 --- a/conf/cassandra.yaml +++ b/conf/cassandra.yaml @@ -354,7 +354,13 @@ concurrent_counter_writes: 32 # buffer_pool_use_heap_if_exhausted: true -# Total permitted memory to use for memtables. Cassandra will stop +# The strategy for optimizing disk read +# Possible values are: +# ssd (for solid state disks, the default) +# spinning (for spinning disks) +# disk_optimization_strategy: ssd + +# Total permitted memory to use for memtables. Cassandra will stop # accepting writes when the limit is exceeded until a flush completes, # and will trigger a flush based on memtable_cleanup_threshold # If omitted, Cassandra will set both to 1/4 the size of the heap. http://git-wip-us.apache.org/repos/asf/cassandra/blob/6422e347/src/java/org/apache/cassandra/config/Config.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java index fe6752f..64b23dd 100644 --- a/src/java/org/apache/cassandra/config/Config.java +++ b/src/java/org/apache/cassandra/config/Config.java @@ -222,6 +222,12 @@ public class Config public boolean buffer_pool_use_heap_if_exhausted = true; + public DiskOptimizationStrategy disk_optimization_strategy = DiskOptimizationStrategy.ssd; + + public double disk_optimization_estimate_percentile = 0.95; + + public double disk_optimization_page_cross_chance = 0.1; + public boolean inter_dc_tcp_nodelay = true; public MemtableAllocationType memtable_allocation_type = MemtableAllocationType.heap_buffers; @@ -308,17 +314,17 @@ public class Config isClientMode = clientMode; } - public static enum CommitLogSync + public enum CommitLogSync { periodic, batch } - public static enum InternodeCompression + public enum InternodeCompression { all, none, dc } - public static enum DiskAccessMode + public enum DiskAccessMode { auto, mmap, @@ -326,7 +332,7 @@ public class Config standard, } - public static enum MemtableAllocationType + public enum MemtableAllocationType { unslabbed_heap_buffers, heap_buffers, @@ -334,7 +340,7 @@ public class Config offheap_objects } - public static enum DiskFailurePolicy + public enum DiskFailurePolicy { best_effort, stop, @@ -343,7 +349,7 @@ public class Config die } - public static enum CommitFailurePolicy + public enum CommitFailurePolicy { stop, stop_commit, @@ -351,15 +357,21 @@ public class Config die, } - public static enum UserFunctionTimeoutPolicy + public enum UserFunctionTimeoutPolicy { ignore, die, die_immediate } - public static enum RequestSchedulerId + public enum RequestSchedulerId { keyspace } + + public enum DiskOptimizationStrategy + { + ssd, + spinning + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/6422e347/src/java/org/apache/cassandra/config/DatabaseDescriptor.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java index a25af65..f1369d1 100644 --- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java +++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java @@ -1493,6 +1493,33 @@ public class DatabaseDescriptor return conf.buffer_pool_use_heap_if_exhausted; } + public static Config.DiskOptimizationStrategy getDiskOptimizationStrategy() + { + return conf.disk_optimization_strategy; + } + + @VisibleForTesting + public static void setDiskOptimizationStrategy(Config.DiskOptimizationStrategy strategy) + { + conf.disk_optimization_strategy = strategy; + } + + public static double getDiskOptimizationEstimatePercentile() + { + return conf.disk_optimization_estimate_percentile; + } + + public static double getDiskOptimizationPageCrossChance() + { + return conf.disk_optimization_page_cross_chance; + } + + @VisibleForTesting + public static void setDiskOptimizationPageCrossChance(double chance) + { + conf.disk_optimization_page_cross_chance = chance; + } + public static long getTotalCommitlogSpaceInMB() { return conf.commitlog_total_space_in_mb; http://git-wip-us.apache.org/repos/asf/cassandra/blob/6422e347/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java b/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java index a8aedc7..bae0858 100644 --- a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java +++ b/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java @@ -51,6 +51,7 @@ import org.apache.cassandra.dht.*; import org.apache.cassandra.io.FSError; import org.apache.cassandra.io.compress.CompressionMetadata; import org.apache.cassandra.io.sstable.*; +import org.apache.cassandra.io.sstable.format.big.BigTableWriter; import org.apache.cassandra.io.sstable.metadata.*; import org.apache.cassandra.io.util.*; import org.apache.cassandra.metrics.RestorableMeter; @@ -414,8 +415,8 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS { if (!sstable.loadSummary(ibuilder, dbuilder)) sstable.buildSummary(false, ibuilder, dbuilder, false, Downsampling.BASE_SAMPLING_LEVEL); - sstable.ifile = ibuilder.complete(sstable.descriptor.filenameFor(Component.PRIMARY_INDEX)); - sstable.dfile = dbuilder.complete(sstable.descriptor.filenameFor(Component.DATA)); + sstable.ifile = ibuilder.buildIndex(sstable.descriptor, sstable.indexSummary); + sstable.dfile = dbuilder.buildData(sstable.descriptor, statsMetadata); sstable.bf = FilterFactory.AlwaysPresent; sstable.setup(true); return sstable; @@ -719,9 +720,9 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS } if (components.contains(Component.PRIMARY_INDEX)) - ifile = ibuilder.complete(descriptor.filenameFor(Component.PRIMARY_INDEX)); + ifile = ibuilder.buildIndex(descriptor, indexSummary); - dfile = dbuilder.complete(descriptor.filenameFor(Component.DATA)); + dfile = dbuilder.buildData(descriptor, sstableMetadata); // Check for an index summary that was downsampled even though the serialization format doesn't support // that. If it was downsampled, rebuild it. See CASSANDRA-8993 for details. @@ -738,8 +739,8 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS SegmentedFile.Builder dbuilderRebuild = SegmentedFile.getBuilder(DatabaseDescriptor.getDiskAccessMode(), compression)) { buildSummary(false, ibuilderRebuild, dbuilderRebuild, false, Downsampling.BASE_SAMPLING_LEVEL); - ifile = ibuilderRebuild.complete(descriptor.filenameFor(Component.PRIMARY_INDEX)); - dfile = dbuilderRebuild.complete(descriptor.filenameFor(Component.DATA)); + ifile = ibuilderRebuild.buildIndex(descriptor, indexSummary); + dfile = dbuilderRebuild.buildData(descriptor, sstableMetadata); saveSummary(ibuilderRebuild, dbuilderRebuild); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/6422e347/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java index 13c9954..ff279a8 100644 --- a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java +++ b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java @@ -243,12 +243,13 @@ public class BigTableWriter extends SSTableWriter StatsMetadata stats = statsMetadata(); assert boundary.indexLength > 0 && boundary.dataLength > 0; // open the reader early - SegmentedFile ifile = iwriter.builder.complete(descriptor.filenameFor(Component.PRIMARY_INDEX), boundary.indexLength); - SegmentedFile dfile = dbuilder.complete(descriptor.filenameFor(Component.DATA), boundary.dataLength); + IndexSummary indexSummary = iwriter.summary.build(partitioner, boundary); + SegmentedFile ifile = iwriter.builder.buildIndex(descriptor, indexSummary, boundary); + SegmentedFile dfile = dbuilder.buildData(descriptor, stats, boundary); SSTableReader sstable = SSTableReader.internalOpen(descriptor, components, metadata, partitioner, ifile, - dfile, iwriter.summary.build(partitioner, boundary), + dfile, indexSummary, iwriter.bf.sharedCopy(), maxDataAge, stats, SSTableReader.OpenReason.EARLY, header); // now it's open, find the ACTUAL last readable key (i.e. for which the data file has also been flushed) @@ -274,15 +275,16 @@ public class BigTableWriter extends SSTableWriter StatsMetadata stats = statsMetadata(); // finalize in-memory state for the reader - SegmentedFile ifile = iwriter.builder.complete(desc.filenameFor(Component.PRIMARY_INDEX)); - SegmentedFile dfile = dbuilder.complete(desc.filenameFor(Component.DATA)); + IndexSummary indexSummary = iwriter.summary.build(partitioner); + SegmentedFile ifile = iwriter.builder.buildIndex(desc, indexSummary); + SegmentedFile dfile = dbuilder.buildData(desc, stats); SSTableReader sstable = SSTableReader.internalOpen(desc, components, this.metadata, partitioner, ifile, dfile, - iwriter.summary.build(partitioner), + indexSummary, iwriter.bf.sharedCopy(), maxDataAge, stats, http://git-wip-us.apache.org/repos/asf/cassandra/blob/6422e347/src/java/org/apache/cassandra/io/util/BufferedSegmentedFile.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/util/BufferedSegmentedFile.java b/src/java/org/apache/cassandra/io/util/BufferedSegmentedFile.java index 2c59def..744e828 100644 --- a/src/java/org/apache/cassandra/io/util/BufferedSegmentedFile.java +++ b/src/java/org/apache/cassandra/io/util/BufferedSegmentedFile.java @@ -19,9 +19,9 @@ package org.apache.cassandra.io.util; public class BufferedSegmentedFile extends SegmentedFile { - public BufferedSegmentedFile(ChannelProxy channel, long length) + public BufferedSegmentedFile(ChannelProxy channel, int bufferSize, long length) { - super(new Cleanup(channel), channel, length); + super(new Cleanup(channel), channel, bufferSize, length); } private BufferedSegmentedFile(BufferedSegmentedFile copy) @@ -48,16 +48,16 @@ public class BufferedSegmentedFile extends SegmentedFile // only one segment in a standard-io file } - public SegmentedFile complete(ChannelProxy channel, long overrideLength) + public SegmentedFile complete(ChannelProxy channel, int bufferSize, long overrideLength) { long length = overrideLength > 0 ? overrideLength : channel.size(); - return new BufferedSegmentedFile(channel, length); + return new BufferedSegmentedFile(channel, bufferSize, length); } } public FileDataInput getSegment(long position) { - RandomAccessReader reader = RandomAccessReader.open(channel); + RandomAccessReader reader = RandomAccessReader.open(channel, bufferSize, -1L); reader.seek(position); return reader; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/6422e347/src/java/org/apache/cassandra/io/util/CompressedSegmentedFile.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/util/CompressedSegmentedFile.java b/src/java/org/apache/cassandra/io/util/CompressedSegmentedFile.java index ceff7ba..2ae4781 100644 --- a/src/java/org/apache/cassandra/io/util/CompressedSegmentedFile.java +++ b/src/java/org/apache/cassandra/io/util/CompressedSegmentedFile.java @@ -37,14 +37,14 @@ public class CompressedSegmentedFile extends SegmentedFile implements ICompresse private static int MAX_SEGMENT_SIZE = Integer.MAX_VALUE; private final TreeMap<Long, MappedByteBuffer> chunkSegments; - public CompressedSegmentedFile(ChannelProxy channel, CompressionMetadata metadata) + public CompressedSegmentedFile(ChannelProxy channel, int bufferSize, CompressionMetadata metadata) { - this(channel, metadata, createMappedSegments(channel, metadata)); + this(channel, bufferSize, metadata, createMappedSegments(channel, metadata)); } - public CompressedSegmentedFile(ChannelProxy channel, CompressionMetadata metadata, TreeMap<Long, MappedByteBuffer> chunkSegments) + public CompressedSegmentedFile(ChannelProxy channel, int bufferSize, CompressionMetadata metadata, TreeMap<Long, MappedByteBuffer> chunkSegments) { - super(new Cleanup(channel, metadata, chunkSegments), channel, metadata.dataLength, metadata.compressedFileLength); + super(new Cleanup(channel, metadata, chunkSegments), channel, bufferSize, metadata.dataLength, metadata.compressedFileLength); this.metadata = metadata; this.chunkSegments = chunkSegments; } @@ -144,9 +144,9 @@ public class CompressedSegmentedFile extends SegmentedFile implements ICompresse return writer.open(overrideLength); } - public SegmentedFile complete(ChannelProxy channel, long overrideLength) + public SegmentedFile complete(ChannelProxy channel, int bufferSize, long overrideLength) { - return new CompressedSegmentedFile(channel, metadata(channel.filePath(), overrideLength)); + return new CompressedSegmentedFile(channel, bufferSize, metadata(channel.filePath(), overrideLength)); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/6422e347/src/java/org/apache/cassandra/io/util/MmappedSegmentedFile.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/util/MmappedSegmentedFile.java b/src/java/org/apache/cassandra/io/util/MmappedSegmentedFile.java index 91908c9..879ca6f 100644 --- a/src/java/org/apache/cassandra/io/util/MmappedSegmentedFile.java +++ b/src/java/org/apache/cassandra/io/util/MmappedSegmentedFile.java @@ -43,9 +43,9 @@ public class MmappedSegmentedFile extends SegmentedFile */ private final Segment[] segments; - public MmappedSegmentedFile(ChannelProxy channel, long length, Segment[] segments) + public MmappedSegmentedFile(ChannelProxy channel, int bufferSize, long length, Segment[] segments) { - super(new Cleanup(channel, segments), channel, length); + super(new Cleanup(channel, segments), channel, bufferSize, length); this.segments = segments; } @@ -90,7 +90,7 @@ public class MmappedSegmentedFile extends SegmentedFile // we can have single cells or partitions larger than 2Gb, which is our maximum addressable range in a single segment; // in this case we open as a normal random access reader // FIXME: brafs are unbounded, so this segment will cover the rest of the file, rather than just the row - RandomAccessReader file = RandomAccessReader.open(channel); + RandomAccessReader file = RandomAccessReader.open(channel, bufferSize, -1L); file.seek(position); return file; } @@ -183,11 +183,11 @@ public class MmappedSegmentedFile extends SegmentedFile } } - public SegmentedFile complete(ChannelProxy channel, long overrideLength) + public SegmentedFile complete(ChannelProxy channel, int bufferSize, long overrideLength) { long length = overrideLength > 0 ? overrideLength : channel.size(); // create the segments - return new MmappedSegmentedFile(channel, length, createSegments(channel, length)); + return new MmappedSegmentedFile(channel, bufferSize, length, createSegments(channel, length)); } private Segment[] createSegments(ChannelProxy channel, long length) http://git-wip-us.apache.org/repos/asf/cassandra/blob/6422e347/src/java/org/apache/cassandra/io/util/RandomAccessReader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/util/RandomAccessReader.java b/src/java/org/apache/cassandra/io/util/RandomAccessReader.java index c4be8e9..b13d154 100644 --- a/src/java/org/apache/cassandra/io/util/RandomAccessReader.java +++ b/src/java/org/apache/cassandra/io/util/RandomAccessReader.java @@ -27,7 +27,7 @@ import org.apache.cassandra.utils.memory.BufferPool; public class RandomAccessReader extends AbstractDataInput implements FileDataInput { - public static final int DEFAULT_BUFFER_SIZE = 64 * 1024; + public static final int DEFAULT_BUFFER_SIZE = 4096; // the IO channel to the file, we do not own a reference to this due to // performance reasons (CASSANDRA-9379) so it's up to the owner of the RAR to @@ -59,9 +59,16 @@ public class RandomAccessReader extends AbstractDataInput implements FileDataInp buffer.limit(0); } + /** The buffer size is typically already page aligned but if that is not the case + * make sure that it is a multiple of the page size, 4096. + * */ protected int getBufferSize(int size) { - return (int)Math.min(fileLength, size); + if ((size & ~4095) != size) + { // should already be a page size multiple but if that's not case round it up + size = (size + 4095) & ~4095; + } + return size; } protected ByteBuffer allocateBuffer(int size, BufferType bufferType) @@ -103,12 +110,7 @@ public class RandomAccessReader extends AbstractDataInput implements FileDataInp public static RandomAccessReader open(ChannelProxy channel) { - return open(channel, -1L); - } - - public static RandomAccessReader open(ChannelProxy channel, long overrideSize) - { - return open(channel, DEFAULT_BUFFER_SIZE, overrideSize); + return open(channel, DEFAULT_BUFFER_SIZE, -1L); } public static RandomAccessReader open(ChannelProxy channel, int bufferSize, long overrideSize) @@ -132,7 +134,14 @@ public class RandomAccessReader extends AbstractDataInput implements FileDataInp long position = bufferOffset; long limit = bufferOffset; - while (buffer.hasRemaining() && limit < fileLength) + + long pageAligedPos = position & ~4095; + // Because the buffer capacity is a multiple of the page size, we read less + // the first time and then we should read at page boundaries only, + // unless the user seeks elsewhere + long upperLimit = Math.min(fileLength, pageAligedPos + buffer.capacity()); + buffer.limit((int)(upperLimit - position)); + while (buffer.hasRemaining() && limit < upperLimit) { int n = channel.read(buffer, position); if (n < 0) http://git-wip-us.apache.org/repos/asf/cassandra/blob/6422e347/src/java/org/apache/cassandra/io/util/SegmentedFile.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/util/SegmentedFile.java b/src/java/org/apache/cassandra/io/util/SegmentedFile.java index 13a0ec7..e586682 100644 --- a/src/java/org/apache/cassandra/io/util/SegmentedFile.java +++ b/src/java/org/apache/cassandra/io/util/SegmentedFile.java @@ -25,13 +25,17 @@ import java.nio.MappedByteBuffer; import java.util.Iterator; import java.util.NoSuchElementException; -import com.google.common.base.Throwables; import com.google.common.util.concurrent.RateLimiter; import org.apache.cassandra.config.Config; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.io.FSReadError; import org.apache.cassandra.io.compress.CompressedSequentialWriter; +import org.apache.cassandra.io.sstable.Component; +import org.apache.cassandra.io.sstable.Descriptor; +import org.apache.cassandra.io.sstable.IndexSummary; +import org.apache.cassandra.io.sstable.IndexSummaryBuilder; +import org.apache.cassandra.io.sstable.metadata.StatsMetadata; import org.apache.cassandra.utils.CLibrary; import org.apache.cassandra.utils.Pair; import org.apache.cassandra.utils.concurrent.RefCounted; @@ -51,6 +55,7 @@ import static org.apache.cassandra.utils.Throwables.maybeFail; public abstract class SegmentedFile extends SharedCloseableImpl { public final ChannelProxy channel; + public final int bufferSize; public final long length; // This differs from length for compressed files (but we still need length for @@ -60,15 +65,16 @@ public abstract class SegmentedFile extends SharedCloseableImpl /** * Use getBuilder to get a Builder to construct a SegmentedFile. */ - SegmentedFile(Cleanup cleanup, ChannelProxy channel, long length) + SegmentedFile(Cleanup cleanup, ChannelProxy channel, int bufferSize, long length) { - this(cleanup, channel, length, length); + this(cleanup, channel, bufferSize, length, length); } - protected SegmentedFile(Cleanup cleanup, ChannelProxy channel, long length, long onDiskLength) + protected SegmentedFile(Cleanup cleanup, ChannelProxy channel, int bufferSize, long length, long onDiskLength) { super(cleanup); this.channel = channel; + this.bufferSize = bufferSize; this.length = length; this.onDiskLength = onDiskLength; } @@ -77,6 +83,7 @@ public abstract class SegmentedFile extends SharedCloseableImpl { super(copy); channel = copy.channel; + bufferSize = copy.bufferSize; length = copy.length; onDiskLength = copy.onDiskLength; } @@ -109,13 +116,13 @@ public abstract class SegmentedFile extends SharedCloseableImpl public RandomAccessReader createReader() { - return RandomAccessReader.open(channel, length); + return RandomAccessReader.open(channel, bufferSize, length); } public RandomAccessReader createThrottledReader(RateLimiter limiter) { assert limiter != null; - return ThrottledReader.open(channel, length, limiter); + return ThrottledReader.open(channel, bufferSize, length, limiter); } public FileDataInput getSegment(long position) @@ -171,19 +178,14 @@ public abstract class SegmentedFile extends SharedCloseableImpl * Called after all potential boundaries have been added to apply this Builder to a concrete file on disk. * @param channel The channel to the file on disk. */ - protected abstract SegmentedFile complete(ChannelProxy channel, long overrideLength); + protected abstract SegmentedFile complete(ChannelProxy channel, int bufferSize, long overrideLength); - public SegmentedFile complete(String path) - { - return complete(path, -1L); - } - - public SegmentedFile complete(String path, long overrideLength) + private SegmentedFile complete(String path, int bufferSize, long overrideLength) { ChannelProxy channelCopy = getChannel(path); try { - return complete(channelCopy, overrideLength); + return complete(channelCopy, bufferSize, overrideLength); } catch (Throwable t) { @@ -192,6 +194,79 @@ public abstract class SegmentedFile extends SharedCloseableImpl } } + public SegmentedFile buildData(Descriptor desc, StatsMetadata stats, IndexSummaryBuilder.ReadableBoundary boundary) + { + return complete(desc.filenameFor(Component.DATA), bufferSize(stats), boundary.dataLength); + } + + public SegmentedFile buildData(Descriptor desc, StatsMetadata stats) + { + return complete(desc.filenameFor(Component.DATA), bufferSize(stats), -1L); + } + + public SegmentedFile buildIndex(Descriptor desc, IndexSummary indexSummary, IndexSummaryBuilder.ReadableBoundary boundary) + { + return complete(desc.filenameFor(Component.PRIMARY_INDEX), bufferSize(desc, indexSummary), boundary.indexLength); + } + + public SegmentedFile buildIndex(Descriptor desc, IndexSummary indexSummary) + { + return complete(desc.filenameFor(Component.PRIMARY_INDEX), bufferSize(desc, indexSummary), -1L); + } + + private int bufferSize(StatsMetadata stats) + { + return bufferSize(stats.estimatedPartitionSize.percentile(DatabaseDescriptor.getDiskOptimizationEstimatePercentile())); + } + + private int bufferSize(Descriptor desc, IndexSummary indexSummary) + { + File file = new File(desc.filenameFor(Component.PRIMARY_INDEX)); + return bufferSize(file.length() / indexSummary.size()); + } + + /** + Return the buffer size for a given record size. For spinning disks always add one page. + For solid state disks only add one page if the chance of crossing to the next page is more + than a predifined value, @see Config.disk_optimization_page_cross_chance. + */ + static int bufferSize(long recordSize) + { + Config.DiskOptimizationStrategy strategy = DatabaseDescriptor.getDiskOptimizationStrategy(); + if (strategy == Config.DiskOptimizationStrategy.ssd) + { + // The crossing probability is calculated assuming a uniform distribution of record + // start position in a page, so it's the record size modulo the page size divided by + // the total page size. + double pageCrossProbability = (recordSize % 4096) / 4096.; + // if the page cross probability is equal or bigger than disk_optimization_page_cross_chance we add one page + if ((pageCrossProbability - DatabaseDescriptor.getDiskOptimizationPageCrossChance()) > -1e-16) + recordSize += 4096; + + return roundBufferSize(recordSize); + } + else if (strategy == Config.DiskOptimizationStrategy.spinning) + { + return roundBufferSize(recordSize + 4096); + } + else + { + throw new IllegalStateException("Unsupported disk optimization strategy: " + strategy); + } + } + + /** + Round up to the next multiple of 4k but no more than 64k + */ + static int roundBufferSize(long size) + { + if (size <= 0) + return 4096; + + size = (size + 4095) & ~4095; + return (int)Math.min(size, 1 << 16); + } + public void serializeBounds(DataOutput out) throws IOException { out.writeUTF(DatabaseDescriptor.getDiskAccessMode().name()); http://git-wip-us.apache.org/repos/asf/cassandra/blob/6422e347/src/java/org/apache/cassandra/io/util/ThrottledReader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/util/ThrottledReader.java b/src/java/org/apache/cassandra/io/util/ThrottledReader.java index ea21355..024d38f 100644 --- a/src/java/org/apache/cassandra/io/util/ThrottledReader.java +++ b/src/java/org/apache/cassandra/io/util/ThrottledReader.java @@ -29,9 +29,9 @@ public class ThrottledReader extends RandomAccessReader { private final RateLimiter limiter; - protected ThrottledReader(ChannelProxy channel, long overrideLength, RateLimiter limiter) + protected ThrottledReader(ChannelProxy channel, int bufferSize, long overrideLength, RateLimiter limiter) { - super(channel, RandomAccessReader.DEFAULT_BUFFER_SIZE, overrideLength, BufferType.OFF_HEAP); + super(channel, bufferSize, overrideLength, BufferType.OFF_HEAP); this.limiter = limiter; } @@ -41,8 +41,8 @@ public class ThrottledReader extends RandomAccessReader super.reBuffer(); } - public static ThrottledReader open(ChannelProxy channel, long overrideLength, RateLimiter limiter) + public static ThrottledReader open(ChannelProxy channel, int bufferSize, long overrideLength, RateLimiter limiter) { - return new ThrottledReader(channel, overrideLength, limiter); + return new ThrottledReader(channel, bufferSize, overrideLength, limiter); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/6422e347/test/unit/org/apache/cassandra/MockSchema.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/MockSchema.java b/test/unit/org/apache/cassandra/MockSchema.java index d9c7e8b..e052c0a 100644 --- a/test/unit/org/apache/cassandra/MockSchema.java +++ b/test/unit/org/apache/cassandra/MockSchema.java @@ -43,6 +43,7 @@ import org.apache.cassandra.io.util.BufferedSegmentedFile; import org.apache.cassandra.io.util.ChannelProxy; import org.apache.cassandra.io.util.FileUtils; import org.apache.cassandra.io.util.Memory; +import org.apache.cassandra.io.util.RandomAccessReader; import org.apache.cassandra.io.util.SegmentedFile; import org.apache.cassandra.schema.KeyspaceMetadata; import org.apache.cassandra.schema.KeyspaceParams; @@ -61,7 +62,7 @@ public class MockSchema public static final Keyspace ks = Keyspace.mockKS(KeyspaceMetadata.create("mockks", KeyspaceParams.simpleTransient(1))); public static final IndexSummary indexSummary; - private static final SegmentedFile segmentedFile = new BufferedSegmentedFile(new ChannelProxy(temp("mocksegmentedfile")), 0); + private static final SegmentedFile segmentedFile = new BufferedSegmentedFile(new ChannelProxy(temp("mocksegmentedfile")), RandomAccessReader.DEFAULT_BUFFER_SIZE, 0); public static Memtable memtable(ColumnFamilyStore cfs) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/6422e347/test/unit/org/apache/cassandra/db/lifecycle/TransactionLogsTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/lifecycle/TransactionLogsTest.java b/test/unit/org/apache/cassandra/db/lifecycle/TransactionLogsTest.java index 3150087..4105800 100644 --- a/test/unit/org/apache/cassandra/db/lifecycle/TransactionLogsTest.java +++ b/test/unit/org/apache/cassandra/db/lifecycle/TransactionLogsTest.java @@ -48,6 +48,7 @@ import org.apache.cassandra.io.sstable.metadata.MetadataType; import org.apache.cassandra.io.sstable.metadata.StatsMetadata; import org.apache.cassandra.io.util.BufferedSegmentedFile; import org.apache.cassandra.io.util.ChannelProxy; +import org.apache.cassandra.io.util.RandomAccessReader; import org.apache.cassandra.io.util.SegmentedFile; import org.apache.cassandra.utils.AlwaysPresentFilter; import org.apache.cassandra.utils.concurrent.AbstractTransactionalTest; @@ -517,8 +518,8 @@ public class TransactionLogsTest extends AbstractTransactionalTest } } - SegmentedFile dFile = new BufferedSegmentedFile(new ChannelProxy(new File(descriptor.filenameFor(Component.DATA))), 0); - SegmentedFile iFile = new BufferedSegmentedFile(new ChannelProxy(new File(descriptor.filenameFor(Component.PRIMARY_INDEX))), 0); + SegmentedFile dFile = new BufferedSegmentedFile(new ChannelProxy(new File(descriptor.filenameFor(Component.DATA))), RandomAccessReader.DEFAULT_BUFFER_SIZE, 0); + SegmentedFile iFile = new BufferedSegmentedFile(new ChannelProxy(new File(descriptor.filenameFor(Component.PRIMARY_INDEX))), RandomAccessReader.DEFAULT_BUFFER_SIZE, 0); SerializationHeader header = SerializationHeader.make(cfs.metadata, Collections.EMPTY_LIST); StatsMetadata metadata = (StatsMetadata) new MetadataCollector(cfs.metadata.comparator) http://git-wip-us.apache.org/repos/asf/cassandra/blob/6422e347/test/unit/org/apache/cassandra/io/RandomAccessReaderTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/io/RandomAccessReaderTest.java b/test/unit/org/apache/cassandra/io/RandomAccessReaderTest.java index 71fab61..edbd603 100644 --- a/test/unit/org/apache/cassandra/io/RandomAccessReaderTest.java +++ b/test/unit/org/apache/cassandra/io/RandomAccessReaderTest.java @@ -22,23 +22,61 @@ public class RandomAccessReaderTest @Test public void testReadFully() throws IOException { + testReadImpl(1, 0); + } + + @Test + public void testReadLarge() throws IOException + { + testReadImpl(1000, 0); + } + + @Test + public void testReadLargeWithSkip() throws IOException + { + testReadImpl(1000, 322); + } + + @Test + public void testReadBufferSizeNotAligned() throws IOException + { + testReadImpl(1000, 0, 5122); + } + + private void testReadImpl(int numIterations, int skipIterations) throws IOException + { + testReadImpl(numIterations, skipIterations, RandomAccessReader.DEFAULT_BUFFER_SIZE); + } + + private void testReadImpl(int numIterations, int skipIterations, int bufferSize) throws IOException + { final File f = File.createTempFile("testReadFully", "1"); final String expected = "The quick brown fox jumps over the lazy dog"; SequentialWriter writer = SequentialWriter.open(f); - writer.write(expected.getBytes()); + for (int i = 0; i < numIterations; i++) + writer.write(expected.getBytes()); writer.finish(); assert f.exists(); ChannelProxy channel = new ChannelProxy(f); - RandomAccessReader reader = RandomAccessReader.open(channel); + RandomAccessReader reader = RandomAccessReader.open(channel, bufferSize, -1L); assertEquals(f.getAbsolutePath(), reader.getPath()); - assertEquals(expected.length(), reader.length()); + assertEquals(expected.length() * numIterations, reader.length()); + + if (skipIterations > 0) + { + reader.seek(skipIterations * expected.length()); + } byte[] b = new byte[expected.length()]; - reader.readFully(b); - assertEquals(expected, new String(b)); + int n = numIterations - skipIterations; + for (int i = 0; i < n; i++) + { + reader.readFully(b); + assertEquals(expected, new String(b)); + } assertTrue(reader.isEOF()); assertEquals(0, reader.bytesRemaining()); http://git-wip-us.apache.org/repos/asf/cassandra/blob/6422e347/test/unit/org/apache/cassandra/io/util/SegmentedFileTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/io/util/SegmentedFileTest.java b/test/unit/org/apache/cassandra/io/util/SegmentedFileTest.java new file mode 100644 index 0000000..03c10de --- /dev/null +++ b/test/unit/org/apache/cassandra/io/util/SegmentedFileTest.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.io.util; + +import org.junit.Test; + +import org.apache.cassandra.config.Config; +import org.apache.cassandra.config.DatabaseDescriptor; + +import static org.junit.Assert.assertEquals; + +public class SegmentedFileTest +{ + @Test + public void testRoundingBufferSize() + { + assertEquals(4096, SegmentedFile.Builder.roundBufferSize(-1L)); + assertEquals(4096, SegmentedFile.Builder.roundBufferSize(0)); + assertEquals(4096, SegmentedFile.Builder.roundBufferSize(1)); + assertEquals(4096, SegmentedFile.Builder.roundBufferSize(2013)); + assertEquals(4096, SegmentedFile.Builder.roundBufferSize(4095)); + assertEquals(4096, SegmentedFile.Builder.roundBufferSize(4096)); + assertEquals(8192, SegmentedFile.Builder.roundBufferSize(4097)); + assertEquals(8192, SegmentedFile.Builder.roundBufferSize(8191)); + assertEquals(8192, SegmentedFile.Builder.roundBufferSize(8192)); + assertEquals(12288, SegmentedFile.Builder.roundBufferSize(8193)); + assertEquals(65536, SegmentedFile.Builder.roundBufferSize(65535)); + assertEquals(65536, SegmentedFile.Builder.roundBufferSize(65536)); + assertEquals(65536, SegmentedFile.Builder.roundBufferSize(65537)); + assertEquals(65536, SegmentedFile.Builder.roundBufferSize(10000000000000000L)); + } + + @Test + public void testBufferSize_ssd() + { + DatabaseDescriptor.setDiskOptimizationStrategy(Config.DiskOptimizationStrategy.ssd); + DatabaseDescriptor.setDiskOptimizationPageCrossChance(0.1); + + assertEquals(4096, SegmentedFile.Builder.bufferSize(0)); + assertEquals(4096, SegmentedFile.Builder.bufferSize(10)); + assertEquals(4096, SegmentedFile.Builder.bufferSize(100)); + assertEquals(4096, SegmentedFile.Builder.bufferSize(4096)); + assertEquals(8192, SegmentedFile.Builder.bufferSize(4505)); // just < (4096 + 4096 * 0.1) + assertEquals(12288, SegmentedFile.Builder.bufferSize(4506)); // just > (4096 + 4096 * 0.1) + + DatabaseDescriptor.setDiskOptimizationPageCrossChance(0.5); + assertEquals(8192, SegmentedFile.Builder.bufferSize(4506)); // just > (4096 + 4096 * 0.1) + assertEquals(8192, SegmentedFile.Builder.bufferSize(6143)); // < (4096 + 4096 * 0.5) + assertEquals(12288, SegmentedFile.Builder.bufferSize(6144)); // = (4096 + 4096 * 0.5) + assertEquals(12288, SegmentedFile.Builder.bufferSize(6145)); // > (4096 + 4096 * 0.5) + + DatabaseDescriptor.setDiskOptimizationPageCrossChance(1.0); // never add a page + assertEquals(8192, SegmentedFile.Builder.bufferSize(8191)); + assertEquals(8192, SegmentedFile.Builder.bufferSize(8192)); + + DatabaseDescriptor.setDiskOptimizationPageCrossChance(0.0); // always add a page + assertEquals(8192, SegmentedFile.Builder.bufferSize(10)); + assertEquals(8192, SegmentedFile.Builder.bufferSize(4096)); + } + + @Test + public void testBufferSize_spinning() + { + DatabaseDescriptor.setDiskOptimizationStrategy(Config.DiskOptimizationStrategy.spinning); + + assertEquals(4096, SegmentedFile.Builder.bufferSize(0)); + assertEquals(8192, SegmentedFile.Builder.bufferSize(10)); + assertEquals(8192, SegmentedFile.Builder.bufferSize(100)); + assertEquals(8192, SegmentedFile.Builder.bufferSize(4096)); + assertEquals(12288, SegmentedFile.Builder.bufferSize(4097)); + } +}
