This is an automated email from the ASF dual-hosted git repository.
ifesdjeen pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push:
new 6f13c86 NPE in Slice#make on RT + partition deletion reconciliation
on timestamp tie
6f13c86 is described below
commit 6f13c864a02b32daa7696eca27431f5385a306df
Author: Alex Petrov <[email protected]>
AuthorDate: Wed Feb 17 15:28:47 2021 +0100
NPE in Slice#make on RT + partition deletion reconciliation on timestamp tie
Patch by Alex Petrov; reviewed by Aleksey Yeschenko and Marcus Eriksson for
CASSANDRA-16453.
---
src/java/org/apache/cassandra/db/Slice.java | 2 +
.../reads/repair/RowIteratorMergeListener.java | 21 +++++--
.../cassandra/distributed/test/ReadRepairTest.java | 67 +++++++++++++++++++++-
3 files changed, 84 insertions(+), 6 deletions(-)
diff --git a/src/java/org/apache/cassandra/db/Slice.java
b/src/java/org/apache/cassandra/db/Slice.java
index c6f558f..1aaf430 100644
--- a/src/java/org/apache/cassandra/db/Slice.java
+++ b/src/java/org/apache/cassandra/db/Slice.java
@@ -71,6 +71,8 @@ public class Slice
public static Slice make(ClusteringBound<?> start, ClusteringBound<?> end)
{
+ assert start != null && end != null;
+
if (start.isBottom() && end.isTop())
return ALL;
diff --git
a/src/java/org/apache/cassandra/service/reads/repair/RowIteratorMergeListener.java
b/src/java/org/apache/cassandra/service/reads/repair/RowIteratorMergeListener.java
index 2341c18..38d077a 100644
---
a/src/java/org/apache/cassandra/service/reads/repair/RowIteratorMergeListener.java
+++
b/src/java/org/apache/cassandra/service/reads/repair/RowIteratorMergeListener.java
@@ -255,16 +255,16 @@ public class RowIteratorMergeListener<E extends
Endpoints<E>>
continue;
// We have a close and/or open marker for a source, with
nothing corresponding in merged.
- // Because merged is a superset, this imply that we have a
current deletion (being it due to an
+ // Because merged is a superset, this implies that we have a
current deletion (either due to an
// early opening in merged or a partition level deletion) and
that this deletion will still be
// active after that point. Further whatever deletion was open
or is open by this marker on the
// source, that deletion cannot supersedes the current one.
//
// But while the marker deletion (before and/or after this
point) cannot supersede the current
// deletion, we want to know if it's equal to it (both before
and after), because in that case
- // the source is up to date and we don't want to include
repair.
+ // the source is up to date and we don't want to include it
into repair.
//
- // So in practice we have 2 possible case:
+ // So in practice we have 2 possible cases:
// 1) the source was up-to-date on deletion up to that point:
then it won't be from that point
// on unless it's a boundary and the new opened deletion
time is also equal to the current
// deletion (note that this implies the boundary has the
same closing and opening deletion
@@ -280,6 +280,7 @@ public class RowIteratorMergeListener<E extends
Endpoints<E>>
// current deletion, this means the current deletion is due to
a previously open range tombstone,
// and if the source isn't currently repaired for that RT,
then it means it's up to date on it).
DeletionTime partitionRepairDeletion =
partitionLevelRepairDeletion(i);
+
if (markerToRepair[i] == null &&
currentDeletion.supersedes(partitionRepairDeletion))
{
/*
@@ -310,9 +311,19 @@ public class RowIteratorMergeListener<E extends
Endpoints<E>>
// In case 2) above, we only have something to do if the
source is up-to-date after that point
// (which, since the source isn't up-to-date before that
point, means we're opening a new deletion
// that is equal to the current one).
- else if (marker.isOpen(isReversed) &&
currentDeletion.equals(marker.openDeletionTime(isReversed)))
+ else
{
- closeOpenMarker(i, marker.openBound(isReversed).invert());
+ if (markerToRepair[i] == null)
+ {
+ // Only way we can have no open RT repair is that
partition deletion that has the same timestamp
+ // as the deletion and same local deletion time. In
such case, since partition deletion covers
+ // an entire partition, we do not include it into
repair.
+ assert currentDeletion.localDeletionTime() ==
partitionRepairDeletion.localDeletionTime();
+ }
+ else if (marker.isOpen(isReversed) &&
currentDeletion.equals(marker.openDeletionTime(isReversed)))
+ {
+ closeOpenMarker(i,
marker.openBound(isReversed).invert());
+ }
}
}
else
diff --git
a/test/distributed/org/apache/cassandra/distributed/test/ReadRepairTest.java
b/test/distributed/org/apache/cassandra/distributed/test/ReadRepairTest.java
index 1ec192c..c68320e 100644
--- a/test/distributed/org/apache/cassandra/distributed/test/ReadRepairTest.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/ReadRepairTest.java
@@ -20,6 +20,8 @@ package org.apache.cassandra.distributed.test;
import java.net.InetSocketAddress;
import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
@@ -27,10 +29,17 @@ import java.util.concurrent.Future;
import org.junit.Assert;
import org.junit.Test;
+import net.bytebuddy.ByteBuddy;
+import net.bytebuddy.dynamic.loading.ClassLoadingStrategy;
+import net.bytebuddy.implementation.MethodDelegation;
+import net.bytebuddy.implementation.bind.annotation.SuperCall;
import org.apache.cassandra.config.Config;
import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.Mutation;
import org.apache.cassandra.db.marshal.Int32Type;
+import org.apache.cassandra.db.partitions.PartitionUpdate;
import org.apache.cassandra.dht.Murmur3Partitioner;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.distributed.Cluster;
@@ -41,12 +50,16 @@ import org.apache.cassandra.distributed.api.IInstanceConfig;
import org.apache.cassandra.distributed.api.TokenSupplier;
import org.apache.cassandra.distributed.shared.NetworkTopology;
import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.locator.Replica;
+import org.apache.cassandra.locator.ReplicaPlan;
import org.apache.cassandra.net.Verb;
import org.apache.cassandra.service.PendingRangeCalculatorService;
import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.service.reads.repair.BlockingReadRepair;
import org.apache.cassandra.service.reads.repair.ReadRepairStrategy;
import org.apache.cassandra.utils.concurrent.SimpleCondition;
+import static net.bytebuddy.matcher.ElementMatchers.named;
import static org.apache.cassandra.distributed.api.ConsistencyLevel.ALL;
import static org.apache.cassandra.distributed.api.ConsistencyLevel.QUORUM;
import static org.apache.cassandra.distributed.shared.AssertUtils.assertEquals;
@@ -440,4 +453,56 @@ public class ReadRepairTest extends TestBaseImpl
assertEquals("No read repair requests were expected, found " +
requests, 0, requests);
}
}
-}
+
+ @Test
+ public void partitionDeletionRTTimestampTieTest() throws Throwable
+ {
+ try (Cluster cluster = init(builder()
+ .withNodes(3)
+ .withInstanceInitializer(RRHelper::install)
+ .start()))
+ {
+ cluster.schemaChange(withKeyspace("CREATE TABLE
distributed_test_keyspace.tbl0 (pk bigint,ck bigint,value bigint, PRIMARY KEY
(pk, ck)) WITH CLUSTERING ORDER BY (ck ASC) AND read_repair='blocking';"));
+ long pk = 0L;
+ cluster.coordinator(1).execute("INSERT INTO
distributed_test_keyspace.tbl0 (pk, ck, value) VALUES (?,?,?) USING TIMESTAMP
1", ConsistencyLevel.ALL, pk, 1L, 1L);
+ cluster.coordinator(1).execute("DELETE FROM
distributed_test_keyspace.tbl0 USING TIMESTAMP 2 WHERE pk=? AND ck>?;",
ConsistencyLevel.ALL, pk, 2L);
+ cluster.get(3).executeInternal("DELETE FROM
distributed_test_keyspace.tbl0 USING TIMESTAMP 2 WHERE pk=?;", pk);
+ assertRows(cluster.coordinator(1).execute("SELECT * FROM
distributed_test_keyspace.tbl0 WHERE pk=? AND ck>=? AND ck<?;",
+ ConsistencyLevel.ALL,
pk, 1L, 3L));
+ }
+ }
+
+ public static class RRHelper
+ {
+ static void install(ClassLoader cl, int nodeNumber)
+ {
+ // Only on coordinating node
+ if (nodeNumber == 1)
+ {
+ new ByteBuddy().rebase(BlockingReadRepair.class)
+ .method(named("repairPartition"))
+ .intercept(MethodDelegation.to(RRHelper.class))
+ .make()
+ .load(cl,
ClassLoadingStrategy.Default.INJECTION);
+ }
+ }
+
+ // This verifies new behaviour in 4.0 that was introduced in
CASSANDRA-15369, but did not work
+ // on timestamp tie of RT and partition deletion: we should not
generate RT bounds in such case,
+ // since monotonicity is already ensured by the partition deletion,
and RT is unnecessary there.
+ // For details, see CASSANDRA-16453.
+ public static Object repairPartition(DecoratedKey partitionKey,
Map<Replica, Mutation> mutations, ReplicaPlan.ForTokenWrite writePlan,
@SuperCall Callable<Void> r) throws Exception
+ {
+ Assert.assertEquals(2, mutations.size());
+ for (Mutation value : mutations.values())
+ {
+ for (PartitionUpdate update : value.getPartitionUpdates())
+ {
+ Assert.assertFalse(update.hasRows());
+
Assert.assertFalse(update.partitionLevelDeletion().isLive());
+ }
+ }
+ return r.call();
+ }
+ }
+}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]