This is an automated email from the ASF dual-hosted git repository.

marcuse pushed a commit to branch cassandra-4.0
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/cassandra-4.0 by this push:
     new 5beab63b55 Improve the way we handle repair message timeouts to avoid 
hanging repairs
5beab63b55 is described below

commit 5beab63b5550efb5e31e5005f90649661a9fe595
Author: Marcus Eriksson <marc...@apache.org>
AuthorDate: Mon Aug 29 13:27:16 2022 +0200

    Improve the way we handle repair message timeouts to avoid hanging repairs
    
    Patch by marcuse; reviewed by David Capwell for CASSANDRA-17613
---
 CHANGES.txt                                        |   1 +
 src/java/org/apache/cassandra/config/Config.java   |   2 +
 .../cassandra/config/DatabaseDescriptor.java       |  10 +++
 src/java/org/apache/cassandra/net/Verb.java        |  33 +++----
 .../cassandra/repair/AsymmetricRemoteSyncTask.java |   7 +-
 .../cassandra/repair/RepairMessageVerbHandler.java |   4 +
 .../cassandra/repair/StreamingRepairTask.java      |   9 +-
 .../cassandra/repair/SymmetricRemoteSyncTask.java  |  11 ---
 src/java/org/apache/cassandra/repair/SyncTask.java |  12 +++
 .../apache/cassandra/repair/ValidationTask.java    |   9 +-
 .../cassandra/repair/messages/RepairMessage.java   |  63 +++++++++++++
 .../apache/cassandra/service/StorageService.java   |  13 +++
 .../cassandra/service/StorageServiceMBean.java     |   3 +
 .../distributed/test/RepairRequestTimeoutTest.java | 100 +++++++++++++++++++++
 14 files changed, 240 insertions(+), 37 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index e1f8d19c30..96a37f53e9 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 4.0.7
+ * Avoid getting hanging repairs due to repair message timeouts 
(CASSANDRA-17613)
  * Prevent infinite loop in repair coordinator on FailSession (CASSANDRA-17834)
 Merged from 3.11:
  * Fix potential IndexOutOfBoundsException in PagingState in mixed mode 
clusters (CASSANDRA-17840)
diff --git a/src/java/org/apache/cassandra/config/Config.java 
b/src/java/org/apache/cassandra/config/Config.java
index e8aa297e77..f8d8d46db8 100644
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@ -134,6 +134,8 @@ public class Config
     public volatile Integer repair_session_max_tree_depth = null;
     public volatile Integer repair_session_space_in_mb = null;
 
+    public volatile long repair_request_timeout_in_ms = 
TimeUnit.MILLISECONDS.convert(1, TimeUnit.MINUTES);
+
     public volatile boolean use_offheap_merkle_trees = true;
 
     public int storage_port = 7000;
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java 
b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index 82a5ec73ba..f78a5b668a 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -1723,6 +1723,16 @@ public class DatabaseDescriptor
         return unit.convert(getBlockForPeersTimeoutInSeconds(), 
TimeUnit.SECONDS);
     }
 
+    public static long getRepairRpcTimeout(TimeUnit unit)
+    {
+        return unit.convert(conf.repair_request_timeout_in_ms, MILLISECONDS);
+    }
+
+    public static void setRepairRpcTimeout(long time, TimeUnit unit)
+    {
+        conf.repair_request_timeout_in_ms = MILLISECONDS.convert(time, unit);
+    }
+
     public static double getPhiConvictThreshold()
     {
         return conf.phi_convict_threshold;
diff --git a/src/java/org/apache/cassandra/net/Verb.java 
b/src/java/org/apache/cassandra/net/Verb.java
index fad2fbf6a9..9d8b76dd35 100644
--- a/src/java/org/apache/cassandra/net/Verb.java
+++ b/src/java/org/apache/cassandra/net/Verb.java
@@ -147,22 +147,22 @@ public enum Verb
     SCHEMA_VERSION_REQ     (20,  P1, rpcTimeout,      MIGRATION,         () -> 
NoPayload.serializer,                 () -> SchemaVersionVerbHandler.instance,  
 SCHEMA_VERSION_RSP  ),
 
     // repair; mostly doesn't use callbacks and sends responses as their own 
request messages, with matching sessions by uuid; should eventually harmonize 
and make idiomatic
-    REPAIR_RSP             (100, P1, rpcTimeout,      REQUEST_RESPONSE,  () -> 
NoPayload.serializer,                 () -> ResponseVerbHandler.instance        
                     ),
-    VALIDATION_RSP         (102, P1, rpcTimeout,      ANTI_ENTROPY,      () -> 
ValidationResponse.serializer,        () -> RepairMessageVerbHandler.instance,  
 REPAIR_RSP          ),
-    VALIDATION_REQ         (101, P1, rpcTimeout,      ANTI_ENTROPY,      () -> 
ValidationRequest.serializer,         () -> RepairMessageVerbHandler.instance,  
 REPAIR_RSP          ),
-    SYNC_RSP               (104, P1, rpcTimeout,      ANTI_ENTROPY,      () -> 
SyncResponse.serializer,              () -> RepairMessageVerbHandler.instance,  
 REPAIR_RSP          ),
-    SYNC_REQ               (103, P1, rpcTimeout,      ANTI_ENTROPY,      () -> 
SyncRequest.serializer,               () -> RepairMessageVerbHandler.instance,  
 REPAIR_RSP          ),
-    PREPARE_MSG            (105, P1, rpcTimeout,      ANTI_ENTROPY,      () -> 
PrepareMessage.serializer,            () -> RepairMessageVerbHandler.instance,  
 REPAIR_RSP          ),
-    SNAPSHOT_MSG           (106, P1, rpcTimeout,      ANTI_ENTROPY,      () -> 
SnapshotMessage.serializer,           () -> RepairMessageVerbHandler.instance,  
 REPAIR_RSP          ),
-    CLEANUP_MSG            (107, P1, rpcTimeout,      ANTI_ENTROPY,      () -> 
CleanupMessage.serializer,            () -> RepairMessageVerbHandler.instance,  
 REPAIR_RSP          ),
-    PREPARE_CONSISTENT_RSP (109, P1, rpcTimeout,      ANTI_ENTROPY,      () -> 
PrepareConsistentResponse.serializer, () -> RepairMessageVerbHandler.instance,  
 REPAIR_RSP          ),
-    PREPARE_CONSISTENT_REQ (108, P1, rpcTimeout,      ANTI_ENTROPY,      () -> 
PrepareConsistentRequest.serializer,  () -> RepairMessageVerbHandler.instance,  
 REPAIR_RSP          ),
-    FINALIZE_PROPOSE_MSG   (110, P1, rpcTimeout,      ANTI_ENTROPY,      () -> 
FinalizePropose.serializer,           () -> RepairMessageVerbHandler.instance,  
 REPAIR_RSP          ),
-    FINALIZE_PROMISE_MSG   (111, P1, rpcTimeout,      ANTI_ENTROPY,      () -> 
FinalizePromise.serializer,           () -> RepairMessageVerbHandler.instance,  
 REPAIR_RSP          ),
-    FINALIZE_COMMIT_MSG    (112, P1, rpcTimeout,      ANTI_ENTROPY,      () -> 
FinalizeCommit.serializer,            () -> RepairMessageVerbHandler.instance,  
 REPAIR_RSP          ),
-    FAILED_SESSION_MSG     (113, P1, rpcTimeout,      ANTI_ENTROPY,      () -> 
FailSession.serializer,               () -> RepairMessageVerbHandler.instance,  
 REPAIR_RSP          ),
-    STATUS_RSP             (115, P1, rpcTimeout,      ANTI_ENTROPY,      () -> 
StatusResponse.serializer,            () -> RepairMessageVerbHandler.instance,  
 REPAIR_RSP          ),
-    STATUS_REQ             (114, P1, rpcTimeout,      ANTI_ENTROPY,      () -> 
StatusRequest.serializer,             () -> RepairMessageVerbHandler.instance,  
 REPAIR_RSP          ),
+    REPAIR_RSP             (100, P1, repairMsgTimeout,REQUEST_RESPONSE,  () -> 
NoPayload.serializer,                 () -> ResponseVerbHandler.instance        
                     ),
+    VALIDATION_RSP         (102, P1, longTimeout     ,ANTI_ENTROPY,      () -> 
ValidationResponse.serializer,        () -> RepairMessageVerbHandler.instance,  
 REPAIR_RSP          ),
+    VALIDATION_REQ         (101, P1, repairMsgTimeout,ANTI_ENTROPY,      () -> 
ValidationRequest.serializer,         () -> RepairMessageVerbHandler.instance,  
 REPAIR_RSP          ),
+    SYNC_RSP               (104, P1, repairMsgTimeout,ANTI_ENTROPY,      () -> 
SyncResponse.serializer,              () -> RepairMessageVerbHandler.instance,  
 REPAIR_RSP          ),
+    SYNC_REQ               (103, P1, repairMsgTimeout,ANTI_ENTROPY,      () -> 
SyncRequest.serializer,               () -> RepairMessageVerbHandler.instance,  
 REPAIR_RSP          ),
+    PREPARE_MSG            (105, P1, repairMsgTimeout,ANTI_ENTROPY,      () -> 
PrepareMessage.serializer,            () -> RepairMessageVerbHandler.instance,  
 REPAIR_RSP          ),
+    SNAPSHOT_MSG           (106, P1, repairMsgTimeout,ANTI_ENTROPY,      () -> 
SnapshotMessage.serializer,           () -> RepairMessageVerbHandler.instance,  
 REPAIR_RSP          ),
+    CLEANUP_MSG            (107, P1, repairMsgTimeout,ANTI_ENTROPY,      () -> 
CleanupMessage.serializer,            () -> RepairMessageVerbHandler.instance,  
 REPAIR_RSP          ),
+    PREPARE_CONSISTENT_RSP (109, P1, repairMsgTimeout,ANTI_ENTROPY,      () -> 
PrepareConsistentResponse.serializer, () -> RepairMessageVerbHandler.instance,  
 REPAIR_RSP          ),
+    PREPARE_CONSISTENT_REQ (108, P1, repairMsgTimeout,ANTI_ENTROPY,      () -> 
PrepareConsistentRequest.serializer,  () -> RepairMessageVerbHandler.instance,  
 REPAIR_RSP          ),
+    FINALIZE_PROPOSE_MSG   (110, P1, repairMsgTimeout,ANTI_ENTROPY,      () -> 
FinalizePropose.serializer,           () -> RepairMessageVerbHandler.instance,  
 REPAIR_RSP          ),
+    FINALIZE_PROMISE_MSG   (111, P1, repairMsgTimeout,ANTI_ENTROPY,      () -> 
FinalizePromise.serializer,           () -> RepairMessageVerbHandler.instance,  
 REPAIR_RSP          ),
+    FINALIZE_COMMIT_MSG    (112, P1, repairMsgTimeout,ANTI_ENTROPY,      () -> 
FinalizeCommit.serializer,            () -> RepairMessageVerbHandler.instance,  
 REPAIR_RSP          ),
+    FAILED_SESSION_MSG     (113, P1, repairMsgTimeout,ANTI_ENTROPY,      () -> 
FailSession.serializer,               () -> RepairMessageVerbHandler.instance,  
 REPAIR_RSP          ),
+    STATUS_RSP             (115, P1, repairMsgTimeout,ANTI_ENTROPY,      () -> 
StatusResponse.serializer,            () -> RepairMessageVerbHandler.instance,  
 REPAIR_RSP          ),
+    STATUS_REQ             (114, P1, repairMsgTimeout,ANTI_ENTROPY,      () -> 
StatusRequest.serializer,             () -> RepairMessageVerbHandler.instance,  
 REPAIR_RSP          ),
 
     REPLICATION_DONE_RSP   (82,  P0, rpcTimeout,      MISC,              () -> 
NoPayload.serializer,                 () -> ResponseVerbHandler.instance        
                     ),
     REPLICATION_DONE_REQ   (22,  P0, rpcTimeout,      MISC,              () -> 
NoPayload.serializer,                 () -> 
ReplicationDoneVerbHandler.instance, REPLICATION_DONE_RSP),
@@ -450,4 +450,5 @@ class VerbTimeouts
     static final ToLongFunction<TimeUnit> pingTimeout     = 
DatabaseDescriptor::getPingTimeout;
     static final ToLongFunction<TimeUnit> longTimeout     = units -> 
Math.max(DatabaseDescriptor.getRpcTimeout(units), units.convert(5L, 
TimeUnit.MINUTES));
     static final ToLongFunction<TimeUnit> noTimeout       = units -> { throw 
new IllegalStateException(); };
+    static final ToLongFunction<TimeUnit> repairMsgTimeout= 
DatabaseDescriptor::getRepairRpcTimeout;
 }
\ No newline at end of file
diff --git a/src/java/org/apache/cassandra/repair/AsymmetricRemoteSyncTask.java 
b/src/java/org/apache/cassandra/repair/AsymmetricRemoteSyncTask.java
index 40a1f514be..9762c9f08f 100644
--- a/src/java/org/apache/cassandra/repair/AsymmetricRemoteSyncTask.java
+++ b/src/java/org/apache/cassandra/repair/AsymmetricRemoteSyncTask.java
@@ -20,20 +20,17 @@ package org.apache.cassandra.repair;
 
 import java.util.List;
 
+
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.exceptions.RepairException;
 import org.apache.cassandra.locator.InetAddressAndPort;
-import org.apache.cassandra.net.Message;
-import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.repair.messages.SyncRequest;
 import org.apache.cassandra.streaming.PreviewKind;
 import org.apache.cassandra.streaming.SessionSummary;
 import org.apache.cassandra.tracing.Tracing;
 import org.apache.cassandra.utils.FBUtilities;
 
-import static org.apache.cassandra.net.Verb.SYNC_REQ;
-
 /**
  * AsymmetricRemoteSyncTask sends {@link SyncRequest} to target node to 
repair(stream)
  * data with other target replica.
@@ -53,7 +50,7 @@ public class AsymmetricRemoteSyncTask extends SyncTask 
implements CompletableRem
         SyncRequest request = new SyncRequest(desc, local, 
nodePair.coordinator, nodePair.peer, rangesToSync, previewKind, true);
         String message = String.format("Forwarding streaming repair of %d 
ranges to %s (to be streamed with %s)", request.ranges.size(), request.src, 
request.dst);
         Tracing.traceRepair(message);
-        MessagingService.instance().send(Message.out(SYNC_REQ, request), 
request.src);
+        sendRequest(request, request.src);
     }
 
     public void syncComplete(boolean success, List<SessionSummary> summaries)
diff --git a/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java 
b/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java
index bfc2657ab4..7488f2ebc1 100644
--- a/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java
+++ b/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java
@@ -124,6 +124,8 @@ public class RepairMessageVerbHandler implements 
IVerbHandler<RepairMessage>
                     break;
 
                 case VALIDATION_REQ:
+                    // notify initiator that the message has been received, 
allowing this method to take as long as it needs to
+                    MessagingService.instance().send(message.emptyResponse(), 
message.from());
                     ValidationRequest validationRequest = (ValidationRequest) 
message.payload;
                     logger.debug("Validating {}", validationRequest);
                     // trigger read-only compaction
@@ -143,6 +145,8 @@ public class RepairMessageVerbHandler implements 
IVerbHandler<RepairMessage>
                     break;
 
                 case SYNC_REQ:
+                    // notify initiator that the message has been received, 
allowing this method to take as long as it needs to
+                    MessagingService.instance().send(message.emptyResponse(), 
message.from());
                     // forwarded sync request
                     SyncRequest request = (SyncRequest) message.payload;
                     logger.debug("Syncing {}", request);
diff --git a/src/java/org/apache/cassandra/repair/StreamingRepairTask.java 
b/src/java/org/apache/cassandra/repair/StreamingRepairTask.java
index fbfbac8748..9c6caf46f5 100644
--- a/src/java/org/apache/cassandra/repair/StreamingRepairTask.java
+++ b/src/java/org/apache/cassandra/repair/StreamingRepairTask.java
@@ -39,7 +39,11 @@ import org.apache.cassandra.streaming.StreamPlan;
 import org.apache.cassandra.streaming.StreamState;
 import org.apache.cassandra.streaming.StreamOperation;
 
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static java.util.concurrent.TimeUnit.NANOSECONDS;
 import static org.apache.cassandra.net.Verb.SYNC_RSP;
+import static org.apache.cassandra.utils.MonotonicClock.approxTime;
+
 
 /**
  * StreamingRepairTask performs data streaming between two remote replicas, 
neither of which is repair coordinator.
@@ -73,7 +77,10 @@ public class StreamingRepairTask implements Runnable, 
StreamEventHandler
     public void run()
     {
         logger.info("[streaming task #{}] Performing {}streaming repair of {} 
ranges with {}", desc.sessionId, asymmetric ? "asymmetric " : "", 
ranges.size(), dst);
-        createStreamPlan(dst).execute();
+        long start = approxTime.now();
+        StreamPlan streamPlan = createStreamPlan(dst);
+        logger.info("[streaming task #{}] Stream plan created in {}ms", 
desc.sessionId, MILLISECONDS.convert(approxTime.now() - start, NANOSECONDS));
+        streamPlan.execute();
     }
 
     @VisibleForTesting
diff --git a/src/java/org/apache/cassandra/repair/SymmetricRemoteSyncTask.java 
b/src/java/org/apache/cassandra/repair/SymmetricRemoteSyncTask.java
index b4e2d9c5a1..629f4bb8aa 100644
--- a/src/java/org/apache/cassandra/repair/SymmetricRemoteSyncTask.java
+++ b/src/java/org/apache/cassandra/repair/SymmetricRemoteSyncTask.java
@@ -27,17 +27,12 @@ import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.exceptions.RepairException;
 import org.apache.cassandra.locator.InetAddressAndPort;
-import org.apache.cassandra.net.Message;
-import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.repair.messages.RepairMessage;
 import org.apache.cassandra.repair.messages.SyncRequest;
 import org.apache.cassandra.streaming.PreviewKind;
 import org.apache.cassandra.streaming.SessionSummary;
 import org.apache.cassandra.tracing.Tracing;
 import org.apache.cassandra.utils.FBUtilities;
 
-import static org.apache.cassandra.net.Verb.SYNC_REQ;
-
 /**
  * SymmetricRemoteSyncTask sends {@link SyncRequest} to 
remote(non-coordinator) node
  * to repair(stream) data with other replica.
@@ -53,16 +48,10 @@ public class SymmetricRemoteSyncTask extends SyncTask 
implements CompletableRemo
         super(desc, r1, r2, differences, previewKind);
     }
 
-    void sendRequest(SyncRequest request, InetAddressAndPort to)
-    {
-        MessagingService.instance().send(Message.out(SYNC_REQ, request), to);
-    }
-
     @Override
     protected void startSync()
     {
         InetAddressAndPort local = FBUtilities.getBroadcastAddressAndPort();
-
         SyncRequest request = new SyncRequest(desc, local, 
nodePair.coordinator, nodePair.peer, rangesToSync, previewKind, false);
         Preconditions.checkArgument(nodePair.coordinator.equals(request.src));
         String message = String.format("Forwarding streaming repair of %d 
ranges to %s (to be streamed with %s)", request.ranges.size(), request.src, 
request.dst);
diff --git a/src/java/org/apache/cassandra/repair/SyncTask.java 
b/src/java/org/apache/cassandra/repair/SyncTask.java
index fe9f09ed7e..24e206828d 100644
--- a/src/java/org/apache/cassandra/repair/SyncTask.java
+++ b/src/java/org/apache/cassandra/repair/SyncTask.java
@@ -32,9 +32,13 @@ import org.apache.cassandra.db.Keyspace;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.repair.messages.RepairMessage;
+import org.apache.cassandra.repair.messages.SyncRequest;
 import org.apache.cassandra.streaming.PreviewKind;
 import org.apache.cassandra.tracing.Tracing;
 
+import static org.apache.cassandra.net.Verb.SYNC_REQ;
+
 public abstract class SyncTask extends AbstractFuture<SyncStat> implements 
Runnable
 {
     private static final Logger logger = 
LoggerFactory.getLogger(SyncTask.class);
@@ -101,4 +105,12 @@ public abstract class SyncTask extends 
AbstractFuture<SyncStat> implements Runna
     }
 
     public void abort() {}
+
+    void sendRequest(SyncRequest request, InetAddressAndPort to)
+    {
+        RepairMessage.sendMessageWithFailureCB(request,
+                                               SYNC_REQ,
+                                               to,
+                                               this::setException);
+    }
 }
diff --git a/src/java/org/apache/cassandra/repair/ValidationTask.java 
b/src/java/org/apache/cassandra/repair/ValidationTask.java
index 0161acf8d8..b4aef249c5 100644
--- a/src/java/org/apache/cassandra/repair/ValidationTask.java
+++ b/src/java/org/apache/cassandra/repair/ValidationTask.java
@@ -21,8 +21,7 @@ import com.google.common.util.concurrent.AbstractFuture;
 
 import org.apache.cassandra.exceptions.RepairException;
 import org.apache.cassandra.locator.InetAddressAndPort;
-import org.apache.cassandra.net.Message;
-import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.repair.messages.RepairMessage;
 import org.apache.cassandra.repair.messages.ValidationRequest;
 import org.apache.cassandra.streaming.PreviewKind;
 import org.apache.cassandra.utils.MerkleTrees;
@@ -53,8 +52,10 @@ public class ValidationTask extends 
AbstractFuture<TreeResponse> implements Runn
      */
     public void run()
     {
-        ValidationRequest request = new ValidationRequest(desc, nowInSec);
-        MessagingService.instance().send(Message.out(VALIDATION_REQ, request), 
endpoint);
+        RepairMessage.sendMessageWithFailureCB(new ValidationRequest(desc, 
nowInSec),
+                                               VALIDATION_REQ,
+                                               endpoint,
+                                               this::setException);
     }
 
     /**
diff --git a/src/java/org/apache/cassandra/repair/messages/RepairMessage.java 
b/src/java/org/apache/cassandra/repair/messages/RepairMessage.java
index 3137b4e474..165911dc9b 100644
--- a/src/java/org/apache/cassandra/repair/messages/RepairMessage.java
+++ b/src/java/org/apache/cassandra/repair/messages/RepairMessage.java
@@ -17,7 +17,23 @@
  */
 package org.apache.cassandra.repair.messages;
 
+import java.util.UUID;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.exceptions.RepairException;
+import org.apache.cassandra.exceptions.RequestFailureReason;
+import org.apache.cassandra.gms.Gossiper;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.net.RequestCallback;
+import org.apache.cassandra.net.Verb;
 import org.apache.cassandra.repair.RepairJobDesc;
+import org.apache.cassandra.utils.CassandraVersion;
+
+import static org.apache.cassandra.net.MessageFlag.CALL_BACK_ON_FAILURE;
 
 /**
  * Base class of all repair related request/response messages.
@@ -26,10 +42,57 @@ import org.apache.cassandra.repair.RepairJobDesc;
  */
 public abstract class RepairMessage
 {
+    private static final CassandraVersion SUPPORTS_TIMEOUTS = new 
CassandraVersion("4.0.7-SNAPSHOT");
+    private static final Logger logger = 
LoggerFactory.getLogger(RepairMessage.class);
     public final RepairJobDesc desc;
 
     protected RepairMessage(RepairJobDesc desc)
     {
         this.desc = desc;
     }
+
+    public interface RepairFailureCallback
+    {
+        void onFailure(Exception e);
+    }
+
+    public static void sendMessageWithFailureCB(RepairMessage request, Verb 
verb, InetAddressAndPort endpoint, RepairFailureCallback failureCallback)
+    {
+        RequestCallback<?> callback = new RequestCallback<Object>()
+        {
+            @Override
+            public void onResponse(Message<Object> msg)
+            {
+                logger.info("[#{}] {} received by {}", 
request.desc.parentSessionId, verb, endpoint);
+                // todo: at some point we should make repair messages follow 
the normal path, actually using this
+            }
+
+            @Override
+            public boolean invokeOnFailure()
+            {
+                return true;
+            }
+
+            public void onFailure(InetAddressAndPort from, 
RequestFailureReason failureReason)
+            {
+                logger.error("[#{}] {} failed on {}: {}", 
request.desc.parentSessionId, verb, from, failureReason);
+
+                if (supportsTimeouts(from, request.desc.parentSessionId))
+                    failureCallback.onFailure(new 
RepairException(request.desc, String.format("Got %s failure from %s: %s", verb, 
from, failureReason)));
+            }
+        };
+
+        MessagingService.instance().sendWithCallback(Message.outWithFlag(verb, 
request, CALL_BACK_ON_FAILURE),
+                                                     endpoint,
+                                                     callback);
+    }
+
+    private static boolean supportsTimeouts(InetAddressAndPort from, UUID 
parentSessionId)
+    {
+        CassandraVersion remoteVersion = 
Gossiper.instance.getReleaseVersion(from);
+        if (remoteVersion != null && 
remoteVersion.compareTo(SUPPORTS_TIMEOUTS) >= 0)
+            return true;
+        logger.warn("[#{}] Not failing repair due to remote host {} not 
supporting repair message timeouts (version = {})", parentSessionId, from, 
remoteVersion);
+        return false;
+    }
 }
diff --git a/src/java/org/apache/cassandra/service/StorageService.java 
b/src/java/org/apache/cassandra/service/StorageService.java
index 4568c5af8f..4721113ad3 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -6043,4 +6043,17 @@ public class StorageService extends 
NotificationBroadcasterSupport implements IE
         logger.info("Changing keyspace count warn threshold from {} to {}", 
getKeyspaceCountWarnThreshold(), value);
         DatabaseDescriptor.setKeyspaceCountWarnThreshold(value);
     }
+
+    public Long getRepairRpcTimeout()
+    {
+        return DatabaseDescriptor.getRepairRpcTimeout(MILLISECONDS);
+    }
+
+    public void setRepairRpcTimeout(Long timeoutInMillis)
+    {
+        Preconditions.checkState(timeoutInMillis > 0);
+        DatabaseDescriptor.setRepairRpcTimeout(timeoutInMillis, MILLISECONDS);
+        logger.info("RepairRpcTimeout set to {}ms via JMX", timeoutInMillis);
+    }
+
 }
diff --git a/src/java/org/apache/cassandra/service/StorageServiceMBean.java 
b/src/java/org/apache/cassandra/service/StorageServiceMBean.java
index 30e643bc1d..c61e45e7a4 100644
--- a/src/java/org/apache/cassandra/service/StorageServiceMBean.java
+++ b/src/java/org/apache/cassandra/service/StorageServiceMBean.java
@@ -862,4 +862,7 @@ public interface StorageServiceMBean extends 
NotificationEmitter
     void setTableCountWarnThreshold(int value);
     int getKeyspaceCountWarnThreshold();
     void setKeyspaceCountWarnThreshold(int value);
+
+    public Long getRepairRpcTimeout();
+    public void setRepairRpcTimeout(Long timeoutInMillis);
 }
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/RepairRequestTimeoutTest.java
 
b/test/distributed/org/apache/cassandra/distributed/test/RepairRequestTimeoutTest.java
new file mode 100644
index 0000000000..33e0e78724
--- /dev/null
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/RepairRequestTimeoutTest.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test;
+
+import java.io.IOException;
+
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.ConsistencyLevel;
+import org.apache.cassandra.net.Verb;
+
+import static 
com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly;
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static org.apache.cassandra.distributed.api.Feature.GOSSIP;
+import static org.apache.cassandra.distributed.api.Feature.NETWORK;
+import static org.apache.cassandra.net.Verb.SYNC_REQ;
+import static org.apache.cassandra.net.Verb.VALIDATION_REQ;
+import static org.junit.Assert.assertTrue;
+
+public class RepairRequestTimeoutTest extends TestBaseImpl
+{
+    static Cluster CLUSTER;
+    static final long timeoutMillis = 1000;
+    @BeforeClass
+    public static void setup() throws IOException
+    {
+        CLUSTER = init(Cluster.build(3)
+                              .withConfig(config -> config.with(GOSSIP, 
NETWORK).set("repair_request_timeout_in_ms", timeoutMillis))
+                              .start());
+        CLUSTER.schemaChange(withKeyspace("create table %s.tbl (id int primary 
key)"));
+    }
+
+    @Before
+    public void before()
+    {
+        CLUSTER.filters().reset();
+    }
+
+    @Test
+    public void testLostSYNC_REQ()
+    {
+        testLostMessageHelper(SYNC_REQ);
+    }
+
+    @Test
+    public void testLostVALIDATION_REQ()
+    {
+        testLostMessageHelper(VALIDATION_REQ);
+    }
+
+    public void testLostMessageHelper(Verb verb)
+    {
+        for (int i = 0; i < 10; i++)
+            CLUSTER.coordinator(1).execute(withKeyspace("insert into %s.tbl 
(id) values (?)"), ConsistencyLevel.ALL, i);
+        for (int  i = 10; i < 20; i++)
+            CLUSTER.get((i % 3) + 1).executeInternal(withKeyspace("insert into 
%s.tbl (id) values (?)"), i);
+        CLUSTER.forEach(i -> i.flush(KEYSPACE));
+        CLUSTER.filters().verbs(verb.id).drop();
+        // symmetric vs asymmetric:
+        CLUSTER.get(1).nodetoolResult("repair", "-full", KEYSPACE, 
"tbl").asserts().failure().notificationContains(verb + " failure from");
+        CLUSTER.get(1).nodetoolResult("repair", "-full", "-os", KEYSPACE, 
"tbl").asserts().failure().notificationContains(verb + " failure from");
+
+        // and success
+        CLUSTER.filters().reset();
+        long mark = CLUSTER.get(1).logs().mark();
+
+        CLUSTER.get(1).nodetoolResult("repair", "-full", KEYSPACE, 
"tbl").asserts().success();
+        for (int  i = 10; i < 20; i++)
+            CLUSTER.get((i % 3) + 1).executeInternal(withKeyspace("insert into 
%s.tbl (id) values (?)"), i);
+
+        CLUSTER.get(1).nodetoolResult("repair", "-full", "-os", KEYSPACE, 
"tbl").asserts().success();
+        CLUSTER.get(1).runOnInstance(() -> {
+            // make sure we don't get any expirations after the repair has 
finished
+            long expirationInterval = 
DatabaseDescriptor.getMinRpcTimeout(MILLISECONDS) / 2; // see 
RequestCallbacks.java
+            sleepUninterruptibly((timeoutMillis + expirationInterval) * 2, 
MILLISECONDS);
+        });
+
+        assertTrue(CLUSTER.get(1).logs().grep(mark, "failure 
from").getResult().isEmpty());
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to