This is an automated email from the ASF dual-hosted git repository.
jmckenzie pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push:
new 47cac5c49b Provide JMX endpoint to allow transient logging of blocking
read repairs
47cac5c49b is described below
commit 47cac5c49b93d205fa9b3a57ce55470887c5be45
Author: Josh McKenzie <[email protected]>
AuthorDate: Tue Mar 22 11:35:36 2022 -0400
Provide JMX endpoint to allow transient logging of blocking read repairs
Patch by Josh McKenzie; reviewed by David Capwell for CASSANDRA-17471
Co-authored-by: Aleksey Yeschenko <[email protected]>
Co-aurhoted-by: Josh McKenzie <[email protected]>
---
CHANGES.txt | 1 +
.../org/apache/cassandra/service/StorageProxy.java | 18 +++++++++++++++++-
.../apache/cassandra/service/StorageProxyMBean.java | 3 +++
.../cassandra/service/reads/AbstractReadExecutor.java | 14 +++++++++++++-
.../org/apache/cassandra/service/StorageProxyTest.java | 18 +++++++++++++++++-
5 files changed, 51 insertions(+), 3 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index 377e91b75c..196d2471e0 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
4.1
+ * Provide JMX endpoint to allow transient logging of blocking read repairs
(CASSANDRA-17471)
* Add guardrail for GROUP BY queries (CASSANDRA-17509)
* make pylib PEP and pylint compliant (CASSANDRA-17546)
* Add support for vnodes in jvm-dtest (CASSANDRA-17332)
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java
b/src/java/org/apache/cassandra/service/StorageProxy.java
index 15e9e2d467..85c2699cb7 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -39,6 +39,7 @@ import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import java.util.stream.Collectors;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.cache.CacheLoader;
import com.google.common.collect.Iterables;
@@ -194,6 +195,8 @@ public class StorageProxy implements StorageProxyMBean
private static final PartitionDenylist partitionDenylist = new
PartitionDenylist();
+ private volatile long logBlockingReadRepairAttemptsUntilNanos =
Long.MIN_VALUE;
+
private StorageProxy()
{
}
@@ -2048,9 +2051,10 @@ public class StorageProxy implements StorageProxyMBean
// wait for enough responses to meet the consistency level. If there's
a digest mismatch, begin the read
// repair process by sending full data reads to all replicas we
received responses from.
+ boolean logBlockingRepairAttempts = instance.isLoggingReadRepairs();
for (int i=0; i<cmdCount; i++)
{
- reads[i].awaitResponses();
+ reads[i].awaitResponses(logBlockingRepairAttempts);
}
// read repair - if it looks like we may not receive enough full data
responses to meet CL, send
@@ -3019,6 +3023,18 @@ public class StorageProxy implements StorageProxyMBean
return !partitionDenylist.isKeyPermitted(keyspace, table, bytes);
}
+ @Override
+ public void logBlockingReadRepairAttemptsForNSeconds(int seconds)
+ {
+ logBlockingReadRepairAttemptsUntilNanos = nanoTime() +
TimeUnit.SECONDS.toNanos(seconds);
+ }
+
+ @Override
+ public boolean isLoggingReadRepairs()
+ {
+ return nanoTime() <=
StorageProxy.instance.logBlockingReadRepairAttemptsUntilNanos;
+ }
+
@Override
public void setPaxosVariant(String variant)
{
diff --git a/src/java/org/apache/cassandra/service/StorageProxyMBean.java
b/src/java/org/apache/cassandra/service/StorageProxyMBean.java
index 416a31284a..546143d515 100644
--- a/src/java/org/apache/cassandra/service/StorageProxyMBean.java
+++ b/src/java/org/apache/cassandra/service/StorageProxyMBean.java
@@ -91,6 +91,9 @@ public interface StorageProxyMBean
public String getIdealConsistencyLevel();
public String setIdealConsistencyLevel(String cl);
+ public void logBlockingReadRepairAttemptsForNSeconds(int seconds);
+ public boolean isLoggingReadRepairs();
+
/**
* Tracking and reporting of variances in the repaired data set across
replicas at read time
*/
diff --git
a/src/java/org/apache/cassandra/service/reads/AbstractReadExecutor.java
b/src/java/org/apache/cassandra/service/reads/AbstractReadExecutor.java
index 5f73bc696d..b5a759c3dc 100644
--- a/src/java/org/apache/cassandra/service/reads/AbstractReadExecutor.java
+++ b/src/java/org/apache/cassandra/service/reads/AbstractReadExecutor.java
@@ -366,10 +366,15 @@ public abstract class AbstractReadExecutor
this.result = DuplicateRowChecker.duringRead(result,
this.replicaPlan.get().readCandidates().endpointList());
}
+ public void awaitResponses() throws ReadTimeoutException
+ {
+ awaitResponses(false);
+ }
+
/**
* Wait for the CL to be satisfied by responses
*/
- public void awaitResponses() throws ReadTimeoutException
+ public void awaitResponses(boolean logBlockingReadRepairAttempt) throws
ReadTimeoutException
{
try
{
@@ -397,6 +402,13 @@ public abstract class AbstractReadExecutor
{
Tracing.trace("Digest mismatch: Mismatch for key {}", getKey());
readRepair.startRepair(digestResolver, this::setResult);
+ if (logBlockingReadRepairAttempt)
+ {
+ logger.info("Blocking Read Repair triggered for query [{}] at
CL.{} with endpoints {}",
+ command.toCQLString(),
+ replicaPlan().consistencyLevel(),
+ replicaPlan().contacts());
+ }
}
}
diff --git a/test/unit/org/apache/cassandra/service/StorageProxyTest.java
b/test/unit/org/apache/cassandra/service/StorageProxyTest.java
index 1338cd675f..41742f0c08 100644
--- a/test/unit/org/apache/cassandra/service/StorageProxyTest.java
+++ b/test/unit/org/apache/cassandra/service/StorageProxyTest.java
@@ -18,11 +18,13 @@
package org.apache.cassandra.service;
+import java.net.UnknownHostException;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import com.google.common.util.concurrent.Uninterruptibles;
+import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -101,7 +103,21 @@ public class StorageProxyTest
});
}
- private void shouldHintTest(Consumer<Replica> test) throws Exception
+
+ /**
+ * Ensure that the timer backing the JMX endpoint to transiently enable
blocking read repairs both enables
+ * and disables the way we'd expect.
+ */
+ @Test
+ public void testTransientLoggingTimer()
+ {
+ StorageProxy.instance.logBlockingReadRepairAttemptsForNSeconds(2);
+ Assert.assertTrue(StorageProxy.instance.isLoggingReadRepairs());
+ Uninterruptibles.sleepUninterruptibly(5, TimeUnit.SECONDS);
+ Assert.assertFalse(StorageProxy.instance.isLoggingReadRepairs());
+ }
+
+ private void shouldHintTest(Consumer<Replica> test) throws
UnknownHostException
{
InetAddressAndPort testEp =
InetAddressAndPort.getByName("192.168.1.1");
Replica replica = full(testEp);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]