This is an automated email from the ASF dual-hosted git repository.

marcuse pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 9b6e54e  Avoid trying to keep track of RTs for endpoints we won't 
write to during read repair
9b6e54e is described below

commit 9b6e54ee02dc68a99965f1ea9533fd5e781e4cbe
Author: Marcus Eriksson <marc...@apache.org>
AuthorDate: Mon Aug 31 16:13:27 2020 +0200

    Avoid trying to keep track of RTs for endpoints we won't write to during 
read repair
    
    Patch by marcuse; reviewed by Benedict Elliott Smith for CASSANDRA-16084
---
 CHANGES.txt                                        |  1 +
 .../reads/repair/RowIteratorMergeListener.java     |  3 +
 .../cassandra/distributed/test/ReadRepairTest.java | 72 ++++++++++++++++++++++
 3 files changed, 76 insertions(+)

diff --git a/CHANGES.txt b/CHANGES.txt
index 92c6b87..9188115 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,6 +1,7 @@
 4.0-beta3
  * Add nodetool getfullquerylog (CASSANDRA-15988)
  * Fix yaml format and alignment in tpstats (CASSANDRA-11402)
+ * Avoid trying to keep track of RTs for endpoints we won't write to during 
read repair (CASSANDRA-16084)
 
 4.0-beta2
  * Add addition incremental repair visibility to nodetool repair_admin 
(CASSANDRA-14939)
diff --git 
a/src/java/org/apache/cassandra/service/reads/repair/RowIteratorMergeListener.java
 
b/src/java/org/apache/cassandra/service/reads/repair/RowIteratorMergeListener.java
index 12d27d2..2be3bc2 100644
--- 
a/src/java/org/apache/cassandra/service/reads/repair/RowIteratorMergeListener.java
+++ 
b/src/java/org/apache/cassandra/service/reads/repair/RowIteratorMergeListener.java
@@ -238,6 +238,9 @@ public class RowIteratorMergeListener<E extends 
Endpoints<E>>
 
         for (int i = 0; i < versions.length; i++)
         {
+            // we are not collecting a mutation for this version/source, skip;
+            if (!writeBackTo.get(i))
+                continue;
             RangeTombstoneMarker marker = versions[i];
 
             // Update what the source now thinks is the current deletion
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/ReadRepairTest.java 
b/test/distributed/org/apache/cassandra/distributed/test/ReadRepairTest.java
index f0c82b8..6f91d9b 100644
--- a/test/distributed/org/apache/cassandra/distributed/test/ReadRepairTest.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/ReadRepairTest.java
@@ -21,19 +21,28 @@ package org.apache.cassandra.distributed.test;
 import java.net.InetSocketAddress;
 import java.util.Iterator;
 import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 
 import org.junit.Test;
 
+import org.apache.cassandra.db.Keyspace;
 import org.apache.cassandra.db.marshal.Int32Type;
 import org.apache.cassandra.dht.Murmur3Partitioner;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.distributed.Cluster;
 import org.apache.cassandra.distributed.api.ConsistencyLevel;
+import org.apache.cassandra.distributed.api.Feature;
 import org.apache.cassandra.distributed.api.ICluster;
+import org.apache.cassandra.distributed.api.IInstanceConfig;
+import org.apache.cassandra.distributed.api.TokenSupplier;
 import org.apache.cassandra.distributed.shared.NetworkTopology;
 import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.net.Verb;
 import org.apache.cassandra.service.PendingRangeCalculatorService;
 import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.concurrent.SimpleCondition;
 
 import static org.apache.cassandra.net.Verb.READ_REPAIR_REQ;
 import static org.apache.cassandra.net.Verb.READ_REQ;
@@ -179,6 +188,69 @@ public class ReadRepairTest extends TestBaseImpl
         }
     }
 
+    @Test
+    public void readRepairRTRangeMovementTest() throws Throwable
+    {
+        ExecutorService es = Executors.newFixedThreadPool(1);
+        String key = "test1";
+        try (Cluster cluster = init(Cluster.build()
+                                           .withConfig(config -> 
config.with(Feature.GOSSIP, Feature.NETWORK)
+                                                                       
.set("read_request_timeout_in_ms", Integer.MAX_VALUE))
+                                           
.withTokenSupplier(TokenSupplier.evenlyDistributedTokens(4))
+                                           
.withNodeIdTopology(NetworkTopology.singleDcNetworkTopology(4, "dc0", "rack0"))
+                                           .withNodes(3)
+                                           .start()))
+        {
+            cluster.schemaChange("CREATE TABLE distributed_test_keyspace.tbl 
(\n" +
+                                 "    key text,\n" +
+                                 "    column1 int,\n" +
+                                 "    PRIMARY KEY (key, column1)\n" +
+                                 ") WITH CLUSTERING ORDER BY (column1 ASC)");
+
+            cluster.forEach(i -> i.runOnInstance(() -> 
Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl").disableAutoCompaction()));
+
+            for (int i = 1; i <= 2; i++)
+            {
+                cluster.get(i).executeInternal("DELETE FROM 
distributed_test_keyspace.tbl USING TIMESTAMP 50 WHERE key=?;", key);
+                cluster.get(i).executeInternal("DELETE FROM 
distributed_test_keyspace.tbl USING TIMESTAMP 80 WHERE key=? and column1 >= ? 
and column1 < ?;", key, 10, 100);
+                cluster.get(i).executeInternal("DELETE FROM 
distributed_test_keyspace.tbl USING TIMESTAMP 70 WHERE key=? and column1 = ?;", 
key, 30);
+                cluster.get(i).flush(KEYSPACE);
+            }
+            cluster.get(3).executeInternal("DELETE FROM 
distributed_test_keyspace.tbl USING TIMESTAMP 100 WHERE key=?;", key);
+            cluster.get(3).flush(KEYSPACE);
+
+            // pause the read until we have bootstrapped a new node below
+            SimpleCondition continueRead = new SimpleCondition();
+            SimpleCondition readStarted = new SimpleCondition();
+            
cluster.filters().outbound().from(3).to(1,2).verbs(Verb.READ_REQ.id).messagesMatching((i,
 i1, iMessage) -> {
+                try
+                {
+                    readStarted.signalAll();
+                    continueRead.await();
+                }
+                catch (InterruptedException e)
+                {
+                    throw new RuntimeException(e);
+                }
+                return false;
+            }).drop();
+            Future<Object[][]> read = es.submit(() -> cluster.coordinator(3)
+                                                          .execute("SELECT * 
FROM distributed_test_keyspace.tbl WHERE key=? and column1 >= ? and column1 <= 
?",
+                                                                   
ConsistencyLevel.ALL, key, 20, 40));
+            readStarted.await();
+            IInstanceConfig config = cluster.newInstanceConfig();
+            config.set("auto_bootstrap", true);
+            cluster.bootstrap(config).startup();
+            continueRead.signalAll();
+            read.get();
+        }
+        finally
+        {
+            es.shutdown();
+        }
+    }
+
+
     private void consume(Iterator<Object[]> it)
     {
         while (it.hasNext())


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to