This is an automated email from the ASF dual-hosted git repository. marcuse pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push: new 9b6e54e Avoid trying to keep track of RTs for endpoints we won't write to during read repair 9b6e54e is described below commit 9b6e54ee02dc68a99965f1ea9533fd5e781e4cbe Author: Marcus Eriksson <marc...@apache.org> AuthorDate: Mon Aug 31 16:13:27 2020 +0200 Avoid trying to keep track of RTs for endpoints we won't write to during read repair Patch by marcuse; reviewed by Benedict Elliott Smith for CASSANDRA-16084 --- CHANGES.txt | 1 + .../reads/repair/RowIteratorMergeListener.java | 3 + .../cassandra/distributed/test/ReadRepairTest.java | 72 ++++++++++++++++++++++ 3 files changed, 76 insertions(+) diff --git a/CHANGES.txt b/CHANGES.txt index 92c6b87..9188115 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,6 +1,7 @@ 4.0-beta3 * Add nodetool getfullquerylog (CASSANDRA-15988) * Fix yaml format and alignment in tpstats (CASSANDRA-11402) + * Avoid trying to keep track of RTs for endpoints we won't write to during read repair (CASSANDRA-16084) 4.0-beta2 * Add addition incremental repair visibility to nodetool repair_admin (CASSANDRA-14939) diff --git a/src/java/org/apache/cassandra/service/reads/repair/RowIteratorMergeListener.java b/src/java/org/apache/cassandra/service/reads/repair/RowIteratorMergeListener.java index 12d27d2..2be3bc2 100644 --- a/src/java/org/apache/cassandra/service/reads/repair/RowIteratorMergeListener.java +++ b/src/java/org/apache/cassandra/service/reads/repair/RowIteratorMergeListener.java @@ -238,6 +238,9 @@ public class RowIteratorMergeListener<E extends Endpoints<E>> for (int i = 0; i < versions.length; i++) { + // we are not collecting a mutation for this version/source, skip; + if (!writeBackTo.get(i)) + continue; RangeTombstoneMarker marker = versions[i]; // Update what the source now thinks is the current deletion diff --git a/test/distributed/org/apache/cassandra/distributed/test/ReadRepairTest.java b/test/distributed/org/apache/cassandra/distributed/test/ReadRepairTest.java index f0c82b8..6f91d9b 100644 --- a/test/distributed/org/apache/cassandra/distributed/test/ReadRepairTest.java +++ b/test/distributed/org/apache/cassandra/distributed/test/ReadRepairTest.java @@ -21,19 +21,28 @@ package org.apache.cassandra.distributed.test; import java.net.InetSocketAddress; import java.util.Iterator; import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; import org.junit.Test; +import org.apache.cassandra.db.Keyspace; import org.apache.cassandra.db.marshal.Int32Type; import org.apache.cassandra.dht.Murmur3Partitioner; import org.apache.cassandra.dht.Token; import org.apache.cassandra.distributed.Cluster; import org.apache.cassandra.distributed.api.ConsistencyLevel; +import org.apache.cassandra.distributed.api.Feature; import org.apache.cassandra.distributed.api.ICluster; +import org.apache.cassandra.distributed.api.IInstanceConfig; +import org.apache.cassandra.distributed.api.TokenSupplier; import org.apache.cassandra.distributed.shared.NetworkTopology; import org.apache.cassandra.locator.InetAddressAndPort; +import org.apache.cassandra.net.Verb; import org.apache.cassandra.service.PendingRangeCalculatorService; import org.apache.cassandra.service.StorageService; +import org.apache.cassandra.utils.concurrent.SimpleCondition; import static org.apache.cassandra.net.Verb.READ_REPAIR_REQ; import static org.apache.cassandra.net.Verb.READ_REQ; @@ -179,6 +188,69 @@ public class ReadRepairTest extends TestBaseImpl } } + @Test + public void readRepairRTRangeMovementTest() throws Throwable + { + ExecutorService es = Executors.newFixedThreadPool(1); + String key = "test1"; + try (Cluster cluster = init(Cluster.build() + .withConfig(config -> config.with(Feature.GOSSIP, Feature.NETWORK) + .set("read_request_timeout_in_ms", Integer.MAX_VALUE)) + .withTokenSupplier(TokenSupplier.evenlyDistributedTokens(4)) + .withNodeIdTopology(NetworkTopology.singleDcNetworkTopology(4, "dc0", "rack0")) + .withNodes(3) + .start())) + { + cluster.schemaChange("CREATE TABLE distributed_test_keyspace.tbl (\n" + + " key text,\n" + + " column1 int,\n" + + " PRIMARY KEY (key, column1)\n" + + ") WITH CLUSTERING ORDER BY (column1 ASC)"); + + cluster.forEach(i -> i.runOnInstance(() -> Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl").disableAutoCompaction())); + + for (int i = 1; i <= 2; i++) + { + cluster.get(i).executeInternal("DELETE FROM distributed_test_keyspace.tbl USING TIMESTAMP 50 WHERE key=?;", key); + cluster.get(i).executeInternal("DELETE FROM distributed_test_keyspace.tbl USING TIMESTAMP 80 WHERE key=? and column1 >= ? and column1 < ?;", key, 10, 100); + cluster.get(i).executeInternal("DELETE FROM distributed_test_keyspace.tbl USING TIMESTAMP 70 WHERE key=? and column1 = ?;", key, 30); + cluster.get(i).flush(KEYSPACE); + } + cluster.get(3).executeInternal("DELETE FROM distributed_test_keyspace.tbl USING TIMESTAMP 100 WHERE key=?;", key); + cluster.get(3).flush(KEYSPACE); + + // pause the read until we have bootstrapped a new node below + SimpleCondition continueRead = new SimpleCondition(); + SimpleCondition readStarted = new SimpleCondition(); + cluster.filters().outbound().from(3).to(1,2).verbs(Verb.READ_REQ.id).messagesMatching((i, i1, iMessage) -> { + try + { + readStarted.signalAll(); + continueRead.await(); + } + catch (InterruptedException e) + { + throw new RuntimeException(e); + } + return false; + }).drop(); + Future<Object[][]> read = es.submit(() -> cluster.coordinator(3) + .execute("SELECT * FROM distributed_test_keyspace.tbl WHERE key=? and column1 >= ? and column1 <= ?", + ConsistencyLevel.ALL, key, 20, 40)); + readStarted.await(); + IInstanceConfig config = cluster.newInstanceConfig(); + config.set("auto_bootstrap", true); + cluster.bootstrap(config).startup(); + continueRead.signalAll(); + read.get(); + } + finally + { + es.shutdown(); + } + } + + private void consume(Iterator<Object[]> it) { while (it.hasNext()) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org