This is an automated email from the ASF dual-hosted git repository.

dcapwell pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/trunk by this push:
     new a0debd8  In JVM test for repairs on token boundaries
a0debd8 is described below

commit a0debd8f9e8636b69d5ed1752f8e9b1b3c664954
Author: Chris Lohfink <[email protected]>
AuthorDate: Tue Feb 4 09:59:25 2020 -0800

    In JVM test for repairs on token boundaries
    
    patch by Chris Lohfink; reviewed by David Capwell for CASSANDRA-15542
---
 .../apache/cassandra/dht/Murmur3Partitioner.java   |  13 ++
 .../org/apache/cassandra/utils/ByteBufferUtil.java |   2 +
 .../org/apache/cassandra/utils/MurmurHash.java     | 113 ++++++++++++-
 .../distributed/test/RepairBoundaryTest.java       | 182 +++++++++++++++++++++
 .../cassandra/dht/Murmur3PartitionerTest.java      |  15 ++
 5 files changed, 322 insertions(+), 3 deletions(-)

diff --git a/src/java/org/apache/cassandra/dht/Murmur3Partitioner.java 
b/src/java/org/apache/cassandra/dht/Murmur3Partitioner.java
index 52d0efb..2856f13 100644
--- a/src/java/org/apache/cassandra/dht/Murmur3Partitioner.java
+++ b/src/java/org/apache/cassandra/dht/Murmur3Partitioner.java
@@ -36,6 +36,7 @@ import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.MurmurHash;
 import org.apache.cassandra.utils.ObjectSizes;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.primitives.Longs;
 
 /**
@@ -207,6 +208,18 @@ public class Murmur3Partitioner implements IPartitioner
         {
             return new LongToken(token + 1);
         }
+
+        /**
+         * Reverses murmur3 to find a possible 16 byte key that generates a 
given token
+         */
+        @VisibleForTesting
+        public static ByteBuffer keyForToken(LongToken token)
+        {
+            ByteBuffer result = ByteBuffer.allocate(16);
+            long[] inv = MurmurHash.inv_hash3_x64_128(new long[] {token.token, 
0L});
+            result.putLong(inv[0]).putLong(inv[1]).position(0);
+            return result;
+        }
     }
 
     /**
diff --git a/src/java/org/apache/cassandra/utils/ByteBufferUtil.java 
b/src/java/org/apache/cassandra/utils/ByteBufferUtil.java
index ff3fb3d..5300d9d 100644
--- a/src/java/org/apache/cassandra/utils/ByteBufferUtil.java
+++ b/src/java/org/apache/cassandra/utils/ByteBufferUtil.java
@@ -535,6 +535,8 @@ public class ByteBufferUtil
             return ByteBufferUtil.bytes((InetAddress) obj);
         else if (obj instanceof String)
             return ByteBufferUtil.bytes((String) obj);
+        else if (obj instanceof ByteBuffer)
+            return (ByteBuffer) obj;
         else
             throw new IllegalArgumentException(String.format("Cannot convert 
value %s of type %s",
                                                              obj,
diff --git a/src/java/org/apache/cassandra/utils/MurmurHash.java 
b/src/java/org/apache/cassandra/utils/MurmurHash.java
index c02fdcc..80cf5cd 100644
--- a/src/java/org/apache/cassandra/utils/MurmurHash.java
+++ b/src/java/org/apache/cassandra/utils/MurmurHash.java
@@ -18,6 +18,9 @@
 package org.apache.cassandra.utils;
 
 import java.nio.ByteBuffer;
+import java.util.BitSet;
+
+import com.google.common.primitives.Longs;
 
 /**
  * This is a very fast, non-cryptographic hash suitable for general hash-based
@@ -146,7 +149,7 @@ public class MurmurHash
         return h64;
     }
 
-    protected static long getblock(ByteBuffer key, int offset, int index)
+    protected static long getBlock(ByteBuffer key, int offset, int index)
     {
         int i_8 = index << 3;
         int blockOffset = offset + i_8;
@@ -187,8 +190,8 @@ public class MurmurHash
 
         for(int i = 0; i < nblocks; i++)
         {
-            long k1 = getblock(key, offset, i*2+0);
-            long k2 = getblock(key, offset, i*2+1);
+            long k1 = getBlock(key, offset, i * 2 + 0);
+            long k2 = getBlock(key, offset, i * 2 + 1);
 
             k1 *= c1; k1 = rotl64(k1,31); k1 *= c2; h1 ^= k1;
 
@@ -248,4 +251,108 @@ public class MurmurHash
         result[1] = h2;
     }
 
+    protected static long invRotl64(long v, int n)
+    {
+        return ((v >>> n) | (v << (64 - n)));
+    }
+
+    protected static long invRShiftXor(long value, int shift)
+    {
+        long output = 0;
+        long i = 0;
+        while (i * shift < 64)
+        {
+            long c = (0xffffffffffffffffL << (64 - shift)) >>> (shift * i);
+            long partOutput = value & c;
+            value ^= partOutput >>> shift;
+            output |= partOutput;
+            i += 1;
+        }
+        return output;
+    }
+
+    protected static long invFmix(long k)
+    {
+        k = invRShiftXor(k, 33);
+        k *= 0x9cb4b2f8129337dbL;
+        k = invRShiftXor(k, 33);
+        k *= 0x4f74430c22a54005L;
+        k = invRShiftXor(k, 33);
+        return k;
+    }
+
+    /**
+     * This gives a correct reversal of the tail byte flip which is needed if 
want a non mod16==0 byte hash inv or to
+     * target a hash for a given schema.
+     */
+    public static long invTailReverse(long num)
+    {
+        byte[] v = Longs.toByteArray(Long.reverseBytes(num));
+        for (int i = 0; i < 8; i++)
+        {
+            if (v[i] < 0 && i < 7)
+            {
+                BitSet bits = BitSet.valueOf(v);
+                bits.flip(8 * (i + 1), 64);
+                v = bits.toByteArray();
+            }
+        }
+        return Longs.fromByteArray(v);
+    }
+
+    public static long[] inv_hash3_x64_128(long[] result)
+    {
+        long c1 = 0xa98409e882ce4d7dL;
+        long c2 = 0xa81e14edd9de2c7fL;
+
+        long k1 = 0;
+        long k2 = 0;
+        long h1 = result[0];
+        long h2 = result[1];
+
+        //----------
+        // reverse finalization
+        h2 -= h1;
+        h1 -= h2;
+
+        h1 = invFmix(h1);
+        h2 = invFmix(h2);
+
+        h2 -= h1;
+        h1 -= h2;
+
+        h1 ^= 16;
+        h2 ^= 16;
+
+        //----------
+        // reverse body
+        h2 -= 0x38495ab5;
+        h2 *= 0xcccccccccccccccdL;
+        h2 -= h1;
+        h2 = invRotl64(h2, 31);
+        k2 = h2;
+        h2 = 0;
+
+        k2 *= c1;
+        k2 = invRotl64(k2, 33);
+        k2 *= c2;
+
+        h1 -= 0x52dce729;
+        h1 *= 0xcccccccccccccccdL;
+        //h1 -= h2;
+        h1 = invRotl64(h1, 27);
+
+        k1 = h1;
+
+        k1 *= c2;
+        k1 = invRotl64(k1, 31);
+        k1 *= c1;
+
+        // note that while this works for body block reversing the tail 
reverse requires `invTailReverse`
+        k1 = Long.reverseBytes(k1);
+        k2 = Long.reverseBytes(k2);
+
+        return new long[] {k1, k2};
+    }
+
 }
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/RepairBoundaryTest.java
 
b/test/distributed/org/apache/cassandra/distributed/test/RepairBoundaryTest.java
new file mode 100644
index 0000000..d269bef
--- /dev/null
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/RepairBoundaryTest.java
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Map;
+
+import com.google.common.collect.ImmutableList;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.ConsistencyLevel;
+import org.apache.cassandra.distributed.api.IInvokableInstance;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.concurrent.SimpleCondition;
+import org.apache.cassandra.utils.progress.ProgressEventType;
+
+import static java.util.concurrent.TimeUnit.MINUTES;
+import static org.apache.cassandra.dht.Murmur3Partitioner.*;
+import static 
org.apache.cassandra.dht.Murmur3Partitioner.LongToken.keyForToken;
+import static org.apache.cassandra.distributed.api.Feature.GOSSIP;
+import static org.apache.cassandra.distributed.api.Feature.NETWORK;
+import static org.apache.cassandra.distributed.shared.AssertUtils.assertRows;
+
+public class RepairBoundaryTest extends TestBaseImpl
+{
+    private static Cluster cluster;
+
+    private static final String INSERT = withKeyspace("INSERT INTO %s.test (k, 
c1, c2) VALUES" +
+                                                      "(?, 'C1', ?);");
+
+    private static final String DELETE = withKeyspace("DELETE FROM %s.test 
WHERE k = ?;");
+    private static final String ALL = withKeyspace("SELECT c2 FROM %s.test;");
+
+    private final Map<Integer, ByteBuffer> keys = new HashMap<>();
+
+    private Object[][] c2Row(int... keys)
+    {
+        Object[][] ret = new Object[keys.length][];
+        for (int i = 0; i < keys.length; i++)
+        {
+            ret[i] = new Object[]{ String.valueOf(keys[i]) };
+        }
+        return ret;
+    }
+
+    void delete(IInvokableInstance instance, int... toDelete)
+    {
+        for (int key : toDelete)
+        {
+            instance.executeInternal(DELETE, keys.get(key));
+        }
+    }
+
+    void verify()
+    {
+        assertRows(c2Row(999, 1000, 2001, 2999, 3000, 3001), 
cluster.get(1).executeInternal(ALL));
+        assertRows(c2Row(999, 1000, 1001, 1999, 2000, 3001), 
cluster.get(2).executeInternal(ALL));
+        assertRows(c2Row(1001, 1999, 2000, 2001, 2999, 3000), 
cluster.get(3).executeInternal(ALL));
+    }
+
+    /**
+     * Insert on every token boundary and + or - to first replica.
+     */
+    void populate()
+    {
+        try
+        {
+            cluster.schemaChange(withKeyspace("DROP TABLE IF EXISTS 
%s.test;"));
+            cluster.schemaChange(withKeyspace("CREATE TABLE %s.test (k blob, 
c1 text, c2 text," +
+                                              "PRIMARY KEY (k))"));
+
+            for (int i = 1000; i <= 3000; i += 1000)
+            {
+                keys.put(i, keyForToken(new LongToken(i)));
+                keys.put(i - 1, keyForToken(new LongToken(i - 1)));
+                keys.put(i + 1, keyForToken(new LongToken(i + 1)));
+
+                cluster.coordinator(1).execute(INSERT, ConsistencyLevel.ALL, 
keys.get(i), String.valueOf(i));
+                cluster.coordinator(1).execute(INSERT, ConsistencyLevel.ALL, 
keys.get(i - 1), String.valueOf(i - 1));
+                cluster.coordinator(1).execute(INSERT, ConsistencyLevel.ALL, 
keys.get(i + 1), String.valueOf(i + 1));
+            }
+        }
+        catch (Throwable t)
+        {
+            cluster.close();
+            throw t;
+        }
+    }
+
+    @Test
+    public void primaryRangeRepair()
+    {
+        populate();
+        verify();
+
+        delete(cluster.get(1), 999, 1000, 3001);
+        delete(cluster.get(2), 1999, 2000, 1001);
+        delete(cluster.get(3), 2999, 3000, 2001);
+
+        cluster.forEach(i -> {
+            i.nodetoolResult("repair", "-pr", "--full", 
KEYSPACE).asserts().success();
+        });
+
+        assertRows(c2Row(), cluster.get(1).executeInternal(ALL));
+        assertRows(c2Row(), cluster.get(2).executeInternal(ALL));
+        assertRows(c2Row(), cluster.get(3).executeInternal(ALL));
+    }
+
+    @Test
+    public void singleTokenRangeRepair()
+    {
+        populate();
+        verify();
+
+        delete(cluster.get(1), 999, 1000);
+        delete(cluster.get(3), 1001);
+
+        cluster.get(2).runOnInstance(() -> {
+            try
+            {
+                Map<String, String> options = new HashMap<>();
+                options.put("ranges", "999:1000");
+                options.put("incremental", "false");
+                SimpleCondition await = new SimpleCondition();
+                StorageService.instance.repair(KEYSPACE, options, 
ImmutableList.of((tag, event) -> {
+                    if (event.getType() == ProgressEventType.COMPLETE)
+                        await.signalAll();
+                })).right.get();
+                await.await(1L, MINUTES);
+            }
+            catch (Exception e)
+            {
+            }
+        });
+
+        assertRows(c2Row(999, 1001, 1999, 2000, 3001), 
cluster.get(2).executeInternal(ALL));
+    }
+
+
+    @BeforeClass
+    public static void init() throws IOException
+    {
+        cluster = Cluster.build(3)
+                         .withConfig(config -> 
config.set("hinted_handoff_enabled", false)
+                                                     
.set("commitlog_sync_batch_window_in_ms", 5)
+                                                     .set("num_tokens", 1)
+                                                     .set("initial_token", 
Long.toString(config.num() * 1000))
+                                                     .with(NETWORK)
+                                                     .with(GOSSIP))
+                         .start();
+        cluster.schemaChange(withKeyspace("CREATE KEYSPACE %s WITH replication 
= " +
+                                          "{'class': 'SimpleStrategy', 
'replication_factor': 2};"));
+    }
+
+    @AfterClass
+    public static void closeCluster()
+    {
+        if (cluster != null)
+            cluster.close();
+    }
+}
diff --git a/test/unit/org/apache/cassandra/dht/Murmur3PartitionerTest.java 
b/test/unit/org/apache/cassandra/dht/Murmur3PartitionerTest.java
index 83f3dda..9a02fb0 100644
--- a/test/unit/org/apache/cassandra/dht/Murmur3PartitionerTest.java
+++ b/test/unit/org/apache/cassandra/dht/Murmur3PartitionerTest.java
@@ -17,8 +17,13 @@
  */
 package org.apache.cassandra.dht;
 
+import java.nio.ByteBuffer;
+
 import org.junit.Test;
 
+import static org.quicktheories.QuickTheory.qt;
+import static org.quicktheories.generators.SourceDSL.longs;
+
 public class Murmur3PartitionerTest extends PartitionerTestCase
 {
     public void initPartitioner()
@@ -62,5 +67,15 @@ public class Murmur3PartitionerTest extends 
PartitionerTestCase
         Murmur3Partitioner.LongToken left = new 
Murmur3Partitioner.LongToken(Long.MAX_VALUE - 100);
         assertSplit(left, tok("a"), 16);
     }
+
+    @Test
+    public void testLongTokenInverse()
+    {
+        qt().forAll(longs().between(Long.MIN_VALUE + 1, Long.MAX_VALUE))
+            .check(token -> {
+                ByteBuffer key = Murmur3Partitioner.LongToken.keyForToken(new 
Murmur3Partitioner.LongToken(token));
+                return Murmur3Partitioner.instance.getToken(key).token == 
token;
+            });
+    }
 }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to