Adjust MT depth based on # of partitions validating patch by yukim; reviewed by jbellis for CASSANDRA-5263
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/ef4a07b4 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/ef4a07b4 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/ef4a07b4 Branch: refs/heads/trunk Commit: ef4a07b4b62eb448b6c1752250896fc861ff29a4 Parents: e7dbdd8 Author: Yuki Morishita <[email protected]> Authored: Fri Jun 27 12:13:02 2014 -0500 Committer: Yuki Morishita <[email protected]> Committed: Fri Jun 27 12:13:02 2014 -0500 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../db/compaction/CompactionManager.java | 28 ++++++++++++++++- .../org/apache/cassandra/repair/Validator.java | 32 +++++++------------- .../apache/cassandra/repair/ValidatorTest.java | 10 +++--- .../cassandra/service/SerializationsTest.java | 11 ++++--- 5 files changed, 51 insertions(+), 31 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/ef4a07b4/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 7de31c5..b30e9a2 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,6 +1,7 @@ 2.1.1 * Improve schema merge performance (CASSANDRA-7444) * Fix NPE when unknown prepared statement ID is used (CASSANDRA-7454) + * Adjust MT depth based on # of partition validating (CASSANDRA-5263) 2.1.0 http://git-wip-us.apache.org/repos/asf/cassandra/blob/ef4a07b4/src/java/org/apache/cassandra/db/compaction/CompactionManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java index 227f908..fed7ec7 100644 --- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java @@ -87,6 +87,7 @@ import org.apache.cassandra.service.ActiveRepairService; import org.apache.cassandra.service.StorageService; import org.apache.cassandra.utils.CloseableIterator; import org.apache.cassandra.utils.FBUtilities; +import org.apache.cassandra.utils.MerkleTree; import org.apache.cassandra.utils.WrappedRunnable; import org.apache.cassandra.utils.concurrent.OpOrder; @@ -892,13 +893,26 @@ public class CompactionManager implements CompactionManagerMBean gcBefore = getDefaultGcBefore(cfs); } + // Create Merkle tree suitable to hold estimated partitions for given range. + // We blindly assume that partition is evenly distributed on all sstables for now. + long numPartitions = 0; + for (SSTableReader sstable : sstables) + { + numPartitions += sstable.estimatedKeysForRanges(Collections.singleton(validator.desc.range)); + } + // determine tree depth from number of partitions, but cap at 20 to prevent large tree. + int depth = numPartitions > 0 ? (int) Math.min(Math.floor(Math.log(numPartitions)), 20) : 0; + MerkleTree tree = new MerkleTree(cfs.partitioner, validator.desc.range, MerkleTree.RECOMMENDED_DEPTH, (int) Math.pow(2, depth)); + CompactionIterable ci = new ValidationCompactionIterable(cfs, sstables, validator.desc.range, gcBefore); CloseableIterator<AbstractCompactedRow> iter = ci.iterator(); + + long start = System.nanoTime(); metrics.beginCompaction(ci); try { // validate the CF as we iterate over it - validator.prepare(cfs); + validator.prepare(cfs, tree); while (iter.hasNext()) { if (ci.isStopRequested()) @@ -919,6 +933,18 @@ public class CompactionManager implements CompactionManagerMBean metrics.finishCompaction(ci); } + + if (logger.isDebugEnabled()) + { + // MT serialize may take time + long duration = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start); + logger.debug("Validation finished in {} msec, depth {} for {} keys, serialized size {} bytes for {}", + duration, + depth, + numPartitions, + MerkleTree.serializer.serializedSize(tree, 0), + validator.desc); + } } /** http://git-wip-us.apache.org/repos/asf/cassandra/blob/ef4a07b4/src/java/org/apache/cassandra/repair/Validator.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/Validator.java b/src/java/org/apache/cassandra/repair/Validator.java index d93b4a5..641717e 100644 --- a/src/java/org/apache/cassandra/repair/Validator.java +++ b/src/java/org/apache/cassandra/repair/Validator.java @@ -29,7 +29,6 @@ import org.slf4j.LoggerFactory; import org.apache.cassandra.concurrent.Stage; import org.apache.cassandra.concurrent.StageManager; -import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.ColumnFamilyStore; import org.apache.cassandra.db.DecoratedKey; import org.apache.cassandra.db.compaction.AbstractCompactedRow; @@ -52,41 +51,32 @@ public class Validator implements Runnable public final RepairJobDesc desc; public final InetAddress initiator; - public final MerkleTree tree; public final int gcBefore; // null when all rows with the min token have been consumed - private transient long validated; - private transient MerkleTree.TreeRange range; - private transient MerkleTree.TreeRangeIterator ranges; - private transient DecoratedKey lastKey; + private long validated; + private MerkleTree tree; + // current range being updated + private MerkleTree.TreeRange range; + // iterator for iterating sub ranges (MT's leaves) + private MerkleTree.TreeRangeIterator ranges; + // last key seen + private DecoratedKey lastKey; - /** - * Create Validator with default size of initial Merkle Tree. - */ public Validator(RepairJobDesc desc, InetAddress initiator, int gcBefore) { - this(desc, - initiator, - // TODO: memory usage (maxsize) should either be tunable per - // CF, globally, or as shared for all CFs in a cluster - new MerkleTree(DatabaseDescriptor.getPartitioner(), desc.range, MerkleTree.RECOMMENDED_DEPTH, (int)Math.pow(2, 15)), - gcBefore); - } - - public Validator(RepairJobDesc desc, InetAddress initiator, MerkleTree tree, int gcBefore) - { this.desc = desc; this.initiator = initiator; - this.tree = tree; this.gcBefore = gcBefore; validated = 0; range = null; ranges = null; } - public void prepare(ColumnFamilyStore cfs) + public void prepare(ColumnFamilyStore cfs, MerkleTree tree) { + this.tree = tree; + if (!tree.partitioner().preservesOrder()) { // You can't beat an even tree distribution for md5 http://git-wip-us.apache.org/repos/asf/cassandra/blob/ef4a07b4/test/unit/org/apache/cassandra/repair/ValidatorTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/repair/ValidatorTest.java b/test/unit/org/apache/cassandra/repair/ValidatorTest.java index c3ce810..4d65cdb 100644 --- a/test/unit/org/apache/cassandra/repair/ValidatorTest.java +++ b/test/unit/org/apache/cassandra/repair/ValidatorTest.java @@ -46,6 +46,7 @@ import org.apache.cassandra.repair.messages.RepairMessage; import org.apache.cassandra.repair.messages.ValidationComplete; import org.apache.cassandra.service.StorageService; import org.apache.cassandra.utils.ByteBufferUtil; +import org.apache.cassandra.utils.MerkleTree; import org.apache.cassandra.utils.concurrent.SimpleCondition; import static org.junit.Assert.*; @@ -103,10 +104,11 @@ public class ValidatorTest extends SchemaLoader ColumnFamilyStore cfs = Keyspace.open(keyspace).getColumnFamilyStore(columnFamily); Validator validator = new Validator(desc, remote, 0); - validator.prepare(cfs); + MerkleTree tree = new MerkleTree(cfs.partitioner, validator.desc.range, MerkleTree.RECOMMENDED_DEPTH, (int) Math.pow(2, 15)); + validator.prepare(cfs, tree); // and confirm that the tree was split - assertTrue(validator.tree.size() > 1); + assertTrue(tree.size() > 1); // add a row Token mid = partitioner.midpoint(range.left, range.right); @@ -114,8 +116,8 @@ public class ValidatorTest extends SchemaLoader validator.complete(); // confirm that the tree was validated - Token min = validator.tree.partitioner().getMinimumToken(); - assertNotNull(validator.tree.hash(new Range<>(min, min))); + Token min = tree.partitioner().getMinimumToken(); + assertNotNull(tree.hash(new Range<>(min, min))); if (!lock.isSignaled()) lock.await(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/ef4a07b4/test/unit/org/apache/cassandra/service/SerializationsTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/service/SerializationsTest.java b/test/unit/org/apache/cassandra/service/SerializationsTest.java index 6937ceb..49632f9 100644 --- a/test/unit/org/apache/cassandra/service/SerializationsTest.java +++ b/test/unit/org/apache/cassandra/service/SerializationsTest.java @@ -93,17 +93,18 @@ public class SerializationsTest extends AbstractSerializationsTester private void testValidationCompleteWrite() throws IOException { + IPartitioner p = new RandomPartitioner(); // empty validation + MerkleTree mt = new MerkleTree(p, FULL_RANGE, MerkleTree.RECOMMENDED_DEPTH, (int) Math.pow(2, 15)); Validator v0 = new Validator(DESC, FBUtilities.getBroadcastAddress(), -1); - ValidationComplete c0 = new ValidationComplete(DESC, v0.tree); + ValidationComplete c0 = new ValidationComplete(DESC, mt); // validation with a tree - IPartitioner p = new RandomPartitioner(); - MerkleTree mt = new MerkleTree(p, FULL_RANGE, MerkleTree.RECOMMENDED_DEPTH, Integer.MAX_VALUE); + mt = new MerkleTree(p, FULL_RANGE, MerkleTree.RECOMMENDED_DEPTH, Integer.MAX_VALUE); for (int i = 0; i < 10; i++) mt.split(p.getRandomToken()); - Validator v1 = new Validator(DESC, FBUtilities.getBroadcastAddress(), mt, -1); - ValidationComplete c1 = new ValidationComplete(DESC, v1.tree); + Validator v1 = new Validator(DESC, FBUtilities.getBroadcastAddress(), -1); + ValidationComplete c1 = new ValidationComplete(DESC, mt); // validation failed ValidationComplete c3 = new ValidationComplete(DESC);
