Repository: cassandra
Updated Branches:
  refs/heads/trunk a59be2693 -> 6422e3476


Reduce IOP cost for small reads

Makes default buffer size for (uncompressed) buffered reads smaller,
based on the expected partition size

patch by stefania; reviewed by benedict for CASSANDRA-8894


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/6422e347
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/6422e347
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/6422e347

Branch: refs/heads/trunk
Commit: 6422e34769e42b4eef5d9c073301c9791e9ca8b2
Parents: a59be26
Author: Stefania Alborghetti <[email protected]>
Authored: Mon Jun 22 15:35:03 2015 +0800
Committer: Benedict Elliott Smith <[email protected]>
Committed: Tue Jul 28 15:31:13 2015 +0100

----------------------------------------------------------------------
 conf/cassandra.yaml                             |   8 +-
 .../org/apache/cassandra/config/Config.java     |  28 +++--
 .../cassandra/config/DatabaseDescriptor.java    |  27 +++++
 .../io/sstable/format/SSTableReader.java        |  13 +--
 .../io/sstable/format/big/BigTableWriter.java   |  14 +--
 .../io/util/BufferedSegmentedFile.java          |  10 +-
 .../io/util/CompressedSegmentedFile.java        |  12 +--
 .../cassandra/io/util/MmappedSegmentedFile.java |  10 +-
 .../cassandra/io/util/RandomAccessReader.java   |  27 +++--
 .../apache/cassandra/io/util/SegmentedFile.java | 103 ++++++++++++++++---
 .../cassandra/io/util/ThrottledReader.java      |   8 +-
 test/unit/org/apache/cassandra/MockSchema.java  |   3 +-
 .../db/lifecycle/TransactionLogsTest.java       |   5 +-
 .../cassandra/io/RandomAccessReaderTest.java    |  48 ++++++++-
 .../cassandra/io/util/SegmentedFileTest.java    |  88 ++++++++++++++++
 15 files changed, 332 insertions(+), 72 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/6422e347/conf/cassandra.yaml
----------------------------------------------------------------------
diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml
index 5fe3d87..7ce36af 100644
--- a/conf/cassandra.yaml
+++ b/conf/cassandra.yaml
@@ -354,7 +354,13 @@ concurrent_counter_writes: 32
 
 # buffer_pool_use_heap_if_exhausted: true
 
-# Total permitted memory to use for memtables. Cassandra will stop 
+# The strategy for optimizing disk read
+# Possible values are:
+# ssd (for solid state disks, the default)
+# spinning (for spinning disks)
+# disk_optimization_strategy: ssd
+
+# Total permitted memory to use for memtables. Cassandra will stop
 # accepting writes when the limit is exceeded until a flush completes,
 # and will trigger a flush based on memtable_cleanup_threshold
 # If omitted, Cassandra will set both to 1/4 the size of the heap.

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6422e347/src/java/org/apache/cassandra/config/Config.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/Config.java 
b/src/java/org/apache/cassandra/config/Config.java
index fe6752f..64b23dd 100644
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@ -222,6 +222,12 @@ public class Config
 
     public boolean buffer_pool_use_heap_if_exhausted = true;
 
+    public DiskOptimizationStrategy disk_optimization_strategy = 
DiskOptimizationStrategy.ssd;
+
+    public double disk_optimization_estimate_percentile = 0.95;
+
+    public double disk_optimization_page_cross_chance = 0.1;
+
     public boolean inter_dc_tcp_nodelay = true;
 
     public MemtableAllocationType memtable_allocation_type = 
MemtableAllocationType.heap_buffers;
@@ -308,17 +314,17 @@ public class Config
         isClientMode = clientMode;
     }
 
-    public static enum CommitLogSync
+    public enum CommitLogSync
     {
         periodic,
         batch
     }
-    public static enum InternodeCompression
+    public enum InternodeCompression
     {
         all, none, dc
     }
 
-    public static enum DiskAccessMode
+    public enum DiskAccessMode
     {
         auto,
         mmap,
@@ -326,7 +332,7 @@ public class Config
         standard,
     }
 
-    public static enum MemtableAllocationType
+    public enum MemtableAllocationType
     {
         unslabbed_heap_buffers,
         heap_buffers,
@@ -334,7 +340,7 @@ public class Config
         offheap_objects
     }
 
-    public static enum DiskFailurePolicy
+    public enum DiskFailurePolicy
     {
         best_effort,
         stop,
@@ -343,7 +349,7 @@ public class Config
         die
     }
 
-    public static enum CommitFailurePolicy
+    public enum CommitFailurePolicy
     {
         stop,
         stop_commit,
@@ -351,15 +357,21 @@ public class Config
         die,
     }
 
-    public static enum UserFunctionTimeoutPolicy
+    public enum UserFunctionTimeoutPolicy
     {
         ignore,
         die,
         die_immediate
     }
 
-    public static enum RequestSchedulerId
+    public enum RequestSchedulerId
     {
         keyspace
     }
+
+    public enum DiskOptimizationStrategy
+    {
+        ssd,
+        spinning
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6422e347/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java 
b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index a25af65..f1369d1 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -1493,6 +1493,33 @@ public class DatabaseDescriptor
         return conf.buffer_pool_use_heap_if_exhausted;
     }
 
+    public static Config.DiskOptimizationStrategy getDiskOptimizationStrategy()
+    {
+        return conf.disk_optimization_strategy;
+    }
+
+    @VisibleForTesting
+    public static void 
setDiskOptimizationStrategy(Config.DiskOptimizationStrategy strategy)
+    {
+        conf.disk_optimization_strategy = strategy;
+    }
+
+    public static double getDiskOptimizationEstimatePercentile()
+    {
+        return conf.disk_optimization_estimate_percentile;
+    }
+
+    public static double getDiskOptimizationPageCrossChance()
+    {
+        return conf.disk_optimization_page_cross_chance;
+    }
+
+    @VisibleForTesting
+    public static void setDiskOptimizationPageCrossChance(double chance)
+    {
+        conf.disk_optimization_page_cross_chance = chance;
+    }
+
     public static long getTotalCommitlogSpaceInMB()
     {
         return conf.commitlog_total_space_in_mb;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6422e347/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java 
b/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
index a8aedc7..bae0858 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
@@ -51,6 +51,7 @@ import org.apache.cassandra.dht.*;
 import org.apache.cassandra.io.FSError;
 import org.apache.cassandra.io.compress.CompressionMetadata;
 import org.apache.cassandra.io.sstable.*;
+import org.apache.cassandra.io.sstable.format.big.BigTableWriter;
 import org.apache.cassandra.io.sstable.metadata.*;
 import org.apache.cassandra.io.util.*;
 import org.apache.cassandra.metrics.RestorableMeter;
@@ -414,8 +415,8 @@ public abstract class SSTableReader extends SSTable 
implements SelfRefCounted<SS
         {
             if (!sstable.loadSummary(ibuilder, dbuilder))
                 sstable.buildSummary(false, ibuilder, dbuilder, false, 
Downsampling.BASE_SAMPLING_LEVEL);
-            sstable.ifile = 
ibuilder.complete(sstable.descriptor.filenameFor(Component.PRIMARY_INDEX));
-            sstable.dfile = 
dbuilder.complete(sstable.descriptor.filenameFor(Component.DATA));
+            sstable.ifile = ibuilder.buildIndex(sstable.descriptor, 
sstable.indexSummary);
+            sstable.dfile = dbuilder.buildData(sstable.descriptor, 
statsMetadata);
             sstable.bf = FilterFactory.AlwaysPresent;
             sstable.setup(true);
             return sstable;
@@ -719,9 +720,9 @@ public abstract class SSTableReader extends SSTable 
implements SelfRefCounted<SS
             }
 
             if (components.contains(Component.PRIMARY_INDEX))
-                ifile = 
ibuilder.complete(descriptor.filenameFor(Component.PRIMARY_INDEX));
+                ifile = ibuilder.buildIndex(descriptor, indexSummary);
 
-            dfile = dbuilder.complete(descriptor.filenameFor(Component.DATA));
+            dfile = dbuilder.buildData(descriptor, sstableMetadata);
 
             // Check for an index summary that was downsampled even though the 
serialization format doesn't support
             // that.  If it was downsampled, rebuild it.  See CASSANDRA-8993 
for details.
@@ -738,8 +739,8 @@ public abstract class SSTableReader extends SSTable 
implements SelfRefCounted<SS
                     SegmentedFile.Builder dbuilderRebuild = 
SegmentedFile.getBuilder(DatabaseDescriptor.getDiskAccessMode(), compression))
                 {
                     buildSummary(false, ibuilderRebuild, dbuilderRebuild, 
false, Downsampling.BASE_SAMPLING_LEVEL);
-                    ifile = 
ibuilderRebuild.complete(descriptor.filenameFor(Component.PRIMARY_INDEX));
-                    dfile = 
dbuilderRebuild.complete(descriptor.filenameFor(Component.DATA));
+                    ifile = ibuilderRebuild.buildIndex(descriptor, 
indexSummary);
+                    dfile = dbuilderRebuild.buildData(descriptor, 
sstableMetadata);
                     saveSummary(ibuilderRebuild, dbuilderRebuild);
                 }
             }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6422e347/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java 
b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
index 13c9954..ff279a8 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
@@ -243,12 +243,13 @@ public class BigTableWriter extends SSTableWriter
         StatsMetadata stats = statsMetadata();
         assert boundary.indexLength > 0 && boundary.dataLength > 0;
         // open the reader early
-        SegmentedFile ifile = 
iwriter.builder.complete(descriptor.filenameFor(Component.PRIMARY_INDEX), 
boundary.indexLength);
-        SegmentedFile dfile = 
dbuilder.complete(descriptor.filenameFor(Component.DATA), boundary.dataLength);
+        IndexSummary indexSummary = iwriter.summary.build(partitioner, 
boundary);
+        SegmentedFile ifile = iwriter.builder.buildIndex(descriptor, 
indexSummary, boundary);
+        SegmentedFile dfile = dbuilder.buildData(descriptor, stats, boundary);
         SSTableReader sstable = SSTableReader.internalOpen(descriptor,
                                                            components, 
metadata,
                                                            partitioner, ifile,
-                                                           dfile, 
iwriter.summary.build(partitioner, boundary),
+                                                           dfile, indexSummary,
                                                            
iwriter.bf.sharedCopy(), maxDataAge, stats, SSTableReader.OpenReason.EARLY, 
header);
 
         // now it's open, find the ACTUAL last readable key (i.e. for which 
the data file has also been flushed)
@@ -274,15 +275,16 @@ public class BigTableWriter extends SSTableWriter
 
         StatsMetadata stats = statsMetadata();
         // finalize in-memory state for the reader
-        SegmentedFile ifile = 
iwriter.builder.complete(desc.filenameFor(Component.PRIMARY_INDEX));
-        SegmentedFile dfile = 
dbuilder.complete(desc.filenameFor(Component.DATA));
+        IndexSummary indexSummary = iwriter.summary.build(partitioner);
+        SegmentedFile ifile = iwriter.builder.buildIndex(desc, indexSummary);
+        SegmentedFile dfile = dbuilder.buildData(desc, stats);
         SSTableReader sstable = SSTableReader.internalOpen(desc,
                                                            components,
                                                            this.metadata,
                                                            partitioner,
                                                            ifile,
                                                            dfile,
-                                                           
iwriter.summary.build(partitioner),
+                                                           indexSummary,
                                                            
iwriter.bf.sharedCopy(),
                                                            maxDataAge,
                                                            stats,

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6422e347/src/java/org/apache/cassandra/io/util/BufferedSegmentedFile.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/BufferedSegmentedFile.java 
b/src/java/org/apache/cassandra/io/util/BufferedSegmentedFile.java
index 2c59def..744e828 100644
--- a/src/java/org/apache/cassandra/io/util/BufferedSegmentedFile.java
+++ b/src/java/org/apache/cassandra/io/util/BufferedSegmentedFile.java
@@ -19,9 +19,9 @@ package org.apache.cassandra.io.util;
 
 public class BufferedSegmentedFile extends SegmentedFile
 {
-    public BufferedSegmentedFile(ChannelProxy channel, long length)
+    public BufferedSegmentedFile(ChannelProxy channel, int bufferSize, long 
length)
     {
-        super(new Cleanup(channel), channel, length);
+        super(new Cleanup(channel), channel, bufferSize, length);
     }
 
     private BufferedSegmentedFile(BufferedSegmentedFile copy)
@@ -48,16 +48,16 @@ public class BufferedSegmentedFile extends SegmentedFile
             // only one segment in a standard-io file
         }
 
-        public SegmentedFile complete(ChannelProxy channel, long 
overrideLength)
+        public SegmentedFile complete(ChannelProxy channel, int bufferSize, 
long overrideLength)
         {
             long length = overrideLength > 0 ? overrideLength : channel.size();
-            return new BufferedSegmentedFile(channel, length);
+            return new BufferedSegmentedFile(channel, bufferSize, length);
         }
     }
 
     public FileDataInput getSegment(long position)
     {
-        RandomAccessReader reader = RandomAccessReader.open(channel);
+        RandomAccessReader reader = RandomAccessReader.open(channel, 
bufferSize, -1L);
         reader.seek(position);
         return reader;
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6422e347/src/java/org/apache/cassandra/io/util/CompressedSegmentedFile.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/CompressedSegmentedFile.java 
b/src/java/org/apache/cassandra/io/util/CompressedSegmentedFile.java
index ceff7ba..2ae4781 100644
--- a/src/java/org/apache/cassandra/io/util/CompressedSegmentedFile.java
+++ b/src/java/org/apache/cassandra/io/util/CompressedSegmentedFile.java
@@ -37,14 +37,14 @@ public class CompressedSegmentedFile extends SegmentedFile 
implements ICompresse
     private static int MAX_SEGMENT_SIZE = Integer.MAX_VALUE;
     private final TreeMap<Long, MappedByteBuffer> chunkSegments;
 
-    public CompressedSegmentedFile(ChannelProxy channel, CompressionMetadata 
metadata)
+    public CompressedSegmentedFile(ChannelProxy channel, int bufferSize, 
CompressionMetadata metadata)
     {
-        this(channel, metadata, createMappedSegments(channel, metadata));
+        this(channel, bufferSize, metadata, createMappedSegments(channel, 
metadata));
     }
 
-    public CompressedSegmentedFile(ChannelProxy channel, CompressionMetadata 
metadata, TreeMap<Long, MappedByteBuffer> chunkSegments)
+    public CompressedSegmentedFile(ChannelProxy channel, int bufferSize, 
CompressionMetadata metadata, TreeMap<Long, MappedByteBuffer> chunkSegments)
     {
-        super(new Cleanup(channel, metadata, chunkSegments), channel, 
metadata.dataLength, metadata.compressedFileLength);
+        super(new Cleanup(channel, metadata, chunkSegments), channel, 
bufferSize, metadata.dataLength, metadata.compressedFileLength);
         this.metadata = metadata;
         this.chunkSegments = chunkSegments;
     }
@@ -144,9 +144,9 @@ public class CompressedSegmentedFile extends SegmentedFile 
implements ICompresse
             return writer.open(overrideLength);
         }
 
-        public SegmentedFile complete(ChannelProxy channel, long 
overrideLength)
+        public SegmentedFile complete(ChannelProxy channel, int bufferSize, 
long overrideLength)
         {
-            return new CompressedSegmentedFile(channel, 
metadata(channel.filePath(), overrideLength));
+            return new CompressedSegmentedFile(channel, bufferSize, 
metadata(channel.filePath(), overrideLength));
         }
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6422e347/src/java/org/apache/cassandra/io/util/MmappedSegmentedFile.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/MmappedSegmentedFile.java 
b/src/java/org/apache/cassandra/io/util/MmappedSegmentedFile.java
index 91908c9..879ca6f 100644
--- a/src/java/org/apache/cassandra/io/util/MmappedSegmentedFile.java
+++ b/src/java/org/apache/cassandra/io/util/MmappedSegmentedFile.java
@@ -43,9 +43,9 @@ public class MmappedSegmentedFile extends SegmentedFile
      */
     private final Segment[] segments;
 
-    public MmappedSegmentedFile(ChannelProxy channel, long length, Segment[] 
segments)
+    public MmappedSegmentedFile(ChannelProxy channel, int bufferSize, long 
length, Segment[] segments)
     {
-        super(new Cleanup(channel, segments), channel, length);
+        super(new Cleanup(channel, segments), channel, bufferSize, length);
         this.segments = segments;
     }
 
@@ -90,7 +90,7 @@ public class MmappedSegmentedFile extends SegmentedFile
         // we can have single cells or partitions larger than 2Gb, which is 
our maximum addressable range in a single segment;
         // in this case we open as a normal random access reader
         // FIXME: brafs are unbounded, so this segment will cover the rest of 
the file, rather than just the row
-        RandomAccessReader file = RandomAccessReader.open(channel);
+        RandomAccessReader file = RandomAccessReader.open(channel, bufferSize, 
-1L);
         file.seek(position);
         return file;
     }
@@ -183,11 +183,11 @@ public class MmappedSegmentedFile extends SegmentedFile
             }
         }
 
-        public SegmentedFile complete(ChannelProxy channel, long 
overrideLength)
+        public SegmentedFile complete(ChannelProxy channel, int bufferSize, 
long overrideLength)
         {
             long length = overrideLength > 0 ? overrideLength : channel.size();
             // create the segments
-            return new MmappedSegmentedFile(channel, length, 
createSegments(channel, length));
+            return new MmappedSegmentedFile(channel, bufferSize, length, 
createSegments(channel, length));
         }
 
         private Segment[] createSegments(ChannelProxy channel, long length)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6422e347/src/java/org/apache/cassandra/io/util/RandomAccessReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/RandomAccessReader.java 
b/src/java/org/apache/cassandra/io/util/RandomAccessReader.java
index c4be8e9..b13d154 100644
--- a/src/java/org/apache/cassandra/io/util/RandomAccessReader.java
+++ b/src/java/org/apache/cassandra/io/util/RandomAccessReader.java
@@ -27,7 +27,7 @@ import org.apache.cassandra.utils.memory.BufferPool;
 
 public class RandomAccessReader extends AbstractDataInput implements 
FileDataInput
 {
-    public static final int DEFAULT_BUFFER_SIZE = 64 * 1024;
+    public static final int DEFAULT_BUFFER_SIZE = 4096;
 
     // the IO channel to the file, we do not own a reference to this due to
     // performance reasons (CASSANDRA-9379) so it's up to the owner of the RAR 
to
@@ -59,9 +59,16 @@ public class RandomAccessReader extends AbstractDataInput 
implements FileDataInp
         buffer.limit(0);
     }
 
+    /** The buffer size is typically already page aligned but if that is not 
the case
+     * make sure that it is a multiple of the page size, 4096.
+     * */
     protected int getBufferSize(int size)
     {
-        return (int)Math.min(fileLength, size);
+        if ((size & ~4095) != size)
+        { // should already be a page size multiple but if that's not case 
round it up
+            size = (size + 4095) & ~4095;
+        }
+        return size;
     }
 
     protected ByteBuffer allocateBuffer(int size, BufferType bufferType)
@@ -103,12 +110,7 @@ public class RandomAccessReader extends AbstractDataInput 
implements FileDataInp
 
     public static RandomAccessReader open(ChannelProxy channel)
     {
-        return open(channel, -1L);
-    }
-
-    public static RandomAccessReader open(ChannelProxy channel, long 
overrideSize)
-    {
-        return open(channel, DEFAULT_BUFFER_SIZE, overrideSize);
+        return open(channel, DEFAULT_BUFFER_SIZE, -1L);
     }
 
     public static RandomAccessReader open(ChannelProxy channel, int 
bufferSize, long overrideSize)
@@ -132,7 +134,14 @@ public class RandomAccessReader extends AbstractDataInput 
implements FileDataInp
 
         long position = bufferOffset;
         long limit = bufferOffset;
-        while (buffer.hasRemaining() && limit < fileLength)
+
+        long pageAligedPos = position & ~4095;
+        // Because the buffer capacity is a multiple of the page size, we read 
less
+        // the first time and then we should read at page boundaries only,
+        // unless the user seeks elsewhere
+        long upperLimit = Math.min(fileLength, pageAligedPos + 
buffer.capacity());
+        buffer.limit((int)(upperLimit - position));
+        while (buffer.hasRemaining() && limit < upperLimit)
         {
             int n = channel.read(buffer, position);
             if (n < 0)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6422e347/src/java/org/apache/cassandra/io/util/SegmentedFile.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/SegmentedFile.java 
b/src/java/org/apache/cassandra/io/util/SegmentedFile.java
index 13a0ec7..e586682 100644
--- a/src/java/org/apache/cassandra/io/util/SegmentedFile.java
+++ b/src/java/org/apache/cassandra/io/util/SegmentedFile.java
@@ -25,13 +25,17 @@ import java.nio.MappedByteBuffer;
 import java.util.Iterator;
 import java.util.NoSuchElementException;
 
-import com.google.common.base.Throwables;
 import com.google.common.util.concurrent.RateLimiter;
 
 import org.apache.cassandra.config.Config;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.io.FSReadError;
 import org.apache.cassandra.io.compress.CompressedSequentialWriter;
+import org.apache.cassandra.io.sstable.Component;
+import org.apache.cassandra.io.sstable.Descriptor;
+import org.apache.cassandra.io.sstable.IndexSummary;
+import org.apache.cassandra.io.sstable.IndexSummaryBuilder;
+import org.apache.cassandra.io.sstable.metadata.StatsMetadata;
 import org.apache.cassandra.utils.CLibrary;
 import org.apache.cassandra.utils.Pair;
 import org.apache.cassandra.utils.concurrent.RefCounted;
@@ -51,6 +55,7 @@ import static org.apache.cassandra.utils.Throwables.maybeFail;
 public abstract class SegmentedFile extends SharedCloseableImpl
 {
     public final ChannelProxy channel;
+    public final int bufferSize;
     public final long length;
 
     // This differs from length for compressed files (but we still need length 
for
@@ -60,15 +65,16 @@ public abstract class SegmentedFile extends 
SharedCloseableImpl
     /**
      * Use getBuilder to get a Builder to construct a SegmentedFile.
      */
-    SegmentedFile(Cleanup cleanup, ChannelProxy channel, long length)
+    SegmentedFile(Cleanup cleanup, ChannelProxy channel, int bufferSize, long 
length)
     {
-        this(cleanup, channel, length, length);
+        this(cleanup, channel, bufferSize, length, length);
     }
 
-    protected SegmentedFile(Cleanup cleanup, ChannelProxy channel, long 
length, long onDiskLength)
+    protected SegmentedFile(Cleanup cleanup, ChannelProxy channel, int 
bufferSize, long length, long onDiskLength)
     {
         super(cleanup);
         this.channel = channel;
+        this.bufferSize = bufferSize;
         this.length = length;
         this.onDiskLength = onDiskLength;
     }
@@ -77,6 +83,7 @@ public abstract class SegmentedFile extends 
SharedCloseableImpl
     {
         super(copy);
         channel = copy.channel;
+        bufferSize = copy.bufferSize;
         length = copy.length;
         onDiskLength = copy.onDiskLength;
     }
@@ -109,13 +116,13 @@ public abstract class SegmentedFile extends 
SharedCloseableImpl
 
     public RandomAccessReader createReader()
     {
-        return RandomAccessReader.open(channel, length);
+        return RandomAccessReader.open(channel, bufferSize, length);
     }
 
     public RandomAccessReader createThrottledReader(RateLimiter limiter)
     {
         assert limiter != null;
-        return ThrottledReader.open(channel, length, limiter);
+        return ThrottledReader.open(channel, bufferSize, length, limiter);
     }
 
     public FileDataInput getSegment(long position)
@@ -171,19 +178,14 @@ public abstract class SegmentedFile extends 
SharedCloseableImpl
          * Called after all potential boundaries have been added to apply this 
Builder to a concrete file on disk.
          * @param channel The channel to the file on disk.
          */
-        protected abstract SegmentedFile complete(ChannelProxy channel, long 
overrideLength);
+        protected abstract SegmentedFile complete(ChannelProxy channel, int 
bufferSize, long overrideLength);
 
-        public SegmentedFile complete(String path)
-        {
-            return complete(path, -1L);
-        }
-
-        public SegmentedFile complete(String path, long overrideLength)
+        private SegmentedFile complete(String path, int bufferSize, long 
overrideLength)
         {
             ChannelProxy channelCopy = getChannel(path);
             try
             {
-                return complete(channelCopy, overrideLength);
+                return complete(channelCopy, bufferSize, overrideLength);
             }
             catch (Throwable t)
             {
@@ -192,6 +194,79 @@ public abstract class SegmentedFile extends 
SharedCloseableImpl
             }
         }
 
+        public SegmentedFile buildData(Descriptor desc, StatsMetadata stats, 
IndexSummaryBuilder.ReadableBoundary boundary)
+        {
+            return complete(desc.filenameFor(Component.DATA), 
bufferSize(stats), boundary.dataLength);
+        }
+
+        public SegmentedFile buildData(Descriptor desc, StatsMetadata stats)
+        {
+            return complete(desc.filenameFor(Component.DATA), 
bufferSize(stats), -1L);
+        }
+
+        public SegmentedFile buildIndex(Descriptor desc, IndexSummary 
indexSummary, IndexSummaryBuilder.ReadableBoundary boundary)
+        {
+            return complete(desc.filenameFor(Component.PRIMARY_INDEX), 
bufferSize(desc, indexSummary), boundary.indexLength);
+        }
+
+        public SegmentedFile buildIndex(Descriptor desc, IndexSummary 
indexSummary)
+        {
+            return complete(desc.filenameFor(Component.PRIMARY_INDEX), 
bufferSize(desc, indexSummary), -1L);
+        }
+
+        private int bufferSize(StatsMetadata stats)
+        {
+            return 
bufferSize(stats.estimatedPartitionSize.percentile(DatabaseDescriptor.getDiskOptimizationEstimatePercentile()));
+        }
+
+        private int bufferSize(Descriptor desc, IndexSummary indexSummary)
+        {
+            File file = new File(desc.filenameFor(Component.PRIMARY_INDEX));
+            return bufferSize(file.length() / indexSummary.size());
+        }
+
+        /**
+            Return the buffer size for a given record size. For spinning disks 
always add one page.
+            For solid state disks only add one page if the chance of crossing 
to the next page is more
+            than a predifined value, @see 
Config.disk_optimization_page_cross_chance.
+         */
+        static int bufferSize(long recordSize)
+        {
+            Config.DiskOptimizationStrategy strategy = 
DatabaseDescriptor.getDiskOptimizationStrategy();
+            if (strategy == Config.DiskOptimizationStrategy.ssd)
+            {
+                // The crossing probability is calculated assuming a uniform 
distribution of record
+                // start position in a page, so it's the record size modulo 
the page size divided by
+                // the total page size.
+                double pageCrossProbability = (recordSize % 4096) / 4096.;
+                // if the page cross probability is equal or bigger than 
disk_optimization_page_cross_chance we add one page
+                if ((pageCrossProbability - 
DatabaseDescriptor.getDiskOptimizationPageCrossChance()) > -1e-16)
+                    recordSize += 4096;
+
+                return roundBufferSize(recordSize);
+            }
+            else if (strategy == Config.DiskOptimizationStrategy.spinning)
+            {
+                return roundBufferSize(recordSize + 4096);
+            }
+            else
+            {
+                throw new IllegalStateException("Unsupported disk optimization 
strategy: " + strategy);
+            }
+        }
+
+        /**
+           Round up to the next multiple of 4k but no more than 64k
+         */
+        static int roundBufferSize(long size)
+        {
+            if (size <= 0)
+                return 4096;
+
+            size = (size + 4095) & ~4095;
+            return (int)Math.min(size, 1 << 16);
+        }
+
         public void serializeBounds(DataOutput out) throws IOException
         {
             out.writeUTF(DatabaseDescriptor.getDiskAccessMode().name());

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6422e347/src/java/org/apache/cassandra/io/util/ThrottledReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/ThrottledReader.java 
b/src/java/org/apache/cassandra/io/util/ThrottledReader.java
index ea21355..024d38f 100644
--- a/src/java/org/apache/cassandra/io/util/ThrottledReader.java
+++ b/src/java/org/apache/cassandra/io/util/ThrottledReader.java
@@ -29,9 +29,9 @@ public class ThrottledReader extends RandomAccessReader
 {
     private final RateLimiter limiter;
 
-    protected ThrottledReader(ChannelProxy channel, long overrideLength, 
RateLimiter limiter)
+    protected ThrottledReader(ChannelProxy channel, int bufferSize, long 
overrideLength, RateLimiter limiter)
     {
-        super(channel, RandomAccessReader.DEFAULT_BUFFER_SIZE, overrideLength, 
BufferType.OFF_HEAP);
+        super(channel, bufferSize, overrideLength, BufferType.OFF_HEAP);
         this.limiter = limiter;
     }
 
@@ -41,8 +41,8 @@ public class ThrottledReader extends RandomAccessReader
         super.reBuffer();
     }
 
-    public static ThrottledReader open(ChannelProxy channel, long 
overrideLength, RateLimiter limiter)
+    public static ThrottledReader open(ChannelProxy channel, int bufferSize, 
long overrideLength, RateLimiter limiter)
     {
-        return new ThrottledReader(channel, overrideLength, limiter);
+        return new ThrottledReader(channel, bufferSize, overrideLength, 
limiter);
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6422e347/test/unit/org/apache/cassandra/MockSchema.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/MockSchema.java 
b/test/unit/org/apache/cassandra/MockSchema.java
index d9c7e8b..e052c0a 100644
--- a/test/unit/org/apache/cassandra/MockSchema.java
+++ b/test/unit/org/apache/cassandra/MockSchema.java
@@ -43,6 +43,7 @@ import org.apache.cassandra.io.util.BufferedSegmentedFile;
 import org.apache.cassandra.io.util.ChannelProxy;
 import org.apache.cassandra.io.util.FileUtils;
 import org.apache.cassandra.io.util.Memory;
+import org.apache.cassandra.io.util.RandomAccessReader;
 import org.apache.cassandra.io.util.SegmentedFile;
 import org.apache.cassandra.schema.KeyspaceMetadata;
 import org.apache.cassandra.schema.KeyspaceParams;
@@ -61,7 +62,7 @@ public class MockSchema
     public static final Keyspace ks = 
Keyspace.mockKS(KeyspaceMetadata.create("mockks", 
KeyspaceParams.simpleTransient(1)));
 
     public static final IndexSummary indexSummary;
-    private static final SegmentedFile segmentedFile = new 
BufferedSegmentedFile(new ChannelProxy(temp("mocksegmentedfile")), 0);
+    private static final SegmentedFile segmentedFile = new 
BufferedSegmentedFile(new ChannelProxy(temp("mocksegmentedfile")), 
RandomAccessReader.DEFAULT_BUFFER_SIZE, 0);
 
     public static Memtable memtable(ColumnFamilyStore cfs)
     {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6422e347/test/unit/org/apache/cassandra/db/lifecycle/TransactionLogsTest.java
----------------------------------------------------------------------
diff --git 
a/test/unit/org/apache/cassandra/db/lifecycle/TransactionLogsTest.java 
b/test/unit/org/apache/cassandra/db/lifecycle/TransactionLogsTest.java
index 3150087..4105800 100644
--- a/test/unit/org/apache/cassandra/db/lifecycle/TransactionLogsTest.java
+++ b/test/unit/org/apache/cassandra/db/lifecycle/TransactionLogsTest.java
@@ -48,6 +48,7 @@ import org.apache.cassandra.io.sstable.metadata.MetadataType;
 import org.apache.cassandra.io.sstable.metadata.StatsMetadata;
 import org.apache.cassandra.io.util.BufferedSegmentedFile;
 import org.apache.cassandra.io.util.ChannelProxy;
+import org.apache.cassandra.io.util.RandomAccessReader;
 import org.apache.cassandra.io.util.SegmentedFile;
 import org.apache.cassandra.utils.AlwaysPresentFilter;
 import org.apache.cassandra.utils.concurrent.AbstractTransactionalTest;
@@ -517,8 +518,8 @@ public class TransactionLogsTest extends 
AbstractTransactionalTest
             }
         }
 
-        SegmentedFile dFile = new BufferedSegmentedFile(new ChannelProxy(new 
File(descriptor.filenameFor(Component.DATA))), 0);
-        SegmentedFile iFile = new BufferedSegmentedFile(new ChannelProxy(new 
File(descriptor.filenameFor(Component.PRIMARY_INDEX))), 0);
+        SegmentedFile dFile = new BufferedSegmentedFile(new ChannelProxy(new 
File(descriptor.filenameFor(Component.DATA))), 
RandomAccessReader.DEFAULT_BUFFER_SIZE, 0);
+        SegmentedFile iFile = new BufferedSegmentedFile(new ChannelProxy(new 
File(descriptor.filenameFor(Component.PRIMARY_INDEX))), 
RandomAccessReader.DEFAULT_BUFFER_SIZE, 0);
 
         SerializationHeader header = SerializationHeader.make(cfs.metadata, 
Collections.EMPTY_LIST);
         StatsMetadata metadata = (StatsMetadata) new 
MetadataCollector(cfs.metadata.comparator)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6422e347/test/unit/org/apache/cassandra/io/RandomAccessReaderTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/RandomAccessReaderTest.java 
b/test/unit/org/apache/cassandra/io/RandomAccessReaderTest.java
index 71fab61..edbd603 100644
--- a/test/unit/org/apache/cassandra/io/RandomAccessReaderTest.java
+++ b/test/unit/org/apache/cassandra/io/RandomAccessReaderTest.java
@@ -22,23 +22,61 @@ public class RandomAccessReaderTest
     @Test
     public void testReadFully() throws IOException
     {
+        testReadImpl(1, 0);
+    }
+
+    @Test
+    public void testReadLarge() throws IOException
+    {
+        testReadImpl(1000, 0);
+    }
+
+    @Test
+    public void testReadLargeWithSkip() throws IOException
+    {
+        testReadImpl(1000, 322);
+    }
+
+    @Test
+    public void testReadBufferSizeNotAligned() throws IOException
+    {
+        testReadImpl(1000, 0, 5122);
+    }
+
+    private void testReadImpl(int numIterations, int skipIterations) throws 
IOException
+    {
+        testReadImpl(numIterations, skipIterations, 
RandomAccessReader.DEFAULT_BUFFER_SIZE);
+    }
+
+    private void testReadImpl(int numIterations, int skipIterations, int 
bufferSize) throws IOException
+    {
         final File f = File.createTempFile("testReadFully", "1");
         final String expected = "The quick brown fox jumps over the lazy dog";
 
         SequentialWriter writer = SequentialWriter.open(f);
-        writer.write(expected.getBytes());
+        for (int i = 0; i < numIterations; i++)
+            writer.write(expected.getBytes());
         writer.finish();
 
         assert f.exists();
 
         ChannelProxy channel = new ChannelProxy(f);
-        RandomAccessReader reader = RandomAccessReader.open(channel);
+        RandomAccessReader reader = RandomAccessReader.open(channel, 
bufferSize, -1L);
         assertEquals(f.getAbsolutePath(), reader.getPath());
-        assertEquals(expected.length(), reader.length());
+        assertEquals(expected.length() * numIterations, reader.length());
+
+        if (skipIterations > 0)
+        {
+            reader.seek(skipIterations * expected.length());
+        }
 
         byte[] b = new byte[expected.length()];
-        reader.readFully(b);
-        assertEquals(expected, new String(b));
+        int n = numIterations - skipIterations;
+        for (int i = 0; i < n; i++)
+        {
+            reader.readFully(b);
+            assertEquals(expected, new String(b));
+        }
 
         assertTrue(reader.isEOF());
         assertEquals(0, reader.bytesRemaining());

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6422e347/test/unit/org/apache/cassandra/io/util/SegmentedFileTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/util/SegmentedFileTest.java 
b/test/unit/org/apache/cassandra/io/util/SegmentedFileTest.java
new file mode 100644
index 0000000..03c10de
--- /dev/null
+++ b/test/unit/org/apache/cassandra/io/util/SegmentedFileTest.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.io.util;
+
+import org.junit.Test;
+
+import org.apache.cassandra.config.Config;
+import org.apache.cassandra.config.DatabaseDescriptor;
+
+import static org.junit.Assert.assertEquals;
+
+public class SegmentedFileTest
+{
+    @Test
+    public void testRoundingBufferSize()
+    {
+        assertEquals(4096, SegmentedFile.Builder.roundBufferSize(-1L));
+        assertEquals(4096, SegmentedFile.Builder.roundBufferSize(0));
+        assertEquals(4096, SegmentedFile.Builder.roundBufferSize(1));
+        assertEquals(4096, SegmentedFile.Builder.roundBufferSize(2013));
+        assertEquals(4096, SegmentedFile.Builder.roundBufferSize(4095));
+        assertEquals(4096, SegmentedFile.Builder.roundBufferSize(4096));
+        assertEquals(8192, SegmentedFile.Builder.roundBufferSize(4097));
+        assertEquals(8192, SegmentedFile.Builder.roundBufferSize(8191));
+        assertEquals(8192, SegmentedFile.Builder.roundBufferSize(8192));
+        assertEquals(12288, SegmentedFile.Builder.roundBufferSize(8193));
+        assertEquals(65536, SegmentedFile.Builder.roundBufferSize(65535));
+        assertEquals(65536, SegmentedFile.Builder.roundBufferSize(65536));
+        assertEquals(65536, SegmentedFile.Builder.roundBufferSize(65537));
+        assertEquals(65536, 
SegmentedFile.Builder.roundBufferSize(10000000000000000L));
+    }
+
+    @Test
+    public void testBufferSize_ssd()
+    {
+        
DatabaseDescriptor.setDiskOptimizationStrategy(Config.DiskOptimizationStrategy.ssd);
+        DatabaseDescriptor.setDiskOptimizationPageCrossChance(0.1);
+
+        assertEquals(4096, SegmentedFile.Builder.bufferSize(0));
+        assertEquals(4096, SegmentedFile.Builder.bufferSize(10));
+        assertEquals(4096, SegmentedFile.Builder.bufferSize(100));
+        assertEquals(4096, SegmentedFile.Builder.bufferSize(4096));
+        assertEquals(8192, SegmentedFile.Builder.bufferSize(4505));   // just 
< (4096 + 4096 * 0.1)
+        assertEquals(12288, SegmentedFile.Builder.bufferSize(4506));  // just 
> (4096 + 4096 * 0.1)
+
+        DatabaseDescriptor.setDiskOptimizationPageCrossChance(0.5);
+        assertEquals(8192, SegmentedFile.Builder.bufferSize(4506));  // just > 
(4096 + 4096 * 0.1)
+        assertEquals(8192, SegmentedFile.Builder.bufferSize(6143));  // < 
(4096 + 4096 * 0.5)
+        assertEquals(12288, SegmentedFile.Builder.bufferSize(6144));  // = 
(4096 + 4096 * 0.5)
+        assertEquals(12288, SegmentedFile.Builder.bufferSize(6145));  // > 
(4096 + 4096 * 0.5)
+
+        DatabaseDescriptor.setDiskOptimizationPageCrossChance(1.0); // never 
add a page
+        assertEquals(8192, SegmentedFile.Builder.bufferSize(8191));
+        assertEquals(8192, SegmentedFile.Builder.bufferSize(8192));
+
+        DatabaseDescriptor.setDiskOptimizationPageCrossChance(0.0); // always 
add a page
+        assertEquals(8192, SegmentedFile.Builder.bufferSize(10));
+        assertEquals(8192, SegmentedFile.Builder.bufferSize(4096));
+    }
+
+    @Test
+    public void testBufferSize_spinning()
+    {
+        
DatabaseDescriptor.setDiskOptimizationStrategy(Config.DiskOptimizationStrategy.spinning);
+
+        assertEquals(4096, SegmentedFile.Builder.bufferSize(0));
+        assertEquals(8192, SegmentedFile.Builder.bufferSize(10));
+        assertEquals(8192, SegmentedFile.Builder.bufferSize(100));
+        assertEquals(8192, SegmentedFile.Builder.bufferSize(4096));
+        assertEquals(12288, SegmentedFile.Builder.bufferSize(4097));
+    }
+}

Reply via email to