Repository: cassandra Updated Branches: refs/heads/trunk ea2ee3703 -> aedce5fc6
Fix regression with compressed reader performance due to no pooling and excessive mapping/unmapping patch by benedict; reviewed by tjake for CASSANDRA-9240 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/aedce5fc Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/aedce5fc Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/aedce5fc Branch: refs/heads/trunk Commit: aedce5fc6ba46ca734e91190cfaaeb23ba47a846 Parents: ea2ee37 Author: Benedict Elliott Smith <[email protected]> Authored: Thu May 7 11:31:08 2015 +0100 Committer: Benedict Elliott Smith <[email protected]> Committed: Thu May 7 11:31:08 2015 +0100 ---------------------------------------------------------------------- .../compress/CompressedRandomAccessReader.java | 94 ++++++-------------- .../io/compress/CompressedThrottledReader.java | 9 +- .../io/compress/DeflateCompressor.java | 7 +- .../cassandra/io/compress/LZ4Compressor.java | 6 +- .../cassandra/io/compress/SnappyCompressor.java | 2 + .../io/util/CompressedPoolingSegmentedFile.java | 39 ++++++-- .../io/util/CompressedSegmentedFile.java | 71 ++++++++++++++- .../cassandra/io/util/ICompressedFile.java | 5 ++ .../cassandra/io/util/RandomAccessReader.java | 4 +- .../apache/cassandra/io/util/SegmentedFile.java | 2 +- 10 files changed, 152 insertions(+), 87 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/aedce5fc/src/java/org/apache/cassandra/io/compress/CompressedRandomAccessReader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/compress/CompressedRandomAccessReader.java b/src/java/org/apache/cassandra/io/compress/CompressedRandomAccessReader.java index edf8c68..1febe37 100644 --- a/src/java/org/apache/cassandra/io/compress/CompressedRandomAccessReader.java +++ b/src/java/org/apache/cassandra/io/compress/CompressedRandomAccessReader.java @@ -42,17 +42,23 @@ import org.apache.cassandra.utils.FBUtilities; */ public class CompressedRandomAccessReader extends RandomAccessReader { - private static final boolean useMmap = DatabaseDescriptor.getDiskAccessMode() == Config.DiskAccessMode.mmap; - public static CompressedRandomAccessReader open(ChannelProxy channel, CompressionMetadata metadata) { - return open(channel, metadata, null); + try + { + return new CompressedRandomAccessReader(channel, metadata, null); + } + catch (FileNotFoundException e) + { + throw new RuntimeException(e); + } } - public static CompressedRandomAccessReader open(ChannelProxy channel, CompressionMetadata metadata, CompressedPoolingSegmentedFile owner) + + public static CompressedRandomAccessReader open(ICompressedFile file) { try { - return new CompressedRandomAccessReader(channel, metadata, owner); + return new CompressedRandomAccessReader(file.channel(), file.getMetadata(), file); } catch (FileNotFoundException e) { @@ -60,9 +66,7 @@ public class CompressedRandomAccessReader extends RandomAccessReader } } - - private TreeMap<Long, MappedByteBuffer> chunkSegments; - private int MAX_SEGMENT_SIZE = Integer.MAX_VALUE; + private final TreeMap<Long, MappedByteBuffer> chunkSegments; private final CompressionMetadata metadata; @@ -75,61 +79,24 @@ public class CompressedRandomAccessReader extends RandomAccessReader // raw checksum bytes private ByteBuffer checksumBytes; - protected CompressedRandomAccessReader(ChannelProxy channel, CompressionMetadata metadata, PoolingSegmentedFile owner) throws FileNotFoundException + protected CompressedRandomAccessReader(ChannelProxy channel, CompressionMetadata metadata, ICompressedFile file) throws FileNotFoundException { - super(channel, metadata.chunkLength(), metadata.compressedFileLength, metadata.compressor().useDirectOutputByteBuffers(), owner); + super(channel, metadata.chunkLength(), metadata.compressedFileLength, metadata.compressor().useDirectOutputByteBuffers(), file instanceof PoolingSegmentedFile ? (PoolingSegmentedFile) file : null); this.metadata = metadata; checksum = new Adler32(); - if (!useMmap) + chunkSegments = file == null ? null : file.chunkSegments(); + if (chunkSegments == null) { - compressed = ByteBuffer.wrap(new byte[metadata.compressor().initialCompressedBufferLength(metadata.chunkLength())]); + compressed = super.allocateBuffer(metadata.compressor().initialCompressedBufferLength(metadata.chunkLength()), metadata.compressor().useDirectOutputByteBuffers()); checksumBytes = ByteBuffer.wrap(new byte[4]); } - else - { - try - { - createMappedSegments(); - } - catch (IOException e) - { - throw new IOError(e); - } - } - } - - private void createMappedSegments() throws IOException - { - chunkSegments = new TreeMap<>(); - long offset = 0; - long lastSegmentOffset = 0; - long segmentSize = 0; - - while (offset < metadata.dataLength) - { - CompressionMetadata.Chunk chunk = metadata.chunkFor(offset); - - //Reached a new mmap boundary - if (segmentSize + chunk.length + 4 > MAX_SEGMENT_SIZE) - { - chunkSegments.put(lastSegmentOffset, channel.map(FileChannel.MapMode.READ_ONLY, lastSegmentOffset, segmentSize)); - lastSegmentOffset += segmentSize; - segmentSize = 0; - } - - segmentSize += chunk.length + 4; //checksum - offset += metadata.chunkLength(); - } - - if (segmentSize > 0) - chunkSegments.put(lastSegmentOffset, channel.map(FileChannel.MapMode.READ_ONLY, lastSegmentOffset, segmentSize)); } protected ByteBuffer allocateBuffer(int bufferSize, boolean useDirect) { assert Integer.bitCount(bufferSize) == 1; - return useMmap && useDirect + return useDirect ? ByteBuffer.allocateDirect(bufferSize) : ByteBuffer.allocate(bufferSize); } @@ -138,16 +105,9 @@ public class CompressedRandomAccessReader extends RandomAccessReader public void deallocate() { super.deallocate(); - - if (chunkSegments != null) - { - for (Map.Entry<Long, MappedByteBuffer> entry : chunkSegments.entrySet()) - { - FileUtils.clean(entry.getValue()); - } - } - - chunkSegments = null; + if (compressed != null) + FileUtils.clean(compressed); + compressed = null; } private void reBufferStandard() @@ -175,7 +135,7 @@ public class CompressedRandomAccessReader extends RandomAccessReader int decompressedBytes; try { - decompressedBytes = metadata.compressor().uncompress(compressed.array(), 0, chunk.length, buffer.array(), 0); + decompressedBytes = metadata.compressor().uncompress(compressed, buffer); buffer.limit(decompressedBytes); } catch (IOException e) @@ -186,8 +146,8 @@ public class CompressedRandomAccessReader extends RandomAccessReader if (metadata.parameters.getCrcCheckChance() > ThreadLocalRandom.current().nextDouble()) { - - checksum.update(compressed.array(), 0, chunk.length); + compressed.position(0); + FBUtilities.directCheckSum(checksum, compressed); if (checksum(chunk) != (int) checksum.getValue()) throw new CorruptBlockException(getPath(), chunk); @@ -226,7 +186,7 @@ public class CompressedRandomAccessReader extends RandomAccessReader Map.Entry<Long, MappedByteBuffer> entry = chunkSegments.floorEntry(chunk.offset); long segmentOffset = entry.getKey(); int chunkOffset = Ints.checkedCast(chunk.offset - segmentOffset); - MappedByteBuffer compressedChunk = entry.getValue(); + ByteBuffer compressedChunk = entry.getValue().duplicate(); compressedChunk.position(chunkOffset); compressedChunk.limit(chunkOffset + chunk.length); @@ -284,7 +244,7 @@ public class CompressedRandomAccessReader extends RandomAccessReader @Override protected void reBuffer() { - if (useMmap) + if (chunkSegments != null) { reBufferMmap(); } @@ -305,7 +265,7 @@ public class CompressedRandomAccessReader extends RandomAccessReader public int getTotalBufferSize() { - return super.getTotalBufferSize() + (useMmap ? 0 : compressed.capacity()); + return super.getTotalBufferSize() + (chunkSegments != null ? 0 : compressed.capacity()); } @Override http://git-wip-us.apache.org/repos/asf/cassandra/blob/aedce5fc/src/java/org/apache/cassandra/io/compress/CompressedThrottledReader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/compress/CompressedThrottledReader.java b/src/java/org/apache/cassandra/io/compress/CompressedThrottledReader.java index 63727d8..a29129c 100644 --- a/src/java/org/apache/cassandra/io/compress/CompressedThrottledReader.java +++ b/src/java/org/apache/cassandra/io/compress/CompressedThrottledReader.java @@ -26,14 +26,15 @@ import java.io.FileNotFoundException; import com.google.common.util.concurrent.RateLimiter; import org.apache.cassandra.io.util.ChannelProxy; +import org.apache.cassandra.io.util.ICompressedFile; public class CompressedThrottledReader extends CompressedRandomAccessReader { private final RateLimiter limiter; - public CompressedThrottledReader(ChannelProxy channel, CompressionMetadata metadata, RateLimiter limiter) throws FileNotFoundException + public CompressedThrottledReader(ChannelProxy channel, CompressionMetadata metadata, ICompressedFile file, RateLimiter limiter) throws FileNotFoundException { - super(channel, metadata, null); + super(channel, metadata, file); this.limiter = limiter; } @@ -43,11 +44,11 @@ public class CompressedThrottledReader extends CompressedRandomAccessReader super.reBuffer(); } - public static CompressedThrottledReader open(ChannelProxy channel, CompressionMetadata metadata, RateLimiter limiter) + public static CompressedThrottledReader open(ICompressedFile file, RateLimiter limiter) { try { - return new CompressedThrottledReader(channel, metadata, limiter); + return new CompressedThrottledReader(file.channel(), file.getMetadata(), file, limiter); } catch (FileNotFoundException e) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/aedce5fc/src/java/org/apache/cassandra/io/compress/DeflateCompressor.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/compress/DeflateCompressor.java b/src/java/org/apache/cassandra/io/compress/DeflateCompressor.java index a88e4d2..833c375 100644 --- a/src/java/org/apache/cassandra/io/compress/DeflateCompressor.java +++ b/src/java/org/apache/cassandra/io/compress/DeflateCompressor.java @@ -122,13 +122,14 @@ public class DeflateCompressor implements ICompressor } } - public int uncompress(ByteBuffer input_, ByteBuffer output) throws IOException + public int uncompress(ByteBuffer input, ByteBuffer output) throws IOException { if (!output.hasArray()) throw new IllegalArgumentException("DeflateCompressor doesn't work with direct byte buffers"); - byte[] input = ByteBufferUtil.getArray(input_); - return uncompress(input, 0, input.length, output.array(), output.arrayOffset() + output.position()); + if (input.hasArray()) + return uncompress(input.array(), input.arrayOffset() + input.position(), input.remaining(), output.array(), output.arrayOffset() + output.position()); + return uncompress(ByteBufferUtil.getArray(input), 0, input.remaining(), output.array(), output.arrayOffset() + output.position()); } public boolean useDirectOutputByteBuffers() http://git-wip-us.apache.org/repos/asf/cassandra/blob/aedce5fc/src/java/org/apache/cassandra/io/compress/LZ4Compressor.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/compress/LZ4Compressor.java b/src/java/org/apache/cassandra/io/compress/LZ4Compressor.java index ab10a00..9d54048 100644 --- a/src/java/org/apache/cassandra/io/compress/LZ4Compressor.java +++ b/src/java/org/apache/cassandra/io/compress/LZ4Compressor.java @@ -83,6 +83,7 @@ public class LZ4Compressor implements ICompressor | ((input[inputOffset + 1] & 0xFF) << 8) | ((input[inputOffset + 2] & 0xFF) << 16) | ((input[inputOffset + 3] & 0xFF) << 24); + final int compressedLength; try { @@ -104,6 +105,9 @@ public class LZ4Compressor implements ICompressor public int uncompress(ByteBuffer input, ByteBuffer output) throws IOException { + if (input.hasArray() && output.hasArray()) + return uncompress(input.array(), input.arrayOffset() + input.position(), input.remaining(), output.array(), output.arrayOffset() + output.position()); + int pos = input.position(); final int decompressedLength = (input.get(pos) & 0xFF) | ((input.get(pos + 1) & 0xFF) << 8) @@ -132,7 +136,7 @@ public class LZ4Compressor implements ICompressor @Override public boolean useDirectOutputByteBuffers() { - return false; + return true; } public Set<String> supportedOptions() http://git-wip-us.apache.org/repos/asf/cassandra/blob/aedce5fc/src/java/org/apache/cassandra/io/compress/SnappyCompressor.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/compress/SnappyCompressor.java b/src/java/org/apache/cassandra/io/compress/SnappyCompressor.java index d1f1f34..04f676b 100644 --- a/src/java/org/apache/cassandra/io/compress/SnappyCompressor.java +++ b/src/java/org/apache/cassandra/io/compress/SnappyCompressor.java @@ -96,6 +96,8 @@ public class SnappyCompressor implements ICompressor public int uncompress(ByteBuffer input, ByteBuffer output) throws IOException { + if (input.hasArray() && output.hasArray()) + return Snappy.rawUncompress(input.array(), input.arrayOffset() + input.position(), input.remaining(), output.array(), output.arrayOffset() + output.position()); return Snappy.uncompress(input, output); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/aedce5fc/src/java/org/apache/cassandra/io/util/CompressedPoolingSegmentedFile.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/util/CompressedPoolingSegmentedFile.java b/src/java/org/apache/cassandra/io/util/CompressedPoolingSegmentedFile.java index 502c461..cb30131 100644 --- a/src/java/org/apache/cassandra/io/util/CompressedPoolingSegmentedFile.java +++ b/src/java/org/apache/cassandra/io/util/CompressedPoolingSegmentedFile.java @@ -17,6 +17,10 @@ */ package org.apache.cassandra.io.util; +import java.nio.MappedByteBuffer; +import java.nio.channels.FileChannel; +import java.util.TreeMap; + import com.google.common.util.concurrent.RateLimiter; import org.apache.cassandra.io.compress.CompressedRandomAccessReader; @@ -27,31 +31,56 @@ import org.apache.cassandra.io.compress.CompressionMetadata; public class CompressedPoolingSegmentedFile extends PoolingSegmentedFile implements ICompressedFile { public final CompressionMetadata metadata; + private final TreeMap<Long, MappedByteBuffer> chunkSegments; public CompressedPoolingSegmentedFile(ChannelProxy channel, CompressionMetadata metadata) { - super(new Cleanup(channel, metadata), channel, metadata.dataLength, metadata.compressedFileLength); + this(channel, metadata, CompressedSegmentedFile.createMappedSegments(channel, metadata)); + } + + private CompressedPoolingSegmentedFile(ChannelProxy channel, CompressionMetadata metadata, TreeMap<Long, MappedByteBuffer> chunkSegments) + { + super(new Cleanup(channel, metadata, chunkSegments), channel, metadata.dataLength, metadata.compressedFileLength); this.metadata = metadata; + this.chunkSegments = chunkSegments; } private CompressedPoolingSegmentedFile(CompressedPoolingSegmentedFile copy) { super(copy); this.metadata = copy.metadata; + this.chunkSegments = copy.chunkSegments; + } + + public ChannelProxy channel() + { + return channel; + } + + public TreeMap<Long, MappedByteBuffer> chunkSegments() + { + return chunkSegments; } protected static final class Cleanup extends PoolingSegmentedFile.Cleanup { final CompressionMetadata metadata; - protected Cleanup(ChannelProxy channel, CompressionMetadata metadata) + final TreeMap<Long, MappedByteBuffer> chunkSegments; + protected Cleanup(ChannelProxy channel, CompressionMetadata metadata, TreeMap<Long, MappedByteBuffer> chunkSegments) { super(channel); this.metadata = metadata; + this.chunkSegments = chunkSegments; } public void tidy() { super.tidy(); metadata.close(); + if (chunkSegments != null) + { + for (MappedByteBuffer segment : chunkSegments.values()) + FileUtils.clean(segment); + } } } @@ -82,17 +111,17 @@ public class CompressedPoolingSegmentedFile extends PoolingSegmentedFile impleme public RandomAccessReader createReader() { - return CompressedRandomAccessReader.open(channel, metadata, null); + return CompressedRandomAccessReader.open(this); } public RandomAccessReader createThrottledReader(RateLimiter limiter) { - return CompressedThrottledReader.open(channel, metadata, limiter); + return CompressedThrottledReader.open(this, limiter); } protected RandomAccessReader createPooledReader() { - return CompressedRandomAccessReader.open(channel, metadata, this); + return CompressedRandomAccessReader.open(this); } public CompressionMetadata getMetadata() http://git-wip-us.apache.org/repos/asf/cassandra/blob/aedce5fc/src/java/org/apache/cassandra/io/util/CompressedSegmentedFile.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/util/CompressedSegmentedFile.java b/src/java/org/apache/cassandra/io/util/CompressedSegmentedFile.java index 5d2c897..caf4c22 100644 --- a/src/java/org/apache/cassandra/io/util/CompressedSegmentedFile.java +++ b/src/java/org/apache/cassandra/io/util/CompressedSegmentedFile.java @@ -17,8 +17,14 @@ */ package org.apache.cassandra.io.util; +import java.nio.MappedByteBuffer; +import java.nio.channels.FileChannel; +import java.util.TreeMap; + import com.google.common.util.concurrent.RateLimiter; +import org.apache.cassandra.config.Config; +import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.io.compress.CompressedRandomAccessReader; import org.apache.cassandra.io.compress.CompressedSequentialWriter; import org.apache.cassandra.io.compress.CompressedThrottledReader; @@ -27,31 +33,88 @@ import org.apache.cassandra.io.compress.CompressionMetadata; public class CompressedSegmentedFile extends SegmentedFile implements ICompressedFile { public final CompressionMetadata metadata; + private static final boolean useMmap = DatabaseDescriptor.getDiskAccessMode() == Config.DiskAccessMode.mmap; + private static int MAX_SEGMENT_SIZE = Integer.MAX_VALUE; + private final TreeMap<Long, MappedByteBuffer> chunkSegments; public CompressedSegmentedFile(ChannelProxy channel, CompressionMetadata metadata) { - super(new Cleanup(channel, metadata), channel, metadata.dataLength, metadata.compressedFileLength); + this(channel, metadata, createMappedSegments(channel, metadata)); + } + + public CompressedSegmentedFile(ChannelProxy channel, CompressionMetadata metadata, TreeMap<Long, MappedByteBuffer> chunkSegments) + { + super(new Cleanup(channel, metadata, chunkSegments), channel, metadata.dataLength, metadata.compressedFileLength); this.metadata = metadata; + this.chunkSegments = chunkSegments; } private CompressedSegmentedFile(CompressedSegmentedFile copy) { super(copy); this.metadata = copy.metadata; + this.chunkSegments = copy.chunkSegments; + } + + public ChannelProxy channel() + { + return channel; + } + + public TreeMap<Long, MappedByteBuffer> chunkSegments() + { + return chunkSegments; + } + + static TreeMap<Long, MappedByteBuffer> createMappedSegments(ChannelProxy channel, CompressionMetadata metadata) + { + if (!useMmap) + return null; + TreeMap<Long, MappedByteBuffer> chunkSegments = new TreeMap<>(); + long offset = 0; + long lastSegmentOffset = 0; + long segmentSize = 0; + + while (offset < metadata.dataLength) + { + CompressionMetadata.Chunk chunk = metadata.chunkFor(offset); + + //Reached a new mmap boundary + if (segmentSize + chunk.length + 4 > MAX_SEGMENT_SIZE) + { + chunkSegments.put(lastSegmentOffset, channel.map(FileChannel.MapMode.READ_ONLY, lastSegmentOffset, segmentSize)); + lastSegmentOffset += segmentSize; + segmentSize = 0; + } + + segmentSize += chunk.length + 4; //checksum + offset += metadata.chunkLength(); + } + + if (segmentSize > 0) + chunkSegments.put(lastSegmentOffset, channel.map(FileChannel.MapMode.READ_ONLY, lastSegmentOffset, segmentSize)); + return chunkSegments; } private static final class Cleanup extends SegmentedFile.Cleanup { final CompressionMetadata metadata; - protected Cleanup(ChannelProxy channel, CompressionMetadata metadata) + final TreeMap<Long, MappedByteBuffer> chunkSegments; + protected Cleanup(ChannelProxy channel, CompressionMetadata metadata, TreeMap<Long, MappedByteBuffer> chunkSegments) { super(channel); this.metadata = metadata; + this.chunkSegments = chunkSegments; } public void tidy() { super.tidy(); metadata.close(); + if (chunkSegments != null) + { + for (MappedByteBuffer segment : chunkSegments.values()) + FileUtils.clean(segment); + } } } @@ -97,12 +160,12 @@ public class CompressedSegmentedFile extends SegmentedFile implements ICompresse public RandomAccessReader createReader() { - return CompressedRandomAccessReader.open(channel, metadata); + return CompressedRandomAccessReader.open(this); } public RandomAccessReader createThrottledReader(RateLimiter limiter) { - return CompressedThrottledReader.open(channel, metadata, limiter); + return CompressedThrottledReader.open(this, limiter); } public CompressionMetadata getMetadata() http://git-wip-us.apache.org/repos/asf/cassandra/blob/aedce5fc/src/java/org/apache/cassandra/io/util/ICompressedFile.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/util/ICompressedFile.java b/src/java/org/apache/cassandra/io/util/ICompressedFile.java index 3ca7718..ce7b22c 100644 --- a/src/java/org/apache/cassandra/io/util/ICompressedFile.java +++ b/src/java/org/apache/cassandra/io/util/ICompressedFile.java @@ -17,9 +17,14 @@ */ package org.apache.cassandra.io.util; +import java.nio.MappedByteBuffer; +import java.util.TreeMap; + import org.apache.cassandra.io.compress.CompressionMetadata; public interface ICompressedFile { + public ChannelProxy channel(); public CompressionMetadata getMetadata(); + public TreeMap<Long, MappedByteBuffer> chunkSegments(); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/aedce5fc/src/java/org/apache/cassandra/io/util/RandomAccessReader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/util/RandomAccessReader.java b/src/java/org/apache/cassandra/io/util/RandomAccessReader.java index 87ba677..328095b 100644 --- a/src/java/org/apache/cassandra/io/util/RandomAccessReader.java +++ b/src/java/org/apache/cassandra/io/util/RandomAccessReader.java @@ -65,8 +65,8 @@ public class RandomAccessReader extends AbstractDataInput implements FileDataInp { int size = (int) Math.min(fileLength, bufferSize); return useDirectBuffer - ? ByteBuffer.allocate(size) - : ByteBuffer.allocateDirect(size); + ? ByteBuffer.allocateDirect(size) + : ByteBuffer.allocate(size); } public static RandomAccessReader open(ChannelProxy channel, long overrideSize, PoolingSegmentedFile owner) http://git-wip-us.apache.org/repos/asf/cassandra/blob/aedce5fc/src/java/org/apache/cassandra/io/util/SegmentedFile.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/util/SegmentedFile.java b/src/java/org/apache/cassandra/io/util/SegmentedFile.java index 129d914..cb4d132 100644 --- a/src/java/org/apache/cassandra/io/util/SegmentedFile.java +++ b/src/java/org/apache/cassandra/io/util/SegmentedFile.java @@ -133,7 +133,7 @@ public abstract class SegmentedFile extends SharedCloseableImpl */ public static Builder getBuilder(Config.DiskAccessMode mode, boolean compressed) { - return compressed ? new CompressedSegmentedFile.Builder(null) + return compressed ? new CompressedPoolingSegmentedFile.Builder(null) : mode == Config.DiskAccessMode.mmap ? new MmappedSegmentedFile.Builder() : new BufferedPoolingSegmentedFile.Builder(); }
