This is an automated email from the ASF dual-hosted git repository.

ifesdjeen pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 6f13c86  NPE in Slice#make on RT + partition deletion reconciliation 
on timestamp tie
6f13c86 is described below

commit 6f13c864a02b32daa7696eca27431f5385a306df
Author: Alex Petrov <[email protected]>
AuthorDate: Wed Feb 17 15:28:47 2021 +0100

    NPE in Slice#make on RT + partition deletion reconciliation on timestamp tie
    
    Patch by Alex Petrov; reviewed by Aleksey Yeschenko and Marcus Eriksson for 
CASSANDRA-16453.
---
 src/java/org/apache/cassandra/db/Slice.java        |  2 +
 .../reads/repair/RowIteratorMergeListener.java     | 21 +++++--
 .../cassandra/distributed/test/ReadRepairTest.java | 67 +++++++++++++++++++++-
 3 files changed, 84 insertions(+), 6 deletions(-)

diff --git a/src/java/org/apache/cassandra/db/Slice.java 
b/src/java/org/apache/cassandra/db/Slice.java
index c6f558f..1aaf430 100644
--- a/src/java/org/apache/cassandra/db/Slice.java
+++ b/src/java/org/apache/cassandra/db/Slice.java
@@ -71,6 +71,8 @@ public class Slice
 
     public static Slice make(ClusteringBound<?> start, ClusteringBound<?> end)
     {
+        assert start != null && end != null;
+
         if (start.isBottom() && end.isTop())
             return ALL;
 
diff --git 
a/src/java/org/apache/cassandra/service/reads/repair/RowIteratorMergeListener.java
 
b/src/java/org/apache/cassandra/service/reads/repair/RowIteratorMergeListener.java
index 2341c18..38d077a 100644
--- 
a/src/java/org/apache/cassandra/service/reads/repair/RowIteratorMergeListener.java
+++ 
b/src/java/org/apache/cassandra/service/reads/repair/RowIteratorMergeListener.java
@@ -255,16 +255,16 @@ public class RowIteratorMergeListener<E extends 
Endpoints<E>>
                     continue;
 
                 // We have a close and/or open marker for a source, with 
nothing corresponding in merged.
-                // Because merged is a superset, this imply that we have a 
current deletion (being it due to an
+                // Because merged is a superset, this implies that we have a 
current deletion (either due to an
                 // early opening in merged or a partition level deletion) and 
that this deletion will still be
                 // active after that point. Further whatever deletion was open 
or is open by this marker on the
                 // source, that deletion cannot supersedes the current one.
                 //
                 // But while the marker deletion (before and/or after this 
point) cannot supersede the current
                 // deletion, we want to know if it's equal to it (both before 
and after), because in that case
-                // the source is up to date and we don't want to include 
repair.
+                // the source is up to date and we don't want to include it 
into repair.
                 //
-                // So in practice we have 2 possible case:
+                // So in practice we have 2 possible cases:
                 //  1) the source was up-to-date on deletion up to that point: 
then it won't be from that point
                 //     on unless it's a boundary and the new opened deletion 
time is also equal to the current
                 //     deletion (note that this implies the boundary has the 
same closing and opening deletion
@@ -280,6 +280,7 @@ public class RowIteratorMergeListener<E extends 
Endpoints<E>>
                 // current deletion, this means the current deletion is due to 
a previously open range tombstone,
                 // and if the source isn't currently repaired for that RT, 
then it means it's up to date on it).
                 DeletionTime partitionRepairDeletion = 
partitionLevelRepairDeletion(i);
+
                 if (markerToRepair[i] == null && 
currentDeletion.supersedes(partitionRepairDeletion))
                 {
                     /*
@@ -310,9 +311,19 @@ public class RowIteratorMergeListener<E extends 
Endpoints<E>>
                 // In case 2) above, we only have something to do if the 
source is up-to-date after that point
                 // (which, since the source isn't up-to-date before that 
point, means we're opening a new deletion
                 // that is equal to the current one).
-                else if (marker.isOpen(isReversed) && 
currentDeletion.equals(marker.openDeletionTime(isReversed)))
+                else
                 {
-                    closeOpenMarker(i, marker.openBound(isReversed).invert());
+                    if (markerToRepair[i] == null)
+                    {
+                        // Only way we can have no open RT repair is that 
partition deletion that has the same timestamp
+                        // as the deletion and same local deletion time. In 
such case, since partition deletion covers
+                        // an entire partition, we do not include it into 
repair.
+                        assert currentDeletion.localDeletionTime() == 
partitionRepairDeletion.localDeletionTime();
+                    }
+                    else if (marker.isOpen(isReversed) && 
currentDeletion.equals(marker.openDeletionTime(isReversed)))
+                    {
+                        closeOpenMarker(i, 
marker.openBound(isReversed).invert());
+                    }
                 }
             }
             else
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/ReadRepairTest.java 
b/test/distributed/org/apache/cassandra/distributed/test/ReadRepairTest.java
index 1ec192c..c68320e 100644
--- a/test/distributed/org/apache/cassandra/distributed/test/ReadRepairTest.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/ReadRepairTest.java
@@ -20,6 +20,8 @@ package org.apache.cassandra.distributed.test;
 
 import java.net.InetSocketAddress;
 import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
@@ -27,10 +29,17 @@ import java.util.concurrent.Future;
 import org.junit.Assert;
 import org.junit.Test;
 
+import net.bytebuddy.ByteBuddy;
+import net.bytebuddy.dynamic.loading.ClassLoadingStrategy;
+import net.bytebuddy.implementation.MethodDelegation;
+import net.bytebuddy.implementation.bind.annotation.SuperCall;
 import org.apache.cassandra.config.Config;
 import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.DecoratedKey;
 import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.Mutation;
 import org.apache.cassandra.db.marshal.Int32Type;
+import org.apache.cassandra.db.partitions.PartitionUpdate;
 import org.apache.cassandra.dht.Murmur3Partitioner;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.distributed.Cluster;
@@ -41,12 +50,16 @@ import org.apache.cassandra.distributed.api.IInstanceConfig;
 import org.apache.cassandra.distributed.api.TokenSupplier;
 import org.apache.cassandra.distributed.shared.NetworkTopology;
 import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.locator.Replica;
+import org.apache.cassandra.locator.ReplicaPlan;
 import org.apache.cassandra.net.Verb;
 import org.apache.cassandra.service.PendingRangeCalculatorService;
 import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.service.reads.repair.BlockingReadRepair;
 import org.apache.cassandra.service.reads.repair.ReadRepairStrategy;
 import org.apache.cassandra.utils.concurrent.SimpleCondition;
 
+import static net.bytebuddy.matcher.ElementMatchers.named;
 import static org.apache.cassandra.distributed.api.ConsistencyLevel.ALL;
 import static org.apache.cassandra.distributed.api.ConsistencyLevel.QUORUM;
 import static org.apache.cassandra.distributed.shared.AssertUtils.assertEquals;
@@ -440,4 +453,56 @@ public class ReadRepairTest extends TestBaseImpl
             assertEquals("No read repair requests were expected, found " + 
requests, 0, requests);
         }
     }
-}
+
+    @Test
+    public void partitionDeletionRTTimestampTieTest() throws Throwable
+    {
+        try (Cluster cluster = init(builder()
+                                    .withNodes(3)
+                                    .withInstanceInitializer(RRHelper::install)
+                                    .start()))
+        {
+            cluster.schemaChange(withKeyspace("CREATE TABLE 
distributed_test_keyspace.tbl0 (pk bigint,ck bigint,value bigint, PRIMARY KEY 
(pk, ck)) WITH  CLUSTERING ORDER BY (ck ASC) AND read_repair='blocking';"));
+            long pk = 0L;
+            cluster.coordinator(1).execute("INSERT INTO 
distributed_test_keyspace.tbl0 (pk, ck, value) VALUES (?,?,?) USING TIMESTAMP 
1", ConsistencyLevel.ALL, pk, 1L, 1L);
+            cluster.coordinator(1).execute("DELETE FROM 
distributed_test_keyspace.tbl0 USING TIMESTAMP 2 WHERE pk=? AND ck>?;", 
ConsistencyLevel.ALL, pk, 2L);
+            cluster.get(3).executeInternal("DELETE FROM 
distributed_test_keyspace.tbl0 USING TIMESTAMP 2 WHERE pk=?;", pk);
+            assertRows(cluster.coordinator(1).execute("SELECT * FROM 
distributed_test_keyspace.tbl0 WHERE pk=? AND ck>=? AND ck<?;",
+                                                      ConsistencyLevel.ALL, 
pk, 1L, 3L));
+        }
+    }
+
+    public static class RRHelper
+    {
+        static void install(ClassLoader cl, int nodeNumber)
+        {
+            // Only on coordinating node
+            if (nodeNumber == 1)
+            {
+                new ByteBuddy().rebase(BlockingReadRepair.class)
+                               .method(named("repairPartition"))
+                               .intercept(MethodDelegation.to(RRHelper.class))
+                               .make()
+                               .load(cl, 
ClassLoadingStrategy.Default.INJECTION);
+            }
+        }
+
+        // This verifies new behaviour in 4.0 that was introduced in 
CASSANDRA-15369, but did not work
+        // on timestamp tie of RT and partition deletion: we should not 
generate RT bounds in such case,
+        // since monotonicity is already ensured by the partition deletion, 
and RT is unnecessary there.
+        // For details, see CASSANDRA-16453.
+        public static Object repairPartition(DecoratedKey partitionKey, 
Map<Replica, Mutation> mutations, ReplicaPlan.ForTokenWrite writePlan, 
@SuperCall Callable<Void> r) throws Exception
+        {
+            Assert.assertEquals(2, mutations.size());
+            for (Mutation value : mutations.values())
+            {
+                for (PartitionUpdate update : value.getPartitionUpdates())
+                {
+                    Assert.assertFalse(update.hasRows());
+                    
Assert.assertFalse(update.partitionLevelDeletion().isLive());
+                }
+            }
+            return r.call();
+        }
+    }
+}
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to