Fix merkle tree depth calculation Patch by Paulo Motta; Reviewed by Yuki Morishita for CASSANDRA-12580
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/c70ce630 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/c70ce630 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/c70ce630 Branch: refs/heads/trunk Commit: c70ce6307da824529762ff40673642b6f86972aa Parents: 2383935 Author: Paulo Motta <pauloricard...@gmail.com> Authored: Tue Aug 30 21:06:39 2016 -0300 Committer: Yuki Morishita <yu...@apache.org> Committed: Thu Sep 29 14:26:53 2016 -0500 ---------------------------------------------------------------------- CHANGES.txt | 2 +- .../db/compaction/CompactionManager.java | 4 +- .../org/apache/cassandra/repair/Validator.java | 9 +- .../org/apache/cassandra/utils/MerkleTree.java | 10 ++ .../db/compaction/CompactionsTest.java | 2 +- .../apache/cassandra/repair/ValidatorTest.java | 159 ++++++++++++------- 6 files changed, 125 insertions(+), 61 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/c70ce630/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 998849e..97bc70a 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 2.2.9 + * Fix merkle tree depth calculation (CASSANDRA-12580) * Make Collections deserialization more robust (CASSANDRA-12618) @@ -36,7 +37,6 @@ * Don't write shadowed range tombstone (CASSANDRA-12030) Merged from 2.1: * Add system property to set the max number of native transport requests in queue (CASSANDRA-11363) - * Fix queries with empty ByteBuffer values in clustering column restrictions (CASSANDRA-12127) * Disable passing control to post-flush after flush failure to prevent data loss (CASSANDRA-11828) * Allow STCS-in-L0 compactions to reduce scope with LCS (CASSANDRA-12040) * cannot use cql since upgrading python to 2.7.11+ (CASSANDRA-11850) http://git-wip-us.apache.org/repos/asf/cassandra/blob/c70ce630/src/java/org/apache/cassandra/db/compaction/CompactionManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java index cf82498..78fa23c 100644 --- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java @@ -1112,8 +1112,8 @@ public class CompactionManager implements CompactionManagerMBean { numPartitions += sstable.estimatedKeysForRanges(singleton(validator.desc.range)); } - // determine tree depth from number of partitions, but cap at 20 to prevent large tree. - int depth = numPartitions > 0 ? (int) Math.min(Math.floor(Math.log(numPartitions)), 20) : 0; + // determine tree depth from number of partitions, but cap at 20 to prevent large tree (CASSANDRA-5263) + int depth = numPartitions > 0 ? (int) Math.min(Math.ceil(Math.log(numPartitions) / Math.log(2)), 20) : 0; MerkleTree tree = new MerkleTree(cfs.partitioner, validator.desc.range, MerkleTree.RECOMMENDED_DEPTH, (int) Math.pow(2, depth)); long start = System.nanoTime(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/c70ce630/src/java/org/apache/cassandra/repair/Validator.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/Validator.java b/src/java/org/apache/cassandra/repair/Validator.java index 4db1cfb..8dbb4cf 100644 --- a/src/java/org/apache/cassandra/repair/Validator.java +++ b/src/java/org/apache/cassandra/repair/Validator.java @@ -54,6 +54,7 @@ public class Validator implements Runnable public final RepairJobDesc desc; public final InetAddress initiator; public final int gcBefore; + private final boolean evenTreeDistribution; // null when all rows with the min token have been consumed private long validated; @@ -67,19 +68,25 @@ public class Validator implements Runnable public Validator(RepairJobDesc desc, InetAddress initiator, int gcBefore) { + this(desc, initiator, gcBefore, false); + } + + public Validator(RepairJobDesc desc, InetAddress initiator, int gcBefore, boolean evenTreeDistribution) + { this.desc = desc; this.initiator = initiator; this.gcBefore = gcBefore; validated = 0; range = null; ranges = null; + this.evenTreeDistribution = evenTreeDistribution; } public void prepare(ColumnFamilyStore cfs, MerkleTree tree) { this.tree = tree; - if (!tree.partitioner().preservesOrder()) + if (!tree.partitioner().preservesOrder() || evenTreeDistribution) { // You can't beat an even tree distribution for md5 tree.init(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/c70ce630/src/java/org/apache/cassandra/utils/MerkleTree.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/utils/MerkleTree.java b/src/java/org/apache/cassandra/utils/MerkleTree.java index 4fec62d..1e0f505 100644 --- a/src/java/org/apache/cassandra/utils/MerkleTree.java +++ b/src/java/org/apache/cassandra/utils/MerkleTree.java @@ -516,6 +516,16 @@ public class MerkleTree implements Serializable return histbuild.buildWithStdevRangesAroundMean(); } + public long rowCount() + { + long count = 0; + for (TreeRange range : new TreeRangeIterator(this)) + { + count += range.hashable.rowsInRange; + } + return count; + } + @Override public String toString() { http://git-wip-us.apache.org/repos/asf/cassandra/blob/c70ce630/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java b/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java index 8ff3022..471f8cf 100644 --- a/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java +++ b/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java @@ -125,7 +125,7 @@ public class CompactionsTest return store; } - private long populate(String ks, String cf, int startRowKey, int endRowKey, int ttl) + public static long populate(String ks, String cf, int startRowKey, int endRowKey, int ttl) { long timestamp = System.currentTimeMillis(); for (int i = startRowKey; i <= endRowKey; i++) http://git-wip-us.apache.org/repos/asf/cassandra/blob/c70ce630/test/unit/org/apache/cassandra/repair/ValidatorTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/repair/ValidatorTest.java b/test/unit/org/apache/cassandra/repair/ValidatorTest.java index a9f18f5..61ab3da 100644 --- a/test/unit/org/apache/cassandra/repair/ValidatorTest.java +++ b/test/unit/org/apache/cassandra/repair/ValidatorTest.java @@ -20,8 +20,16 @@ package org.apache.cassandra.repair; import java.io.IOException; import java.net.InetAddress; import java.security.MessageDigest; +import java.util.Collections; import java.util.UUID; +import java.util.concurrent.TimeUnit; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.SettableFuture; + +import org.apache.cassandra.db.compaction.CompactionManager; +import org.apache.cassandra.db.compaction.CompactionsTest; +import org.apache.cassandra.io.sstable.format.SSTableReader; import org.apache.cassandra.io.util.SequentialWriter; import org.junit.After; import org.junit.BeforeClass; @@ -46,15 +54,20 @@ import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.net.IMessageSink; import org.apache.cassandra.repair.messages.RepairMessage; import org.apache.cassandra.repair.messages.ValidationComplete; +import org.apache.cassandra.service.ActiveRepairService; import org.apache.cassandra.service.StorageService; import org.apache.cassandra.utils.ByteBufferUtil; +import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.MerkleTree; +import org.apache.cassandra.utils.UUIDGen; import org.apache.cassandra.utils.concurrent.SimpleCondition; import static org.junit.Assert.*; public class ValidatorTest { + private static final long TEST_TIMEOUT = 60; //seconds + private static final String keyspace = "ValidatorTest"; private static final String columnFamily = "Standard1"; private final IPartitioner partitioner = StorageService.getPartitioner(); @@ -81,35 +94,7 @@ public class ValidatorTest Range<Token> range = new Range<>(partitioner.getMinimumToken(), partitioner.getRandomToken()); final RepairJobDesc desc = new RepairJobDesc(UUID.randomUUID(), UUID.randomUUID(), keyspace, columnFamily, range); - final SimpleCondition lock = new SimpleCondition(); - MessagingService.instance().addMessageSink(new IMessageSink() - { - @SuppressWarnings("unchecked") - public boolean allowOutgoingMessage(MessageOut message, int id, InetAddress to) - { - try - { - if (message.verb == MessagingService.Verb.REPAIR_MESSAGE) - { - RepairMessage m = (RepairMessage) message.payload; - assertEquals(RepairMessage.Type.VALIDATION_COMPLETE, m.messageType); - assertEquals(desc, m.desc); - assertTrue(((ValidationComplete) m).success); - assertNotNull(((ValidationComplete) m).tree); - } - } - finally - { - lock.signalAll(); - } - return false; - } - - public boolean allowIncomingMessage(MessageIn message, int id) - { - return false; - } - }); + final ListenableFuture<MessageOut> outgoingMessageSink = registerOutgoingMessageSink(); InetAddress remote = InetAddress.getByName("127.0.0.2"); @@ -131,8 +116,13 @@ public class ValidatorTest Token min = tree.partitioner().getMinimumToken(); assertNotNull(tree.hash(new Range<>(min, min))); - if (!lock.isSignaled()) - lock.await(); + MessageOut message = outgoingMessageSink.get(TEST_TIMEOUT, TimeUnit.SECONDS); + assertEquals(MessagingService.Verb.REPAIR_MESSAGE, message.verb); + RepairMessage m = (RepairMessage) message.payload; + assertEquals(RepairMessage.Type.VALIDATION_COMPLETE, m.messageType); + assertEquals(desc, m.desc); + assertTrue(((ValidationComplete) m).success); + assertNotNull(((ValidationComplete) m).tree); } private static class CompactedRowStub extends AbstractCompactedRow @@ -163,27 +153,91 @@ public class ValidatorTest Range<Token> range = new Range<>(partitioner.getMinimumToken(), partitioner.getRandomToken()); final RepairJobDesc desc = new RepairJobDesc(UUID.randomUUID(), UUID.randomUUID(), keyspace, columnFamily, range); - final SimpleCondition lock = new SimpleCondition(); + final ListenableFuture<MessageOut> outgoingMessageSink = registerOutgoingMessageSink(); + + InetAddress remote = InetAddress.getByName("127.0.0.2"); + + Validator validator = new Validator(desc, remote, 0); + validator.fail(); + + MessageOut message = outgoingMessageSink.get(TEST_TIMEOUT, TimeUnit.SECONDS); + assertEquals(MessagingService.Verb.REPAIR_MESSAGE, message.verb); + RepairMessage m = (RepairMessage) message.payload; + assertEquals(RepairMessage.Type.VALIDATION_COMPLETE, m.messageType); + assertEquals(desc, m.desc); + assertFalse(((ValidationComplete) m).success); + assertNull(((ValidationComplete) m).tree); + } + + @Test + public void simpleValidationTest128() throws Exception + { + simpleValidationTest(128); + } + + @Test + public void simpleValidationTest1500() throws Exception + { + simpleValidationTest(1500); + } + + /** + * Test for CASSANDRA-5263 + * 1. Create N rows + * 2. Run validation compaction + * 3. Expect merkle tree with size 2^(log2(n)) + */ + public void simpleValidationTest(int n) throws Exception + { + Keyspace ks = Keyspace.open(keyspace); + ColumnFamilyStore cfs = ks.getColumnFamilyStore(columnFamily); + cfs.clearUnsafe(); + + // disable compaction while flushing + cfs.disableAutoCompaction(); + + CompactionsTest.populate(keyspace, columnFamily, 0, n, 0); //ttl=3s + + cfs.forceBlockingFlush(); + assertEquals(1, cfs.getSSTables().size()); + + // wait enough to force single compaction + TimeUnit.SECONDS.sleep(5); + + SSTableReader sstable = cfs.getSSTables().iterator().next(); + UUID repairSessionId = UUIDGen.getTimeUUID(); + final RepairJobDesc desc = new RepairJobDesc(repairSessionId, UUIDGen.getTimeUUID(), cfs.keyspace.getName(), + cfs.getColumnFamilyName(), new Range<Token>(sstable.first.getToken(), + sstable.last.getToken())); + + ActiveRepairService.instance.registerParentRepairSession(repairSessionId, FBUtilities.getBroadcastAddress(), + Collections.singletonList(cfs), Collections.singleton(desc.range), + false, false); + + final ListenableFuture<MessageOut> outgoingMessageSink = registerOutgoingMessageSink(); + Validator validator = new Validator(desc, FBUtilities.getBroadcastAddress(), 0, true); + CompactionManager.instance.submitValidation(cfs, validator); + + MessageOut message = outgoingMessageSink.get(TEST_TIMEOUT, TimeUnit.SECONDS); + assertEquals(MessagingService.Verb.REPAIR_MESSAGE, message.verb); + RepairMessage m = (RepairMessage) message.payload; + assertEquals(RepairMessage.Type.VALIDATION_COMPLETE, m.messageType); + assertEquals(desc, m.desc); + assertTrue(((ValidationComplete) m).success); + MerkleTree tree = ((ValidationComplete) m).tree; + + assertEquals(Math.pow(2, Math.ceil(Math.log(n) / Math.log(2))), tree.size(), 0.0); + assertEquals(tree.rowCount(), n); + } + + private ListenableFuture<MessageOut> registerOutgoingMessageSink() + { + final SettableFuture<MessageOut> future = SettableFuture.create(); MessagingService.instance().addMessageSink(new IMessageSink() { - @SuppressWarnings("unchecked") public boolean allowOutgoingMessage(MessageOut message, int id, InetAddress to) { - try - { - if (message.verb == MessagingService.Verb.REPAIR_MESSAGE) - { - RepairMessage m = (RepairMessage) message.payload; - assertEquals(RepairMessage.Type.VALIDATION_COMPLETE, m.messageType); - assertEquals(desc, m.desc); - assertFalse(((ValidationComplete) m).success); - assertNull(((ValidationComplete) m).tree); - } - } - finally - { - lock.signalAll(); - } + future.set(message); return false; } @@ -192,13 +246,6 @@ public class ValidatorTest return false; } }); - - InetAddress remote = InetAddress.getByName("127.0.0.2"); - - Validator validator = new Validator(desc, remote, 0); - validator.fail(); - - if (!lock.isSignaled()) - lock.await(); + return future; } }