Repository: cassandra Updated Branches: refs/heads/trunk d0e2b9ca9 -> 68b81372c
Use murmur3 for validation compactions Patch by marcuse; reviewed by Michael Kjellman for CASSANDRA-14002 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/68b81372 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/68b81372 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/68b81372 Branch: refs/heads/trunk Commit: 68b81372cd838808b304d58677cdc86f6ec35ffa Parents: d0e2b9c Author: Marcus Eriksson <marc...@apache.org> Authored: Mon Nov 6 16:40:58 2017 +0100 Committer: Marcus Eriksson <marc...@apache.org> Committed: Mon Mar 5 08:20:38 2018 +0100 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../org/apache/cassandra/repair/Validator.java | 92 ++++++++++++++++---- .../apache/cassandra/repair/ValidatorTest.java | 43 +++++++++ 3 files changed, 120 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/68b81372/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index f52aceb..9d0b483 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 4.0 + * Use Murmur3 for validation compactions (CASSANDRA-14002) * Comma at the end of the seed list is interpretated as localhost (CASSANDRA-14285) * Refactor read executor and response resolver, abstract read repair (CASSANDRA-14058) * Add optional startup delay to wait until peers are ready (CASSANDRA-13993) http://git-wip-us.apache.org/repos/asf/cassandra/blob/68b81372/src/java/org/apache/cassandra/repair/Validator.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/Validator.java b/src/java/org/apache/cassandra/repair/Validator.java index 4c2856d..28c910b 100644 --- a/src/java/org/apache/cassandra/repair/Validator.java +++ b/src/java/org/apache/cassandra/repair/Validator.java @@ -26,6 +26,7 @@ import java.util.Random; import com.google.common.annotations.VisibleForTesting; import com.google.common.hash.Funnel; import com.google.common.hash.HashCode; +import com.google.common.hash.HashFunction; import com.google.common.hash.Hasher; import com.google.common.hash.Hashing; @@ -192,80 +193,127 @@ public class Validator implements Runnable return range.contains(t); } + /** + * Hasher that concatenates the hash code from 2 hash functions (murmur3_128) with different + * seeds and counts the number of bytes we hashed. + * + * Everything hashed by this class is hashed by both hash functions and the + * resulting hashcode is a concatenation of the output bytes from each. + * + * Idea from Guavas Hashing.ConcatenatedHashFunction, but that is package-private so we can't use it + */ + @VisibleForTesting static class CountingHasher implements Hasher { + @VisibleForTesting + static final HashFunction[] hashFunctions = new HashFunction[2]; + + static + { + for (int i = 0; i < hashFunctions.length; i++) + hashFunctions[i] = Hashing.murmur3_128(i * 1000); + } private long count; - private final Hasher underlying; + private final int bits; + private final Hasher[] underlying = new Hasher[2]; - CountingHasher(Hasher underlying) + CountingHasher() { - this.underlying = underlying; + int bits = 0; + for (int i = 0; i < underlying.length; i++) + { + this.underlying[i] = hashFunctions[i].newHasher(); + bits += hashFunctions[i].bits(); + } + this.bits = bits; } public Hasher putByte(byte b) { count += 1; - return underlying.putByte(b); + for (Hasher h : underlying) + h.putByte(b); + return this; } public Hasher putBytes(byte[] bytes) { count += bytes.length; - return underlying.putBytes(bytes); + for (Hasher h : underlying) + h.putBytes(bytes); + return this; } public Hasher putBytes(byte[] bytes, int offset, int length) { count += length; - return underlying.putBytes(bytes, offset, length); + for (Hasher h : underlying) + h.putBytes(bytes, offset, length); + return this; } public Hasher putBytes(ByteBuffer byteBuffer) { count += byteBuffer.remaining(); - return underlying.putBytes(byteBuffer); + for (Hasher h : underlying) + h.putBytes(byteBuffer.duplicate()); + return this; } public Hasher putShort(short i) { count += Short.BYTES; - return underlying.putShort(i); + for (Hasher h : underlying) + h.putShort(i); + return this; } public Hasher putInt(int i) { count += Integer.BYTES; - return underlying.putInt(i); + for (Hasher h : underlying) + h.putInt(i); + return this; } public Hasher putLong(long l) { count += Long.BYTES; - return underlying.putLong(l); + for (Hasher h : underlying) + h.putLong(l); + return this; } public Hasher putFloat(float v) { count += Float.BYTES; - return underlying.putFloat(v); + for (Hasher h : underlying) + h.putFloat(v); + return this; } public Hasher putDouble(double v) { count += Double.BYTES; - return underlying.putDouble(v); + for (Hasher h : underlying) + h.putDouble(v); + return this; } public Hasher putBoolean(boolean b) { count += Byte.BYTES; - return underlying.putBoolean(b); + for (Hasher h : underlying) + h.putBoolean(b); + return this; } public Hasher putChar(char c) { count += Character.BYTES; - return underlying.putChar(c); + for (Hasher h : underlying) + h.putChar(c); + return this; } public Hasher putUnencodedChars(CharSequence charSequence) @@ -285,7 +333,19 @@ public class Validator implements Runnable public HashCode hash() { - return underlying.hash(); + byte[] res = new byte[bits / 8]; + int i = 0; + for (Hasher hasher : underlying) + { + HashCode newHash = hasher.hash(); + i += newHash.writeBytesTo(res, i, newHash.bits() / 8); + } + return HashCode.fromBytes(res); + } + + public long getCount() + { + return count; } } @@ -293,7 +353,7 @@ public class Validator implements Runnable { validated++; // MerkleTree uses XOR internally, so we want lots of output bits here - CountingHasher hasher = new CountingHasher(Hashing.sha256().newHasher()); + CountingHasher hasher = new CountingHasher(); UnfilteredRowIterators.digest(partition, hasher, MessagingService.current_version); // only return new hash for merkle tree in case digest was updated - see CASSANDRA-8979 return hasher.count > 0 http://git-wip-us.apache.org/repos/asf/cassandra/blob/68b81372/test/unit/org/apache/cassandra/repair/ValidatorTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/repair/ValidatorTest.java b/test/unit/org/apache/cassandra/repair/ValidatorTest.java index 322772a..7c3dd27 100644 --- a/test/unit/org/apache/cassandra/repair/ValidatorTest.java +++ b/test/unit/org/apache/cassandra/repair/ValidatorTest.java @@ -17,6 +17,8 @@ */ package org.apache.cassandra.repair; +import java.net.InetAddress; +import java.nio.ByteBuffer; import java.util.Arrays; import java.util.Collections; import java.util.Iterator; @@ -25,6 +27,8 @@ import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; +import com.google.common.hash.Hasher; + import org.apache.cassandra.db.compaction.CompactionManager; import org.apache.cassandra.db.compaction.CompactionsTest; import org.apache.cassandra.io.sstable.format.SSTableReader; @@ -213,6 +217,45 @@ public class ValidatorTest assertEquals(trees.rowCount(), n); } + @Test + public void testCountingHasher() + { + Hasher [] hashers = new Hasher[] {new Validator.CountingHasher(), Validator.CountingHasher.hashFunctions[0].newHasher(), Validator.CountingHasher.hashFunctions[1].newHasher() }; + byte [] random = UUIDGen.getTimeUUIDBytes(); + + // call all overloaded methods: + for (Hasher hasher : hashers) + { + hasher.putByte((byte) 33) + .putBytes(random) + .putBytes(ByteBuffer.wrap(random)) + .putBytes(random, 0, 3) + .putChar('a') + .putBoolean(false) + .putDouble(3.3) + .putInt(77) + .putFloat(99) + .putLong(101) + .putShort((short) 23); + } + + long len = Byte.BYTES + + random.length * 2 // both the byte[] and the ByteBuffer + + 3 // 3 bytes from the random byte[] + + Character.BYTES + + Byte.BYTES + + Double.BYTES + + Integer.BYTES + + Float.BYTES + + Long.BYTES + + Short.BYTES; + + byte [] h = hashers[0].hash().asBytes(); + assertTrue(Arrays.equals(hashers[1].hash().asBytes(), Arrays.copyOfRange(h, 0, 16))); + assertTrue(Arrays.equals(hashers[2].hash().asBytes(), Arrays.copyOfRange(h, 16, 32))); + assertEquals(len, ((Validator.CountingHasher)hashers[0]).getCount()); + } + private CompletableFuture<MessageOut> registerOutgoingMessageSink() { final CompletableFuture<MessageOut> future = new CompletableFuture<>(); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org