http://git-wip-us.apache.org/repos/asf/cassandra/blob/30bb255e/src/java/org/apache/cassandra/io/util/CompressedSegmentedFile.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/CompressedSegmentedFile.java 
b/src/java/org/apache/cassandra/io/util/CompressedSegmentedFile.java
index 16f791a..7365d40 100644
--- a/src/java/org/apache/cassandra/io/util/CompressedSegmentedFile.java
+++ b/src/java/org/apache/cassandra/io/util/CompressedSegmentedFile.java
@@ -17,49 +17,61 @@
  */
 package org.apache.cassandra.io.util;
 
-import com.google.common.util.concurrent.RateLimiter;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.concurrent.ThreadLocalRandom;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.primitives.Ints;
 
+import org.apache.cassandra.cache.ChunkCache;
 import org.apache.cassandra.config.Config;
+import org.apache.cassandra.config.Config.DiskAccessMode;
+import org.apache.cassandra.io.compress.*;
+import org.apache.cassandra.io.sstable.CorruptSSTableException;
 import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.io.compress.CompressedRandomAccessReader;
-import org.apache.cassandra.io.compress.CompressedSequentialWriter;
-import org.apache.cassandra.io.compress.CompressionMetadata;
-import org.apache.cassandra.utils.JVMStabilityInspector;
 import org.apache.cassandra.utils.concurrent.Ref;
 
 public class CompressedSegmentedFile extends SegmentedFile implements 
ICompressedFile
 {
-    private static final Logger logger = 
LoggerFactory.getLogger(CompressedSegmentedFile.class);
-    private static final boolean useMmap = 
DatabaseDescriptor.getDiskAccessMode() == Config.DiskAccessMode.mmap;
-
     public final CompressionMetadata metadata;
-    private final MmappedRegions regions;
 
-    public CompressedSegmentedFile(ChannelProxy channel, int bufferSize, 
CompressionMetadata metadata)
+    public CompressedSegmentedFile(ChannelProxy channel, CompressionMetadata 
metadata, Config.DiskAccessMode mode)
     {
         this(channel,
-             bufferSize,
              metadata,
-             useMmap
+             mode == DiskAccessMode.mmap
              ? MmappedRegions.map(channel, metadata)
              : null);
     }
 
-    public CompressedSegmentedFile(ChannelProxy channel, int bufferSize, 
CompressionMetadata metadata, MmappedRegions regions)
+    public CompressedSegmentedFile(ChannelProxy channel, CompressionMetadata 
metadata, MmappedRegions regions)
+    {
+        this(channel, metadata, regions, createRebufferer(channel, metadata, 
regions));
+    }
+
+    private static RebuffererFactory createRebufferer(ChannelProxy channel, 
CompressionMetadata metadata, MmappedRegions regions)
+    {
+        return ChunkCache.maybeWrap(chunkReader(channel, metadata, regions));
+    }
+
+    public static ChunkReader chunkReader(ChannelProxy channel, 
CompressionMetadata metadata, MmappedRegions regions)
+    {
+        return regions != null
+               ? new Mmap(channel, metadata, regions)
+               : new Standard(channel, metadata);
+    }
+
+    public CompressedSegmentedFile(ChannelProxy channel, CompressionMetadata 
metadata, MmappedRegions regions, RebuffererFactory rebufferer)
     {
-        super(new Cleanup(channel, metadata, regions), channel, bufferSize, 
metadata.dataLength, metadata.compressedFileLength);
+        super(new Cleanup(channel, metadata, regions, rebufferer), channel, 
rebufferer, metadata.compressedFileLength);
         this.metadata = metadata;
-        this.regions = regions;
     }
 
     private CompressedSegmentedFile(CompressedSegmentedFile copy)
     {
         super(copy);
         this.metadata = copy.metadata;
-        this.regions = copy.regions;
     }
 
     public ChannelProxy channel()
@@ -67,33 +79,21 @@ public class CompressedSegmentedFile extends SegmentedFile 
implements ICompresse
         return channel;
     }
 
-    public MmappedRegions regions()
-    {
-        return regions;
-    }
-
     private static final class Cleanup extends SegmentedFile.Cleanup
     {
         final CompressionMetadata metadata;
-        private final MmappedRegions regions;
 
-        protected Cleanup(ChannelProxy channel, CompressionMetadata metadata, 
MmappedRegions regions)
+        protected Cleanup(ChannelProxy channel, CompressionMetadata metadata, 
MmappedRegions regions, ReaderFileProxy rebufferer)
         {
-            super(channel);
+            super(channel, rebufferer);
             this.metadata = metadata;
-            this.regions = regions;
         }
         public void tidy()
         {
-            Throwable err = regions == null ? null : regions.close(null);
-            if (err != null)
+            if (ChunkCache.instance != null)
             {
-                JVMStabilityInspector.inspectThrowable(err);
-
-                // This is not supposed to happen
-                logger.error("Error while closing mmapped regions", err);
+                ChunkCache.instance.invalidateFile(name());
             }
-
             metadata.close();
 
             super.tidy();
@@ -114,9 +114,12 @@ public class CompressedSegmentedFile extends SegmentedFile 
implements ICompresse
     public static class Builder extends SegmentedFile.Builder
     {
         final CompressedSequentialWriter writer;
+        final Config.DiskAccessMode mode;
+
         public Builder(CompressedSequentialWriter writer)
         {
             this.writer = writer;
+            this.mode = DatabaseDescriptor.getDiskAccessMode();
         }
 
         protected CompressionMetadata metadata(String path, long 
overrideLength)
@@ -129,7 +132,7 @@ public class CompressedSegmentedFile extends SegmentedFile 
implements ICompresse
 
         public SegmentedFile complete(ChannelProxy channel, int bufferSize, 
long overrideLength)
         {
-            return new CompressedSegmentedFile(channel, bufferSize, 
metadata(channel.filePath(), overrideLength));
+            return new CompressedSegmentedFile(channel, 
metadata(channel.filePath(), overrideLength), mode);
         }
     }
 
@@ -140,18 +143,216 @@ public class CompressedSegmentedFile extends 
SegmentedFile implements ICompresse
         super.dropPageCache(metadata.chunkFor(before).offset);
     }
 
-    public RandomAccessReader createReader()
+    public CompressionMetadata getMetadata()
     {
-        return new CompressedRandomAccessReader.Builder(this).build();
+        return metadata;
     }
 
-    public RandomAccessReader createReader(RateLimiter limiter)
+    public long dataLength()
     {
-        return new 
CompressedRandomAccessReader.Builder(this).limiter(limiter).build();
+        return metadata.dataLength;
     }
 
-    public CompressionMetadata getMetadata()
+    @VisibleForTesting
+    public abstract static class CompressedChunkReader extends 
AbstractReaderFileProxy implements ChunkReader
     {
-        return metadata;
+        final CompressionMetadata metadata;
+
+        public CompressedChunkReader(ChannelProxy channel, CompressionMetadata 
metadata)
+        {
+            super(channel, metadata.dataLength);
+            this.metadata = metadata;
+            assert Integer.bitCount(metadata.chunkLength()) == 1; //must be a 
power of two
+        }
+
+        @VisibleForTesting
+        public double getCrcCheckChance()
+        {
+            return metadata.parameters.getCrcCheckChance();
+        }
+
+        @Override
+        public String toString()
+        {
+            return String.format("CompressedChunkReader.%s(%s - %s, chunk 
length %d, data length %d)",
+                                 getClass().getSimpleName(),
+                                 channel.filePath(),
+                                 
metadata.compressor().getClass().getSimpleName(),
+                                 metadata.chunkLength(),
+                                 metadata.dataLength);
+        }
+
+        @Override
+        public int chunkSize()
+        {
+            return metadata.chunkLength();
+        }
+
+        @Override
+        public boolean alignmentRequired()
+        {
+            return true;
+        }
+
+        @Override
+        public BufferType preferredBufferType()
+        {
+            return metadata.compressor().preferredBufferType();
+        }
+
+        @Override
+        public Rebufferer instantiateRebufferer()
+        {
+            return BufferManagingRebufferer.on(this);
+        }
+    }
+
+    static class Standard extends CompressedChunkReader
+    {
+        // we read the raw compressed bytes into this buffer, then 
uncompressed them into the provided one.
+        private final ThreadLocal<ByteBuffer> compressedHolder;
+
+        public Standard(ChannelProxy channel, CompressionMetadata metadata)
+        {
+            super(channel, metadata);
+            compressedHolder = ThreadLocal.withInitial(this::allocateBuffer);
+        }
+
+        public ByteBuffer allocateBuffer()
+        {
+            return 
allocateBuffer(metadata.compressor().initialCompressedBufferLength(metadata.chunkLength()));
+        }
+
+        public ByteBuffer allocateBuffer(int size)
+        {
+            return metadata.compressor().preferredBufferType().allocate(size);
+        }
+
+        @Override
+        public void readChunk(long position, ByteBuffer uncompressed)
+        {
+            try
+            {
+                // accesses must always be aligned
+                assert (position & -uncompressed.capacity()) == position;
+                assert position <= fileLength;
+
+                CompressionMetadata.Chunk chunk = metadata.chunkFor(position);
+                ByteBuffer compressed = compressedHolder.get();
+
+                if (compressed.capacity() < chunk.length)
+                {
+                    compressed = allocateBuffer(chunk.length);
+                    compressedHolder.set(compressed);
+                }
+                else
+                {
+                    compressed.clear();
+                }
+
+                compressed.limit(chunk.length);
+                if (channel.read(compressed, chunk.offset) != chunk.length)
+                    throw new CorruptBlockException(channel.filePath(), chunk);
+
+                compressed.flip();
+                uncompressed.clear();
+
+                try
+                {
+                    metadata.compressor().uncompress(compressed, uncompressed);
+                }
+                catch (IOException e)
+                {
+                    throw new CorruptBlockException(channel.filePath(), chunk);
+                }
+                finally
+                {
+                    uncompressed.flip();
+                }
+
+                if (getCrcCheckChance() > 
ThreadLocalRandom.current().nextDouble())
+                {
+                    compressed.rewind();
+                    int checksum = (int) metadata.checksumType.of(compressed);
+
+                    compressed.clear().limit(Integer.BYTES);
+                    if (channel.read(compressed, chunk.offset + chunk.length) 
!= Integer.BYTES
+                        || compressed.getInt(0) != checksum)
+                        throw new CorruptBlockException(channel.filePath(), 
chunk);
+                }
+            }
+            catch (CorruptBlockException e)
+            {
+                throw new CorruptSSTableException(e, channel.filePath());
+            }
+        }
+    }
+
+    static class Mmap extends CompressedChunkReader
+    {
+        protected final MmappedRegions regions;
+
+        public Mmap(ChannelProxy channel, CompressionMetadata metadata, 
MmappedRegions regions)
+        {
+            super(channel, metadata);
+            this.regions = regions;
+        }
+
+        @Override
+        public void readChunk(long position, ByteBuffer uncompressed)
+        {
+            try
+            {
+                // accesses must always be aligned
+                assert (position & -uncompressed.capacity()) == position;
+                assert position <= fileLength;
+
+                CompressionMetadata.Chunk chunk = metadata.chunkFor(position);
+
+                MmappedRegions.Region region = regions.floor(chunk.offset);
+                long segmentOffset = region.offset();
+                int chunkOffset = Ints.checkedCast(chunk.offset - 
segmentOffset);
+                ByteBuffer compressedChunk = region.buffer();
+
+                compressedChunk.position(chunkOffset).limit(chunkOffset + 
chunk.length);
+
+                uncompressed.clear();
+
+                try
+                {
+                    metadata.compressor().uncompress(compressedChunk, 
uncompressed);
+                }
+                catch (IOException e)
+                {
+                    throw new CorruptBlockException(channel.filePath(), chunk);
+                }
+                finally
+                {
+                    uncompressed.flip();
+                }
+
+                if (getCrcCheckChance() > 
ThreadLocalRandom.current().nextDouble())
+                {
+                    compressedChunk.position(chunkOffset).limit(chunkOffset + 
chunk.length);
+
+                    int checksum = (int) 
metadata.checksumType.of(compressedChunk);
+
+                    compressedChunk.limit(compressedChunk.capacity());
+                    if (compressedChunk.getInt() != checksum)
+                        throw new CorruptBlockException(channel.filePath(), 
chunk);
+                }
+            }
+            catch (CorruptBlockException e)
+            {
+                throw new CorruptSSTableException(e, channel.filePath());
+            }
+
+        }
+
+        public void close()
+        {
+            regions.closeQuietly();
+            super.close();
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/30bb255e/src/java/org/apache/cassandra/io/util/DataIntegrityMetadata.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/DataIntegrityMetadata.java 
b/src/java/org/apache/cassandra/io/util/DataIntegrityMetadata.java
index eb84a89..0c48d13 100644
--- a/src/java/org/apache/cassandra/io/util/DataIntegrityMetadata.java
+++ b/src/java/org/apache/cassandra/io/util/DataIntegrityMetadata.java
@@ -34,6 +34,7 @@ import com.google.common.base.Charsets;
 import org.apache.cassandra.io.FSWriteError;
 import org.apache.cassandra.io.sstable.Component;
 import org.apache.cassandra.io.sstable.Descriptor;
+import org.apache.cassandra.utils.ChecksumType;
 import org.apache.cassandra.utils.Throwables;
 
 public class DataIntegrityMetadata
@@ -45,21 +46,21 @@ public class DataIntegrityMetadata
 
     public static class ChecksumValidator implements Closeable
     {
-        private final Checksum checksum;
+        private final ChecksumType checksumType;
         private final RandomAccessReader reader;
         public final int chunkSize;
         private final String dataFilename;
 
         public ChecksumValidator(Descriptor descriptor) throws IOException
         {
-            this(descriptor.version.uncompressedChecksumType().newInstance(),
+            this(descriptor.version.uncompressedChecksumType(),
                  RandomAccessReader.open(new 
File(descriptor.filenameFor(Component.CRC))),
                  descriptor.filenameFor(Component.DATA));
         }
 
-        public ChecksumValidator(Checksum checksum, RandomAccessReader reader, 
String dataFilename) throws IOException
+        public ChecksumValidator(ChecksumType checksumType, RandomAccessReader 
reader, String dataFilename) throws IOException
         {
-            this.checksum = checksum;
+            this.checksumType = checksumType;
             this.reader = reader;
             this.dataFilename = dataFilename;
             chunkSize = reader.readInt();
@@ -79,9 +80,7 @@ public class DataIntegrityMetadata
 
         public void validate(byte[] bytes, int start, int end) throws 
IOException
         {
-            checksum.update(bytes, start, end);
-            int current = (int) checksum.getValue();
-            checksum.reset();
+            int current = (int) checksumType.of(bytes, start, end);
             int actual = reader.readInt();
             if (current != actual)
                 throw new IOException("Corrupted File : " + dataFilename);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/30bb255e/src/java/org/apache/cassandra/io/util/ICompressedFile.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/ICompressedFile.java 
b/src/java/org/apache/cassandra/io/util/ICompressedFile.java
index 43cef8c..e69487c 100644
--- a/src/java/org/apache/cassandra/io/util/ICompressedFile.java
+++ b/src/java/org/apache/cassandra/io/util/ICompressedFile.java
@@ -23,6 +23,4 @@ public interface ICompressedFile
 {
     ChannelProxy channel();
     CompressionMetadata getMetadata();
-    MmappedRegions regions();
-
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/30bb255e/src/java/org/apache/cassandra/io/util/LimitingRebufferer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/LimitingRebufferer.java 
b/src/java/org/apache/cassandra/io/util/LimitingRebufferer.java
new file mode 100644
index 0000000..e69da70
--- /dev/null
+++ b/src/java/org/apache/cassandra/io/util/LimitingRebufferer.java
@@ -0,0 +1,106 @@
+package org.apache.cassandra.io.util;
+
+import java.nio.ByteBuffer;
+
+import com.google.common.primitives.Ints;
+import com.google.common.util.concurrent.RateLimiter;
+
+/**
+ * Rebufferer wrapper that applies rate limiting.
+ *
+ * Instantiated once per RandomAccessReader, thread-unsafe.
+ * The instances reuse themselves as the BufferHolder to avoid having to 
return a new object for each rebuffer call.
+ */
+public class LimitingRebufferer implements Rebufferer, Rebufferer.BufferHolder
+{
+    final private Rebufferer wrapped;
+    final private RateLimiter limiter;
+    final private int limitQuant;
+
+    private BufferHolder bufferHolder;
+    private ByteBuffer buffer;
+    private long offset;
+
+    public LimitingRebufferer(Rebufferer wrapped, RateLimiter limiter, int 
limitQuant)
+    {
+        this.wrapped = wrapped;
+        this.limiter = limiter;
+        this.limitQuant = limitQuant;
+    }
+
+    @Override
+    public BufferHolder rebuffer(long position)
+    {
+        bufferHolder = wrapped.rebuffer(position);
+        buffer = bufferHolder.buffer();
+        offset = bufferHolder.offset();
+        int posInBuffer = Ints.checkedCast(position - offset);
+        int remaining = buffer.limit() - posInBuffer;
+        if (remaining == 0)
+            return this;
+
+        if (remaining > limitQuant)
+        {
+            buffer.limit(posInBuffer + limitQuant); // certainly below current 
limit
+            remaining = limitQuant;
+        }
+        limiter.acquire(remaining);
+        return this;
+    }
+
+    @Override
+    public ChannelProxy channel()
+    {
+        return wrapped.channel();
+    }
+
+    @Override
+    public long fileLength()
+    {
+        return wrapped.fileLength();
+    }
+
+    @Override
+    public double getCrcCheckChance()
+    {
+        return wrapped.getCrcCheckChance();
+    }
+
+    @Override
+    public void close()
+    {
+        wrapped.close();
+    }
+
+    @Override
+    public void closeReader()
+    {
+        wrapped.closeReader();
+    }
+
+    @Override
+    public String toString()
+    {
+        return "LimitingRebufferer[" + limiter.toString() + "]:" + 
wrapped.toString();
+    }
+
+    // BufferHolder methods
+
+    @Override
+    public ByteBuffer buffer()
+    {
+        return buffer;
+    }
+
+    @Override
+    public long offset()
+    {
+        return offset;
+    }
+
+    @Override
+    public void release()
+    {
+        bufferHolder.release();
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/30bb255e/src/java/org/apache/cassandra/io/util/MmapRebufferer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/MmapRebufferer.java 
b/src/java/org/apache/cassandra/io/util/MmapRebufferer.java
new file mode 100644
index 0000000..6c39cb1
--- /dev/null
+++ b/src/java/org/apache/cassandra/io/util/MmapRebufferer.java
@@ -0,0 +1,49 @@
+package org.apache.cassandra.io.util;
+
+/**
+ * Rebufferer for memory-mapped files. Thread-safe and shared among reader 
instances.
+ * This is simply a thin wrapper around MmappedRegions as the buffers there 
can be used directly after duplication.
+ */
+class MmapRebufferer extends AbstractReaderFileProxy implements Rebufferer, 
RebuffererFactory
+{
+    protected final MmappedRegions regions;
+
+    public MmapRebufferer(ChannelProxy channel, long fileLength, 
MmappedRegions regions)
+    {
+        super(channel, fileLength);
+        this.regions = regions;
+    }
+
+    @Override
+    public BufferHolder rebuffer(long position)
+    {
+        return regions.floor(position);
+    }
+
+    @Override
+    public Rebufferer instantiateRebufferer()
+    {
+        return this;
+    }
+
+    @Override
+    public void close()
+    {
+        regions.closeQuietly();
+    }
+
+    @Override
+    public void closeReader()
+    {
+        // Instance is shared among readers. Nothing to release.
+    }
+
+    @Override
+    public String toString()
+    {
+        return String.format("%s(%s - data length %d)",
+                             getClass().getSimpleName(),
+                             channel.filePath(),
+                             fileLength());
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/cassandra/blob/30bb255e/src/java/org/apache/cassandra/io/util/MmappedRegions.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/MmappedRegions.java 
b/src/java/org/apache/cassandra/io/util/MmappedRegions.java
index 8f6cd92..f269b84 100644
--- a/src/java/org/apache/cassandra/io/util/MmappedRegions.java
+++ b/src/java/org/apache/cassandra/io/util/MmappedRegions.java
@@ -22,8 +22,11 @@ import java.nio.ByteBuffer;
 import java.nio.channels.FileChannel;
 import java.util.Arrays;
 
+import org.slf4j.LoggerFactory;
+
 import org.apache.cassandra.io.FSReadError;
 import org.apache.cassandra.io.compress.CompressionMetadata;
+import org.apache.cassandra.utils.JVMStabilityInspector;
 import org.apache.cassandra.utils.Throwables;
 import org.apache.cassandra.utils.concurrent.RefCounted;
 import org.apache.cassandra.utils.concurrent.SharedCloseableImpl;
@@ -190,8 +193,20 @@ public class MmappedRegions extends SharedCloseableImpl
         assert !isCleanedUp() : "Attempted to use closed region";
         return state.floor(position);
     }
+    
+    public void closeQuietly()
+    {
+        Throwable err = close(null);
+        if (err != null)
+        {
+            JVMStabilityInspector.inspectThrowable(err);
+
+            // This is not supposed to happen
+            LoggerFactory.getLogger(getClass()).error("Error while closing 
mmapped regions", err);
+        }
+    }
 
-    public static final class Region
+    public static final class Region implements Rebufferer.BufferHolder
     {
         public final long offset;
         public final ByteBuffer buffer;
@@ -202,15 +217,25 @@ public class MmappedRegions extends SharedCloseableImpl
             this.buffer = buffer;
         }
 
-        public long bottom()
+        public ByteBuffer buffer()
+        {
+            return buffer.duplicate();
+        }
+
+        public long offset()
         {
             return offset;
         }
 
-        public long top()
+        public long end()
         {
             return offset + buffer.capacity();
         }
+
+        public void release()
+        {
+            // only released after no readers are present
+        }
     }
 
     private static final class State
@@ -260,7 +285,7 @@ public class MmappedRegions extends SharedCloseableImpl
 
         private Region floor(long position)
         {
-            assert 0 <= position && position < length : String.format("%d >= 
%d", position, length);
+            assert 0 <= position && position <= length : String.format("%d > 
%d", position, length);
 
             int idx = Arrays.binarySearch(offsets, 0, last +1, position);
             assert idx != -1 : String.format("Bad position %d for regions %s, 
last %d in %s", position, Arrays.toString(offsets), last, channel);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/30bb255e/src/java/org/apache/cassandra/io/util/MmappedSegmentedFile.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/MmappedSegmentedFile.java 
b/src/java/org/apache/cassandra/io/util/MmappedSegmentedFile.java
index 5f56ff6..d514bf8 100644
--- a/src/java/org/apache/cassandra/io/util/MmappedSegmentedFile.java
+++ b/src/java/org/apache/cassandra/io/util/MmappedSegmentedFile.java
@@ -19,30 +19,29 @@ package org.apache.cassandra.io.util;
 
 import java.io.*;
 
-import com.google.common.util.concurrent.RateLimiter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.db.TypeSizes;
 import org.apache.cassandra.io.sstable.format.Version;
-import org.apache.cassandra.utils.JVMStabilityInspector;
 
 public class MmappedSegmentedFile extends SegmentedFile
 {
     private static final Logger logger = 
LoggerFactory.getLogger(MmappedSegmentedFile.class);
 
-    private final MmappedRegions regions;
+    public MmappedSegmentedFile(ChannelProxy channel, long length, 
MmappedRegions regions)
+    {
+        this(channel, new MmapRebufferer(channel, length, regions), length);
+    }
 
-    public MmappedSegmentedFile(ChannelProxy channel, int bufferSize, long 
length, MmappedRegions regions)
+    public MmappedSegmentedFile(ChannelProxy channel, RebuffererFactory 
rebufferer, long length)
     {
-        super(new Cleanup(channel, regions), channel, bufferSize, length);
-        this.regions = regions;
+        super(new Cleanup(channel, rebufferer), channel, rebufferer, length);
     }
 
     private MmappedSegmentedFile(MmappedSegmentedFile copy)
     {
         super(copy);
-        this.regions = copy.regions;
     }
 
     public MmappedSegmentedFile sharedCopy()
@@ -50,49 +49,6 @@ public class MmappedSegmentedFile extends SegmentedFile
         return new MmappedSegmentedFile(this);
     }
 
-    public RandomAccessReader createReader()
-    {
-        return new RandomAccessReader.Builder(channel)
-               .overrideLength(length)
-               .regions(regions)
-               .build();
-    }
-
-    public RandomAccessReader createReader(RateLimiter limiter)
-    {
-        return new RandomAccessReader.Builder(channel)
-               .overrideLength(length)
-               .bufferSize(bufferSize)
-               .regions(regions)
-               .limiter(limiter)
-               .build();
-    }
-
-    private static final class Cleanup extends SegmentedFile.Cleanup
-    {
-        private final MmappedRegions regions;
-
-        Cleanup(ChannelProxy channel, MmappedRegions regions)
-        {
-            super(channel);
-            this.regions = regions;
-        }
-
-        public void tidy()
-        {
-            Throwable err = regions.close(null);
-            if (err != null)
-            {
-                JVMStabilityInspector.inspectThrowable(err);
-
-                // This is not supposed to happen
-                logger.error("Error while closing mmapped regions", err);
-            }
-
-            super.tidy();
-        }
-    }
-
     /**
      * Overrides the default behaviour to create segments of a maximum size.
      */
@@ -110,7 +66,7 @@ public class MmappedSegmentedFile extends SegmentedFile
             long length = overrideLength > 0 ? overrideLength : channel.size();
             updateRegions(channel, length);
 
-            return new MmappedSegmentedFile(channel, bufferSize, length, 
regions.sharedCopy());
+            return new MmappedSegmentedFile(channel, length, 
regions.sharedCopy());
         }
 
         private void updateRegions(ChannelProxy channel, long length)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/30bb255e/src/java/org/apache/cassandra/io/util/RandomAccessReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/RandomAccessReader.java 
b/src/java/org/apache/cassandra/io/util/RandomAccessReader.java
index e6383cf..725b367 100644
--- a/src/java/org/apache/cassandra/io/util/RandomAccessReader.java
+++ b/src/java/org/apache/cassandra/io/util/RandomAccessReader.java
@@ -21,11 +21,13 @@ import java.io.*;
 import java.nio.ByteBuffer;
 import java.nio.ByteOrder;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.primitives.Ints;
 import com.google.common.util.concurrent.RateLimiter;
 
-import org.apache.cassandra.io.FSReadError;
 import org.apache.cassandra.io.compress.BufferType;
+import org.apache.cassandra.io.compress.CompressionMetadata;
+import org.apache.cassandra.io.util.Rebufferer.BufferHolder;
 import org.apache.cassandra.utils.memory.BufferPool;
 
 public class RandomAccessReader extends RebufferingInputStream implements 
FileDataInput
@@ -41,60 +43,22 @@ public class RandomAccessReader extends 
RebufferingInputStream implements FileDa
     //       and because our BufferPool currently has a maximum allocation 
size of this.
     public static final int MAX_BUFFER_SIZE = 1 << 16; // 64k
 
-    // the IO channel to the file, we do not own a reference to this due to
-    // performance reasons (CASSANDRA-9379) so it's up to the owner of the RAR 
to
-    // ensure that the channel stays open and that it is closed afterwards
-    protected final ChannelProxy channel;
-
-    // optional memory mapped regions for the channel
-    protected final MmappedRegions regions;
-
-    // An optional limiter that will throttle the amount of data we read
-    protected final RateLimiter limiter;
-
-    // the file length, this can be overridden at construction to a value 
shorter
-    // than the true length of the file; if so, it acts as an imposed limit on 
reads,
-    // required when opening sstables early not to read past the mark
-    private final long fileLength;
-
-    // the buffer size for buffered readers
-    protected final int bufferSize;
-
-    // the buffer type for buffered readers
-    protected final BufferType bufferType;
-
-    // offset from the beginning of the file
-    protected long bufferOffset;
-
     // offset of the last file mark
     protected long markedPointer;
 
-    protected RandomAccessReader(Builder builder)
-    {
-        super(builder.createBuffer());
+    @VisibleForTesting
+    final Rebufferer rebufferer;
+    BufferHolder bufferHolder = Rebufferer.EMPTY;
 
-        this.channel = builder.channel;
-        this.regions = builder.regions;
-        this.limiter = builder.limiter;
-        this.fileLength = builder.overrideLength <= 0 ? builder.channel.size() 
: builder.overrideLength;
-        this.bufferSize = builder.bufferSize;
-        this.bufferType = builder.bufferType;
-        this.buffer = builder.buffer;
-    }
-
-    protected static ByteBuffer allocateBuffer(int size, BufferType bufferType)
+    protected RandomAccessReader(Rebufferer rebufferer)
     {
-        return BufferPool.get(size, bufferType).order(ByteOrder.BIG_ENDIAN);
+        super(Rebufferer.EMPTY.buffer());
+        this.rebufferer = rebufferer;
     }
 
-    protected void releaseBuffer()
+    public static ByteBuffer allocateBuffer(int size, BufferType bufferType)
     {
-        if (buffer != null)
-        {
-            if (regions == null)
-                BufferPool.put(buffer);
-            buffer = null;
-        }
+        return BufferPool.get(size, bufferType).order(ByteOrder.BIG_ENDIAN);
     }
 
     /**
@@ -105,80 +69,40 @@ public class RandomAccessReader extends 
RebufferingInputStream implements FileDa
         if (isEOF())
             return;
 
-        if (regions == null)
-            reBufferStandard();
-        else
-            reBufferMmap();
-
-        if (limiter != null)
-            limiter.acquire(buffer.remaining());
-
-        assert buffer.order() == ByteOrder.BIG_ENDIAN : "Buffer must have BIG 
ENDIAN byte ordering";
+        reBufferAt(current());
     }
 
-    protected void reBufferStandard()
+    public void reBufferAt(long position)
     {
-        bufferOffset += buffer.position();
-        assert bufferOffset < fileLength;
-
-        buffer.clear();
-        long position = bufferOffset;
-        long limit = bufferOffset;
+        bufferHolder.release();
+        bufferHolder = rebufferer.rebuffer(position);
+        buffer = bufferHolder.buffer();
+        buffer.position(Ints.checkedCast(position - bufferHolder.offset()));
 
-        long pageAligedPos = position & ~4095;
-        // Because the buffer capacity is a multiple of the page size, we read 
less
-        // the first time and then we should read at page boundaries only,
-        // unless the user seeks elsewhere
-        long upperLimit = Math.min(fileLength, pageAligedPos + 
buffer.capacity());
-        buffer.limit((int)(upperLimit - position));
-        while (buffer.hasRemaining() && limit < upperLimit)
-        {
-            int n = channel.read(buffer, position);
-            if (n < 0)
-                throw new FSReadError(new IOException("Unexpected end of 
file"), channel.filePath());
-
-            position += n;
-            limit = bufferOffset + buffer.position();
-        }
-
-        buffer.flip();
-    }
-
-    protected void reBufferMmap()
-    {
-        long position = bufferOffset + buffer.position();
-        assert position < fileLength;
-
-        MmappedRegions.Region region = regions.floor(position);
-        bufferOffset = region.bottom();
-        buffer = region.buffer.duplicate();
-        buffer.position(Ints.checkedCast(position - bufferOffset));
-
-        if (limiter != null && bufferSize < buffer.remaining())
-        { // ensure accurate throttling
-            buffer.limit(buffer.position() + bufferSize);
-        }
+        assert buffer.order() == ByteOrder.BIG_ENDIAN : "Buffer must have BIG 
ENDIAN byte ordering";
     }
 
     @Override
     public long getFilePointer()
     {
+        if (buffer == null)     // closed already
+            return rebufferer.fileLength();
         return current();
     }
 
     protected long current()
     {
-        return bufferOffset + (buffer == null ? 0 : buffer.position());
+        return bufferHolder.offset() + buffer.position();
     }
 
     public String getPath()
     {
-        return channel.filePath();
+        return getChannel().filePath();
     }
 
     public ChannelProxy getChannel()
     {
-        return channel;
+        return rebufferer.channel();
     }
 
     @Override
@@ -242,12 +166,14 @@ public class RandomAccessReader extends 
RebufferingInputStream implements FileDa
     @Override
     public void close()
     {
-           //make idempotent
+        // close needs to be idempotent.
         if (buffer == null)
             return;
 
-        bufferOffset += buffer.position();
-        releaseBuffer();
+        bufferHolder.release();
+        rebufferer.closeReader();
+        buffer = null;
+        bufferHolder = null;
 
         //For performance reasons we don't keep a reference to the file
         //channel so we don't close it
@@ -256,7 +182,7 @@ public class RandomAccessReader extends 
RebufferingInputStream implements FileDa
     @Override
     public String toString()
     {
-        return getClass().getSimpleName() + "(filePath='" + channel + "')";
+        return getClass().getSimpleName() + ':' + rebufferer.toString();
     }
 
     /**
@@ -281,26 +207,17 @@ public class RandomAccessReader extends 
RebufferingInputStream implements FileDa
         if (buffer == null)
             throw new IllegalStateException("Attempted to seek in a closed 
RAR");
 
-        if (newPosition >= length()) // it is save to call length() in 
read-only mode
-        {
-            if (newPosition > length())
-                throw new IllegalArgumentException(String.format("Unable to 
seek to position %d in %s (%d bytes) in read-only mode",
-                                                             newPosition, 
getPath(), length()));
-            buffer.limit(0);
-            bufferOffset = newPosition;
-            return;
-        }
-
+        long bufferOffset = bufferHolder.offset();
         if (newPosition >= bufferOffset && newPosition < bufferOffset + 
buffer.limit())
         {
             buffer.position((int) (newPosition - bufferOffset));
             return;
         }
-        // Set current location to newPosition and clear buffer so reBuffer 
calculates from newPosition
-        bufferOffset = newPosition;
-        buffer.clear();
-        reBuffer();
-        assert current() == newPosition;
+
+        if (newPosition > length())
+            throw new IllegalArgumentException(String.format("Unable to seek 
to position %d in %s (%d bytes) in read-only mode",
+                                                         newPosition, 
getPath(), length()));
+        reBufferAt(newPosition);
     }
 
     /**
@@ -353,7 +270,7 @@ public class RandomAccessReader extends 
RebufferingInputStream implements FileDa
 
     public long length()
     {
-        return fileLength;
+        return rebufferer.fileLength();
     }
 
     public long getPosition()
@@ -361,17 +278,38 @@ public class RandomAccessReader extends 
RebufferingInputStream implements FileDa
         return current();
     }
 
+    public double getCrcCheckChance()
+    {
+        return rebufferer.getCrcCheckChance();
+    }
+
+    protected static Rebufferer instantiateRebufferer(RebuffererFactory 
fileRebufferer, RateLimiter limiter)
+    {
+        Rebufferer rebufferer = fileRebufferer.instantiateRebufferer();
+
+        if (limiter != null)
+            rebufferer = new LimitingRebufferer(rebufferer, limiter, 
MAX_BUFFER_SIZE);
+
+        return rebufferer;
+    }
+
+    public static RandomAccessReader build(SegmentedFile file, RateLimiter 
limiter)
+    {
+        return new 
RandomAccessReader(instantiateRebufferer(file.rebuffererFactory(), limiter));
+    }
+
+    public static Builder builder(ChannelProxy channel)
+    {
+        return new Builder(channel);
+    }
+
     public static class Builder
     {
         // The NIO file channel or an empty channel
         public final ChannelProxy channel;
 
-        // We override the file length when we open sstables early, so that we 
do not
-        // read past the early mark
-        public long overrideLength;
-
         // The size of the buffer for buffered readers
-        public int bufferSize;
+        protected int bufferSize;
 
         // The type of the buffer for buffered readers
         public BufferType bufferType;
@@ -379,20 +317,20 @@ public class RandomAccessReader extends 
RebufferingInputStream implements FileDa
         // The buffer
         public ByteBuffer buffer;
 
+        // An optional limiter that will throttle the amount of data we read
+        public RateLimiter limiter;
+
         // The mmap segments for mmap readers
         public MmappedRegions regions;
 
-        // An optional limiter that will throttle the amount of data we read
-        public RateLimiter limiter;
+        // Compression for compressed readers
+        public CompressionMetadata compression;
 
         public Builder(ChannelProxy channel)
         {
             this.channel = channel;
-            this.overrideLength = -1L;
             this.bufferSize = DEFAULT_BUFFER_SIZE;
             this.bufferType = BufferType.OFF_HEAP;
-            this.regions = null;
-            this.limiter = null;
         }
 
         /** The buffer size is typically already page aligned but if that is 
not the case
@@ -400,38 +338,30 @@ public class RandomAccessReader extends 
RebufferingInputStream implements FileDa
          * buffer size unless we are throttling, in which case we may as well 
read the maximum
          * directly since the intention is to read the full file, see 
CASSANDRA-8630.
          * */
-        private void setBufferSize()
+        private int adjustedBufferSize()
         {
             if (limiter != null)
-            {
-                bufferSize = MAX_BUFFER_SIZE;
-                return;
-            }
-
-            if ((bufferSize & ~4095) != bufferSize)
-            { // should already be a page size multiple but if that's not case 
round it up
-                bufferSize = (bufferSize + 4095) & ~4095;
-            }
+                return MAX_BUFFER_SIZE;
 
-            bufferSize = Math.min(MAX_BUFFER_SIZE, bufferSize);
+            // should already be a page size multiple but if that's not case 
round it up
+            int wholePageSize = (bufferSize + 4095) & ~4095;
+            return Math.min(MAX_BUFFER_SIZE, wholePageSize);
         }
 
-        protected ByteBuffer createBuffer()
+        protected Rebufferer createRebufferer()
         {
-            setBufferSize();
-
-            buffer = regions == null
-                     ? allocateBuffer(bufferSize, bufferType)
-                     : regions.floor(0).buffer.duplicate();
-
-            buffer.limit(0);
-            return buffer;
+            return instantiateRebufferer(chunkReader(), limiter);
         }
 
-        public Builder overrideLength(long overrideLength)
+        public RebuffererFactory chunkReader()
         {
-            this.overrideLength = overrideLength;
-            return this;
+            if (compression != null)
+                return CompressedSegmentedFile.chunkReader(channel, 
compression, regions);
+            if (regions != null)
+                return new MmapRebufferer(channel, -1, regions);
+
+            int adjustedSize = adjustedBufferSize();
+            return new SimpleChunkReader(channel, -1, bufferType, 
adjustedSize);
         }
 
         public Builder bufferSize(int bufferSize)
@@ -455,6 +385,12 @@ public class RandomAccessReader extends 
RebufferingInputStream implements FileDa
             return this;
         }
 
+        public Builder compression(CompressionMetadata metadata)
+        {
+            this.compression = metadata;
+            return this;
+        }
+
         public Builder limiter(RateLimiter limiter)
         {
             this.limiter = limiter;
@@ -463,12 +399,12 @@ public class RandomAccessReader extends 
RebufferingInputStream implements FileDa
 
         public RandomAccessReader build()
         {
-            return new RandomAccessReader(this);
+            return new RandomAccessReader(createRebufferer());
         }
 
         public RandomAccessReader buildWithChannel()
         {
-            return new RandomAccessReaderWithOwnChannel(this);
+            return new RandomAccessReaderWithOwnChannel(createRebufferer());
         }
     }
 
@@ -479,9 +415,9 @@ public class RandomAccessReader extends 
RebufferingInputStream implements FileDa
     // not have a shared channel.
     public static class RandomAccessReaderWithOwnChannel extends 
RandomAccessReader
     {
-        protected RandomAccessReaderWithOwnChannel(Builder builder)
+        protected RandomAccessReaderWithOwnChannel(Rebufferer rebufferer)
         {
-            super(builder);
+            super(rebufferer);
         }
 
         @Override
@@ -493,7 +429,14 @@ public class RandomAccessReader extends 
RebufferingInputStream implements FileDa
             }
             finally
             {
-                channel.close();
+                try
+                {
+                    rebufferer.close();
+                }
+                finally
+                {
+                    getChannel().close();
+                }
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/30bb255e/src/java/org/apache/cassandra/io/util/ReaderFileProxy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/ReaderFileProxy.java 
b/src/java/org/apache/cassandra/io/util/ReaderFileProxy.java
new file mode 100644
index 0000000..3ddb143
--- /dev/null
+++ b/src/java/org/apache/cassandra/io/util/ReaderFileProxy.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.io.util;
+
+/**
+ * Base class for the RandomAccessReader components that implement reading.
+ */
+public interface ReaderFileProxy extends AutoCloseable
+{
+    void close();               // no checked exceptions
+
+    ChannelProxy channel();
+
+    long fileLength();
+
+    /**
+     * Needed for tests. Returns the table's CRC check chance, which is only 
set for compressed tables.
+     */
+    double getCrcCheckChance();
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/cassandra/blob/30bb255e/src/java/org/apache/cassandra/io/util/Rebufferer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/Rebufferer.java 
b/src/java/org/apache/cassandra/io/util/Rebufferer.java
new file mode 100644
index 0000000..e88c7cb
--- /dev/null
+++ b/src/java/org/apache/cassandra/io/util/Rebufferer.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.io.util;
+
+import java.nio.ByteBuffer;
+
+/**
+ * Rebufferer for reading data by a RandomAccessReader.
+ */
+public interface Rebufferer extends ReaderFileProxy
+{
+    /**
+     * Rebuffer (move on or seek to) a given position, and return a buffer 
that can be used there.
+     * The only guarantee about the size of the returned data is that unless 
rebuffering at the end of the file,
+     * the buffer will not be empty and will contain the requested position, 
i.e.
+     * {@code offset <= position < offset + bh.buffer().limit()}, but the 
buffer will not be positioned there.
+     */
+    BufferHolder rebuffer(long position);
+
+    /**
+     * Called when a reader is closed. Should clean up reader-specific data.
+     */
+    void closeReader();
+
+    public interface BufferHolder
+    {
+        /**
+         * Returns a useable buffer (i.e. one whose position and limit can be 
freely modified). Its limit will be set
+         * to the size of the available data in the buffer.
+         * The buffer must be treated as read-only.
+         */
+        ByteBuffer buffer();
+
+        /**
+         * Position in the file of the start of the buffer.
+         */
+        long offset();
+
+        /**
+         * To be called when this buffer is no longer in use. Must be called 
for all BufferHolders, or ChunkCache
+         * will not be able to free blocks.
+         */
+        void release();
+    }
+
+    static final BufferHolder EMPTY = new BufferHolder()
+    {
+        final ByteBuffer EMPTY_BUFFER = ByteBuffer.allocate(0);
+
+        @Override
+        public ByteBuffer buffer()
+        {
+            return EMPTY_BUFFER;
+        }
+
+        @Override
+        public long offset()
+        {
+            return 0;
+        }
+
+        @Override
+        public void release()
+        {
+            // nothing to do
+        }
+    };
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/cassandra/blob/30bb255e/src/java/org/apache/cassandra/io/util/RebuffererFactory.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/RebuffererFactory.java 
b/src/java/org/apache/cassandra/io/util/RebuffererFactory.java
new file mode 100644
index 0000000..ec35f0b
--- /dev/null
+++ b/src/java/org/apache/cassandra/io/util/RebuffererFactory.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.io.util;
+
+/**
+ * Interface for the classes that can be used to instantiate rebufferers over 
a given file.
+ *
+ * These are one of two types:
+ *  - chunk sources (e.g. SimpleReadRebufferer) which instantiate a buffer 
managing rebufferer referencing
+ *    themselves.
+ *  - thread-safe shared rebufferers (e.g. MmapRebufferer) which directly 
return themselves.
+ */
+public interface RebuffererFactory extends ReaderFileProxy
+{
+    Rebufferer instantiateRebufferer();
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/30bb255e/src/java/org/apache/cassandra/io/util/SegmentedFile.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/SegmentedFile.java 
b/src/java/org/apache/cassandra/io/util/SegmentedFile.java
index ab2d291..62e14ba 100644
--- a/src/java/org/apache/cassandra/io/util/SegmentedFile.java
+++ b/src/java/org/apache/cassandra/io/util/SegmentedFile.java
@@ -21,7 +21,6 @@ import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.File;
 import java.io.IOException;
-import java.util.function.Supplier;
 
 import com.google.common.util.concurrent.RateLimiter;
 
@@ -52,27 +51,21 @@ import static 
org.apache.cassandra.utils.Throwables.maybeFail;
 public abstract class SegmentedFile extends SharedCloseableImpl
 {
     public final ChannelProxy channel;
-    public final int bufferSize;
-    public final long length;
 
     // This differs from length for compressed files (but we still need length 
for
     // SegmentIterator because offsets in the file are relative to the 
uncompressed size)
     public final long onDiskLength;
 
     /**
-     * Use getBuilder to get a Builder to construct a SegmentedFile.
+     * Rebufferer to use to construct RandomAccessReaders.
      */
-    SegmentedFile(Cleanup cleanup, ChannelProxy channel, int bufferSize, long 
length)
-    {
-        this(cleanup, channel, bufferSize, length, length);
-    }
+    private final RebuffererFactory rebufferer;
 
-    protected SegmentedFile(Cleanup cleanup, ChannelProxy channel, int 
bufferSize, long length, long onDiskLength)
+    protected SegmentedFile(Cleanup cleanup, ChannelProxy channel, 
RebuffererFactory rebufferer, long onDiskLength)
     {
         super(cleanup);
+        this.rebufferer = rebufferer;
         this.channel = channel;
-        this.bufferSize = bufferSize;
-        this.length = length;
         this.onDiskLength = onDiskLength;
     }
 
@@ -80,8 +73,7 @@ public abstract class SegmentedFile extends 
SharedCloseableImpl
     {
         super(copy);
         channel = copy.channel;
-        bufferSize = copy.bufferSize;
-        length = copy.length;
+        rebufferer = copy.rebufferer;
         onDiskLength = copy.onDiskLength;
     }
 
@@ -90,12 +82,24 @@ public abstract class SegmentedFile extends 
SharedCloseableImpl
         return channel.filePath();
     }
 
+    public long dataLength()
+    {
+        return rebufferer.fileLength();
+    }
+
+    public RebuffererFactory rebuffererFactory()
+    {
+        return rebufferer;
+    }
+
     protected static class Cleanup implements RefCounted.Tidy
     {
         final ChannelProxy channel;
-        protected Cleanup(ChannelProxy channel)
+        final ReaderFileProxy rebufferer;
+        protected Cleanup(ChannelProxy channel, ReaderFileProxy rebufferer)
         {
             this.channel = channel;
+            this.rebufferer = rebufferer;
         }
 
         public String name()
@@ -105,7 +109,14 @@ public abstract class SegmentedFile extends 
SharedCloseableImpl
 
         public void tidy()
         {
-            channel.close();
+            try
+            {
+                channel.close();
+            }
+            finally
+            {
+                rebufferer.close();
+            }
         }
     }
 
@@ -113,19 +124,12 @@ public abstract class SegmentedFile extends 
SharedCloseableImpl
 
     public RandomAccessReader createReader()
     {
-        return new RandomAccessReader.Builder(channel)
-               .overrideLength(length)
-               .bufferSize(bufferSize)
-               .build();
+        return RandomAccessReader.build(this, null);
     }
 
     public RandomAccessReader createReader(RateLimiter limiter)
     {
-        return new RandomAccessReader.Builder(channel)
-               .overrideLength(length)
-               .bufferSize(bufferSize)
-               .limiter(limiter)
-               .build();
+        return RandomAccessReader.build(this, limiter);
     }
 
     public FileDataInput createReader(long position)
@@ -308,7 +312,7 @@ public abstract class SegmentedFile extends 
SharedCloseableImpl
     @Override
     public String toString() {
         return getClass().getSimpleName() + "(path='" + path() + '\'' +
-               ", length=" + length +
+               ", length=" + rebufferer.fileLength() +
                ')';
-}
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/30bb255e/src/java/org/apache/cassandra/io/util/SimpleChunkReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/SimpleChunkReader.java 
b/src/java/org/apache/cassandra/io/util/SimpleChunkReader.java
new file mode 100644
index 0000000..7bfb57b
--- /dev/null
+++ b/src/java/org/apache/cassandra/io/util/SimpleChunkReader.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.io.util;
+
+import java.nio.ByteBuffer;
+
+import org.apache.cassandra.io.compress.BufferType;
+
+class SimpleChunkReader extends AbstractReaderFileProxy implements ChunkReader
+{
+    private final int bufferSize;
+    private final BufferType bufferType;
+
+    public SimpleChunkReader(ChannelProxy channel, long fileLength, BufferType 
bufferType, int bufferSize)
+    {
+        super(channel, fileLength);
+        this.bufferSize = bufferSize;
+        this.bufferType = bufferType;
+    }
+
+    @Override
+    public void readChunk(long position, ByteBuffer buffer)
+    {
+        buffer.clear();
+        channel.read(buffer, position);
+        buffer.flip();
+    }
+
+    @Override
+    public int chunkSize()
+    {
+        return bufferSize;
+    }
+
+    @Override
+    public BufferType preferredBufferType()
+    {
+        return bufferType;
+    }
+
+    @Override
+    public boolean alignmentRequired()
+    {
+        return false;
+    }
+
+    @Override
+    public Rebufferer instantiateRebufferer()
+    {
+        return BufferManagingRebufferer.on(this);
+    }
+
+    @Override
+    public String toString()
+    {
+        return String.format("%s(%s - chunk length %d, data length %d)",
+                             getClass().getSimpleName(),
+                             channel.filePath(),
+                             bufferSize,
+                             fileLength());
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/cassandra/blob/30bb255e/src/java/org/apache/cassandra/metrics/CacheMetrics.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/metrics/CacheMetrics.java 
b/src/java/org/apache/cassandra/metrics/CacheMetrics.java
index 151268b..e623dcb 100644
--- a/src/java/org/apache/cassandra/metrics/CacheMetrics.java
+++ b/src/java/org/apache/cassandra/metrics/CacheMetrics.java
@@ -17,8 +17,6 @@
  */
 package org.apache.cassandra.metrics;
 
-import java.util.concurrent.atomic.AtomicLong;
-
 import com.codahale.metrics.Gauge;
 import com.codahale.metrics.Meter;
 import com.codahale.metrics.RatioGauge;
@@ -56,7 +54,7 @@ public class CacheMetrics
      * @param type Type of Cache to identify metrics.
      * @param cache Cache to measure metrics
      */
-    public CacheMetrics(String type, final ICache cache)
+    public CacheMetrics(String type, final ICache<?, ?> cache)
     {
         MetricNameFactory factory = new DefaultNameFactory("Cache", type);
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/30bb255e/src/java/org/apache/cassandra/metrics/CacheMissMetrics.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/metrics/CacheMissMetrics.java 
b/src/java/org/apache/cassandra/metrics/CacheMissMetrics.java
new file mode 100644
index 0000000..19d61ef
--- /dev/null
+++ b/src/java/org/apache/cassandra/metrics/CacheMissMetrics.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.metrics;
+
+import com.codahale.metrics.Gauge;
+import com.codahale.metrics.Meter;
+import com.codahale.metrics.RatioGauge;
+import com.codahale.metrics.Timer;
+import org.apache.cassandra.cache.CacheSize;
+
+import static org.apache.cassandra.metrics.CassandraMetricsRegistry.Metrics;
+
+/**
+ * Metrics for {@code ICache}.
+ */
+public class CacheMissMetrics
+{
+    /** Cache capacity in bytes */
+    public final Gauge<Long> capacity;
+    /** Total number of cache hits */
+    public final Meter misses;
+    /** Total number of cache requests */
+    public final Meter requests;
+    /** Latency of misses */
+    public final Timer missLatency;
+    /** all time cache hit rate */
+    public final Gauge<Double> hitRate;
+    /** 1m hit rate */
+    public final Gauge<Double> oneMinuteHitRate;
+    /** 5m hit rate */
+    public final Gauge<Double> fiveMinuteHitRate;
+    /** 15m hit rate */
+    public final Gauge<Double> fifteenMinuteHitRate;
+    /** Total size of cache, in bytes */
+    public final Gauge<Long> size;
+    /** Total number of cache entries */
+    public final Gauge<Integer> entries;
+
+    /**
+     * Create metrics for given cache.
+     *
+     * @param type Type of Cache to identify metrics.
+     * @param cache Cache to measure metrics
+     */
+    public CacheMissMetrics(String type, final CacheSize cache)
+    {
+        MetricNameFactory factory = new DefaultNameFactory("Cache", type);
+
+        capacity = Metrics.register(factory.createMetricName("Capacity"), 
(Gauge<Long>) cache::capacity);
+        misses = Metrics.meter(factory.createMetricName("Misses"));
+        requests = Metrics.meter(factory.createMetricName("Requests"));
+        missLatency = Metrics.timer(factory.createMetricName("MissLatency"));
+        hitRate = Metrics.register(factory.createMetricName("HitRate"), new 
RatioGauge()
+        {
+            @Override
+            public Ratio getRatio()
+            {
+                long req = requests.getCount();
+                long mis = misses.getCount();
+                return Ratio.of(req - mis, req);
+            }
+        });
+        oneMinuteHitRate = 
Metrics.register(factory.createMetricName("OneMinuteHitRate"), new RatioGauge()
+        {
+            protected Ratio getRatio()
+            {
+                double req = requests.getOneMinuteRate();
+                double mis = misses.getOneMinuteRate();
+                return Ratio.of(req - mis, req);
+            }
+        });
+        fiveMinuteHitRate = 
Metrics.register(factory.createMetricName("FiveMinuteHitRate"), new RatioGauge()
+        {
+            protected Ratio getRatio()
+            {
+                double req = requests.getFiveMinuteRate();
+                double mis = misses.getFiveMinuteRate();
+                return Ratio.of(req - mis, req);
+            }
+        });
+        fifteenMinuteHitRate = 
Metrics.register(factory.createMetricName("FifteenMinuteHitRate"), new 
RatioGauge()
+        {
+            protected Ratio getRatio()
+            {
+                double req = requests.getFifteenMinuteRate();
+                double mis = misses.getFifteenMinuteRate();
+                return Ratio.of(req - mis, req);
+            }
+        });
+        size = Metrics.register(factory.createMetricName("Size"), 
(Gauge<Long>) cache::weightedSize);
+        entries = Metrics.register(factory.createMetricName("Entries"), 
(Gauge<Integer>) cache::size);
+    }
+
+    public void reset()
+    {
+        requests.mark(-requests.getCount());
+        misses.mark(-misses.getCount());
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/30bb255e/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java 
b/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java
index 55ac7ac..c7d5f98 100644
--- 
a/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java
+++ 
b/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java
@@ -60,7 +60,7 @@ public class CompressedInputStream extends InputStream
     // number of bytes in the buffer that are actually valid
     protected int validBufferBytes = -1;
 
-    private final Checksum checksum;
+    private final ChecksumType checksumType;
 
     // raw checksum bytes
     private final byte[] checksumBytes = new byte[4];
@@ -76,11 +76,11 @@ public class CompressedInputStream extends InputStream
     public CompressedInputStream(InputStream source, CompressionInfo info, 
ChecksumType checksumType, Supplier<Double> crcCheckChanceSupplier)
     {
         this.info = info;
-        this.checksum =  checksumType.newInstance();
         this.buffer = new byte[info.parameters.chunkLength()];
         // buffer is limited to store up to 1024 chunks
         this.dataBuffer = new 
ArrayBlockingQueue<>(Math.min(info.chunks.length, 1024));
         this.crcCheckChanceSupplier = crcCheckChanceSupplier;
+        this.checksumType = checksumType;
 
         new Thread(new Reader(source, info, dataBuffer)).start();
     }
@@ -122,14 +122,11 @@ public class CompressedInputStream extends InputStream
         // validate crc randomly
         if (this.crcCheckChanceSupplier.get() > 
ThreadLocalRandom.current().nextDouble())
         {
-            checksum.update(compressed, 0, compressed.length - 
checksumBytes.length);
+            int checksum = (int) checksumType.of(compressed, 0, 
compressed.length - checksumBytes.length);
 
             System.arraycopy(compressed, compressed.length - 
checksumBytes.length, checksumBytes, 0, checksumBytes.length);
-            if (Ints.fromByteArray(checksumBytes) != (int) checksum.getValue())
+            if (Ints.fromByteArray(checksumBytes) != checksum)
                 throw new IOException("CRC unmatched");
-
-            // reset checksum object back to the original (blank) state
-            checksum.reset();
         }
 
         // buffer offset is always aligned

http://git-wip-us.apache.org/repos/asf/cassandra/blob/30bb255e/src/java/org/apache/cassandra/tools/NodeProbe.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/NodeProbe.java 
b/src/java/org/apache/cassandra/tools/NodeProbe.java
index fb6e1fb..2118308 100644
--- a/src/java/org/apache/cassandra/tools/NodeProbe.java
+++ b/src/java/org/apache/cassandra/tools/NodeProbe.java
@@ -1179,9 +1179,18 @@ public class NodeProbe implements AutoCloseable
                             
CassandraMetricsRegistry.JmxGaugeMBean.class).getValue();
                 case "Requests":
                 case "Hits":
+                case "Misses":
                     return JMX.newMBeanProxy(mbeanServerConn,
                             new 
ObjectName("org.apache.cassandra.metrics:type=Cache,scope=" + cacheType + 
",name=" + metricName),
                             
CassandraMetricsRegistry.JmxMeterMBean.class).getCount();
+                case "MissLatency":
+                    return JMX.newMBeanProxy(mbeanServerConn,
+                            new 
ObjectName("org.apache.cassandra.metrics:type=Cache,scope=" + cacheType + 
",name=" + metricName),
+                            
CassandraMetricsRegistry.JmxTimerMBean.class).getMean();
+                case "MissLatencyUnit":
+                    return JMX.newMBeanProxy(mbeanServerConn,
+                            new 
ObjectName("org.apache.cassandra.metrics:type=Cache,scope=" + cacheType + 
",name=MissLatency"),
+                            
CassandraMetricsRegistry.JmxTimerMBean.class).getDurationUnit();
                 default:
                     throw new RuntimeException("Unknown cache metric name.");
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/30bb255e/src/java/org/apache/cassandra/tools/nodetool/Info.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/nodetool/Info.java 
b/src/java/org/apache/cassandra/tools/nodetool/Info.java
index 268d9df..c37f3b8 100644
--- a/src/java/org/apache/cassandra/tools/nodetool/Info.java
+++ b/src/java/org/apache/cassandra/tools/nodetool/Info.java
@@ -117,6 +117,28 @@ public class Info extends NodeToolCmd
                 probe.getCacheMetric("CounterCache", "HitRate"),
                 cacheService.getCounterCacheSavePeriodInSeconds());
 
+        // Chunk Cache: Hits, Requests, RecentHitRate, SavePeriodInSeconds
+        try
+        {
+            System.out.printf("%-23s: entries %d, size %s, capacity %s, %d 
misses, %d requests, %.3f recent hit rate, %.3f %s miss latency%n",
+                    "Chunk Cache",
+                    probe.getCacheMetric("ChunkCache", "Entries"),
+                    FileUtils.stringifyFileSize((long) 
probe.getCacheMetric("ChunkCache", "Size")),
+                    FileUtils.stringifyFileSize((long) 
probe.getCacheMetric("ChunkCache", "Capacity")),
+                    probe.getCacheMetric("ChunkCache", "Misses"),
+                    probe.getCacheMetric("ChunkCache", "Requests"),
+                    probe.getCacheMetric("ChunkCache", "HitRate"),
+                    probe.getCacheMetric("ChunkCache", "MissLatency"),
+                    probe.getCacheMetric("ChunkCache", "MissLatencyUnit"));
+        }
+        catch (RuntimeException e)
+        {
+            if (!(e.getCause() instanceof InstanceNotFoundException))
+                throw e;
+
+            // Chunk cache is not on.
+        }
+
         // check if node is already joined, before getting tokens, since it 
throws exception if not.
         if (probe.isJoined())
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/30bb255e/src/java/org/apache/cassandra/utils/ChecksumType.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/ChecksumType.java 
b/src/java/org/apache/cassandra/utils/ChecksumType.java
index c9a1eb8..3fa245b 100644
--- a/src/java/org/apache/cassandra/utils/ChecksumType.java
+++ b/src/java/org/apache/cassandra/utils/ChecksumType.java
@@ -24,7 +24,7 @@ import java.util.zip.Adler32;
 
 public enum ChecksumType
 {
-    Adler32()
+    Adler32
     {
 
         @Override
@@ -40,7 +40,7 @@ public enum ChecksumType
         }
 
     },
-    CRC32()
+    CRC32
     {
 
         @Override
@@ -58,6 +58,28 @@ public enum ChecksumType
     };
 
     public abstract Checksum newInstance();
-
     public abstract void update(Checksum checksum, ByteBuffer buf);
+
+    private ThreadLocal<Checksum> instances = 
ThreadLocal.withInitial(this::newInstance);
+
+    public Checksum threadLocalInstance()
+    {
+        return instances.get();
+    }
+
+    public long of(ByteBuffer buf)
+    {
+        Checksum checksum = instances.get();
+        checksum.reset();
+        update(checksum, buf);
+        return checksum.getValue();
+    }
+
+    public long of(byte[] data, int off, int len)
+    {
+        Checksum checksum = instances.get();
+        checksum.reset();
+        checksum.update(data, off, len);
+        return checksum.getValue();
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/30bb255e/src/java/org/apache/cassandra/utils/memory/BufferPool.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/memory/BufferPool.java 
b/src/java/org/apache/cassandra/utils/memory/BufferPool.java
index 38c008d..ad2404f 100644
--- a/src/java/org/apache/cassandra/utils/memory/BufferPool.java
+++ b/src/java/org/apache/cassandra/utils/memory/BufferPool.java
@@ -46,7 +46,7 @@ import org.apache.cassandra.utils.concurrent.Ref;
 public class BufferPool
 {
     /** The size of a page aligned buffer, 64KiB */
-    static final int CHUNK_SIZE = 64 << 10;
+    public static final int CHUNK_SIZE = 64 << 10;
 
     @VisibleForTesting
     public static long MEMORY_USAGE_THRESHOLD = 
DatabaseDescriptor.getFileCacheSizeInMB() * 1024L * 1024L;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/30bb255e/test/long/org/apache/cassandra/cql3/CachingBench.java
----------------------------------------------------------------------
diff --git a/test/long/org/apache/cassandra/cql3/CachingBench.java 
b/test/long/org/apache/cassandra/cql3/CachingBench.java
new file mode 100644
index 0000000..370b3ff
--- /dev/null
+++ b/test/long/org/apache/cassandra/cql3/CachingBench.java
@@ -0,0 +1,375 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.cql3;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Predicate;
+
+import com.google.common.collect.Iterables;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import junit.framework.Assert;
+import org.apache.cassandra.config.Config.CommitLogSync;
+import org.apache.cassandra.config.Config.DiskAccessMode;
+import org.apache.cassandra.cache.ChunkCache;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.compaction.CompactionManager;
+import org.apache.cassandra.db.rows.Row;
+import org.apache.cassandra.db.rows.Unfiltered;
+import org.apache.cassandra.db.rows.UnfilteredRowIterator;
+import org.apache.cassandra.io.sstable.ISSTableScanner;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.utils.FBUtilities;
+
+public class CachingBench extends CQLTester
+{
+    private static final String STRATEGY = "LeveledCompactionStrategy";
+
+    private static final int DEL_SECTIONS = 1000;
+    private static final int FLUSH_FREQ = 10000;
+    private static final int SCAN_FREQUENCY_INV = 12000;
+    static final int COUNT = 29000;
+    static final int ITERS = 9;
+
+    static final int KEY_RANGE = 30;
+    static final int CLUSTERING_RANGE = 210000;
+
+    static final int EXTRA_SIZE = 1025;
+    static final boolean CONCURRENT_COMPACTIONS = true;
+
+    // The name of this method is important!
+    // CommitLog settings must be applied before CQLTester sets up; by using 
the same name as its @BeforeClass method we
+    // are effectively overriding it.
+    @BeforeClass
+    public static void setUpClass()
+    {
+        DatabaseDescriptor.setCommitLogSync(CommitLogSync.periodic);
+        DatabaseDescriptor.setCommitLogSyncPeriod(100);
+        CQLTester.setUpClass();
+    }
+    
+    String hashQuery;
+
+    @Before
+    public void before() throws Throwable
+    {
+        createTable("CREATE TABLE %s(" +
+                    "  key int," +
+                    "  column int," +
+                    "  data int," +
+                    "  extra text," +
+                    "  PRIMARY KEY(key, column)" +
+                    ")"
+                   );
+
+        String hashIFunc = parseFunctionName(createFunction(KEYSPACE, "int, 
int",
+                " CREATE FUNCTION %s (state int, val int)" +
+                " CALLED ON NULL INPUT" +
+                " RETURNS int" +
+                " LANGUAGE java" +
+                " AS 'return val != null ? state * 17 + val : state;'")).name;
+        String hashTFunc = parseFunctionName(createFunction(KEYSPACE, "int, 
text",
+                " CREATE FUNCTION %s (state int, val text)" +
+                " CALLED ON NULL INPUT" +
+                " RETURNS int" +
+                " LANGUAGE java" +
+                " AS 'return val != null ? state * 17 + val.hashCode() : 
state;'")).name;
+
+        String hashInt = createAggregate(KEYSPACE, "int",
+                " CREATE AGGREGATE %s (int)" +
+                " SFUNC " + hashIFunc +
+                " STYPE int" +
+                " INITCOND 1");
+        String hashText = createAggregate(KEYSPACE, "text",
+                " CREATE AGGREGATE %s (text)" +
+                " SFUNC " + hashTFunc +
+                " STYPE int" +
+                " INITCOND 1");
+
+        hashQuery = String.format("SELECT count(column), %s(key), %s(column), 
%s(data), %s(extra), avg(key), avg(column), avg(data) FROM %%s",
+                                  hashInt, hashInt, hashInt, hashText);
+    }
+    AtomicLong id = new AtomicLong();
+    long compactionTimeNanos = 0;
+
+    void pushData(Random rand, int count) throws Throwable
+    {
+        for (int i = 0; i < count; ++i)
+        {
+            long ii = id.incrementAndGet();
+            if (ii % 1000 == 0)
+                System.out.print('.');
+            int key = rand.nextInt(KEY_RANGE);
+            int column = rand.nextInt(CLUSTERING_RANGE);
+            execute("INSERT INTO %s (key, column, data, extra) VALUES (?, ?, 
?, ?)", key, column, (int) ii, genExtra(rand));
+            maybeCompact(ii);
+        }
+    }
+
+    private String genExtra(Random rand)
+    {
+        StringBuilder builder = new StringBuilder(EXTRA_SIZE);
+        for (int i = 0; i < EXTRA_SIZE; ++i)
+            builder.append((char) ('a' + rand.nextInt('z' - 'a' + 1)));
+        return builder.toString();
+    }
+
+    void readAndDelete(Random rand, int count) throws Throwable
+    {
+        for (int i = 0; i < count; ++i)
+        {
+            int key;
+            UntypedResultSet res;
+            long ii = id.incrementAndGet();
+            if (ii % 1000 == 0)
+                System.out.print('-');
+            if (rand.nextInt(SCAN_FREQUENCY_INV) != 1)
+            {
+                do
+                {
+                    key = rand.nextInt(KEY_RANGE);
+                    long cid = rand.nextInt(DEL_SECTIONS);
+                    int cstart = (int) (cid * CLUSTERING_RANGE / DEL_SECTIONS);
+                    int cend = (int) ((cid + 1) * CLUSTERING_RANGE / 
DEL_SECTIONS);
+                    res = execute("SELECT column FROM %s WHERE key = ? AND 
column >= ? AND column < ? LIMIT 1", key, cstart, cend);
+                } while (res.size() == 0);
+                UntypedResultSet.Row r = Iterables.get(res, 
rand.nextInt(res.size()));
+                int clustering = r.getInt("column");
+                execute("DELETE FROM %s WHERE key = ? AND column = ?", key, 
clustering);
+            }
+            else
+            {
+                execute(hashQuery);
+            }
+            maybeCompact(ii);
+        }
+    }
+
+    private void maybeCompact(long ii)
+    {
+        if (ii % FLUSH_FREQ == 0)
+        {
+            System.out.print("F");
+            flush();
+            if (ii % (FLUSH_FREQ * 10) == 0)
+            {
+                System.out.println("C");
+                long startTime = System.nanoTime();
+                
getCurrentColumnFamilyStore().enableAutoCompaction(!CONCURRENT_COMPACTIONS);
+                long endTime = System.nanoTime();
+                compactionTimeNanos += endTime - startTime;
+                getCurrentColumnFamilyStore().disableAutoCompaction();
+            }
+        }
+    }
+
+    public void testSetup(String compactionClass, String compressorClass, 
DiskAccessMode mode, boolean cacheEnabled) throws Throwable
+    {
+        id.set(0);
+        compactionTimeNanos = 0;
+        ChunkCache.instance.enable(cacheEnabled);
+        DatabaseDescriptor.setDiskAccessMode(mode);
+        alterTable("ALTER TABLE %s WITH compaction = { 'class' :  '" + 
compactionClass + "'  };");
+        alterTable("ALTER TABLE %s WITH compression = { 'sstable_compression' 
: '" + compressorClass + "'  };");
+        ColumnFamilyStore cfs = getCurrentColumnFamilyStore();
+        cfs.disableAutoCompaction();
+
+        long onStartTime = System.currentTimeMillis();
+        ExecutorService es = 
Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
+        List<Future<?>> tasks = new ArrayList<>();
+        for (int ti = 0; ti < 1; ++ti)
+        {
+            Random rand = new Random(ti);
+            tasks.add(es.submit(() -> 
+            {
+                for (int i = 0; i < ITERS; ++i)
+                    try
+                    {
+                        pushData(rand, COUNT);
+                        readAndDelete(rand, COUNT / 3);
+                    }
+                    catch (Throwable e)
+                    {
+                        throw new AssertionError(e);
+                    }
+            }));
+        }
+        for (Future<?> task : tasks)
+            task.get();
+
+        flush();
+        long onEndTime = System.currentTimeMillis();
+        int startRowCount = countRows(cfs);
+        int startTombCount = countTombstoneMarkers(cfs);
+        int startRowDeletions = countRowDeletions(cfs);
+        int startTableCount = cfs.getLiveSSTables().size();
+        long startSize = SSTableReader.getTotalBytes(cfs.getLiveSSTables());
+        System.out.println("\nCompession: " + 
cfs.getCompressionParameters().toString());
+        System.out.println("Reader " + 
cfs.getLiveSSTables().iterator().next().getFileDataInput(0).toString());
+        if (cacheEnabled)
+            System.out.format("Cache size %s requests %,d hit ratio %f\n",
+                
FileUtils.stringifyFileSize(ChunkCache.instance.metrics.size.getValue()),
+                ChunkCache.instance.metrics.requests.getCount(),
+                ChunkCache.instance.metrics.hitRate.getValue());
+        else
+        {
+            Assert.assertTrue("Chunk cache had requests: " + 
ChunkCache.instance.metrics.requests.getCount(), 
ChunkCache.instance.metrics.requests.getCount() < COUNT);
+            System.out.println("Cache disabled");
+        }
+        System.out.println(String.format("Operations completed in %.3fs", 
(onEndTime - onStartTime) * 1e-3));
+        if (!CONCURRENT_COMPACTIONS)
+            System.out.println(String.format(", out of which %.3f for 
non-concurrent compaction", compactionTimeNanos * 1e-9));
+        else
+            System.out.println();
+
+        String hashesBefore = getHashes();
+        long startTime = System.currentTimeMillis();
+        CompactionManager.instance.performMaximal(cfs, true);
+        long endTime = System.currentTimeMillis();
+
+        int endRowCount = countRows(cfs);
+        int endTombCount = countTombstoneMarkers(cfs);
+        int endRowDeletions = countRowDeletions(cfs);
+        int endTableCount = cfs.getLiveSSTables().size();
+        long endSize = SSTableReader.getTotalBytes(cfs.getLiveSSTables());
+
+        System.out.println(String.format("Major compaction completed in %.3fs",
+                (endTime - startTime) * 1e-3));
+        System.out.println(String.format("At start: %,12d tables %12s %,12d 
rows %,12d deleted rows %,12d tombstone markers",
+                startTableCount, FileUtils.stringifyFileSize(startSize), 
startRowCount, startRowDeletions, startTombCount));
+        System.out.println(String.format("At end:   %,12d tables %12s %,12d 
rows %,12d deleted rows %,12d tombstone markers",
+                endTableCount, FileUtils.stringifyFileSize(endSize), 
endRowCount, endRowDeletions, endTombCount));
+        String hashesAfter = getHashes();
+
+        Assert.assertEquals(hashesBefore, hashesAfter);
+    }
+
+    private String getHashes() throws Throwable
+    {
+        long startTime = System.currentTimeMillis();
+        String hashes = Arrays.toString(getRows(execute(hashQuery))[0]);
+        long endTime = System.currentTimeMillis();
+        System.out.println(String.format("Hashes: %s, retrieved in %.3fs", 
hashes, (endTime - startTime) * 1e-3));
+        return hashes;
+    }
+
+    @Test
+    public void testWarmup() throws Throwable
+    {
+        testSetup(STRATEGY, "LZ4Compressor", DiskAccessMode.mmap, false);
+    }
+
+    @Test
+    public void testLZ4CachedMmap() throws Throwable
+    {
+        testSetup(STRATEGY, "LZ4Compressor", DiskAccessMode.mmap, true);
+    }
+
+    @Test
+    public void testLZ4CachedStandard() throws Throwable
+    {
+        testSetup(STRATEGY, "LZ4Compressor", DiskAccessMode.standard, true);
+    }
+
+    @Test
+    public void testLZ4UncachedMmap() throws Throwable
+    {
+        testSetup(STRATEGY, "LZ4Compressor", DiskAccessMode.mmap, false);
+    }
+
+    @Test
+    public void testLZ4UncachedStandard() throws Throwable
+    {
+        testSetup(STRATEGY, "LZ4Compressor", DiskAccessMode.standard, false);
+    }
+
+    @Test
+    public void testCachedStandard() throws Throwable
+    {
+        testSetup(STRATEGY, "", DiskAccessMode.standard, true);
+    }
+
+    @Test
+    public void testUncachedStandard() throws Throwable
+    {
+        testSetup(STRATEGY, "", DiskAccessMode.standard, false);
+    }
+
+    @Test
+    public void testMmapped() throws Throwable
+    {
+        testSetup(STRATEGY, "", DiskAccessMode.mmap, false /* doesn't matter 
*/);
+    }
+
+    int countTombstoneMarkers(ColumnFamilyStore cfs)
+    {
+        return count(cfs, x -> x.isRangeTombstoneMarker());
+    }
+
+    int countRowDeletions(ColumnFamilyStore cfs)
+    {
+        return count(cfs, x -> x.isRow() && !((Row) x).deletion().isLive());
+    }
+
+    int countRows(ColumnFamilyStore cfs)
+    {
+        int nowInSec = FBUtilities.nowInSeconds();
+        return count(cfs, x -> x.isRow() && ((Row) x).hasLiveData(nowInSec));
+    }
+
+    private int count(ColumnFamilyStore cfs, Predicate<Unfiltered> predicate)
+    {
+        int count = 0;
+        for (SSTableReader reader : cfs.getLiveSSTables())
+            count += count(reader, predicate);
+        return count;
+    }
+
+    int count(SSTableReader reader, Predicate<Unfiltered> predicate)
+    {
+        int instances = 0;
+        try (ISSTableScanner partitions = reader.getScanner())
+        {
+            while (partitions.hasNext())
+            {
+                try (UnfilteredRowIterator iter = partitions.next())
+                {
+                    while (iter.hasNext())
+                    {
+                        Unfiltered atom = iter.next();
+                        if (predicate.test(atom))
+                            ++instances;
+                    }
+                }
+            }
+        }
+        return instances;
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/30bb255e/test/unit/org/apache/cassandra/cql3/CQLTester.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/CQLTester.java 
b/test/unit/org/apache/cassandra/cql3/CQLTester.java
index 1b36c03..1e7d05f 100644
--- a/test/unit/org/apache/cassandra/cql3/CQLTester.java
+++ b/test/unit/org/apache/cassandra/cql3/CQLTester.java
@@ -108,9 +108,6 @@ public abstract class CQLTester
         }
         PROTOCOL_VERSIONS = builder.build();
 
-        // Once per-JVM is enough
-        prepareServer();
-
         nativeAddr = InetAddress.getLoopbackAddress();
 
         try
@@ -230,6 +227,9 @@ public abstract class CQLTester
             DatabaseDescriptor.setRowCacheSizeInMB(ROW_CACHE_SIZE_IN_MB);
 
         
StorageService.instance.setPartitionerUnsafe(Murmur3Partitioner.instance);
+
+        // Once per-JVM is enough
+        prepareServer();
     }
 
     @AfterClass

http://git-wip-us.apache.org/repos/asf/cassandra/blob/30bb255e/test/unit/org/apache/cassandra/cql3/selection/SelectionColumnMappingTest.java
----------------------------------------------------------------------
diff --git 
a/test/unit/org/apache/cassandra/cql3/selection/SelectionColumnMappingTest.java 
b/test/unit/org/apache/cassandra/cql3/selection/SelectionColumnMappingTest.java
index 3c83da7..b6f8ada 100644
--- 
a/test/unit/org/apache/cassandra/cql3/selection/SelectionColumnMappingTest.java
+++ 
b/test/unit/org/apache/cassandra/cql3/selection/SelectionColumnMappingTest.java
@@ -34,6 +34,8 @@ public class SelectionColumnMappingTest extends CQLTester
     public static void setUpClass()
     {
         
DatabaseDescriptor.setPartitionerUnsafe(ByteOrderedPartitioner.instance);
+
+        prepareServer();
     }
 
     @Test

Reply via email to