pool [Compressed]RandomAccessReader objects on the partitioned read path; creating them is expensive patch by jbellis; reviewed by xedin and slebresne for CASSANDRA-4942
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/edcc7f13 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/edcc7f13 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/edcc7f13 Branch: refs/heads/trunk Commit: edcc7f137f573eb6aa38fe4f5b79c22de7811342 Parents: 6773383 Author: Jonathan Ellis <[email protected]> Authored: Fri Nov 9 23:03:51 2012 -0600 Committer: Jonathan Ellis <[email protected]> Committed: Wed Nov 14 10:23:51 2012 -0600 ---------------------------------------------------------------------- CHANGES.txt | 6 ++- .../io/compress/CompressedRandomAccessReader.java | 27 +++++++--- .../cassandra/io/util/BufferedSegmentedFile.java | 13 +---- .../cassandra/io/util/CompressedSegmentedFile.java | 10 ++-- .../cassandra/io/util/PoolingSegmentedFile.java | 41 +++++++++++++++ .../cassandra/io/util/RandomAccessReader.java | 41 ++++++++++++--- .../compress/CompressedRandomAccessReaderTest.java | 4 +- .../io/util/BufferedRandomAccessFileTest.java | 4 +- 8 files changed, 110 insertions(+), 36 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/edcc7f13/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 900c9ef..be34e89 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,7 +1,11 @@ 1.2.1 - * Add debug logging to list filenames processed by Directories.migrateFile method (CASSANDRA-4939) + * pool [Compressed]RandomAccessReader objects on the partitioned read path + (CASSANDRA-4942) + * Add debug logging to list filenames processed by Directories.migrateFile + method (CASSANDRA-4939) * Expose black-listed directories via JMX (CASSANDRA-4848) + 1.2-rc1 * fix cqlsh DESCRIBE command (CASSANDRA-4913) * save truncation position in system table (CASSANDRA-4906) http://git-wip-us.apache.org/repos/asf/cassandra/blob/edcc7f13/src/java/org/apache/cassandra/io/compress/CompressedRandomAccessReader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/compress/CompressedRandomAccessReader.java b/src/java/org/apache/cassandra/io/compress/CompressedRandomAccessReader.java index da35e92..3b0c5ba 100644 --- a/src/java/org/apache/cassandra/io/compress/CompressedRandomAccessReader.java +++ b/src/java/org/apache/cassandra/io/compress/CompressedRandomAccessReader.java @@ -25,26 +25,36 @@ import java.nio.channels.FileChannel; import java.util.zip.CRC32; import java.util.zip.Checksum; +import com.google.common.annotations.VisibleForTesting; import com.google.common.primitives.Ints; import org.apache.cassandra.io.FSReadError; import org.apache.cassandra.io.sstable.CorruptSSTableException; +import org.apache.cassandra.io.util.CompressedSegmentedFile; +import org.apache.cassandra.io.util.PoolingSegmentedFile; import org.apache.cassandra.io.util.RandomAccessReader; import org.apache.cassandra.utils.FBUtilities; // TODO refactor this to separate concept of "buffer to avoid lots of read() syscalls" and "compression buffer" public class CompressedRandomAccessReader extends RandomAccessReader { - public static RandomAccessReader open(String dataFilePath, CompressionMetadata metadata) + public static CompressedRandomAccessReader open(String path, CompressionMetadata metadata, CompressedSegmentedFile owner) { - return open(dataFilePath, metadata, false); + try + { + return new CompressedRandomAccessReader(path, metadata, false, owner); + } + catch (FileNotFoundException e) + { + throw new RuntimeException(e); + } } - public static RandomAccessReader open(String dataFilePath, CompressionMetadata metadata, boolean skipIOCache) + public static CompressedRandomAccessReader open(String dataFilePath, CompressionMetadata metadata, boolean skipIOCache) { try { - return new CompressedRandomAccessReader(dataFilePath, metadata, skipIOCache); + return new CompressedRandomAccessReader(dataFilePath, metadata, skipIOCache, null); } catch (FileNotFoundException e) { @@ -65,9 +75,9 @@ public class CompressedRandomAccessReader extends RandomAccessReader private final FileInputStream source; private final FileChannel channel; - public CompressedRandomAccessReader(String dataFilePath, CompressionMetadata metadata, boolean skipIOCache) throws FileNotFoundException + private CompressedRandomAccessReader(String dataFilePath, CompressionMetadata metadata, boolean skipIOCache, PoolingSegmentedFile owner) throws FileNotFoundException { - super(new File(dataFilePath), metadata.chunkLength(), skipIOCache); + super(new File(dataFilePath), metadata.chunkLength(), skipIOCache, owner); this.metadata = metadata; compressed = new byte[metadata.compressor().initialCompressedBufferLength(metadata.chunkLength())]; // can't use super.read(...) methods @@ -155,9 +165,10 @@ public class CompressedRandomAccessReader extends RandomAccessReader } @Override - public void close() + public void deallocate() { - super.close(); + super.deallocate(); + try { source.close(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/edcc7f13/src/java/org/apache/cassandra/io/util/BufferedSegmentedFile.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/util/BufferedSegmentedFile.java b/src/java/org/apache/cassandra/io/util/BufferedSegmentedFile.java index 2c8b89e..49972c8 100644 --- a/src/java/org/apache/cassandra/io/util/BufferedSegmentedFile.java +++ b/src/java/org/apache/cassandra/io/util/BufferedSegmentedFile.java @@ -19,7 +19,7 @@ package org.apache.cassandra.io.util; import java.io.File; -public class BufferedSegmentedFile extends SegmentedFile +public class BufferedSegmentedFile extends PoolingSegmentedFile { public BufferedSegmentedFile(String path, long length) { @@ -49,15 +49,8 @@ public class BufferedSegmentedFile extends SegmentedFile } } - public FileDataInput getSegment(long position) + protected RandomAccessReader createReader(String path) { - RandomAccessReader file = RandomAccessReader.open(new File(path)); - file.seek(position); - return file; - } - - public void cleanup() - { - // nothing to do + return RandomAccessReader.open(new File(path), this); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/edcc7f13/src/java/org/apache/cassandra/io/util/CompressedSegmentedFile.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/util/CompressedSegmentedFile.java b/src/java/org/apache/cassandra/io/util/CompressedSegmentedFile.java index 7280dcd..e106be7 100644 --- a/src/java/org/apache/cassandra/io/util/CompressedSegmentedFile.java +++ b/src/java/org/apache/cassandra/io/util/CompressedSegmentedFile.java @@ -20,7 +20,7 @@ package org.apache.cassandra.io.util; import org.apache.cassandra.io.compress.CompressedRandomAccessReader; import org.apache.cassandra.io.compress.CompressionMetadata; -public class CompressedSegmentedFile extends SegmentedFile +public class CompressedSegmentedFile extends PoolingSegmentedFile { public final CompressionMetadata metadata; @@ -52,15 +52,15 @@ public class CompressedSegmentedFile extends SegmentedFile } } - public FileDataInput getSegment(long position) + protected RandomAccessReader createReader(String path) { - RandomAccessReader file = CompressedRandomAccessReader.open(path, metadata); - file.seek(position); - return file; + return CompressedRandomAccessReader.open(path, metadata, this); } + @Override public void cleanup() { + super.cleanup(); metadata.close(); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/edcc7f13/src/java/org/apache/cassandra/io/util/PoolingSegmentedFile.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/util/PoolingSegmentedFile.java b/src/java/org/apache/cassandra/io/util/PoolingSegmentedFile.java new file mode 100644 index 0000000..2e0acfc --- /dev/null +++ b/src/java/org/apache/cassandra/io/util/PoolingSegmentedFile.java @@ -0,0 +1,41 @@ +package org.apache.cassandra.io.util; + +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; + +public abstract class PoolingSegmentedFile extends SegmentedFile +{ + public final Queue<RandomAccessReader> pool = new ConcurrentLinkedQueue<RandomAccessReader>(); + + protected PoolingSegmentedFile(String path, long length) + { + super(path, length); + } + + protected PoolingSegmentedFile(String path, long length, long onDiskLength) + { + super(path, length, onDiskLength); + } + + public FileDataInput getSegment(long position) + { + RandomAccessReader reader = pool.poll(); + if (reader == null) + reader = createReader(path); + reader.seek(position); + return reader; + } + + protected abstract RandomAccessReader createReader(String path); + + public void recycle(RandomAccessReader reader) + { + pool.add(reader); + } + + public void cleanup() + { + for (RandomAccessReader reader : pool) + reader.deallocate(); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/edcc7f13/src/java/org/apache/cassandra/io/util/RandomAccessReader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/util/RandomAccessReader.java b/src/java/org/apache/cassandra/io/util/RandomAccessReader.java index 06778d9..3210372 100644 --- a/src/java/org/apache/cassandra/io/util/RandomAccessReader.java +++ b/src/java/org/apache/cassandra/io/util/RandomAccessReader.java @@ -60,11 +60,14 @@ public class RandomAccessReader extends RandomAccessFile implements FileDataInpu private final long fileLength; - // used in tests - public RandomAccessReader(File file, int bufferSize, boolean skipIOCache) throws FileNotFoundException + protected final PoolingSegmentedFile owner; + + protected RandomAccessReader(File file, int bufferSize, boolean skipIOCache, PoolingSegmentedFile owner) throws FileNotFoundException { super(file, "r"); + this.owner = owner; + channel = super.getChannel(); filePath = file.getAbsolutePath(); @@ -101,17 +104,22 @@ public class RandomAccessReader extends RandomAccessFile implements FileDataInpu return open(file, false); } + public static RandomAccessReader open(File file, PoolingSegmentedFile owner) + { + return open(file, DEFAULT_BUFFER_SIZE, false, owner); + } + public static RandomAccessReader open(File file, boolean skipIOCache) { - return open(file, DEFAULT_BUFFER_SIZE, skipIOCache); + return open(file, DEFAULT_BUFFER_SIZE, skipIOCache, null); } @VisibleForTesting - static RandomAccessReader open(File file, int bufferSize, boolean skipIOCache) + static RandomAccessReader open(File file, int bufferSize, boolean skipIOCache, PoolingSegmentedFile owner) { try { - return new RandomAccessReader(file, bufferSize, skipIOCache); + return new RandomAccessReader(file, bufferSize, skipIOCache, owner); } catch (FileNotFoundException e) { @@ -120,9 +128,9 @@ public class RandomAccessReader extends RandomAccessFile implements FileDataInpu } @VisibleForTesting - public static RandomAccessReader open(SequentialWriter writer) + static RandomAccessReader open(SequentialWriter writer) { - return open(new File(writer.getPath()), DEFAULT_BUFFER_SIZE, false); + return open(new File(writer.getPath()), DEFAULT_BUFFER_SIZE, false, null); } /** @@ -237,7 +245,24 @@ public class RandomAccessReader extends RandomAccessFile implements FileDataInpu @Override public void close() { - buffer = null; + if (owner == null || buffer == null) + { + // The buffer == null check is so that if the pool owner has deallocated us, calling close() + // will re-call deallocate rather than recycling a deallocated object. + // I'd be more comfortable if deallocate didn't have to handle being idempotent like that, + // but RandomAccessFile.close will call AbstractInterruptibleChannel.close which will + // re-call RAF.close -- in this case, [C]RAR.close since we are overriding that. + deallocate(); + } + else + { + owner.recycle(this); + } + } + + public void deallocate() + { + buffer = null; // makes sure we don't use this after it's ostensibly closed if (skipIOCache && bytesSinceCacheFlush > 0) CLibrary.trySkipCache(fd, 0, 0); http://git-wip-us.apache.org/repos/asf/cassandra/blob/edcc7f13/test/unit/org/apache/cassandra/io/compress/CompressedRandomAccessReaderTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/io/compress/CompressedRandomAccessReaderTest.java b/test/unit/org/apache/cassandra/io/compress/CompressedRandomAccessReaderTest.java index eabb489..830c3e1 100644 --- a/test/unit/org/apache/cassandra/io/compress/CompressedRandomAccessReaderTest.java +++ b/test/unit/org/apache/cassandra/io/compress/CompressedRandomAccessReaderTest.java @@ -75,8 +75,8 @@ public class CompressedRandomAccessReaderTest assert f.exists(); RandomAccessReader reader = compressed - ? new CompressedRandomAccessReader(filename, new CompressionMetadata(filename + ".metadata", f.length()), false) - : new RandomAccessReader(f, CompressionParameters.DEFAULT_CHUNK_LENGTH, false); + ? CompressedRandomAccessReader.open(filename, new CompressionMetadata(filename + ".metadata", f.length()), false) + : RandomAccessReader.open(f); String expected = "The quick brown fox jumps over the lazy dog"; assertEquals(expected.length(), reader.length()); byte[] b = new byte[expected.length()]; http://git-wip-us.apache.org/repos/asf/cassandra/blob/edcc7f13/test/unit/org/apache/cassandra/io/util/BufferedRandomAccessFileTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/io/util/BufferedRandomAccessFileTest.java b/test/unit/org/apache/cassandra/io/util/BufferedRandomAccessFileTest.java index e7fa8e3..8059bbd 100644 --- a/test/unit/org/apache/cassandra/io/util/BufferedRandomAccessFileTest.java +++ b/test/unit/org/apache/cassandra/io/util/BufferedRandomAccessFileTest.java @@ -338,7 +338,7 @@ public class BufferedRandomAccessFileTest for (final int offset : Arrays.asList(0, 8)) { File file1 = writeTemporaryFile(new byte[16]); - final RandomAccessReader file = RandomAccessReader.open(file1, bufferSize, false); + final RandomAccessReader file = RandomAccessReader.open(file1, bufferSize, false, null); expectEOF(new Callable<Object>() { public Object call() throws IOException @@ -353,7 +353,7 @@ public class BufferedRandomAccessFileTest for (final int n : Arrays.asList(1, 2, 4, 8)) { File file1 = writeTemporaryFile(new byte[16]); - final RandomAccessReader file = RandomAccessReader.open(file1, bufferSize, false); + final RandomAccessReader file = RandomAccessReader.open(file1, bufferSize, false, null); expectEOF(new Callable<Object>() { public Object call() throws IOException
