Add range tombstones to read repair digests patch by Oleg Anastasyev; reviewed by jbellis for CASSANDRA-6863
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/a2e74354 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/a2e74354 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/a2e74354 Branch: refs/heads/trunk Commit: a2e74354ca51809a11b62dd7995c026807683b0a Parents: be2686d Author: Jonathan Ellis <[email protected]> Authored: Tue Apr 22 07:27:20 2014 -0500 Committer: Jonathan Ellis <[email protected]> Committed: Tue Apr 22 07:27:20 2014 -0500 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../org/apache/cassandra/db/ColumnFamily.java | 5 ++ .../org/apache/cassandra/db/DeletionInfo.java | 34 +++++++++- .../apache/cassandra/db/RangeTombstoneList.java | 68 ++++++++++++++++++-- .../apache/cassandra/net/MessagingService.java | 32 ++++++++- test/unit/org/apache/cassandra/Util.java | 7 ++ .../apache/cassandra/db/ColumnFamilyTest.java | 46 ++++++++++++- test/unit/org/apache/cassandra/db/RowTest.java | 40 ++++++++++-- 8 files changed, 222 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/a2e74354/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index ae7410e..495dab2 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 2.1.0-beta2 + * Add range tombstones to read repair digests (CASSANDRA-6863) * Fix BTree.clear for large updates (CASSANDRA-6943) * Fail write instead of logging a warning when unable to append to CL (CASSANDRA-6764) http://git-wip-us.apache.org/repos/asf/cassandra/blob/a2e74354/src/java/org/apache/cassandra/db/ColumnFamily.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/ColumnFamily.java b/src/java/org/apache/cassandra/db/ColumnFamily.java index da404b0..4f85610 100644 --- a/src/java/org/apache/cassandra/db/ColumnFamily.java +++ b/src/java/org/apache/cassandra/db/ColumnFamily.java @@ -313,8 +313,11 @@ public abstract class ColumnFamily implements Iterable<Cell>, IRowCacheEntry } } + cfDiff.setDeletionInfo(deletionInfo().diff(cfComposite.deletionInfo())); + if (!cfDiff.isEmpty()) return cfDiff; + return null; } @@ -385,6 +388,8 @@ public abstract class ColumnFamily implements Iterable<Cell>, IRowCacheEntry { for (Cell cell : this) cell.updateDigest(digest); + if (MessagingService.instance().areAllNodesAtLeast21()) + deletionInfo().updateDigest(digest); } public static ColumnFamily diff(ColumnFamily cf1, ColumnFamily cf2) http://git-wip-us.apache.org/repos/asf/cassandra/blob/a2e74354/src/java/org/apache/cassandra/db/DeletionInfo.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/DeletionInfo.java b/src/java/org/apache/cassandra/db/DeletionInfo.java index 8601bce..a167b85 100644 --- a/src/java/org/apache/cassandra/db/DeletionInfo.java +++ b/src/java/org/apache/cassandra/db/DeletionInfo.java @@ -19,7 +19,9 @@ package org.apache.cassandra.db; import java.io.DataInput; import java.io.IOException; -import java.util.*; +import java.security.MessageDigest; +import java.util.Comparator; +import java.util.Iterator; import com.google.common.base.Objects; import com.google.common.collect.Iterators; @@ -29,6 +31,7 @@ import org.apache.cassandra.db.composites.CType; import org.apache.cassandra.db.composites.Composite; import org.apache.cassandra.io.IVersionedSerializer; import org.apache.cassandra.io.util.DataOutputPlus; +import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.ObjectSizes; /** @@ -168,6 +171,35 @@ public class DeletionInfo implements IMeasurableMemory } /** + * Evaluates difference between this deletion info and superset for read repair + * + * @return the difference between the two, or LIVE if no difference + */ + public DeletionInfo diff(DeletionInfo superset) + { + RangeTombstoneList rangeDiff = superset.ranges == null || superset.ranges.isEmpty() + ? null + : ranges == null ? superset.ranges : ranges.diff(superset.ranges); + + return topLevel.markedForDeleteAt != superset.topLevel.markedForDeleteAt || rangeDiff != null + ? new DeletionInfo(superset.topLevel, rangeDiff) + : DeletionInfo.live(); + } + + + /** + * Digests deletion info. Used to trigger read repair on mismatch. + */ + public void updateDigest(MessageDigest digest) + { + if (topLevel.markedForDeleteAt != Long.MIN_VALUE) + digest.update(ByteBufferUtil.bytes(topLevel.markedForDeleteAt)); + + if (ranges != null) + ranges.updateDigest(digest); + } + + /** * Returns true if {@code purge} would remove the top-level tombstone or any of the range * tombstones, false otherwise. * @param gcBefore timestamp (in seconds) before which tombstones should be purged http://git-wip-us.apache.org/repos/asf/cassandra/blob/a2e74354/src/java/org/apache/cassandra/db/RangeTombstoneList.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/RangeTombstoneList.java b/src/java/org/apache/cassandra/db/RangeTombstoneList.java index dd0b9a6..b06c520 100644 --- a/src/java/org/apache/cassandra/db/RangeTombstoneList.java +++ b/src/java/org/apache/cassandra/db/RangeTombstoneList.java @@ -19,12 +19,16 @@ package org.apache.cassandra.db; import java.io.DataInput; import java.io.IOException; +import java.nio.ByteBuffer; +import java.security.MessageDigest; import java.util.Arrays; import java.util.Comparator; import java.util.Iterator; import com.google.common.collect.AbstractIterator; import com.google.common.collect.Iterators; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.apache.cassandra.cache.IMeasurableMemory; import org.apache.cassandra.db.composites.CType; @@ -32,10 +36,7 @@ import org.apache.cassandra.db.composites.Composite; import org.apache.cassandra.io.IVersionedSerializer; import org.apache.cassandra.io.util.DataOutputPlus; import org.apache.cassandra.net.MessagingService; - import org.apache.cassandra.utils.ObjectSizes; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** * Data structure holding the range tombstones of a ColumnFamily. @@ -384,6 +385,64 @@ public class RangeTombstoneList implements Iterable<RangeTombstone>, IMeasurable } }; } + + /** + * Evaluates a diff between superset (known to be all merged tombstones) and this list for read repair + * + * @return null if there is no difference + */ + public RangeTombstoneList diff(RangeTombstoneList superset) + { + if (isEmpty()) + return superset; + + assert size <= superset.size; + + RangeTombstoneList diff = null; + + int j = 0; // index to iterate through our own list + for (int i = 0; i < superset.size; i++) + { + boolean sameStart = j < size && starts[j].equals(superset.starts[i]); + // don't care about local deletion time here. for RR it doesn't makes sense + if (!sameStart + || !ends[j].equals(superset.ends[i]) + || markedAts[j] != superset.markedAts[i]) + { + if (diff == null) + diff = new RangeTombstoneList(comparator, Math.min(8, superset.size - i)); + diff.add(superset.starts[i], superset.ends[i], superset.markedAts[i], superset.delTimes[i]); + + if (sameStart) + j++; + } + else + { + j++; + } + } + + return diff; + } + + /** + * Calculates digest for triggering read repair on mismatch + */ + public void updateDigest(MessageDigest digest) + { + ByteBuffer longBuffer = ByteBuffer.allocate(8); + for (int i = 0; i < size; i++) + { + for (int j = 0; j < starts[i].size(); j++) + digest.update(starts[i].get(j).duplicate()); + for (int j = 0; j < ends[i].size(); j++) + digest.update(ends[i].get(j).duplicate()); + + longBuffer.putLong(0, markedAts[i]); + digest.update(longBuffer.array(), 0, 8); + } + } + @Override public boolean equals(Object o) @@ -393,7 +452,7 @@ public class RangeTombstoneList implements Iterable<RangeTombstone>, IMeasurable RangeTombstoneList that = (RangeTombstoneList)o; if (size != that.size) return false; - + for (int i = 0; i < size; i++) { if (!starts[i].equals(that.starts[i])) @@ -779,4 +838,5 @@ public class RangeTombstoneList implements Iterable<RangeTombstone>, IMeasurable return false; } } + } http://git-wip-us.apache.org/repos/asf/cassandra/blob/a2e74354/src/java/org/apache/cassandra/net/MessagingService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/net/MessagingService.java b/src/java/org/apache/cassandra/net/MessagingService.java index 6d9a1b5..4ef57d3 100644 --- a/src/java/org/apache/cassandra/net/MessagingService.java +++ b/src/java/org/apache/cassandra/net/MessagingService.java @@ -84,6 +84,8 @@ public final class MessagingService implements MessagingServiceMBean */ public static final int PROTOCOL_MAGIC = 0xCA552DFA; + private boolean allNodesAtLeast21; + /* All verb handler identifiers */ public enum Verb { @@ -760,20 +762,47 @@ public final class MessagingService implements MessagingServiceMBean return packed >>> (start + 1) - count & ~(-1 << count); } + public boolean areAllNodesAtLeast21() + { + return allNodesAtLeast21; + } + /** * @return the last version associated with address, or @param version if this is the first such version */ public int setVersion(InetAddress endpoint, int version) { logger.debug("Setting version {} for {}", version, endpoint); + if (version < VERSION_21) + allNodesAtLeast21 = false; Integer v = versions.put(endpoint, version); + + // if the version was increased to 2.0 or later, see if all nodes are >= 2.0 now + if (v != null && v < VERSION_21 && version >= VERSION_21) + refreshAllNodesAtLeast21(); + return v == null ? version : v; } public void resetVersion(InetAddress endpoint) { logger.debug("Reseting version for {}", endpoint); - versions.remove(endpoint); + Integer removed = versions.remove(endpoint); + if (removed != null && removed <= VERSION_21) + refreshAllNodesAtLeast21(); + } + + private void refreshAllNodesAtLeast21() + { + for (Integer version: versions.values()) + { + if (version < VERSION_21) + { + allNodesAtLeast21 = false; + return; + } + } + allNodesAtLeast21 = true; } public int getVersion(InetAddress endpoint) @@ -807,6 +836,7 @@ public final class MessagingService implements MessagingServiceMBean return versions.containsKey(endpoint); } + public void incrementDroppedMessages(Verb verb) { assert DROPPABLE_VERBS.contains(verb) : "Verb " + verb + " should not legally be dropped"; http://git-wip-us.apache.org/repos/asf/cassandra/blob/a2e74354/test/unit/org/apache/cassandra/Util.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/Util.java b/test/unit/org/apache/cassandra/Util.java index b74f2c9..fe80009 100644 --- a/test/unit/org/apache/cassandra/Util.java +++ b/test/unit/org/apache/cassandra/Util.java @@ -368,4 +368,11 @@ public class Util throw new RuntimeException(e); } } + + public static RangeTombstone tombstone(String start, String finish, long timestamp, int localtime) + { + Composite startName = CellNames.simpleDense(ByteBufferUtil.bytes(start)); + Composite endName = CellNames.simpleDense(ByteBufferUtil.bytes(finish)); + return new RangeTombstone(startName, endName, timestamp , localtime); + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/a2e74354/test/unit/org/apache/cassandra/db/ColumnFamilyTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/ColumnFamilyTest.java b/test/unit/org/apache/cassandra/db/ColumnFamilyTest.java index b791b03..7f8da96 100644 --- a/test/unit/org/apache/cassandra/db/ColumnFamilyTest.java +++ b/test/unit/org/apache/cassandra/db/ColumnFamilyTest.java @@ -22,12 +22,13 @@ import java.io.ByteArrayInputStream; import java.io.DataInputStream; import java.io.IOException; import java.nio.ByteBuffer; -import java.util.*; +import java.util.TreeMap; import com.google.common.collect.Iterables; import org.junit.Test; import org.apache.cassandra.SchemaLoader; +import org.apache.cassandra.Util; import org.apache.cassandra.io.sstable.ColumnStats; import org.apache.cassandra.io.util.DataOutputBuffer; import org.apache.cassandra.net.MessagingService; @@ -35,6 +36,7 @@ import org.apache.cassandra.utils.ByteBufferUtil; import static org.apache.cassandra.Util.column; import static org.apache.cassandra.Util.cellname; +import static org.apache.cassandra.Util.tombstone; import static org.junit.Assert.assertEquals; public class ColumnFamilyTest extends SchemaLoader @@ -105,6 +107,48 @@ public class ColumnFamilyTest extends SchemaLoader } @Test + public void testDigest() + { + ColumnFamily cf = ArrayBackedSortedColumns.factory.create("Keyspace1", "Standard1"); + ColumnFamily cf2 = ArrayBackedSortedColumns.factory.create("Keyspace1", "Standard1"); + + ByteBuffer digest = ColumnFamily.digest(cf); + + cf.addColumn(column("col1", "", 1)); + cf2.addColumn(column("col1", "", 1)); + + assert !digest.equals(ColumnFamily.digest(cf)); + + digest = ColumnFamily.digest(cf); + assert digest.equals(ColumnFamily.digest(cf2)); + + cf.addColumn(column("col2", "", 2)); + assert !digest.equals(ColumnFamily.digest(cf)); + + digest = ColumnFamily.digest(cf); + cf.addColumn(column("col1", "", 3)); + assert !digest.equals(ColumnFamily.digest(cf)); + + digest = ColumnFamily.digest(cf); + cf.delete(new DeletionTime(4, 4)); + assert !digest.equals(ColumnFamily.digest(cf)); + + digest = ColumnFamily.digest(cf); + cf.delete(tombstone("col1", "col11", 5, 5)); + assert !digest.equals(ColumnFamily.digest(cf)); + + digest = ColumnFamily.digest(cf); + assert digest.equals(ColumnFamily.digest(cf)); + + cf.delete(tombstone("col2", "col21", 5, 5)); + assert !digest.equals(ColumnFamily.digest(cf)); + + digest = ColumnFamily.digest(cf); + cf.delete(tombstone("col1", "col11", 5, 5)); // this does not change RangeTombstoneLList + assert digest.equals(ColumnFamily.digest(cf)); + } + + @Test public void testTimestamp() { ColumnFamily cf = ArrayBackedSortedColumns.factory.create("Keyspace1", "Standard1"); http://git-wip-us.apache.org/repos/asf/cassandra/blob/a2e74354/test/unit/org/apache/cassandra/db/RowTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/RowTest.java b/test/unit/org/apache/cassandra/db/RowTest.java index f024c44..3de7fdc 100644 --- a/test/unit/org/apache/cassandra/db/RowTest.java +++ b/test/unit/org/apache/cassandra/db/RowTest.java @@ -20,19 +20,19 @@ package org.apache.cassandra.db; import java.util.Arrays; import java.util.concurrent.TimeUnit; -import org.junit.Test; import com.google.common.util.concurrent.Uninterruptibles; +import org.junit.Test; import org.apache.cassandra.SchemaLoader; -import org.apache.cassandra.db.composites.*; +import org.apache.cassandra.db.composites.CellNames; import org.apache.cassandra.utils.ByteBufferUtil; +import static org.apache.cassandra.Util.column; +import static org.apache.cassandra.Util.tombstone; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; -import static org.apache.cassandra.Util.column; - public class RowTest extends SchemaLoader { @Test @@ -48,6 +48,38 @@ public class RowTest extends SchemaLoader ColumnFamily cfDiff = cf1.diff(cf2); assertFalse(cfDiff.hasColumns()); assertEquals(cfDiff.deletionInfo(), delInfo); + + RangeTombstone tombstone1 = tombstone("1", "11", (long) 123, 123); + RangeTombstone tombstone1_2 = tombstone("111", "112", (long) 1230, 123); + RangeTombstone tombstone2_1 = tombstone("2", "22", (long) 123, 123); + RangeTombstone tombstone2_2 = tombstone("2", "24", (long) 123, 123); + RangeTombstone tombstone3_1 = tombstone("3", "31", (long) 123, 123); + RangeTombstone tombstone3_2 = tombstone("3", "31", (long) 1230, 123); + RangeTombstone tombstone4_1 = tombstone("4", "41", (long) 123, 123); + RangeTombstone tombstone4_2 = tombstone("4", "41", (long) 123, 1230); + RangeTombstone tombstone5_2 = tombstone("5", "51", (long) 123, 1230); + cf1.delete(tombstone1); + cf1.delete(tombstone2_1); + cf1.delete(tombstone3_1); + cf1.delete(tombstone4_1); + + cf2.delete(tombstone1); + cf2.delete(tombstone1_2); + cf2.delete(tombstone2_2); + cf2.delete(tombstone3_2); + cf2.delete(tombstone4_2); + cf2.delete(tombstone5_2); + + cfDiff = cf1.diff(cf2); + assertEquals(0, cfDiff.getColumnCount()); + + // only tmbstones which differ in superset or have more recent timestamp to be in diff + delInfo.add(tombstone1_2, cf1.getComparator()); + delInfo.add(tombstone2_2, cf1.getComparator()); + delInfo.add(tombstone3_2, cf1.getComparator()); + delInfo.add(tombstone5_2, cf1.getComparator()); + + assertEquals(delInfo, cfDiff.deletionInfo()); } @Test
