Updated Branches: refs/heads/cassandra-1.2 dd184c430 -> 27ed655fd
Change Kernel Page Cache skipping into row preheating (disabled by default) patch by Pavel Yaskevich; reviewed by Jonathan Ellis and Yuki Morishita for CASSANDRA-4937 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/27ed655f Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/27ed655f Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/27ed655f Branch: refs/heads/cassandra-1.2 Commit: 27ed655fd0552055bd1c26c62c098c16501bc32b Parents: dd184c4 Author: Pavel Yaskevich <[email protected]> Authored: Wed Mar 20 14:45:13 2013 -0700 Committer: Pavel Yaskevich <[email protected]> Committed: Thu Mar 21 14:43:25 2013 -0700 ---------------------------------------------------------------------- CHANGES.txt | 2 + conf/cassandra.yaml | 6 + src/java/org/apache/cassandra/config/Config.java | 2 + .../cassandra/config/DatabaseDescriptor.java | 5 + .../cassandra/db/commitlog/CommitLogReplayer.java | 2 +- .../db/compaction/AbstractCompactionStrategy.java | 2 +- .../cassandra/db/compaction/CompactionManager.java | 2 +- .../cassandra/db/compaction/CompactionTask.java | 5 +- .../db/compaction/LeveledCompactionStrategy.java | 6 +- .../apache/cassandra/db/compaction/Scrubber.java | 4 +- .../io/compress/CompressedRandomAccessReader.java | 10 +- .../apache/cassandra/io/sstable/KeyIterator.java | 2 +- .../io/sstable/SSTableBoundedScanner.java | 4 +- .../apache/cassandra/io/sstable/SSTableReader.java | 90 +++++++++++++-- .../cassandra/io/sstable/SSTableScanner.java | 10 +- .../cassandra/io/util/RandomAccessReader.java | 54 ++------- .../apache/cassandra/streaming/FileStreamTask.java | 2 +- .../compress/CompressedFileStreamTask.java | 2 +- .../org/apache/cassandra/tools/SSTableExport.java | 4 +- src/java/org/apache/cassandra/utils/CLibrary.java | 21 ++++ .../unit/org/apache/cassandra/db/KeyCacheTest.java | 4 +- .../compress/CompressedRandomAccessReaderTest.java | 8 +- .../apache/cassandra/io/sstable/SSTableTest.java | 4 +- .../apache/cassandra/io/sstable/SSTableUtils.java | 4 +- .../io/util/BufferedRandomAccessFileTest.java | 4 +- 25 files changed, 161 insertions(+), 98 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/27ed655f/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 0959ac9..e309eae 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -5,6 +5,8 @@ * Improve asynchronous hint delivery (CASSANDRA-5179) * Fix Guava dependency version (12.0 -> 13.0.1) for Maven (CASSANDRA-5364) * Validate that provided CQL3 collection value are < 64K (CASSANDRA-5355) + * Change Kernel Page Cache skipping into row preheating (disabled by default) + (CASSANDRA-4937) 1.2.3 http://git-wip-us.apache.org/repos/asf/cassandra/blob/27ed655f/conf/cassandra.yaml ---------------------------------------------------------------------- diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml index 0a8102d..178487d 100644 --- a/conf/cassandra.yaml +++ b/conf/cassandra.yaml @@ -684,3 +684,9 @@ internode_compression: all # reducing overhead from the TCP protocol itself, at the cost of increasing # latency if you block for cross-datacenter responses. inter_dc_tcp_nodelay: true + +# Enable or disable kernel page cache preheating from contents of the key cache after compaction. +# When enabled it would preheat only first "page" (4KB) of each row to optimize +# for sequential access. Note: This could be harmful for fat rows, see CASSANDRA-4937 +# for further details on that topic. +preheat_kernel_page_cache: false http://git-wip-us.apache.org/repos/asf/cassandra/blob/27ed655f/src/java/org/apache/cassandra/config/Config.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java index 02324ee..212147a 100644 --- a/src/java/org/apache/cassandra/config/Config.java +++ b/src/java/org/apache/cassandra/config/Config.java @@ -167,6 +167,8 @@ public class Config public boolean inter_dc_tcp_nodelay = true; + public boolean preheat_kernel_page_cache = false; + private static boolean loadYaml = true; private static boolean outboundBindAny = false; http://git-wip-us.apache.org/repos/asf/cassandra/blob/27ed655f/src/java/org/apache/cassandra/config/DatabaseDescriptor.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java index 2d3cbb5..5fcef07 100644 --- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java +++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java @@ -1275,4 +1275,9 @@ public class DatabaseDescriptor { return conf.inter_dc_tcp_nodelay; } + + public static boolean shouldPreheatPageCache() + { + return conf.preheat_kernel_page_cache; + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/27ed655f/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java index 2728970..8dcdaad 100644 --- a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java +++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java @@ -121,7 +121,7 @@ public class CommitLogReplayer CommitLogDescriptor desc = CommitLogDescriptor.fromFileName(file.getName()); final long segment = desc.id; int version = desc.getMessagingVersion(); - RandomAccessReader reader = RandomAccessReader.open(new File(file.getAbsolutePath()), true); + RandomAccessReader reader = RandomAccessReader.open(new File(file.getAbsolutePath())); try { assert reader.length() <= Integer.MAX_VALUE; http://git-wip-us.apache.org/repos/asf/cassandra/blob/27ed655f/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java index cb15109..85b09c1 100644 --- a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java +++ b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java @@ -152,7 +152,7 @@ public abstract class AbstractCompactionStrategy { ArrayList<ICompactionScanner> scanners = new ArrayList<ICompactionScanner>(); for (SSTableReader sstable : sstables) - scanners.add(sstable.getDirectScanner(range)); + scanners.add(sstable.getScanner(range)); return scanners; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/27ed655f/src/java/org/apache/cassandra/db/compaction/CompactionManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java index 14e1b13..0fe3a7a 100644 --- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java @@ -568,7 +568,7 @@ public class CompactionManager implements CompactionManagerMBean if (compactionFileLocation == null) throw new IOException("disk full"); - SSTableScanner scanner = sstable.getDirectScanner(); + SSTableScanner scanner = sstable.getScanner(); long rowsRead = 0; List<IColumn> indexedColumnsInRow = null; http://git-wip-us.apache.org/repos/asf/cassandra/blob/27ed655f/src/java/org/apache/cassandra/db/compaction/CompactionTask.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java index 75ea1cb..8c2af4d 100644 --- a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java @@ -231,10 +231,7 @@ public class CompactionTask extends AbstractCompactionTask cfs.replaceCompactedSSTables(toCompact, sstables, compactionType); // TODO: this doesn't belong here, it should be part of the reader to load when the tracker is wired up for (SSTableReader sstable : sstables) - { - for (Map.Entry<DecoratedKey, RowIndexEntry> entry : cachedKeyMap.get(sstable.descriptor).entrySet()) - sstable.cacheKey(entry.getKey(), entry.getValue()); - } + sstable.preheat(cachedKeyMap.get(sstable.descriptor)); if (logger.isInfoEnabled()) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/27ed655f/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java index 6a1bf4b..d916c48 100644 --- a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java +++ b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java @@ -179,7 +179,7 @@ public class LeveledCompactionStrategy extends AbstractCompactionStrategy implem { // L0 makes no guarantees about overlapping-ness. Just create a direct scanner for each for (SSTableReader sstable : byLevel.get(level)) - scanners.add(sstable.getDirectScanner(range)); + scanners.add(sstable.getScanner(range)); } else { @@ -209,7 +209,7 @@ public class LeveledCompactionStrategy extends AbstractCompactionStrategy implem this.sstables = new ArrayList<SSTableReader>(sstables); Collections.sort(this.sstables, SSTable.sstableComparator); sstableIterator = this.sstables.iterator(); - currentScanner = sstableIterator.next().getDirectScanner(range); + currentScanner = sstableIterator.next().getScanner(range); long length = 0; for (SSTableReader sstable : sstables) @@ -234,7 +234,7 @@ public class LeveledCompactionStrategy extends AbstractCompactionStrategy implem currentScanner = null; return endOfData(); } - currentScanner = sstableIterator.next().getDirectScanner(range); + currentScanner = sstableIterator.next().getScanner(range); } } catch (IOException e) http://git-wip-us.apache.org/repos/asf/cassandra/blob/27ed655f/src/java/org/apache/cassandra/db/compaction/Scrubber.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/Scrubber.java b/src/java/org/apache/cassandra/db/compaction/Scrubber.java index 0601857..30929cc 100644 --- a/src/java/org/apache/cassandra/db/compaction/Scrubber.java +++ b/src/java/org/apache/cassandra/db/compaction/Scrubber.java @@ -94,8 +94,8 @@ public class Scrubber implements Closeable // we'll also loop through the index at the same time, using the position from the index to recover if the // row header (key or data size) is corrupt. (This means our position in the index file will be one row // "ahead" of the data file.) - this.dataFile = sstable.openDataReader(true); - this.indexFile = RandomAccessReader.open(new File(sstable.descriptor.filenameFor(Component.PRIMARY_INDEX)), true); + this.dataFile = sstable.openDataReader(); + this.indexFile = RandomAccessReader.open(new File(sstable.descriptor.filenameFor(Component.PRIMARY_INDEX))); this.scrubInfo = new ScrubInfo(dataFile, sstable); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/27ed655f/src/java/org/apache/cassandra/io/compress/CompressedRandomAccessReader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/compress/CompressedRandomAccessReader.java b/src/java/org/apache/cassandra/io/compress/CompressedRandomAccessReader.java index bbd2466..aa686c2 100644 --- a/src/java/org/apache/cassandra/io/compress/CompressedRandomAccessReader.java +++ b/src/java/org/apache/cassandra/io/compress/CompressedRandomAccessReader.java @@ -42,7 +42,7 @@ public class CompressedRandomAccessReader extends RandomAccessReader { try { - return new CompressedRandomAccessReader(path, metadata, false, owner); + return new CompressedRandomAccessReader(path, metadata, owner); } catch (FileNotFoundException e) { @@ -50,11 +50,11 @@ public class CompressedRandomAccessReader extends RandomAccessReader } } - public static CompressedRandomAccessReader open(String dataFilePath, CompressionMetadata metadata, boolean skipIOCache) + public static CompressedRandomAccessReader open(String dataFilePath, CompressionMetadata metadata) { try { - return new CompressedRandomAccessReader(dataFilePath, metadata, skipIOCache, null); + return new CompressedRandomAccessReader(dataFilePath, metadata, null); } catch (FileNotFoundException e) { @@ -73,9 +73,9 @@ public class CompressedRandomAccessReader extends RandomAccessReader // raw checksum bytes private final ByteBuffer checksumBytes = ByteBuffer.wrap(new byte[4]); - private CompressedRandomAccessReader(String dataFilePath, CompressionMetadata metadata, boolean skipIOCache, PoolingSegmentedFile owner) throws FileNotFoundException + private CompressedRandomAccessReader(String dataFilePath, CompressionMetadata metadata, PoolingSegmentedFile owner) throws FileNotFoundException { - super(new File(dataFilePath), metadata.chunkLength(), skipIOCache, owner); + super(new File(dataFilePath), metadata.chunkLength(), owner); this.metadata = metadata; compressed = ByteBuffer.wrap(new byte[metadata.compressor().initialCompressedBufferLength(metadata.chunkLength())]); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/27ed655f/src/java/org/apache/cassandra/io/sstable/KeyIterator.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/KeyIterator.java b/src/java/org/apache/cassandra/io/sstable/KeyIterator.java index e581b22..9fe1309 100644 --- a/src/java/org/apache/cassandra/io/sstable/KeyIterator.java +++ b/src/java/org/apache/cassandra/io/sstable/KeyIterator.java @@ -38,7 +38,7 @@ public class KeyIterator extends AbstractIterator<DecoratedKey> implements Close { this.desc = desc; File path = new File(desc.filenameFor(SSTable.COMPONENT_INDEX)); - in = RandomAccessReader.open(path, true); + in = RandomAccessReader.open(path); } protected DecoratedKey computeNext() http://git-wip-us.apache.org/repos/asf/cassandra/blob/27ed655f/src/java/org/apache/cassandra/io/sstable/SSTableBoundedScanner.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableBoundedScanner.java b/src/java/org/apache/cassandra/io/sstable/SSTableBoundedScanner.java index a571901..d5bec82 100644 --- a/src/java/org/apache/cassandra/io/sstable/SSTableBoundedScanner.java +++ b/src/java/org/apache/cassandra/io/sstable/SSTableBoundedScanner.java @@ -35,9 +35,9 @@ public class SSTableBoundedScanner extends SSTableScanner private final Iterator<Pair<Long, Long>> rangeIterator; private Pair<Long, Long> currentRange; - SSTableBoundedScanner(SSTableReader sstable, boolean skipCache, Iterator<Pair<Long, Long>> rangeIterator) + SSTableBoundedScanner(SSTableReader sstable, Iterator<Pair<Long, Long>> rangeIterator) { - super(sstable, skipCache); + super(sstable); this.rangeIterator = rangeIterator; assert rangeIterator.hasNext(); // use EmptyCompactionScanner otherwise currentRange = rangeIterator.next(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/27ed655f/src/java/org/apache/cassandra/io/sstable/SSTableReader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java index 51f2344..dbc45d8 100644 --- a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java +++ b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java @@ -352,7 +352,7 @@ public class SSTableReader extends SSTable : SegmentedFile.getBuilder(DatabaseDescriptor.getDiskAccessMode()); // we read the positions in a BRAF so we don't have to worry about an entry spanning a mmap boundary. - RandomAccessReader primaryIndex = RandomAccessReader.open(new File(descriptor.filenameFor(Component.PRIMARY_INDEX)), true); + RandomAccessReader primaryIndex = RandomAccessReader.open(new File(descriptor.filenameFor(Component.PRIMARY_INDEX))); // try to load summaries from the disk and check if we need // to read primary index because we should re-create a BloomFilter or pre-load KeyCache @@ -698,6 +698,29 @@ public class SSTableReader extends SSTable keyCache.put(cacheKey, info); } + public void preheat(Map<DecoratedKey, RowIndexEntry> cachedKeys) throws IOException + { + RandomAccessFile f = new RandomAccessFile(getFilename(), "r"); + + try + { + int fd = CLibrary.getfd(f.getFD()); + + for (Map.Entry<DecoratedKey, RowIndexEntry> entry : cachedKeys.entrySet()) + { + cacheKey(entry.getKey(), entry.getValue()); + + // add to the cache but don't do actual preheating if we have it disabled in the config + if (DatabaseDescriptor.shouldPreheatPageCache() && fd > 0) + CLibrary.preheatPage(fd, entry.getValue().position); + } + } + finally + { + FileUtils.closeQuietly(f); + } + } + public RowIndexEntry getCachedPosition(DecoratedKey key, boolean updateStats) { return getCachedPosition(new KeyCacheKey(descriptor, key.key), updateStats); @@ -897,6 +920,15 @@ public class SSTableReader extends SSTable { if (references.decrementAndGet() == 0 && isCompacted.get()) { + /** + * Make OS a favour and suggest (using fadvice call) that we + * don't want to see pages of this SSTable in memory anymore. + * + * NOTE: We can't use madvice in java because it requires address of + * the mapping, so instead we always open a file and run fadvice(fd, 0, 0) on it + */ + dropPageCache(); + // Force finalizing mmapping if necessary ifile.cleanup(); dfile.cleanup(); @@ -948,12 +980,12 @@ public class SSTableReader extends SSTable } /** - * Direct I/O SSTableScanner + * I/O SSTableScanner * @return A Scanner for seeking over the rows of the SSTable. */ - public SSTableScanner getDirectScanner() + public SSTableScanner getScanner() { - return new SSTableScanner(this, true); + return new SSTableScanner(this); } /** @@ -962,14 +994,14 @@ public class SSTableReader extends SSTable * @param range the range of keys to cover * @return A Scanner for seeking over the rows of the SSTable. */ - public ICompactionScanner getDirectScanner(Range<Token> range) + public ICompactionScanner getScanner(Range<Token> range) { if (range == null) - return getDirectScanner(); + return getScanner(); Iterator<Pair<Long, Long>> rangeIterator = getPositionsForRanges(Collections.singletonList(range)).iterator(); return rangeIterator.hasNext() - ? new SSTableBoundedScanner(this, true, rangeIterator) + ? new SSTableBoundedScanner(this, rangeIterator) : new EmptyCompactionScanner(getFilename()); } @@ -1122,16 +1154,16 @@ public class SSTableReader extends SSTable return sstableMetadata.ancestors; } - public RandomAccessReader openDataReader(boolean skipIOCache) + public RandomAccessReader openDataReader() { return compression - ? CompressedRandomAccessReader.open(getFilename(), getCompressionMetadata(), skipIOCache) - : RandomAccessReader.open(new File(getFilename()), skipIOCache); + ? CompressedRandomAccessReader.open(getFilename(), getCompressionMetadata()) + : RandomAccessReader.open(new File(getFilename())); } - public RandomAccessReader openIndexReader(boolean skipIOCache) + public RandomAccessReader openIndexReader() { - return RandomAccessReader.open(new File(getIndexFilename()), skipIOCache); + return RandomAccessReader.open(new File(getIndexFilename())); } /** @@ -1222,4 +1254,38 @@ public class SSTableReader extends SSTable throw new UnsupportedOperationException(); } } + + private void dropPageCache() + { + dropPageCache(dfile.path); + dropPageCache(ifile.path); + } + + private void dropPageCache(String filePath) + { + RandomAccessFile file = null; + + try + { + file = new RandomAccessFile(filePath, "r"); + + int fd = CLibrary.getfd(file.getFD()); + + if (fd > 0) + { + if (logger.isDebugEnabled()) + logger.debug(String.format("Dropping page cache of file %s.", filePath)); + + CLibrary.trySkipCache(fd, 0, 0); + } + } + catch (IOException e) + { + // we don't care if cache cleanup fails + } + finally + { + FileUtils.closeQuietly(file); + } + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/27ed655f/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java b/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java index 22ac485..949acda 100644 --- a/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java +++ b/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java @@ -46,10 +46,10 @@ public class SSTableScanner implements ICompactionScanner /** * @param sstable SSTable to scan. */ - SSTableScanner(SSTableReader sstable, boolean skipCache) + SSTableScanner(SSTableReader sstable) { - this.dfile = sstable.openDataReader(skipCache); - this.ifile = sstable.openIndexReader(skipCache); + this.dfile = sstable.openDataReader(); + this.ifile = sstable.openIndexReader(); this.sstable = sstable; this.filter = null; } @@ -60,8 +60,8 @@ public class SSTableScanner implements ICompactionScanner */ SSTableScanner(SSTableReader sstable, QueryFilter filter) { - this.dfile = sstable.openDataReader(false); - this.ifile = sstable.openIndexReader(false); + this.dfile = sstable.openDataReader(); + this.ifile = sstable.openIndexReader(); this.sstable = sstable; this.filter = filter; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/27ed655f/src/java/org/apache/cassandra/io/util/RandomAccessReader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/util/RandomAccessReader.java b/src/java/org/apache/cassandra/io/util/RandomAccessReader.java index 3210372..4d7bfbb 100644 --- a/src/java/org/apache/cassandra/io/util/RandomAccessReader.java +++ b/src/java/org/apache/cassandra/io/util/RandomAccessReader.java @@ -50,19 +50,11 @@ public class RandomAccessReader extends RandomAccessFile implements FileDataInpu // channel liked with the file, used to retrieve data and force updates. protected final FileChannel channel; - private final boolean skipIOCache; - - // file descriptor - private final int fd; - - // used if skip I/O cache was enabled - private long bytesSinceCacheFlush = 0; - private final long fileLength; protected final PoolingSegmentedFile owner; - protected RandomAccessReader(File file, int bufferSize, boolean skipIOCache, PoolingSegmentedFile owner) throws FileNotFoundException + protected RandomAccessReader(File file, int bufferSize, PoolingSegmentedFile owner) throws FileNotFoundException { super(file, "r"); @@ -74,18 +66,8 @@ public class RandomAccessReader extends RandomAccessFile implements FileDataInpu // allocating required size of the buffer if (bufferSize <= 0) throw new IllegalArgumentException("bufferSize must be positive"); - buffer = new byte[bufferSize]; - this.skipIOCache = skipIOCache; - try - { - fd = CLibrary.getfd(getFD()); - } - catch (IOException e) - { - // fd == null, Not Supposed To Happen - throw new RuntimeException(e); - } + buffer = new byte[bufferSize]; // we can cache file length in read-only mode try @@ -99,27 +81,22 @@ public class RandomAccessReader extends RandomAccessFile implements FileDataInpu validBufferBytes = -1; // that will trigger reBuffer() on demand by read/seek operations } - public static RandomAccessReader open(File file) - { - return open(file, false); - } - public static RandomAccessReader open(File file, PoolingSegmentedFile owner) { - return open(file, DEFAULT_BUFFER_SIZE, false, owner); + return open(file, DEFAULT_BUFFER_SIZE, owner); } - public static RandomAccessReader open(File file, boolean skipIOCache) + public static RandomAccessReader open(File file) { - return open(file, DEFAULT_BUFFER_SIZE, skipIOCache, null); + return open(file, DEFAULT_BUFFER_SIZE, null); } @VisibleForTesting - static RandomAccessReader open(File file, int bufferSize, boolean skipIOCache, PoolingSegmentedFile owner) + static RandomAccessReader open(File file, int bufferSize, PoolingSegmentedFile owner) { try { - return new RandomAccessReader(file, bufferSize, skipIOCache, owner); + return new RandomAccessReader(file, bufferSize, owner); } catch (FileNotFoundException e) { @@ -130,7 +107,7 @@ public class RandomAccessReader extends RandomAccessFile implements FileDataInpu @VisibleForTesting static RandomAccessReader open(SequentialWriter writer) { - return open(new File(writer.getPath()), DEFAULT_BUFFER_SIZE, false, null); + return open(new File(writer.getPath()), DEFAULT_BUFFER_SIZE, null); } /** @@ -158,21 +135,11 @@ public class RandomAccessReader extends RandomAccessFile implements FileDataInpu } validBufferBytes = read; - bytesSinceCacheFlush += read; } catch (IOException e) { throw new FSReadError(e, filePath); } - - if (skipIOCache && bytesSinceCacheFlush >= CACHE_FLUSH_INTERVAL_IN_BYTES) - { - // with random I/O we can't control what we are skipping so - // it will be more appropriate to just skip a whole file after - // we reach threshold - CLibrary.trySkipCache(this.fd, 0, 0); - bytesSinceCacheFlush = 0; - } } @Override @@ -264,9 +231,6 @@ public class RandomAccessReader extends RandomAccessFile implements FileDataInpu { buffer = null; // makes sure we don't use this after it's ostensibly closed - if (skipIOCache && bytesSinceCacheFlush > 0) - CLibrary.trySkipCache(fd, 0, 0); - try { super.close(); @@ -280,7 +244,7 @@ public class RandomAccessReader extends RandomAccessFile implements FileDataInpu @Override public String toString() { - return getClass().getSimpleName() + "(" + "filePath='" + filePath + "'" + ", skipIOCache=" + skipIOCache + ")"; + return getClass().getSimpleName() + "(" + "filePath='" + filePath + "')"; } /** http://git-wip-us.apache.org/repos/asf/cassandra/blob/27ed655f/src/java/org/apache/cassandra/streaming/FileStreamTask.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/FileStreamTask.java b/src/java/org/apache/cassandra/streaming/FileStreamTask.java index 67d5c35..979b2e1 100644 --- a/src/java/org/apache/cassandra/streaming/FileStreamTask.java +++ b/src/java/org/apache/cassandra/streaming/FileStreamTask.java @@ -139,7 +139,7 @@ public class FileStreamTask extends WrappedRunnable return; // try to skip kernel page cache if possible - RandomAccessReader file = RandomAccessReader.open(new File(header.file.getFilename()), true); + RandomAccessReader file = RandomAccessReader.open(new File(header.file.getFilename())); // setting up data compression stream compressedoutput = new LZFOutputStream(output); http://git-wip-us.apache.org/repos/asf/cassandra/blob/27ed655f/src/java/org/apache/cassandra/streaming/compress/CompressedFileStreamTask.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/compress/CompressedFileStreamTask.java b/src/java/org/apache/cassandra/streaming/compress/CompressedFileStreamTask.java index dda9d7d..c1818ed 100644 --- a/src/java/org/apache/cassandra/streaming/compress/CompressedFileStreamTask.java +++ b/src/java/org/apache/cassandra/streaming/compress/CompressedFileStreamTask.java @@ -65,7 +65,7 @@ public class CompressedFileStreamTask extends FileStreamTask ByteBuffer headerBuffer = MessagingService.instance().constructStreamHeader(header, false, MessagingService.instance().getVersion(to)); socket.getOutputStream().write(ByteBufferUtil.getArray(headerBuffer)); - RandomAccessReader file = RandomAccessReader.open(new File(header.file.getFilename()), true); + RandomAccessReader file = RandomAccessReader.open(new File(header.file.getFilename())); FileChannel fc = file.getChannel(); StreamingMetrics.activeStreamsOutbound.inc(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/27ed655f/src/java/org/apache/cassandra/tools/SSTableExport.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/tools/SSTableExport.java b/src/java/org/apache/cassandra/tools/SSTableExport.java index 51cdc72..2e117ef 100644 --- a/src/java/org/apache/cassandra/tools/SSTableExport.java +++ b/src/java/org/apache/cassandra/tools/SSTableExport.java @@ -349,7 +349,7 @@ public class SSTableExport public static void export(Descriptor desc, PrintStream outs, Collection<String> toExport, String[] excludes) throws IOException { SSTableReader reader = SSTableReader.open(desc); - SSTableScanner scanner = reader.getDirectScanner(); + SSTableScanner scanner = reader.getScanner(); IPartitioner<?> partitioner = reader.partitioner; @@ -406,7 +406,7 @@ public class SSTableExport SSTableIdentityIterator row; - SSTableScanner scanner = reader.getDirectScanner(); + SSTableScanner scanner = reader.getScanner(); outs.println("["); http://git-wip-us.apache.org/repos/asf/cassandra/blob/27ed655f/src/java/org/apache/cassandra/utils/CLibrary.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/utils/CLibrary.java b/src/java/org/apache/cassandra/utils/CLibrary.java index f8ad9d1..2f6e088 100644 --- a/src/java/org/apache/cassandra/utils/CLibrary.java +++ b/src/java/org/apache/cassandra/utils/CLibrary.java @@ -326,4 +326,25 @@ public final class CLibrary return -1; } + + /** + * Suggest kernel to preheat one page for the given file. + * + * @param fd The file descriptor of file to preheat. + * @param position The offset of the block. + * + * @return On success, zero is returned. On error, an error number is returned. + */ + public static int preheatPage(int fd, long position) + { + try + { + // 4096 is good for SSD because they operate on "Pages" 4KB in size + return posix_fadvise(fd, position, 4096, POSIX_FADV_WILLNEED); + } + catch (UnsatisfiedLinkError e) + { + return -1; + } + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/27ed655f/test/unit/org/apache/cassandra/db/KeyCacheTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/KeyCacheTest.java b/test/unit/org/apache/cassandra/db/KeyCacheTest.java index 93f1fea..b05a607 100644 --- a/test/unit/org/apache/cassandra/db/KeyCacheTest.java +++ b/test/unit/org/apache/cassandra/db/KeyCacheTest.java @@ -130,13 +130,13 @@ public class KeyCacheTest extends SchemaLoader false, 10)); - assert CacheService.instance.keyCache.size() == 2; + assertEquals(2, CacheService.instance.keyCache.size()); Util.compactAll(cfs).get(); keyCacheSize = CacheService.instance.keyCache.size(); // after compaction cache should have entries for // new SSTables, if we had 2 keys in cache previously it should become 4 - assert keyCacheSize == 4 : keyCacheSize; + assertEquals(4, keyCacheSize); // re-read same keys to verify that key cache didn't grow further cfs.getColumnFamily(QueryFilter.getSliceFilter(key1, http://git-wip-us.apache.org/repos/asf/cassandra/blob/27ed655f/test/unit/org/apache/cassandra/io/compress/CompressedRandomAccessReaderTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/io/compress/CompressedRandomAccessReaderTest.java b/test/unit/org/apache/cassandra/io/compress/CompressedRandomAccessReaderTest.java index 830c3e1..437b778 100644 --- a/test/unit/org/apache/cassandra/io/compress/CompressedRandomAccessReaderTest.java +++ b/test/unit/org/apache/cassandra/io/compress/CompressedRandomAccessReaderTest.java @@ -75,7 +75,7 @@ public class CompressedRandomAccessReaderTest assert f.exists(); RandomAccessReader reader = compressed - ? CompressedRandomAccessReader.open(filename, new CompressionMetadata(filename + ".metadata", f.length()), false) + ? CompressedRandomAccessReader.open(filename, new CompressionMetadata(filename + ".metadata", f.length())) : RandomAccessReader.open(f); String expected = "The quick brown fox jumps over the lazy dog"; assertEquals(expected.length(), reader.length()); @@ -115,7 +115,7 @@ public class CompressedRandomAccessReaderTest CompressionMetadata meta = new CompressionMetadata(metadata.getPath(), file.length()); CompressionMetadata.Chunk chunk = meta.chunkFor(0); - RandomAccessReader reader = CompressedRandomAccessReader.open(file.getPath(), meta, false); + RandomAccessReader reader = CompressedRandomAccessReader.open(file.getPath(), meta); // read and verify compressed data assertEquals(CONTENT, reader.readLine()); // close reader @@ -142,7 +142,7 @@ public class CompressedRandomAccessReaderTest checksumModifier.write(random.nextInt()); checksumModifier.getFD().sync(); // making sure that change was synced with disk - final RandomAccessReader r = CompressedRandomAccessReader.open(file.getPath(), meta, false); + final RandomAccessReader r = CompressedRandomAccessReader.open(file.getPath(), meta); Throwable exception = null; try @@ -163,7 +163,7 @@ public class CompressedRandomAccessReaderTest // lets write original checksum and check if we can read data updateChecksum(checksumModifier, chunk.length, checksum); - reader = CompressedRandomAccessReader.open(file.getPath(), meta, false); + reader = CompressedRandomAccessReader.open(file.getPath(), meta); // read and verify compressed data assertEquals(CONTENT, reader.readLine()); // close reader http://git-wip-us.apache.org/repos/asf/cassandra/blob/27ed655f/test/unit/org/apache/cassandra/io/sstable/SSTableTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableTest.java b/test/unit/org/apache/cassandra/io/sstable/SSTableTest.java index 12f2747..e944ed2 100644 --- a/test/unit/org/apache/cassandra/io/sstable/SSTableTest.java +++ b/test/unit/org/apache/cassandra/io/sstable/SSTableTest.java @@ -56,7 +56,7 @@ public class SSTableTest extends SchemaLoader private void verifySingle(SSTableReader sstable, ByteBuffer bytes, ByteBuffer key) throws IOException { - RandomAccessReader file = sstable.openDataReader(false); + RandomAccessReader file = sstable.openDataReader(); file.seek(sstable.getPosition(sstable.partitioner.decorateKey(key), SSTableReader.Operator.EQ).position); assert key.equals(ByteBufferUtil.readWithShortLength(file)); int size = (int)SSTableReader.readRowSize(file, sstable.descriptor); @@ -98,7 +98,7 @@ public class SSTableTest extends SchemaLoader { List<ByteBuffer> keys = new ArrayList<ByteBuffer>(map.keySet()); //Collections.shuffle(keys); - RandomAccessReader file = sstable.openDataReader(false); + RandomAccessReader file = sstable.openDataReader(); for (ByteBuffer key : keys) { file.seek(sstable.getPosition(sstable.partitioner.decorateKey(key), SSTableReader.Operator.EQ).position); http://git-wip-us.apache.org/repos/asf/cassandra/blob/27ed655f/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java b/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java index 2b0a13a..fcfaeec 100644 --- a/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java +++ b/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java @@ -72,8 +72,8 @@ public class SSTableUtils public static void assertContentEquals(SSTableReader lhs, SSTableReader rhs) throws IOException { - SSTableScanner slhs = lhs.getDirectScanner(); - SSTableScanner srhs = rhs.getDirectScanner(); + SSTableScanner slhs = lhs.getScanner(); + SSTableScanner srhs = rhs.getScanner(); while (slhs.hasNext()) { OnDiskAtomIterator ilhs = slhs.next(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/27ed655f/test/unit/org/apache/cassandra/io/util/BufferedRandomAccessFileTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/io/util/BufferedRandomAccessFileTest.java b/test/unit/org/apache/cassandra/io/util/BufferedRandomAccessFileTest.java index 8059bbd..90c27e3 100644 --- a/test/unit/org/apache/cassandra/io/util/BufferedRandomAccessFileTest.java +++ b/test/unit/org/apache/cassandra/io/util/BufferedRandomAccessFileTest.java @@ -338,7 +338,7 @@ public class BufferedRandomAccessFileTest for (final int offset : Arrays.asList(0, 8)) { File file1 = writeTemporaryFile(new byte[16]); - final RandomAccessReader file = RandomAccessReader.open(file1, bufferSize, false, null); + final RandomAccessReader file = RandomAccessReader.open(file1, bufferSize, null); expectEOF(new Callable<Object>() { public Object call() throws IOException @@ -353,7 +353,7 @@ public class BufferedRandomAccessFileTest for (final int n : Arrays.asList(1, 2, 4, 8)) { File file1 = writeTemporaryFile(new byte[16]); - final RandomAccessReader file = RandomAccessReader.open(file1, bufferSize, false, null); + final RandomAccessReader file = RandomAccessReader.open(file1, bufferSize, null); expectEOF(new Callable<Object>() { public Object call() throws IOException
