This is an automated email from the ASF dual-hosted git repository.
dcapwell pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push:
new a0debd8 In JVM test for repairs on token boundaries
a0debd8 is described below
commit a0debd8f9e8636b69d5ed1752f8e9b1b3c664954
Author: Chris Lohfink <[email protected]>
AuthorDate: Tue Feb 4 09:59:25 2020 -0800
In JVM test for repairs on token boundaries
patch by Chris Lohfink; reviewed by David Capwell for CASSANDRA-15542
---
.../apache/cassandra/dht/Murmur3Partitioner.java | 13 ++
.../org/apache/cassandra/utils/ByteBufferUtil.java | 2 +
.../org/apache/cassandra/utils/MurmurHash.java | 113 ++++++++++++-
.../distributed/test/RepairBoundaryTest.java | 182 +++++++++++++++++++++
.../cassandra/dht/Murmur3PartitionerTest.java | 15 ++
5 files changed, 322 insertions(+), 3 deletions(-)
diff --git a/src/java/org/apache/cassandra/dht/Murmur3Partitioner.java
b/src/java/org/apache/cassandra/dht/Murmur3Partitioner.java
index 52d0efb..2856f13 100644
--- a/src/java/org/apache/cassandra/dht/Murmur3Partitioner.java
+++ b/src/java/org/apache/cassandra/dht/Murmur3Partitioner.java
@@ -36,6 +36,7 @@ import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.MurmurHash;
import org.apache.cassandra.utils.ObjectSizes;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.primitives.Longs;
/**
@@ -207,6 +208,18 @@ public class Murmur3Partitioner implements IPartitioner
{
return new LongToken(token + 1);
}
+
+ /**
+ * Reverses murmur3 to find a possible 16 byte key that generates a
given token
+ */
+ @VisibleForTesting
+ public static ByteBuffer keyForToken(LongToken token)
+ {
+ ByteBuffer result = ByteBuffer.allocate(16);
+ long[] inv = MurmurHash.inv_hash3_x64_128(new long[] {token.token,
0L});
+ result.putLong(inv[0]).putLong(inv[1]).position(0);
+ return result;
+ }
}
/**
diff --git a/src/java/org/apache/cassandra/utils/ByteBufferUtil.java
b/src/java/org/apache/cassandra/utils/ByteBufferUtil.java
index ff3fb3d..5300d9d 100644
--- a/src/java/org/apache/cassandra/utils/ByteBufferUtil.java
+++ b/src/java/org/apache/cassandra/utils/ByteBufferUtil.java
@@ -535,6 +535,8 @@ public class ByteBufferUtil
return ByteBufferUtil.bytes((InetAddress) obj);
else if (obj instanceof String)
return ByteBufferUtil.bytes((String) obj);
+ else if (obj instanceof ByteBuffer)
+ return (ByteBuffer) obj;
else
throw new IllegalArgumentException(String.format("Cannot convert
value %s of type %s",
obj,
diff --git a/src/java/org/apache/cassandra/utils/MurmurHash.java
b/src/java/org/apache/cassandra/utils/MurmurHash.java
index c02fdcc..80cf5cd 100644
--- a/src/java/org/apache/cassandra/utils/MurmurHash.java
+++ b/src/java/org/apache/cassandra/utils/MurmurHash.java
@@ -18,6 +18,9 @@
package org.apache.cassandra.utils;
import java.nio.ByteBuffer;
+import java.util.BitSet;
+
+import com.google.common.primitives.Longs;
/**
* This is a very fast, non-cryptographic hash suitable for general hash-based
@@ -146,7 +149,7 @@ public class MurmurHash
return h64;
}
- protected static long getblock(ByteBuffer key, int offset, int index)
+ protected static long getBlock(ByteBuffer key, int offset, int index)
{
int i_8 = index << 3;
int blockOffset = offset + i_8;
@@ -187,8 +190,8 @@ public class MurmurHash
for(int i = 0; i < nblocks; i++)
{
- long k1 = getblock(key, offset, i*2+0);
- long k2 = getblock(key, offset, i*2+1);
+ long k1 = getBlock(key, offset, i * 2 + 0);
+ long k2 = getBlock(key, offset, i * 2 + 1);
k1 *= c1; k1 = rotl64(k1,31); k1 *= c2; h1 ^= k1;
@@ -248,4 +251,108 @@ public class MurmurHash
result[1] = h2;
}
+ protected static long invRotl64(long v, int n)
+ {
+ return ((v >>> n) | (v << (64 - n)));
+ }
+
+ protected static long invRShiftXor(long value, int shift)
+ {
+ long output = 0;
+ long i = 0;
+ while (i * shift < 64)
+ {
+ long c = (0xffffffffffffffffL << (64 - shift)) >>> (shift * i);
+ long partOutput = value & c;
+ value ^= partOutput >>> shift;
+ output |= partOutput;
+ i += 1;
+ }
+ return output;
+ }
+
+ protected static long invFmix(long k)
+ {
+ k = invRShiftXor(k, 33);
+ k *= 0x9cb4b2f8129337dbL;
+ k = invRShiftXor(k, 33);
+ k *= 0x4f74430c22a54005L;
+ k = invRShiftXor(k, 33);
+ return k;
+ }
+
+ /**
+ * This gives a correct reversal of the tail byte flip which is needed if
want a non mod16==0 byte hash inv or to
+ * target a hash for a given schema.
+ */
+ public static long invTailReverse(long num)
+ {
+ byte[] v = Longs.toByteArray(Long.reverseBytes(num));
+ for (int i = 0; i < 8; i++)
+ {
+ if (v[i] < 0 && i < 7)
+ {
+ BitSet bits = BitSet.valueOf(v);
+ bits.flip(8 * (i + 1), 64);
+ v = bits.toByteArray();
+ }
+ }
+ return Longs.fromByteArray(v);
+ }
+
+ public static long[] inv_hash3_x64_128(long[] result)
+ {
+ long c1 = 0xa98409e882ce4d7dL;
+ long c2 = 0xa81e14edd9de2c7fL;
+
+ long k1 = 0;
+ long k2 = 0;
+ long h1 = result[0];
+ long h2 = result[1];
+
+ //----------
+ // reverse finalization
+ h2 -= h1;
+ h1 -= h2;
+
+ h1 = invFmix(h1);
+ h2 = invFmix(h2);
+
+ h2 -= h1;
+ h1 -= h2;
+
+ h1 ^= 16;
+ h2 ^= 16;
+
+ //----------
+ // reverse body
+ h2 -= 0x38495ab5;
+ h2 *= 0xcccccccccccccccdL;
+ h2 -= h1;
+ h2 = invRotl64(h2, 31);
+ k2 = h2;
+ h2 = 0;
+
+ k2 *= c1;
+ k2 = invRotl64(k2, 33);
+ k2 *= c2;
+
+ h1 -= 0x52dce729;
+ h1 *= 0xcccccccccccccccdL;
+ //h1 -= h2;
+ h1 = invRotl64(h1, 27);
+
+ k1 = h1;
+
+ k1 *= c2;
+ k1 = invRotl64(k1, 31);
+ k1 *= c1;
+
+ // note that while this works for body block reversing the tail
reverse requires `invTailReverse`
+ k1 = Long.reverseBytes(k1);
+ k2 = Long.reverseBytes(k2);
+
+ return new long[] {k1, k2};
+ }
+
}
diff --git
a/test/distributed/org/apache/cassandra/distributed/test/RepairBoundaryTest.java
b/test/distributed/org/apache/cassandra/distributed/test/RepairBoundaryTest.java
new file mode 100644
index 0000000..d269bef
--- /dev/null
+++
b/test/distributed/org/apache/cassandra/distributed/test/RepairBoundaryTest.java
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Map;
+
+import com.google.common.collect.ImmutableList;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.ConsistencyLevel;
+import org.apache.cassandra.distributed.api.IInvokableInstance;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.concurrent.SimpleCondition;
+import org.apache.cassandra.utils.progress.ProgressEventType;
+
+import static java.util.concurrent.TimeUnit.MINUTES;
+import static org.apache.cassandra.dht.Murmur3Partitioner.*;
+import static
org.apache.cassandra.dht.Murmur3Partitioner.LongToken.keyForToken;
+import static org.apache.cassandra.distributed.api.Feature.GOSSIP;
+import static org.apache.cassandra.distributed.api.Feature.NETWORK;
+import static org.apache.cassandra.distributed.shared.AssertUtils.assertRows;
+
+public class RepairBoundaryTest extends TestBaseImpl
+{
+ private static Cluster cluster;
+
+ private static final String INSERT = withKeyspace("INSERT INTO %s.test (k,
c1, c2) VALUES" +
+ "(?, 'C1', ?);");
+
+ private static final String DELETE = withKeyspace("DELETE FROM %s.test
WHERE k = ?;");
+ private static final String ALL = withKeyspace("SELECT c2 FROM %s.test;");
+
+ private final Map<Integer, ByteBuffer> keys = new HashMap<>();
+
+ private Object[][] c2Row(int... keys)
+ {
+ Object[][] ret = new Object[keys.length][];
+ for (int i = 0; i < keys.length; i++)
+ {
+ ret[i] = new Object[]{ String.valueOf(keys[i]) };
+ }
+ return ret;
+ }
+
+ void delete(IInvokableInstance instance, int... toDelete)
+ {
+ for (int key : toDelete)
+ {
+ instance.executeInternal(DELETE, keys.get(key));
+ }
+ }
+
+ void verify()
+ {
+ assertRows(c2Row(999, 1000, 2001, 2999, 3000, 3001),
cluster.get(1).executeInternal(ALL));
+ assertRows(c2Row(999, 1000, 1001, 1999, 2000, 3001),
cluster.get(2).executeInternal(ALL));
+ assertRows(c2Row(1001, 1999, 2000, 2001, 2999, 3000),
cluster.get(3).executeInternal(ALL));
+ }
+
+ /**
+ * Insert on every token boundary and + or - to first replica.
+ */
+ void populate()
+ {
+ try
+ {
+ cluster.schemaChange(withKeyspace("DROP TABLE IF EXISTS
%s.test;"));
+ cluster.schemaChange(withKeyspace("CREATE TABLE %s.test (k blob,
c1 text, c2 text," +
+ "PRIMARY KEY (k))"));
+
+ for (int i = 1000; i <= 3000; i += 1000)
+ {
+ keys.put(i, keyForToken(new LongToken(i)));
+ keys.put(i - 1, keyForToken(new LongToken(i - 1)));
+ keys.put(i + 1, keyForToken(new LongToken(i + 1)));
+
+ cluster.coordinator(1).execute(INSERT, ConsistencyLevel.ALL,
keys.get(i), String.valueOf(i));
+ cluster.coordinator(1).execute(INSERT, ConsistencyLevel.ALL,
keys.get(i - 1), String.valueOf(i - 1));
+ cluster.coordinator(1).execute(INSERT, ConsistencyLevel.ALL,
keys.get(i + 1), String.valueOf(i + 1));
+ }
+ }
+ catch (Throwable t)
+ {
+ cluster.close();
+ throw t;
+ }
+ }
+
+ @Test
+ public void primaryRangeRepair()
+ {
+ populate();
+ verify();
+
+ delete(cluster.get(1), 999, 1000, 3001);
+ delete(cluster.get(2), 1999, 2000, 1001);
+ delete(cluster.get(3), 2999, 3000, 2001);
+
+ cluster.forEach(i -> {
+ i.nodetoolResult("repair", "-pr", "--full",
KEYSPACE).asserts().success();
+ });
+
+ assertRows(c2Row(), cluster.get(1).executeInternal(ALL));
+ assertRows(c2Row(), cluster.get(2).executeInternal(ALL));
+ assertRows(c2Row(), cluster.get(3).executeInternal(ALL));
+ }
+
+ @Test
+ public void singleTokenRangeRepair()
+ {
+ populate();
+ verify();
+
+ delete(cluster.get(1), 999, 1000);
+ delete(cluster.get(3), 1001);
+
+ cluster.get(2).runOnInstance(() -> {
+ try
+ {
+ Map<String, String> options = new HashMap<>();
+ options.put("ranges", "999:1000");
+ options.put("incremental", "false");
+ SimpleCondition await = new SimpleCondition();
+ StorageService.instance.repair(KEYSPACE, options,
ImmutableList.of((tag, event) -> {
+ if (event.getType() == ProgressEventType.COMPLETE)
+ await.signalAll();
+ })).right.get();
+ await.await(1L, MINUTES);
+ }
+ catch (Exception e)
+ {
+ }
+ });
+
+ assertRows(c2Row(999, 1001, 1999, 2000, 3001),
cluster.get(2).executeInternal(ALL));
+ }
+
+
+ @BeforeClass
+ public static void init() throws IOException
+ {
+ cluster = Cluster.build(3)
+ .withConfig(config ->
config.set("hinted_handoff_enabled", false)
+
.set("commitlog_sync_batch_window_in_ms", 5)
+ .set("num_tokens", 1)
+ .set("initial_token",
Long.toString(config.num() * 1000))
+ .with(NETWORK)
+ .with(GOSSIP))
+ .start();
+ cluster.schemaChange(withKeyspace("CREATE KEYSPACE %s WITH replication
= " +
+ "{'class': 'SimpleStrategy',
'replication_factor': 2};"));
+ }
+
+ @AfterClass
+ public static void closeCluster()
+ {
+ if (cluster != null)
+ cluster.close();
+ }
+}
diff --git a/test/unit/org/apache/cassandra/dht/Murmur3PartitionerTest.java
b/test/unit/org/apache/cassandra/dht/Murmur3PartitionerTest.java
index 83f3dda..9a02fb0 100644
--- a/test/unit/org/apache/cassandra/dht/Murmur3PartitionerTest.java
+++ b/test/unit/org/apache/cassandra/dht/Murmur3PartitionerTest.java
@@ -17,8 +17,13 @@
*/
package org.apache.cassandra.dht;
+import java.nio.ByteBuffer;
+
import org.junit.Test;
+import static org.quicktheories.QuickTheory.qt;
+import static org.quicktheories.generators.SourceDSL.longs;
+
public class Murmur3PartitionerTest extends PartitionerTestCase
{
public void initPartitioner()
@@ -62,5 +67,15 @@ public class Murmur3PartitionerTest extends
PartitionerTestCase
Murmur3Partitioner.LongToken left = new
Murmur3Partitioner.LongToken(Long.MAX_VALUE - 100);
assertSplit(left, tok("a"), 16);
}
+
+ @Test
+ public void testLongTokenInverse()
+ {
+ qt().forAll(longs().between(Long.MIN_VALUE + 1, Long.MAX_VALUE))
+ .check(token -> {
+ ByteBuffer key = Murmur3Partitioner.LongToken.keyForToken(new
Murmur3Partitioner.LongToken(token));
+ return Murmur3Partitioner.instance.getToken(key).token ==
token;
+ });
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]