merge from 1.2
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/0b235995 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/0b235995 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/0b235995 Branch: refs/heads/trunk Commit: 0b2359959f792e0a7a4514a79ef3bfdd32e7c83e Parents: cfc163d f932aa2 Author: Jonathan Ellis <[email protected]> Authored: Wed Jun 19 10:27:08 2013 -0500 Committer: Jonathan Ellis <[email protected]> Committed: Wed Jun 19 11:01:17 2013 -0500 ---------------------------------------------------------------------- CHANGES.txt | 1 - .../apache/cassandra/db/ColumnFamilyStore.java | 2 +- .../cassandra/io/sstable/SSTableLoader.java | 2 +- .../cassandra/io/sstable/SSTableReader.java | 164 ++++++------------- 4 files changed, 54 insertions(+), 115 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/0b235995/CHANGES.txt ---------------------------------------------------------------------- diff --cc CHANGES.txt index 1450aa0,e1282aa..7e1bf77 --- a/CHANGES.txt +++ b/CHANGES.txt @@@ -1,69 -1,5 +1,68 @@@ +2.0 + * Removed on-heap row cache (CASSANDRA-5348) + * use nanotime consistently for node-local timeouts (CASSANDRA-5581) + * Avoid unnecessary second pass on name-based queries (CASSANDRA-5577) + * Experimental triggers (CASSANDRA-1311) + * JEMalloc support for off-heap allocation (CASSANDRA-3997) + * Single-pass compaction (CASSANDRA-4180) + * Removed token range bisection (CASSANDRA-5518) + * Removed compatibility with pre-1.2.5 sstables and network messages + (CASSANDRA-5511) + * removed PBSPredictor (CASSANDRA-5455) + * CAS support (CASSANDRA-5062, 5441, 5442, 5443) + * Leveled compaction performs size-tiered compactions in L0 + (CASSANDRA-5371, 5439) + * Add yaml network topology snitch for mixed ec2/other envs (CASSANDRA-5339) + * Log when a node is down longer than the hint window (CASSANDRA-4554) + * Optimize tombstone creation for ExpiringColumns (CASSANDRA-4917) + * Improve LeveledScanner work estimation (CASSANDRA-5250, 5407) + * Replace compaction lock with runWithCompactionsDisabled (CASSANDRA-3430) + * Change Message IDs to ints (CASSANDRA-5307) + * Move sstable level information into the Stats component, removing the + need for a separate Manifest file (CASSANDRA-4872) + * avoid serializing to byte[] on commitlog append (CASSANDRA-5199) + * make index_interval configurable per columnfamily (CASSANDRA-3961, CASSANDRA-5650) + * add default_time_to_live (CASSANDRA-3974) + * add memtable_flush_period_in_ms (CASSANDRA-4237) + * replace supercolumns internally by composites (CASSANDRA-3237, 5123) + * upgrade thrift to 0.9.0 (CASSANDRA-3719) + * drop unnecessary keyspace parameter from user-defined compaction API + (CASSANDRA-5139) + * more robust solution to incomplete compactions + counters (CASSANDRA-5151) + * Change order of directory searching for c*.in.sh (CASSANDRA-3983) + * Add tool to reset SSTable compaction level for LCS (CASSANDRA-5271) + * Allow custom configuration loader (CASSANDRA-5045) + * Remove memory emergency pressure valve logic (CASSANDRA-3534) + * Reduce request latency with eager retry (CASSANDRA-4705) + * cqlsh: Remove ASSUME command (CASSANDRA-5331) + * Rebuild BF when loading sstables if bloom_filter_fp_chance + has changed since compaction (CASSANDRA-5015) + * remove row-level bloom filters (CASSANDRA-4885) + * Change Kernel Page Cache skipping into row preheating (disabled by default) + (CASSANDRA-4937) + * Improve repair by deciding on a gcBefore before sending + out TreeRequests (CASSANDRA-4932) + * Add an official way to disable compactions (CASSANDRA-5074) + * Reenable ALTER TABLE DROP with new semantics (CASSANDRA-3919) + * Add binary protocol versioning (CASSANDRA-5436) + * Swap THshaServer for TThreadedSelectorServer (CASSANDRA-5530) + * Add alias support to SELECT statement (CASSANDRA-5075) + * Don't create empty RowMutations in CommitLogReplayer (CASSANDRA-5541) + * Use range tombstones when dropping cfs/columns from schema (CASSANDRA-5579) + * cqlsh: drop CQL2/CQL3-beta support (CASSANDRA-5585) + * Track max/min column names in sstables to be able to optimize slice + queries (CASSANDRA-5514, CASSANDRA-5595, CASSANDRA-5600) + * Binary protocol: allow batching already prepared statements (CASSANDRA-4693) + * Allow preparing timestamp, ttl and limit in CQL3 queries (CASSANDRA-4450) + * Support native link w/o JNA in Java7 (CASSANDRA-3734) + * Use SASL authentication in binary protocol v2 (CASSANDRA-5545) + * Replace Thrift HsHa with LMAX Disruptor based implementation (CASSANDRA-5582) + * cqlsh: Add row count to SELECT output (CASSANDRA-5636) + * Include a timestamp with all read commands to determine column expiration + (CASSANDRA-5149) + 1.2.6 * Fix cross-DC mutation forwarding (CASSANDRA-5632) - * Reduce SSTableLoader memory usage (CASSANDRA-5555) * Scale hinted_handoff_throttle_in_kb to cluster size (CASSANDRA-5272) * (Hadoop) Add CQL3 input/output formats (CASSANDRA-4421, 5622) * (Hadoop) Fix InputKeyRange in CFIF (CASSANDRA-5536) http://git-wip-us.apache.org/repos/asf/cassandra/blob/0b235995/src/java/org/apache/cassandra/db/ColumnFamilyStore.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/ColumnFamilyStore.java index 648c25a,429859e..735f627 --- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java +++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java @@@ -257,8 -232,34 +257,8 @@@ public class ColumnFamilyStore implemen if (loadSSTables) { Directories.SSTableLister sstableFiles = directories.sstableLister().skipTemporary(true); - Collection<SSTableReader> sstables = SSTableReader.openAll(sstableFiles.list().entrySet(), metadata, this.partitioner); + Collection<SSTableReader> sstables = SSTableReader.batchOpen(sstableFiles.list().entrySet(), metadata, this.partitioner); - if (metadata.getDefaultValidator().isCommutative()) - { - // Filter non-compacted sstables, remove compacted ones - Set<Integer> compactedSSTables = new HashSet<Integer>(); - for (SSTableReader sstable : sstables) - compactedSSTables.addAll(sstable.getAncestors()); - - Set<SSTableReader> liveSSTables = new HashSet<SSTableReader>(); - for (SSTableReader sstable : sstables) - { - if (compactedSSTables.contains(sstable.descriptor.generation)) - { - logger.info("{} is already compacted and will be removed.", sstable); - sstable.markCompacted(); // we need to mark as compacted to be deleted - sstable.releaseReference(); // this amount to deleting the sstable - } - else - { - liveSSTables.add(sstable); - } - } - data.addInitialSSTables(liveSSTables); - } - else - { - data.addInitialSSTables(sstables); - } + data.addInitialSSTables(sstables); } if (caching == Caching.ALL || caching == Caching.KEYS_ONLY) http://git-wip-us.apache.org/repos/asf/cassandra/blob/0b235995/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/0b235995/src/java/org/apache/cassandra/io/sstable/SSTableReader.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/io/sstable/SSTableReader.java index f63ea6a,ea9c451..85f2677 --- a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java +++ b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java @@@ -175,17 -160,47 +161,39 @@@ public class SSTableReader extends SSTa IPartitioner partitioner, boolean validate) throws IOException { + assert partitioner != null; + // Minimum components without which we can't do anything + assert components.contains(Component.DATA) : "Data component is missing for sstable" + descriptor; + assert components.contains(Component.PRIMARY_INDEX) : "Primary index component is missing for sstable " + descriptor; + - long start = System.currentTimeMillis(); + long start = System.nanoTime(); - SSTableMetadata sstableMetadata = openMetadata(descriptor, components, partitioner); + logger.info("Opening {} ({} bytes)", descriptor, new File(descriptor.filenameFor(COMPONENT_DATA)).length()); + + SSTableMetadata sstableMetadata = SSTableMetadata.serializer.deserialize(descriptor); + + // Check if sstable is created using same partitioner. + // Partitioner can be null, which indicates older version of sstable or no stats available. + // In that case, we skip the check. + String partitionerName = partitioner.getClass().getCanonicalName(); + if (sstableMetadata.partitioner != null && !partitionerName.equals(sstableMetadata.partitioner)) + { + logger.error(String.format("Cannot open %s; partitioner %s does not match system partitioner %s. Note that the default partitioner starting with Cassandra 1.2 is Murmur3Partitioner, so you will need to edit that to match your old partitioner if upgrading.", + descriptor, sstableMetadata.partitioner, partitionerName)); + System.exit(1); + } SSTableReader sstable = new SSTableReader(descriptor, components, metadata, partitioner, + null, + null, + null, + null, System.currentTimeMillis(), sstableMetadata); - // versions before 'c' encoded keys as utf-16 before hashing to the filter - if (descriptor.version.hasStringsInBloomFilter) - { - sstable.load(true); - } - else - { - sstable.load(false); - sstable.loadBloomFilter(); - } + + sstable.load(); if (validate) sstable.validate(); @@@ -388,55 -349,16 +361,16 @@@ { SegmentedFile.Builder ibuilder = SegmentedFile.getBuilder(DatabaseDescriptor.getIndexAccessMode()); SegmentedFile.Builder dbuilder = compression - ? SegmentedFile.getCompressedBuilder() - : SegmentedFile.getBuilder(DatabaseDescriptor.getDiskAccessMode()); - - - boolean summaryLoaded = loadSummary(this, ibuilder, dbuilder, metadata); - if (recreateBloomFilter || !summaryLoaded) - buildSummary(recreateBloomFilter, ibuilder, dbuilder, summaryLoaded); - - ifile = ibuilder.complete(descriptor.filenameFor(Component.PRIMARY_INDEX)); - dfile = dbuilder.complete(descriptor.filenameFor(Component.DATA)); - if (recreateBloomFilter || !summaryLoaded) // save summary information to disk - saveSummary(this, ibuilder, dbuilder); - } - - /** - * A simplified load that creates a minimal partition index - */ - private void loadForBatch() throws IOException - { - // force buffered i/o in non-compressed mode so we don't need to worry about mmap segments - SegmentedFile.Builder ibuilder = new BufferedSegmentedFile.Builder(); - SegmentedFile.Builder dbuilder = compression - ? SegmentedFile.getCompressedBuilder() - : SegmentedFile.getBuilder(DatabaseDescriptor.getDiskAccessMode()); + ? SegmentedFile.getCompressedBuilder() - : new BufferedSegmentedFile.Builder(); - - // build a bare-bones IndexSummary - IndexSummaryBuilder summaryBuilder = new IndexSummaryBuilder(1, 1); - RandomAccessReader in = RandomAccessReader.open(new File(descriptor.filenameFor(Component.PRIMARY_INDEX))); - try - { - first = partitioner.decorateKey(ByteBufferUtil.readWithShortLength(in)); - summaryBuilder.maybeAddEntry(first, 0); - indexSummary = summaryBuilder.build(partitioner); - } - finally - { - FileUtils.closeQuietly(in); - } - - last = null; // shouldn't need this for batch operations - - ifile = ibuilder.complete(descriptor.filenameFor(Component.PRIMARY_INDEX)); - dfile = dbuilder.complete(descriptor.filenameFor(Component.DATA)); - } ++ : SegmentedFile.getBuilder(DatabaseDescriptor.getDiskAccessMode()); - private void buildSummary(boolean recreateBloomFilter, SegmentedFile.Builder ibuilder, SegmentedFile.Builder dbuilder, boolean summaryLoaded) throws IOException - { // we read the positions in a BRAF so we don't have to worry about an entry spanning a mmap boundary. - RandomAccessReader primaryIndex = RandomAccessReader.open(new File(descriptor.filenameFor(Component.PRIMARY_INDEX)), true); + RandomAccessReader primaryIndex = RandomAccessReader.open(new File(descriptor.filenameFor(Component.PRIMARY_INDEX))); + + // try to load summaries from the disk and check if we need + // to read primary index because we should re-create a BloomFilter or pre-load KeyCache - final boolean summaryLoaded = loadSummary(this, ibuilder, dbuilder); - final boolean readIndex = recreatebloom || !summaryLoaded; ++ final boolean summaryLoaded = loadSummary(this, ibuilder, dbuilder, metadata); ++ final boolean readIndex = recreateBloomFilter || !summaryLoaded; try { long indexSize = primaryIndex.length(); @@@ -450,10 -371,10 +384,10 @@@ IndexSummaryBuilder summaryBuilder = null; if (!summaryLoaded) - summaryBuilder = new IndexSummaryBuilder(estimatedKeys); + summaryBuilder = new IndexSummaryBuilder(estimatedKeys, metadata.getIndexInterval()); long indexPosition; - while ((indexPosition = primaryIndex.getFilePointer()) != indexSize) + while (readIndex && (indexPosition = primaryIndex.getFilePointer()) != indexSize) { ByteBuffer key = ByteBufferUtil.readWithShortLength(primaryIndex); RowIndexEntry indexEntry = RowIndexEntry.serializer.deserialize(primaryIndex, descriptor.version); @@@ -484,12 -405,18 +418,18 @@@ first = getMinimalKey(first); last = getMinimalKey(last); + // finalize the load. + // finalize the state of the reader + ifile = ibuilder.complete(descriptor.filenameFor(Component.PRIMARY_INDEX)); + dfile = dbuilder.complete(descriptor.filenameFor(Component.DATA)); + if (readIndex) // save summary information to disk + saveSummary(this, ibuilder, dbuilder); } - public static boolean loadSummary(SSTableReader reader, SegmentedFile.Builder ibuilder, SegmentedFile.Builder dbuilder) + public static boolean loadSummary(SSTableReader reader, SegmentedFile.Builder ibuilder, SegmentedFile.Builder dbuilder, CFMetaData metadata) { File summariesFile = new File(reader.descriptor.filenameFor(Component.SUMMARY)); - if (!summariesFile.exists()) + if (!reader.descriptor.version.offHeapSummaries || !summariesFile.exists()) return false; DataInputStream iStream = null; @@@ -602,7 -523,7 +542,7 @@@ public IFilter getBloomFilter() { -- return bf; ++ return bf; } public long getBloomFilterSerializedSize() @@@ -1059,44 -941,33 +999,44 @@@ */ public SSTableScanner getScanner(QueryFilter filter) { - return new SSTableScanner(this, filter); + return new SSTableScanner(this, filter, null); + } + + public SSTableScanner getScanner(QueryFilter filter, RowPosition startWith) + { + return new SSTableScanner(this, filter, startWith, null); + } + + /** - * I/O SSTableScanner - * @return A Scanner for seeking over the rows of the SSTable. - */ ++ * I/O SSTableScanner ++ * @return A Scanner for seeking over the rows of the SSTable. ++ */ + public SSTableScanner getScanner() + { + return getScanner((RateLimiter) null); } - /** - * Direct I/O SSTableScanner - * @return A Scanner for seeking over the rows of the SSTable. - */ - public SSTableScanner getDirectScanner(RateLimiter limiter) - { - return new SSTableScanner(this, true, limiter); - } + public SSTableScanner getScanner(RateLimiter limiter) + { + return new SSTableScanner(this, null, limiter); + } - /** - * Direct I/O SSTableScanner over a defined range of tokens. - * - * @param range the range of keys to cover - * @return A Scanner for seeking over the rows of the SSTable. - */ - public ICompactionScanner getDirectScanner(Range<Token> range, RateLimiter limiter) + /** - * Direct I/O SSTableScanner over a defined range of tokens. - * - * @param range the range of keys to cover - * @return A Scanner for seeking over the rows of the SSTable. - */ ++ * Direct I/O SSTableScanner over a defined range of tokens. ++ * ++ * @param range the range of keys to cover ++ * @return A Scanner for seeking over the rows of the SSTable. ++ */ + public ICompactionScanner getScanner(Range<Token> range, RateLimiter limiter) { if (range == null) - return getDirectScanner(limiter); + return getScanner(limiter); Iterator<Pair<Long, Long>> rangeIterator = getPositionsForRanges(Collections.singletonList(range)).iterator(); - return rangeIterator.hasNext() - ? new SSTableBoundedScanner(this, true, rangeIterator, limiter) - : new EmptyCompactionScanner(getFilename()); + if (rangeIterator.hasNext()) + return new SSTableScanner(this, null, range, limiter); + else + return new EmptyCompactionScanner(getFilename()); } public FileDataInput getFileDataInput(long position)
