cut down on the number of sstables compared for version and purge checks
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/45af95ab Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/45af95ab Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/45af95ab Branch: refs/heads/trunk Commit: 45af95ab30d63829342f26c237b5c2cd186cf5e3 Parents: 18b5564 Author: Jonathan Ellis <[email protected]> Authored: Fri Mar 23 12:48:56 2012 -0500 Committer: Jonathan Ellis <[email protected]> Committed: Mon Mar 26 15:53:22 2012 -0500 ---------------------------------------------------------------------- .../org/apache/cassandra/db/ColumnFamilyStore.java | 34 ++++++------ src/java/org/apache/cassandra/db/DataTracker.java | 16 +++--- .../db/compaction/CompactionController.java | 39 ++++++++------- 3 files changed, 46 insertions(+), 43 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/45af95ab/src/java/org/apache/cassandra/db/ColumnFamilyStore.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java index 81fa285..4475070 100644 --- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java +++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java @@ -30,10 +30,7 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.regex.Pattern; import javax.management.*; -import com.google.common.collect.AbstractIterator; -import com.google.common.collect.ImmutableSet; -import com.google.common.collect.Iterables; -import com.google.common.collect.Sets; +import com.google.common.collect.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -69,6 +66,7 @@ import org.apache.cassandra.service.StorageService; import org.apache.cassandra.thrift.IndexExpression; import org.apache.cassandra.utils.*; import org.apache.cassandra.utils.IntervalTree.Interval; +import org.apache.cassandra.utils.IntervalTree.IntervalTree; import org.cliffc.high_scale_lib.NonBlockingHashMap; import static org.apache.cassandra.config.CFMetaData.Caching; @@ -844,23 +842,25 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean } /** - * Uses bloom filters to check if key may be present in any sstable in this - * ColumnFamilyStore, minus a set of provided ones. - * - * Because BFs are checked, negative returns ensure that the key is not - * present in the checked SSTables, but positive ones doesn't ensure key - * presence. + * @param sstables + * @return sstables whose key range overlaps with that of the given sstables, not including itself. + * (The given sstables may or may not overlap with each other.) */ - public boolean isKeyInRemainingSSTables(DecoratedKey key, Set<? extends SSTable> sstablesToIgnore) + public Set<SSTableReader> getOverlappingSSTables(Collection<SSTableReader> sstables) { - // we don't need to acquire references here, since the bloom filter is safe to use even post-compaction - List<SSTableReader> filteredSSTables = data.getView().intervalTree.search(new Interval(key, key)); - for (SSTableReader sstable : filteredSSTables) + assert !sstables.isEmpty(); + IntervalTree<SSTableReader> tree = data.getView().intervalTree; + + Set<SSTableReader> results = null; + for (SSTableReader sstable : sstables) { - if (!sstablesToIgnore.contains(sstable) && sstable.getBloomFilter().isPresent(key.key)) - return true; + Set<SSTableReader> overlaps = ImmutableSet.copyOf(tree.search(new Interval<SSTableReader>(sstable.first, sstable.last))); + assert overlaps.contains(sstable); + results = results == null ? overlaps : Sets.union(results, overlaps); } - return false; + results = Sets.difference(results, ImmutableSet.copyOf(sstables)); + + return results; } /* http://git-wip-us.apache.org/repos/asf/cassandra/blob/45af95ab/src/java/org/apache/cassandra/db/DataTracker.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/DataTracker.java b/src/java/org/apache/cassandra/db/DataTracker.java index d6b5fe5..71577b2 100644 --- a/src/java/org/apache/cassandra/db/DataTracker.java +++ b/src/java/org/apache/cassandra/db/DataTracker.java @@ -552,6 +552,14 @@ public class DataTracker assert found : consumer + " not subscribed"; } + public static IntervalTree<SSTableReader> buildIntervalTree(Iterable<SSTableReader> sstables) + { + List<Interval> intervals = new ArrayList<Interval>(Iterables.size(sstables)); + for (SSTableReader sstable : sstables) + intervals.add(new Interval<SSTableReader>(sstable.first, sstable.last, sstable)); + return new IntervalTree<SSTableReader>(intervals); + } + /** * An immutable structure holding the current memtable, the memtables pending * flush, the sstables for a column family, and the sstables that are active @@ -584,14 +592,6 @@ public class DataTracker return Sets.difference(ImmutableSet.copyOf(sstables), compacting); } - private IntervalTree buildIntervalTree(List<SSTableReader> sstables) - { - List<Interval> intervals = new ArrayList<Interval>(sstables.size()); - for (SSTableReader sstable : sstables) - intervals.add(new Interval<SSTableReader>(sstable.first, sstable.last, sstable)); - return new IntervalTree<SSTableReader>(intervals); - } - public View switchMemtable(Memtable newMemtable) { Set<Memtable> newPending = ImmutableSet.<Memtable>builder().addAll(memtablesPendingFlush).add(memtable).build(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/45af95ab/src/java/org/apache/cassandra/db/compaction/CompactionController.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionController.java b/src/java/org/apache/cassandra/db/compaction/CompactionController.java index 7db5723..1da6f9c 100644 --- a/src/java/org/apache/cassandra/db/compaction/CompactionController.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionController.java @@ -21,17 +21,17 @@ package org.apache.cassandra.db.compaction; import java.util.*; +import com.google.common.collect.ImmutableSet; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.cassandra.config.DatabaseDescriptor; -import org.apache.cassandra.db.ColumnFamily; -import org.apache.cassandra.db.ColumnFamilyStore; -import org.apache.cassandra.db.DecoratedKey; -import org.apache.cassandra.db.EchoedRow; +import org.apache.cassandra.db.*; import org.apache.cassandra.io.sstable.SSTableIdentityIterator; import org.apache.cassandra.io.sstable.SSTableReader; import org.apache.cassandra.service.CacheService; +import org.apache.cassandra.utils.IntervalTree.Interval; +import org.apache.cassandra.utils.IntervalTree.IntervalTree; /** * Manage compaction options. @@ -41,8 +41,8 @@ public class CompactionController private static Logger logger = LoggerFactory.getLogger(CompactionController.class); private final ColumnFamilyStore cfs; - private final Set<SSTableReader> sstables; - private final boolean forceDeserialize; + private final boolean deserializeRequired; + private final IntervalTree<SSTableReader> overlappingTree; public final int gcBefore; public boolean keyExistenceIsExpensive; @@ -52,15 +52,16 @@ public class CompactionController { assert cfs != null; this.cfs = cfs; - this.sstables = new HashSet<SSTableReader>(sstables); this.gcBefore = gcBefore; // If we merge an old NodeId id, we must make sure that no further increment for that id are in an active memtable. // For that, we must make sure that this id was renewed before the creation of the oldest unflushed memtable. We // add 5 minutes to be sure we're on the safe side in terms of thread safety (though we should be fine in our // current 'stop all write during memtable switch' situation). this.mergeShardBefore = (int) ((cfs.oldestUnflushedMemtable() + 5 * 3600) / 1000); - this.forceDeserialize = forceDeserialize; - keyExistenceIsExpensive = cfs.getCompactionStrategy().isKeyExistenceExpensive(this.sstables); + deserializeRequired = forceDeserialize || !allLatestVersion(sstables); + Set<SSTableReader> overlappingSSTables = cfs.getOverlappingSSTables(sstables); + overlappingTree = DataTracker.buildIntervalTree(overlappingSSTables); + keyExistenceIsExpensive = cfs.getCompactionStrategy().isKeyExistenceExpensive(ImmutableSet.copyOf(sstables)); } public String getKeyspace() @@ -79,19 +80,21 @@ public class CompactionController */ public boolean shouldPurge(DecoratedKey key) { - return !cfs.isKeyInRemainingSSTables(key, sstables); + List<SSTableReader> filteredSSTables = overlappingTree.search(new Interval(key, key)); + for (SSTableReader sstable : filteredSSTables) + { + if (sstable.getBloomFilter().isPresent(key.key)) + return false; + } + return true; } - public boolean needDeserialize() + private static boolean allLatestVersion(Iterable<SSTableReader> sstables) { - if (forceDeserialize) - return true; - for (SSTableReader sstable : sstables) if (!sstable.descriptor.isLatestVersion) - return true; - - return false; + return false; + return true; } public void invalidateCachedRow(DecoratedKey key) @@ -128,7 +131,7 @@ public class CompactionController // in-memory echoedrow is only enabled if we think checking for the key's existence in the other sstables, // is going to be less expensive than simply de/serializing the row again - if (rows.size() == 1 && !needDeserialize() + if (rows.size() == 1 && !deserializeRequired && (rowSize > DatabaseDescriptor.getInMemoryCompactionLimit() || !keyExistenceIsExpensive) && !shouldPurge(rows.get(0).getKey())) {
