This is an automated email from the ASF dual-hosted git repository. bdeggleston pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push: new 9abe212 Prevent read repair mutations from increasing read timeout 9abe212 is described below commit 9abe2127dde7ea317928b37b8b5c662e787b2192 Author: yifan-c <yc25c...@gmail.com> AuthorDate: Thu Dec 5 15:00:19 2019 -0800 Prevent read repair mutations from increasing read timeout Patch by Yifan Cai; Reviewed by Blake Eggleston and Jordan West for CASSANDRA-15442 --- CHANGES.txt | 1 + .../reads/repair/BlockingPartitionRepair.java | 16 +++++++--- .../service/reads/repair/BlockingReadRepair.java | 2 +- .../test/DistributedReadWritePathTest.java | 37 ++++++++++++++++++++++ .../reads/repair/BlockingReadRepairTest.java | 26 ++++++++------- .../repair/DiagEventsBlockingReadRepairTest.java | 11 +++++-- .../service/reads/repair/ReadRepairTest.java | 23 ++++++++------ 7 files changed, 87 insertions(+), 29 deletions(-) diff --git a/CHANGES.txt b/CHANGES.txt index 6783b2c..2c6a1d9 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 4.0-alpha3 + * Prevent read repair mutations from increasing read timeout (CASSANDRA-15442) * Document 4.0 system keyspace changes, bump generations (CASSANDRA-15454) * Make it possible to disable STCS-in-L0 during runtime (CASSANDRA-15445) * Removed obsolete OldNetworkTopologyStrategy (CASSANDRA-13990) diff --git a/src/java/org/apache/cassandra/service/reads/repair/BlockingPartitionRepair.java b/src/java/org/apache/cassandra/service/reads/repair/BlockingPartitionRepair.java index 220ada5..01fd7f0 100644 --- a/src/java/org/apache/cassandra/service/reads/repair/BlockingPartitionRepair.java +++ b/src/java/org/apache/cassandra/service/reads/repair/BlockingPartitionRepair.java @@ -161,11 +161,17 @@ public class BlockingPartitionRepair<E extends Endpoints<E>, P extends ReplicaPl } } - public boolean awaitRepairs(long timeout, TimeUnit timeoutUnit) + /** + * Wait for the repair to complete util a future time + * If the {@param timeoutAt} is a past time, the method returns immediately with the repair result. + * @param timeoutAt, future time + * @param timeUnit, the time unit of the future time + * @return true if repair is done; otherwise, false. + */ + public boolean awaitRepairsUntil(long timeoutAt, TimeUnit timeUnit) { - long elapsed = System.nanoTime() - mutationsSentTime; - long remaining = timeoutUnit.toNanos(timeout) - elapsed; - + long timeoutAtNanos = timeUnit.toNanos(timeoutAt); + long remaining = timeoutAtNanos - System.nanoTime(); try { return latch.await(remaining, TimeUnit.NANOSECONDS); @@ -190,7 +196,7 @@ public class BlockingPartitionRepair<E extends Endpoints<E>, P extends ReplicaPl */ public void maybeSendAdditionalWrites(long timeout, TimeUnit timeoutUnit) { - if (awaitRepairs(timeout, timeoutUnit)) + if (awaitRepairsUntil(timeout + timeoutUnit.convert(mutationsSentTime, TimeUnit.NANOSECONDS), timeoutUnit)) return; E newCandidates = replicaPlan.uncontactedCandidates(); diff --git a/src/java/org/apache/cassandra/service/reads/repair/BlockingReadRepair.java b/src/java/org/apache/cassandra/service/reads/repair/BlockingReadRepair.java index ef624d6..764765e 100644 --- a/src/java/org/apache/cassandra/service/reads/repair/BlockingReadRepair.java +++ b/src/java/org/apache/cassandra/service/reads/repair/BlockingReadRepair.java @@ -87,7 +87,7 @@ public class BlockingReadRepair<E extends Endpoints<E>, P extends ReplicaPlan.Fo boolean timedOut = false; for (BlockingPartitionRepair repair: repairs) { - if (!repair.awaitRepairs(DatabaseDescriptor.getWriteRpcTimeout(NANOSECONDS), NANOSECONDS)) + if (!repair.awaitRepairsUntil(DatabaseDescriptor.getReadRpcTimeout(NANOSECONDS) + queryStartNanoTime, NANOSECONDS)) { timedOut = true; } diff --git a/test/distributed/org/apache/cassandra/distributed/test/DistributedReadWritePathTest.java b/test/distributed/org/apache/cassandra/distributed/test/DistributedReadWritePathTest.java index e0c6916..0870ab3 100644 --- a/test/distributed/org/apache/cassandra/distributed/test/DistributedReadWritePathTest.java +++ b/test/distributed/org/apache/cassandra/distributed/test/DistributedReadWritePathTest.java @@ -18,6 +18,8 @@ package org.apache.cassandra.distributed.test; +import java.util.concurrent.TimeUnit; + import org.junit.Assert; import org.junit.BeforeClass; import org.junit.Test; @@ -29,6 +31,7 @@ import org.apache.cassandra.distributed.Cluster; import org.apache.cassandra.distributed.impl.IInvokableInstance; import static org.apache.cassandra.distributed.api.Feature.NETWORK; +import static org.apache.cassandra.net.Verb.READ_REPAIR_RSP; import static org.junit.Assert.assertEquals; import static org.apache.cassandra.net.Verb.READ_REPAIR_REQ; @@ -127,6 +130,40 @@ public class DistributedReadWritePathTest extends DistributedTestBase } @Test + public void readRepairTimeoutTest() throws Throwable + { + final long reducedReadTimeout = 3000L; + try (Cluster cluster = init(Cluster.create(3))) + { + cluster.forEach(i -> i.runOnInstance(() -> DatabaseDescriptor.setReadRpcTimeout(reducedReadTimeout))); + cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck)) WITH read_repair='blocking'"); + cluster.get(1).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 1, 1)"); + cluster.get(2).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 1, 1)"); + assertRows(cluster.get(3).executeInternal("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1")); + cluster.verbs(READ_REPAIR_RSP).to(1).drop(); + final long start = System.currentTimeMillis(); + try + { + cluster.coordinator(1).execute("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1", ConsistencyLevel.ALL); + Assert.fail("Read timeout expected but it did not occur"); + } + catch (Exception ex) + { + // the containing exception class was loaded by another class loader. Comparing the message as a workaround to assert the exception + Assert.assertTrue(ex.getMessage().contains("org.apache.cassandra.exceptions.ReadTimeoutException")); + long actualTimeTaken = System.currentTimeMillis() - start; + long magicDelayAmount = 100L; // it might not be the best way to check if the time taken is around the timeout value. + // Due to the delays, the actual time taken from client perspective is slighly more than the timeout value + Assert.assertTrue(actualTimeTaken > reducedReadTimeout); + // But it should not exceed too much + Assert.assertTrue(actualTimeTaken < reducedReadTimeout + magicDelayAmount); + assertRows(cluster.get(3).executeInternal("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1"), + row(1, 1, 1)); // the partition happened when the repaired node sending back ack. The mutation should be in fact applied. + } + } + } + + @Test public void failingReadRepairTest() throws Throwable { try (Cluster cluster = init(Cluster.create(3))) diff --git a/test/unit/org/apache/cassandra/service/reads/repair/BlockingReadRepairTest.java b/test/unit/org/apache/cassandra/service/reads/repair/BlockingReadRepairTest.java index 7538832..3cc1a63 100644 --- a/test/unit/org/apache/cassandra/service/reads/repair/BlockingReadRepairTest.java +++ b/test/unit/org/apache/cassandra/service/reads/repair/BlockingReadRepairTest.java @@ -25,7 +25,6 @@ import java.util.Set; import java.util.concurrent.TimeUnit; import com.google.common.collect.Lists; -import org.apache.cassandra.locator.ReplicaPlan; import org.junit.Assert; import org.junit.BeforeClass; import org.junit.Test; @@ -38,6 +37,7 @@ import org.apache.cassandra.locator.Endpoints; import org.apache.cassandra.locator.EndpointsForRange; import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.locator.Replica; +import org.apache.cassandra.locator.ReplicaPlan; import org.apache.cassandra.locator.ReplicaUtils; import org.apache.cassandra.net.Message; import org.apache.cassandra.service.reads.ReadCallback; @@ -158,11 +158,11 @@ public class BlockingReadRepairTest extends AbstractReadRepairTest assertMutationEqual(resolved, handler.mutationsSent.get(target3)); // check repairs stop blocking after receiving 2 acks - Assert.assertFalse(handler.awaitRepairs(0, TimeUnit.NANOSECONDS)); + Assert.assertFalse(getCurrentRepairStatus(handler)); handler.ack(target1); - Assert.assertFalse(handler.awaitRepairs(0, TimeUnit.NANOSECONDS)); + Assert.assertFalse(getCurrentRepairStatus(handler)); handler.ack(target3); - Assert.assertTrue(handler.awaitRepairs(0, TimeUnit.NANOSECONDS)); + Assert.assertTrue(getCurrentRepairStatus(handler)); } @@ -243,14 +243,13 @@ public class BlockingReadRepairTest extends AbstractReadRepairTest InstrumentedReadRepairHandler handler = createRepairHandler(repairs, 2); handler.sendInitialRepairs(); - Assert.assertFalse(handler.awaitRepairs(0, TimeUnit.NANOSECONDS)); + Assert.assertFalse(getCurrentRepairStatus(handler)); handler.ack(target1); - Assert.assertFalse(handler.awaitRepairs(0, TimeUnit.NANOSECONDS)); + Assert.assertFalse(getCurrentRepairStatus(handler)); // here we should stop blocking, even though we've sent 3 repairs handler.ack(target2); - Assert.assertTrue(handler.awaitRepairs(0, TimeUnit.NANOSECONDS)); - + Assert.assertTrue(getCurrentRepairStatus(handler)); } /** @@ -275,14 +274,19 @@ public class BlockingReadRepairTest extends AbstractReadRepairTest Assert.assertTrue(handler.mutationsSent.containsKey(remote1.endpoint())); Assert.assertEquals(1, handler.waitingOn()); - Assert.assertFalse(handler.awaitRepairs(0, TimeUnit.NANOSECONDS)); + Assert.assertFalse(getCurrentRepairStatus(handler)); handler.ack(remote1.endpoint()); Assert.assertEquals(1, handler.waitingOn()); - Assert.assertFalse(handler.awaitRepairs(0, TimeUnit.NANOSECONDS)); + Assert.assertFalse(getCurrentRepairStatus(handler)); handler.ack(replica1.endpoint()); Assert.assertEquals(0, handler.waitingOn()); - Assert.assertTrue(handler.awaitRepairs(0, TimeUnit.NANOSECONDS)); + Assert.assertTrue(getCurrentRepairStatus(handler)); + } + + private boolean getCurrentRepairStatus(BlockingPartitionRepair handler) + { + return handler.awaitRepairsUntil(System.nanoTime(), TimeUnit.NANOSECONDS); } } diff --git a/test/unit/org/apache/cassandra/service/reads/repair/DiagEventsBlockingReadRepairTest.java b/test/unit/org/apache/cassandra/service/reads/repair/DiagEventsBlockingReadRepairTest.java index 3bcd757..c15d7f4 100644 --- a/test/unit/org/apache/cassandra/service/reads/repair/DiagEventsBlockingReadRepairTest.java +++ b/test/unit/org/apache/cassandra/service/reads/repair/DiagEventsBlockingReadRepairTest.java @@ -100,11 +100,16 @@ public class DiagEventsBlockingReadRepairTest extends AbstractReadRepairTest Assert.assertEquals(resolved.toString(), handler.updatesByEp.get(target3)); // check repairs stop blocking after receiving 2 acks - Assert.assertFalse(handler.awaitRepairs(0, TimeUnit.NANOSECONDS)); + Assert.assertFalse(getCurrentRepairStatus(handler)); handler.ack(target1); - Assert.assertFalse(handler.awaitRepairs(0, TimeUnit.NANOSECONDS)); + Assert.assertFalse(getCurrentRepairStatus(handler)); handler.ack(target3); - Assert.assertTrue(handler.awaitRepairs(0, TimeUnit.NANOSECONDS)); + Assert.assertTrue(getCurrentRepairStatus(handler)); + } + + private boolean getCurrentRepairStatus(BlockingPartitionRepair handler) + { + return handler.awaitRepairsUntil(System.nanoTime(), TimeUnit.NANOSECONDS); } public InstrumentedReadRepair createInstrumentedReadRepair(ReadCommand command, ReplicaPlan.Shared<?,?> replicaPlan, long queryStartNanoTime) diff --git a/test/unit/org/apache/cassandra/service/reads/repair/ReadRepairTest.java b/test/unit/org/apache/cassandra/service/reads/repair/ReadRepairTest.java index 232644d..5ae9dd8 100644 --- a/test/unit/org/apache/cassandra/service/reads/repair/ReadRepairTest.java +++ b/test/unit/org/apache/cassandra/service/reads/repair/ReadRepairTest.java @@ -216,11 +216,11 @@ public class ReadRepairTest assertMutationEqual(resolved, handler.mutationsSent.get(target3.endpoint())); // check repairs stop blocking after receiving 2 acks - Assert.assertFalse(handler.awaitRepairs(0, TimeUnit.NANOSECONDS)); + Assert.assertFalse(getCurrentRepairStatus(handler)); handler.ack(target1.endpoint()); - Assert.assertFalse(handler.awaitRepairs(0, TimeUnit.NANOSECONDS)); + Assert.assertFalse(getCurrentRepairStatus(handler)); handler.ack(target3.endpoint()); - Assert.assertTrue(handler.awaitRepairs(0, TimeUnit.NANOSECONDS)); + Assert.assertTrue(getCurrentRepairStatus(handler)); } /** @@ -304,13 +304,13 @@ public class ReadRepairTest InstrumentedReadRepairHandler handler = createRepairHandler(repairs, 2, replicas, replicas); handler.sendInitialRepairs(); - Assert.assertFalse(handler.awaitRepairs(0, TimeUnit.NANOSECONDS)); + Assert.assertFalse(getCurrentRepairStatus(handler)); handler.ack(target1.endpoint()); - Assert.assertFalse(handler.awaitRepairs(0, TimeUnit.NANOSECONDS)); + Assert.assertFalse(getCurrentRepairStatus(handler)); // here we should stop blocking, even though we've sent 3 repairs handler.ack(target2.endpoint()); - Assert.assertTrue(handler.awaitRepairs(0, TimeUnit.NANOSECONDS)); + Assert.assertTrue(getCurrentRepairStatus(handler)); } @@ -337,14 +337,19 @@ public class ReadRepairTest Assert.assertTrue(handler.mutationsSent.containsKey(remote1.endpoint())); Assert.assertEquals(1, handler.waitingOn()); - Assert.assertFalse(handler.awaitRepairs(0, TimeUnit.NANOSECONDS)); + Assert.assertFalse(getCurrentRepairStatus(handler)); handler.ack(remote1.endpoint()); Assert.assertEquals(1, handler.waitingOn()); - Assert.assertFalse(handler.awaitRepairs(0, TimeUnit.NANOSECONDS)); + Assert.assertFalse(getCurrentRepairStatus(handler)); handler.ack(target1.endpoint()); Assert.assertEquals(0, handler.waitingOn()); - Assert.assertTrue(handler.awaitRepairs(0, TimeUnit.NANOSECONDS)); + Assert.assertTrue(getCurrentRepairStatus(handler)); + } + + private boolean getCurrentRepairStatus(BlockingPartitionRepair handler) + { + return handler.awaitRepairsUntil(System.nanoTime(), TimeUnit.NANOSECONDS); } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org