This is an automated email from the ASF dual-hosted git repository.
brandonwilliams pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push:
new 6098762 Fail truncation requests when they fail on a replica
6098762 is described below
commit 609876275738589fdfb9a3e20cb2f594aa404037
Author: Ekaterina Dimitrova <[email protected]>
AuthorDate: Mon Oct 12 18:11:51 2020 -0400
Fail truncation requests when they fail on a replica
Patch by Ekaterina Dimitrova, reviewed by brandonwilliams for
CASSANDRA-16208
---
CHANGES.txt | 1 +
.../apache/cassandra/db/TruncateVerbHandler.java | 24 +++++++++----------
.../cassandra/service/TruncateResponseHandler.java | 27 ++++++++++++++++++----
3 files changed, 35 insertions(+), 17 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index a7701c7..fe3fef8 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
4.0-beta3
+ * Fail truncation requests when they fail on a replica (CASSANDRA-16208)
* Move compact storage validation earlier in startup process (CASSANDRA-16063)
* Fix ByteBufferAccessor cast exceptions are thrown when trying to query a
virtual table (CASSANDRA-16155)
* Consolidate node liveness check for forced repair (CASSANDRA-16113)
diff --git a/src/java/org/apache/cassandra/db/TruncateVerbHandler.java
b/src/java/org/apache/cassandra/db/TruncateVerbHandler.java
index c605d1f..0d71464 100644
--- a/src/java/org/apache/cassandra/db/TruncateVerbHandler.java
+++ b/src/java/org/apache/cassandra/db/TruncateVerbHandler.java
@@ -34,31 +34,31 @@ public class TruncateVerbHandler implements
IVerbHandler<TruncateRequest>
public void doVerb(Message<TruncateRequest> message)
{
- TruncateRequest t = message.payload;
- Tracing.trace("Applying truncation of {}.{}", t.keyspace, t.table);
+ TruncateRequest truncation = message.payload;
+ Tracing.trace("Applying truncation of {}.{}", truncation.keyspace,
truncation.table);
try
{
- ColumnFamilyStore cfs =
Keyspace.open(t.keyspace).getColumnFamilyStore(t.table);
+ ColumnFamilyStore cfs =
Keyspace.open(truncation.keyspace).getColumnFamilyStore(truncation.table);
cfs.truncateBlocking();
}
- catch (Exception e)
+ catch (Throwable throwable)
{
- logger.error("Error in truncation", e);
- respondError(t, message);
+ logger.error("Error in truncation", throwable);
+ respondError(truncation, message);
- if (FSError.findNested(e) != null)
- throw FSError.findNested(e);
+ if (FSError.findNested(throwable) != null)
+ throw FSError.findNested(throwable);
}
Tracing.trace("Enqueuing response to truncate operation to {}",
message.from());
- TruncateResponse response = new TruncateResponse(t.keyspace, t.table,
true);
- logger.trace("{} applied. Enqueuing response to {}@{} ", t,
message.id(), message.from());
+ TruncateResponse response = new TruncateResponse(truncation.keyspace,
truncation.table, true);
+ logger.trace("{} applied. Enqueuing response to {}@{} ", truncation,
message.id(), message.from());
MessagingService.instance().send(message.responseWith(response),
message.from());
}
- private static void respondError(TruncateRequest t, Message
truncateRequestMessage)
+ private static void respondError(TruncateRequest truncation, Message
truncateRequestMessage)
{
- TruncateResponse response = new TruncateResponse(t.keyspace, t.table,
false);
+ TruncateResponse response = new TruncateResponse(truncation.keyspace,
truncation.table, false);
MessagingService.instance().send(truncateRequestMessage.responseWith(response),
truncateRequestMessage.from());
}
}
diff --git a/src/java/org/apache/cassandra/service/TruncateResponseHandler.java
b/src/java/org/apache/cassandra/service/TruncateResponseHandler.java
index bcd7426..c2651e6 100644
--- a/src/java/org/apache/cassandra/service/TruncateResponseHandler.java
+++ b/src/java/org/apache/cassandra/service/TruncateResponseHandler.java
@@ -17,6 +17,7 @@
*/
package org.apache.cassandra.service;
+import java.net.InetAddress;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
@@ -24,19 +25,22 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.TruncateResponse;
+import org.apache.cassandra.exceptions.TruncateException;
import org.apache.cassandra.net.RequestCallback;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.utils.concurrent.SimpleCondition;
import static java.util.concurrent.TimeUnit.NANOSECONDS;
-public class TruncateResponseHandler implements RequestCallback
+public class TruncateResponseHandler implements
RequestCallback<TruncateResponse>
{
protected static final Logger logger =
LoggerFactory.getLogger(TruncateResponseHandler.class);
protected final SimpleCondition condition = new SimpleCondition();
private final int responseCount;
protected final AtomicInteger responses = new AtomicInteger(0);
private final long start;
+ private volatile InetAddress truncateFailingReplica;
public TruncateResponseHandler(int responseCount)
{
@@ -51,24 +55,37 @@ public class TruncateResponseHandler implements
RequestCallback
public void get() throws TimeoutException
{
long timeoutNanos =
DatabaseDescriptor.getTruncateRpcTimeout(NANOSECONDS) - (System.nanoTime() -
start);
- boolean success;
+ boolean completedInTime;
try
{
- success = condition.await(timeoutNanos, NANOSECONDS); // TODO
truncate needs a much longer timeout
+ completedInTime = condition.await(timeoutNanos, NANOSECONDS); //
TODO truncate needs a much longer timeout
}
catch (InterruptedException ex)
{
throw new AssertionError(ex);
}
- if (!success)
+ if (!completedInTime)
{
throw new TimeoutException("Truncate timed out - received only " +
responses.get() + " responses");
}
+
+ if (truncateFailingReplica != null)
+ {
+ throw new TruncateException("Truncate failed on replica " +
truncateFailingReplica);
+ }
}
- public void onResponse(Message message)
+ public void onResponse(Message<TruncateResponse> message)
{
+ // If the truncation hasn't succeeded on some replica, abort and
indicate this back to the client.
+ if (!message.payload.success)
+ {
+ truncateFailingReplica = message.from().address;
+ condition.signalAll();
+ return;
+ }
+
responses.incrementAndGet();
if (responses.get() >= responseCount)
condition.signalAll();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]