This is an automated email from the ASF dual-hosted git repository.

brandonwilliams pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 6098762  Fail truncation requests when they fail on a replica
6098762 is described below

commit 609876275738589fdfb9a3e20cb2f594aa404037
Author: Ekaterina Dimitrova <[email protected]>
AuthorDate: Mon Oct 12 18:11:51 2020 -0400

    Fail truncation requests when they fail on a replica
    
    Patch by Ekaterina Dimitrova, reviewed by brandonwilliams for
    CASSANDRA-16208
---
 CHANGES.txt                                        |  1 +
 .../apache/cassandra/db/TruncateVerbHandler.java   | 24 +++++++++----------
 .../cassandra/service/TruncateResponseHandler.java | 27 ++++++++++++++++++----
 3 files changed, 35 insertions(+), 17 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index a7701c7..fe3fef8 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 4.0-beta3
+ * Fail truncation requests when they fail on a replica (CASSANDRA-16208)
  * Move compact storage validation earlier in startup process (CASSANDRA-16063)
  * Fix ByteBufferAccessor cast exceptions are thrown when trying to query a 
virtual table (CASSANDRA-16155)
  * Consolidate node liveness check for forced repair (CASSANDRA-16113)
diff --git a/src/java/org/apache/cassandra/db/TruncateVerbHandler.java 
b/src/java/org/apache/cassandra/db/TruncateVerbHandler.java
index c605d1f..0d71464 100644
--- a/src/java/org/apache/cassandra/db/TruncateVerbHandler.java
+++ b/src/java/org/apache/cassandra/db/TruncateVerbHandler.java
@@ -34,31 +34,31 @@ public class TruncateVerbHandler implements 
IVerbHandler<TruncateRequest>
 
     public void doVerb(Message<TruncateRequest> message)
     {
-        TruncateRequest t = message.payload;
-        Tracing.trace("Applying truncation of {}.{}", t.keyspace, t.table);
+        TruncateRequest truncation = message.payload;
+        Tracing.trace("Applying truncation of {}.{}", truncation.keyspace, 
truncation.table);
         try
         {
-            ColumnFamilyStore cfs = 
Keyspace.open(t.keyspace).getColumnFamilyStore(t.table);
+            ColumnFamilyStore cfs = 
Keyspace.open(truncation.keyspace).getColumnFamilyStore(truncation.table);
             cfs.truncateBlocking();
         }
-        catch (Exception e)
+        catch (Throwable throwable)
         {
-            logger.error("Error in truncation", e);
-            respondError(t, message);
+            logger.error("Error in truncation", throwable);
+            respondError(truncation, message);
 
-            if (FSError.findNested(e) != null)
-                throw FSError.findNested(e);
+            if (FSError.findNested(throwable) != null)
+                throw FSError.findNested(throwable);
         }
         Tracing.trace("Enqueuing response to truncate operation to {}", 
message.from());
 
-        TruncateResponse response = new TruncateResponse(t.keyspace, t.table, 
true);
-        logger.trace("{} applied.  Enqueuing response to {}@{} ", t, 
message.id(), message.from());
+        TruncateResponse response = new TruncateResponse(truncation.keyspace, 
truncation.table, true);
+        logger.trace("{} applied.  Enqueuing response to {}@{} ", truncation, 
message.id(), message.from());
         MessagingService.instance().send(message.responseWith(response), 
message.from());
     }
 
-    private static void respondError(TruncateRequest t, Message 
truncateRequestMessage)
+    private static void respondError(TruncateRequest truncation, Message 
truncateRequestMessage)
     {
-        TruncateResponse response = new TruncateResponse(t.keyspace, t.table, 
false);
+        TruncateResponse response = new TruncateResponse(truncation.keyspace, 
truncation.table, false);
         
MessagingService.instance().send(truncateRequestMessage.responseWith(response), 
truncateRequestMessage.from());
     }
 }
diff --git a/src/java/org/apache/cassandra/service/TruncateResponseHandler.java 
b/src/java/org/apache/cassandra/service/TruncateResponseHandler.java
index bcd7426..c2651e6 100644
--- a/src/java/org/apache/cassandra/service/TruncateResponseHandler.java
+++ b/src/java/org/apache/cassandra/service/TruncateResponseHandler.java
@@ -17,6 +17,7 @@
  */
 package org.apache.cassandra.service;
 
+import java.net.InetAddress;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicInteger;
 
@@ -24,19 +25,22 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.TruncateResponse;
+import org.apache.cassandra.exceptions.TruncateException;
 import org.apache.cassandra.net.RequestCallback;
 import org.apache.cassandra.net.Message;
 import org.apache.cassandra.utils.concurrent.SimpleCondition;
 
 import static java.util.concurrent.TimeUnit.NANOSECONDS;
 
-public class TruncateResponseHandler implements RequestCallback
+public class TruncateResponseHandler implements 
RequestCallback<TruncateResponse>
 {
     protected static final Logger logger = 
LoggerFactory.getLogger(TruncateResponseHandler.class);
     protected final SimpleCondition condition = new SimpleCondition();
     private final int responseCount;
     protected final AtomicInteger responses = new AtomicInteger(0);
     private final long start;
+    private volatile InetAddress truncateFailingReplica;
 
     public TruncateResponseHandler(int responseCount)
     {
@@ -51,24 +55,37 @@ public class TruncateResponseHandler implements 
RequestCallback
     public void get() throws TimeoutException
     {
         long timeoutNanos = 
DatabaseDescriptor.getTruncateRpcTimeout(NANOSECONDS) - (System.nanoTime() - 
start);
-        boolean success;
+        boolean completedInTime;
         try
         {
-            success = condition.await(timeoutNanos, NANOSECONDS); // TODO 
truncate needs a much longer timeout
+            completedInTime = condition.await(timeoutNanos, NANOSECONDS); // 
TODO truncate needs a much longer timeout
         }
         catch (InterruptedException ex)
         {
             throw new AssertionError(ex);
         }
 
-        if (!success)
+        if (!completedInTime)
         {
             throw new TimeoutException("Truncate timed out - received only " + 
responses.get() + " responses");
         }
+
+        if (truncateFailingReplica != null)
+        {
+            throw new TruncateException("Truncate failed on replica " + 
truncateFailingReplica);
+        }
     }
 
-    public void onResponse(Message message)
+    public void onResponse(Message<TruncateResponse> message)
     {
+        // If the truncation hasn't succeeded on some replica, abort and 
indicate this back to the client.
+        if (!message.payload.success)
+        {
+            truncateFailingReplica = message.from().address;
+            condition.signalAll();
+            return;
+        }
+
         responses.incrementAndGet();
         if (responses.get() >= responseCount)
             condition.signalAll();


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to