This is an automated email from the ASF dual-hosted git repository.

marcuse pushed a commit to branch cassandra-4.1
in repository https://gitbox.apache.org/repos/asf/cassandra.git

commit cd576a0d140c94007fd0181044df453c0681252f
Merge: 726b67b1e4 5beab63b55
Author: Marcus Eriksson <[email protected]>
AuthorDate: Mon Aug 29 13:27:46 2022 +0200

    Merge branch 'cassandra-4.0' into cassandra-4.1

 CHANGES.txt                                        |   4 +
 src/java/org/apache/cassandra/config/Config.java   |   1 +
 src/java/org/apache/cassandra/net/Verb.java        |   2 +-
 .../cassandra/repair/AsymmetricRemoteSyncTask.java |   7 +-
 .../cassandra/repair/RepairMessageVerbHandler.java |   5 +-
 .../cassandra/repair/StreamingRepairTask.java      |   9 +-
 .../cassandra/repair/SymmetricRemoteSyncTask.java  |  10 ---
 src/java/org/apache/cassandra/repair/SyncTask.java |  11 +++
 .../apache/cassandra/repair/ValidationTask.java    |   9 +-
 .../cassandra/repair/messages/RepairMessage.java   |  63 +++++++++++++
 .../cassandra/service/ActiveRepairService.java     |   2 -
 .../distributed/test/RepairCoordinatorBase.java    |   3 +
 .../distributed/test/RepairRequestTimeoutTest.java | 100 +++++++++++++++++++++
 .../upgrade/RepairRequestTimeoutUpgradeTest.java   |  58 ++++++++++++
 14 files changed, 260 insertions(+), 24 deletions(-)

diff --cc CHANGES.txt
index 4335f0ea2f,96a37f53e9..bf0ea108f8
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,47 -1,12 +1,51 @@@
 +4.1-alpha2
 + * Fix BulkLoader to load entireSSTableThrottle and 
entireSSTableInterDcThrottle (CASSANDRA-17677)
 + * Fix a race condition where a keyspace can be oopened while it is being 
removed (CASSANDRA-17658)
 + * DatabaseDescriptor will set the default failure detector during client 
initialization (CASSANDRA-17782)
 + * Avoid initializing schema via SystemKeyspace.getPreferredIP() with the 
BulkLoader tool (CASSANDRA-17740)
 + * Uncomment prepared_statements_cache_size, key_cache_size, 
counter_cache_size, index_summary_capacity which were
 +   commented out by mistake in a previous patch
 +   Fix breaking change with cache_load_timeout; cache_load_timeout_seconds 
<=0 and cache_load_timeout=0 are equivalent
 +   and they both mean disabled
 +   Deprecate public method setRate(final double throughputMbPerSec) in 
Compaction Manager in favor of
 +   setRateInBytes(final double throughputBytesPerSec)
 +   Revert breaking change removal of 
StressCQLSSTableWriter.Builder.withBufferSizeInMB(int size). Deprecate it in 
favor
 +   of StressCQLSSTableWriter.Builder.withBufferSizeInMiB(int size)
 +   Fix precision issues, add new -m flag (for nodetool/setstreamthroughput, 
nodetool/setinterdcstreamthroughput,
 +   nodetool/getstreamthroughput and nodetoo/getinterdcstreamthroughput), add 
new -d flags (nodetool/getstreamthroughput, 
nodetool/getinterdcstreamthroughput, nodetool/getcompactionthroughput)
 +   Fix a bug with precision in nodetool/compactionstats
 +   Deprecate StorageService methods and add new ones for 
stream_throughput_outbound, inter_dc_stream_throughput_outbound,
 +   compaction_throughput_outbound in the JMX MBean 
`org.apache.cassandra.db:type=StorageService`
 +   Removed getEntireSSTableStreamThroughputMebibytesPerSec in favor of new 
getEntireSSTableStreamThroughputMebibytesPerSecAsDouble
 +   in the JMX MBean `org.apache.cassandra.db:type=StorageService`
 +   Removed getEntireSSTableInterDCStreamThroughputMebibytesPerSec in favor of 
getEntireSSTableInterDCStreamThroughputMebibytesPerSecAsDouble
 +   in the JMX MBean `org.apache.cassandra.db:type=StorageService` 
(CASSANDRA-17725)
 + * Fix sstable_preemptive_open_interval disabled value. 
sstable_preemptive_open_interval = null backward compatible with
 +   sstable_preemptive_open_interval_in_mb = -1 (CASSANDRA-17737)
 + * Remove usages of Path#toFile() in the snapshot apparatus (CASSANDRA-17769)
 + * Fix Settings Virtual Table to update paxos_variant after startup and 
rename enable_uuid_sstable_identifiers to
 +   uuid_sstable_identifiers_enabled as per our config naming conventions 
(CASSANDRA-17738)
 + * index_summary_resize_interval_in_minutes = -1 is equivalent to 
index_summary_resize_interval being set to null or
 +   disabled. JMX MBean IndexSummaryManager, setResizeIntervalInMinutes method 
still takes resizeIntervalInMinutes = -1 for disabled (CASSANDRA-17735)
 + * min_tracked_partition_size_bytes parameter from 4.1 alpha1 was renamed to 
min_tracked_partition_size (CASSANDRA-17733)
 + * Remove commons-lang dependency during build runtime (CASSANDRA-17724)
 + * Relax synchronization on StreamSession#onError() to avoid deadlock 
(CASSANDRA-17706)
 + * Fix AbstractCell#toString throws MarshalException for cell in collection 
(CASSANDRA-17695)
 + * Add new vtable output option to compactionstats (CASSANDRA-17683)
 + * Fix commitLogUpperBound initialization in AbstractMemtableWithCommitlog 
(CASSANDRA-17587)
 + * Fix widening to long in getBatchSizeFailThreshold (CASSANDRA-17650)
 + * Fix widening from mebibytes to bytes in IntMebibytesBound (CASSANDRA-17716)
 + * Revert breaking change in nodetool clientstats and expose cient options 
through nodetool clientstats --client-options. (CASSANDRA-17715)
 + * Fix missed nowInSec values in QueryProcessor (CASSANDRA-17458)
 + * Revert removal of withBufferSizeInMB(int size) in CQLSSTableWriter.Builder 
class and deprecate it in favor of withBufferSizeInMiB(int size) 
(CASSANDRA-17675)
 + * Remove expired snapshots of dropped tables after restart (CASSANDRA-17619)
 +Merged from 4.0:
+ 4.0.7
+  * Avoid getting hanging repairs due to repair message timeouts 
(CASSANDRA-17613)
 - * Prevent infinite loop in repair coordinator on FailSession 
(CASSANDRA-17834)
 -Merged from 3.11:
 - * Fix potential IndexOutOfBoundsException in PagingState in mixed mode 
clusters (CASSANDRA-17840)
 -Merged from 3.0:
 - * Fix scrubber falling into infinite loop when the last partition is broken 
(CASSANDRA-17862)
++
+ 
  4.0.6
 + * Prevent infinite loop in repair coordinator on FailSession 
(CASSANDRA-17834)
   * Fix race condition on updating cdc size and advancing to next segment 
(CASSANDRA-17792)
   * Add 'noboolean' rpm build for older distros like CentOS7 (CASSANDRA-17765)
   * Fix default value for compaction_throughput_mb_per_sec in Config class to 
match  the one in cassandra.yaml (CASSANDRA-17790)
diff --cc src/java/org/apache/cassandra/config/Config.java
index 1c5948dc73,f8d8d46db8..7fd9b05816
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@@ -123,41 -91,28 +123,42 @@@ public class Confi
      /** Triggers automatic allocation of tokens if set, based on the provided 
replica count for a datacenter */
      public Integer allocate_tokens_for_local_replication_factor = null;
  
 -    public long native_transport_idle_timeout_in_ms = 0L;
 +    @Replaces(oldName = "native_transport_idle_timeout_in_ms", converter = 
Converters.MILLIS_DURATION_LONG, deprecated = true)
 +    public DurationSpec.LongMillisecondsBound native_transport_idle_timeout = 
new DurationSpec.LongMillisecondsBound("0ms");
 +
 +    @Replaces(oldName = "request_timeout_in_ms", converter = 
Converters.MILLIS_DURATION_LONG, deprecated = true)
 +    public volatile DurationSpec.LongMillisecondsBound request_timeout = new 
DurationSpec.LongMillisecondsBound("10000ms");
  
 -    public volatile long request_timeout_in_ms = 10000L;
 +    @Replaces(oldName = "read_request_timeout_in_ms", converter = 
Converters.MILLIS_DURATION_LONG, deprecated = true)
 +    public volatile DurationSpec.LongMillisecondsBound read_request_timeout = 
new DurationSpec.LongMillisecondsBound("5000ms");
  
 -    public volatile long read_request_timeout_in_ms = 5000L;
 +    @Replaces(oldName = "range_request_timeout_in_ms", converter = 
Converters.MILLIS_DURATION_LONG, deprecated = true)
 +    public volatile DurationSpec.LongMillisecondsBound range_request_timeout 
= new DurationSpec.LongMillisecondsBound("10000ms");
  
 -    public volatile long range_request_timeout_in_ms = 10000L;
 +    @Replaces(oldName = "write_request_timeout_in_ms", converter = 
Converters.MILLIS_DURATION_LONG, deprecated = true)
 +    public volatile DurationSpec.LongMillisecondsBound write_request_timeout 
= new DurationSpec.LongMillisecondsBound("2000ms");
  
 -    public volatile long write_request_timeout_in_ms = 2000L;
 +    @Replaces(oldName = "counter_write_request_timeout_in_ms", converter = 
Converters.MILLIS_DURATION_LONG, deprecated = true)
 +    public volatile DurationSpec.LongMillisecondsBound 
counter_write_request_timeout = new 
DurationSpec.LongMillisecondsBound("5000ms");
  
 -    public volatile long counter_write_request_timeout_in_ms = 5000L;
 +    @Replaces(oldName = "cas_contention_timeout_in_ms", converter = 
Converters.MILLIS_DURATION_LONG, deprecated = true)
 +    public volatile DurationSpec.LongMillisecondsBound cas_contention_timeout 
= new DurationSpec.LongMillisecondsBound("1800ms");
  
 -    public volatile long cas_contention_timeout_in_ms = 1000L;
 +    @Replaces(oldName = "truncate_request_timeout_in_ms", converter = 
Converters.MILLIS_DURATION_LONG, deprecated = true)
 +    public volatile DurationSpec.LongMillisecondsBound 
truncate_request_timeout = new DurationSpec.LongMillisecondsBound("60000ms");
  
 -    public volatile long truncate_request_timeout_in_ms = 60000L;
++    @Replaces(oldName = "repair_request_timeout_in_ms", converter = 
Converters.MILLIS_DURATION_LONG, deprecated = true)
 +    public volatile DurationSpec.LongMillisecondsBound repair_request_timeout 
= new DurationSpec.LongMillisecondsBound("120000ms");
  
      public Integer streaming_connections_per_host = 1;
 -    public Integer streaming_keep_alive_period_in_secs = 300; //5 minutes
 +    @Replaces(oldName = "streaming_keep_alive_period_in_secs", converter = 
Converters.SECONDS_DURATION, deprecated = true)
 +    public DurationSpec.IntSecondsBound streaming_keep_alive_period = new 
DurationSpec.IntSecondsBound("300s");
  
 -    public boolean cross_node_timeout = true;
 +    @Replaces(oldName = "cross_node_timeout", converter = 
Converters.IDENTITY, deprecated = true)
 +    public boolean internode_timeout = true;
  
 -    public volatile long slow_query_log_timeout_in_ms = 500L;
 +    @Replaces(oldName = "slow_query_log_timeout_in_ms", converter = 
Converters.MILLIS_DURATION_LONG, deprecated = true)
 +    public volatile DurationSpec.LongMillisecondsBound slow_query_log_timeout 
= new DurationSpec.LongMillisecondsBound("500ms");
  
      public volatile double phi_convict_threshold = 8.0;
  
diff --cc src/java/org/apache/cassandra/net/Verb.java
index a562b37fb6,9d8b76dd35..d50a187fda
--- a/src/java/org/apache/cassandra/net/Verb.java
+++ b/src/java/org/apache/cassandra/net/Verb.java
@@@ -158,22 -147,22 +158,22 @@@ public enum Ver
      SCHEMA_VERSION_REQ     (20,  P1, rpcTimeout,      MIGRATION,         () 
-> NoPayload.serializer,                 () -> 
SchemaVersionVerbHandler.instance,   SCHEMA_VERSION_RSP  ),
  
      // repair; mostly doesn't use callbacks and sends responses as their own 
request messages, with matching sessions by uuid; should eventually harmonize 
and make idiomatic
 -    REPAIR_RSP             (100, P1, repairMsgTimeout,REQUEST_RESPONSE,  () 
-> NoPayload.serializer,                 () -> ResponseVerbHandler.instance     
                        ),
 -    VALIDATION_RSP         (102, P1, longTimeout     ,ANTI_ENTROPY,      () 
-> ValidationResponse.serializer,        () -> 
RepairMessageVerbHandler.instance,   REPAIR_RSP          ),
 -    VALIDATION_REQ         (101, P1, repairMsgTimeout,ANTI_ENTROPY,      () 
-> ValidationRequest.serializer,         () -> 
RepairMessageVerbHandler.instance,   REPAIR_RSP          ),
 -    SYNC_RSP               (104, P1, repairMsgTimeout,ANTI_ENTROPY,      () 
-> SyncResponse.serializer,              () -> 
RepairMessageVerbHandler.instance,   REPAIR_RSP          ),
 -    SYNC_REQ               (103, P1, repairMsgTimeout,ANTI_ENTROPY,      () 
-> SyncRequest.serializer,               () -> 
RepairMessageVerbHandler.instance,   REPAIR_RSP          ),
 -    PREPARE_MSG            (105, P1, repairMsgTimeout,ANTI_ENTROPY,      () 
-> PrepareMessage.serializer,            () -> 
RepairMessageVerbHandler.instance,   REPAIR_RSP          ),
 -    SNAPSHOT_MSG           (106, P1, repairMsgTimeout,ANTI_ENTROPY,      () 
-> SnapshotMessage.serializer,           () -> 
RepairMessageVerbHandler.instance,   REPAIR_RSP          ),
 -    CLEANUP_MSG            (107, P1, repairMsgTimeout,ANTI_ENTROPY,      () 
-> CleanupMessage.serializer,            () -> 
RepairMessageVerbHandler.instance,   REPAIR_RSP          ),
 -    PREPARE_CONSISTENT_RSP (109, P1, repairMsgTimeout,ANTI_ENTROPY,      () 
-> PrepareConsistentResponse.serializer, () -> 
RepairMessageVerbHandler.instance,   REPAIR_RSP          ),
 -    PREPARE_CONSISTENT_REQ (108, P1, repairMsgTimeout,ANTI_ENTROPY,      () 
-> PrepareConsistentRequest.serializer,  () -> 
RepairMessageVerbHandler.instance,   REPAIR_RSP          ),
 -    FINALIZE_PROPOSE_MSG   (110, P1, repairMsgTimeout,ANTI_ENTROPY,      () 
-> FinalizePropose.serializer,           () -> 
RepairMessageVerbHandler.instance,   REPAIR_RSP          ),
 -    FINALIZE_PROMISE_MSG   (111, P1, repairMsgTimeout,ANTI_ENTROPY,      () 
-> FinalizePromise.serializer,           () -> 
RepairMessageVerbHandler.instance,   REPAIR_RSP          ),
 -    FINALIZE_COMMIT_MSG    (112, P1, repairMsgTimeout,ANTI_ENTROPY,      () 
-> FinalizeCommit.serializer,            () -> 
RepairMessageVerbHandler.instance,   REPAIR_RSP          ),
 -    FAILED_SESSION_MSG     (113, P1, repairMsgTimeout,ANTI_ENTROPY,      () 
-> FailSession.serializer,               () -> 
RepairMessageVerbHandler.instance,   REPAIR_RSP          ),
 -    STATUS_RSP             (115, P1, repairMsgTimeout,ANTI_ENTROPY,      () 
-> StatusResponse.serializer,            () -> 
RepairMessageVerbHandler.instance,   REPAIR_RSP          ),
 -    STATUS_REQ             (114, P1, repairMsgTimeout,ANTI_ENTROPY,      () 
-> StatusRequest.serializer,             () -> 
RepairMessageVerbHandler.instance,   REPAIR_RSP          ),
 +    REPAIR_RSP             (100, P1, repairTimeout,   REQUEST_RESPONSE,  () 
-> NoPayload.serializer,                 () -> ResponseVerbHandler.instance     
                        ),
-     VALIDATION_RSP         (102, P1, repairTimeout,   ANTI_ENTROPY,      () 
-> ValidationResponse.serializer,        () -> 
RepairMessageVerbHandler.instance,   REPAIR_RSP          ),
++    VALIDATION_RSP         (102, P1, longTimeout,     ANTI_ENTROPY,      () 
-> ValidationResponse.serializer,        () -> 
RepairMessageVerbHandler.instance,   REPAIR_RSP          ),
 +    VALIDATION_REQ         (101, P1, repairTimeout,   ANTI_ENTROPY,      () 
-> ValidationRequest.serializer,         () -> 
RepairMessageVerbHandler.instance,   REPAIR_RSP          ),
 +    SYNC_RSP               (104, P1, repairTimeout,   ANTI_ENTROPY,      () 
-> SyncResponse.serializer,              () -> 
RepairMessageVerbHandler.instance,   REPAIR_RSP          ),
 +    SYNC_REQ               (103, P1, repairTimeout,   ANTI_ENTROPY,      () 
-> SyncRequest.serializer,               () -> 
RepairMessageVerbHandler.instance,   REPAIR_RSP          ),
 +    PREPARE_MSG            (105, P1, repairTimeout,   ANTI_ENTROPY,      () 
-> PrepareMessage.serializer,            () -> 
RepairMessageVerbHandler.instance,   REPAIR_RSP          ),
 +    SNAPSHOT_MSG           (106, P1, repairTimeout,   ANTI_ENTROPY,      () 
-> SnapshotMessage.serializer,           () -> 
RepairMessageVerbHandler.instance,   REPAIR_RSP          ),
 +    CLEANUP_MSG            (107, P1, repairTimeout,   ANTI_ENTROPY,      () 
-> CleanupMessage.serializer,            () -> 
RepairMessageVerbHandler.instance,   REPAIR_RSP          ),
 +    PREPARE_CONSISTENT_RSP (109, P1, repairTimeout,   ANTI_ENTROPY,      () 
-> PrepareConsistentResponse.serializer, () -> 
RepairMessageVerbHandler.instance,   REPAIR_RSP          ),
 +    PREPARE_CONSISTENT_REQ (108, P1, repairTimeout,   ANTI_ENTROPY,      () 
-> PrepareConsistentRequest.serializer,  () -> 
RepairMessageVerbHandler.instance,   REPAIR_RSP          ),
 +    FINALIZE_PROPOSE_MSG   (110, P1, repairTimeout,   ANTI_ENTROPY,      () 
-> FinalizePropose.serializer,           () -> 
RepairMessageVerbHandler.instance,   REPAIR_RSP          ),
 +    FINALIZE_PROMISE_MSG   (111, P1, repairTimeout,   ANTI_ENTROPY,      () 
-> FinalizePromise.serializer,           () -> 
RepairMessageVerbHandler.instance,   REPAIR_RSP          ),
 +    FINALIZE_COMMIT_MSG    (112, P1, repairTimeout,   ANTI_ENTROPY,      () 
-> FinalizeCommit.serializer,            () -> 
RepairMessageVerbHandler.instance,   REPAIR_RSP          ),
 +    FAILED_SESSION_MSG     (113, P1, repairTimeout,   ANTI_ENTROPY,      () 
-> FailSession.serializer,               () -> 
RepairMessageVerbHandler.instance,   REPAIR_RSP          ),
 +    STATUS_RSP             (115, P1, repairTimeout,   ANTI_ENTROPY,      () 
-> StatusResponse.serializer,            () -> 
RepairMessageVerbHandler.instance,   REPAIR_RSP          ),
 +    STATUS_REQ             (114, P1, repairTimeout,   ANTI_ENTROPY,      () 
-> StatusRequest.serializer,             () -> 
RepairMessageVerbHandler.instance,   REPAIR_RSP          ),
  
      REPLICATION_DONE_RSP   (82,  P0, rpcTimeout,      MISC,              () 
-> NoPayload.serializer,                 () -> ResponseVerbHandler.instance     
                        ),
      REPLICATION_DONE_REQ   (22,  P0, rpcTimeout,      MISC,              () 
-> NoPayload.serializer,                 () -> 
ReplicationDoneVerbHandler.instance, REPLICATION_DONE_RSP),
diff --cc src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java
index da93413668,7488f2ebc1..58612f778a
--- a/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java
+++ b/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java
@@@ -33,8 -31,6 +33,7 @@@ import org.apache.cassandra.repair.stat
  import org.apache.cassandra.schema.TableId;
  import org.apache.cassandra.service.ActiveRepairService;
  import org.apache.cassandra.streaming.PreviewKind;
- import org.apache.cassandra.utils.FBUtilities;
 +import org.apache.cassandra.utils.TimeUUID;
  
  import static org.apache.cassandra.net.Verb.VALIDATION_RSP;
  
@@@ -148,14 -124,17 +147,16 @@@ public class RepairMessageVerbHandler i
                      break;
  
                  case VALIDATION_REQ:
 +                {
+                     // notify initiator that the message has been received, 
allowing this method to take as long as it needs to
+                     MessagingService.instance().send(message.emptyResponse(), 
message.from());
                      ValidationRequest validationRequest = (ValidationRequest) 
message.payload;
                      logger.debug("Validating {}", validationRequest);
 -                    // trigger read-only compaction
 -                    ColumnFamilyStore store = 
ColumnFamilyStore.getIfExists(desc.keyspace, desc.columnFamily);
 -                    if (store == null)
 +
 +                    ParticipateState participate = 
ActiveRepairService.instance.participate(desc.parentSessionId);
 +                    if (participate == null)
                      {
 -                        logger.error("Table {}.{} was dropped during snapshot 
phase of repair {}",
 -                                     desc.keyspace, desc.columnFamily, 
desc.parentSessionId);
 -                        
MessagingService.instance().send(Message.out(VALIDATION_RSP, new 
ValidationResponse(desc)), message.from());
 +                        logErrorAndSendFailureResponse("Unknown repair " + 
desc.parentSessionId, message);
                          return;
                      }
  
@@@ -205,7 -145,8 +206,9 @@@
                      break;
  
                  case SYNC_REQ:
 +                {
+                     // notify initiator that the message has been received, 
allowing this method to take as long as it needs to
+                     MessagingService.instance().send(message.emptyResponse(), 
message.from());
                      // forwarded sync request
                      SyncRequest request = (SyncRequest) message.payload;
                      logger.debug("Syncing {}", request);
diff --cc src/java/org/apache/cassandra/repair/StreamingRepairTask.java
index 02697b6977,9c6caf46f5..ea50a50068
--- a/src/java/org/apache/cassandra/repair/StreamingRepairTask.java
+++ b/src/java/org/apache/cassandra/repair/StreamingRepairTask.java
@@@ -37,9 -38,12 +37,13 @@@ import org.apache.cassandra.streaming.S
  import org.apache.cassandra.streaming.StreamPlan;
  import org.apache.cassandra.streaming.StreamState;
  import org.apache.cassandra.streaming.StreamOperation;
 +import org.apache.cassandra.utils.TimeUUID;
  
+ import static java.util.concurrent.TimeUnit.MILLISECONDS;
+ import static java.util.concurrent.TimeUnit.NANOSECONDS;
  import static org.apache.cassandra.net.Verb.SYNC_RSP;
 -import static org.apache.cassandra.utils.MonotonicClock.approxTime;
++import static org.apache.cassandra.utils.MonotonicClock.Global.approxTime;
+ 
  
  /**
   * StreamingRepairTask performs data streaming between two remote replicas, 
neither of which is repair coordinator.
diff --cc src/java/org/apache/cassandra/repair/SyncTask.java
index e254d93883,24e206828d..b325eb42ad
--- a/src/java/org/apache/cassandra/repair/SyncTask.java
+++ b/src/java/org/apache/cassandra/repair/SyncTask.java
@@@ -35,9 -37,9 +37,10 @@@ import org.apache.cassandra.repair.mess
  import org.apache.cassandra.streaming.PreviewKind;
  import org.apache.cassandra.tracing.Tracing;
  
 +import static org.apache.cassandra.utils.Clock.Global.currentTimeMillis;
+ import static org.apache.cassandra.net.Verb.SYNC_REQ;
  
 -public abstract class SyncTask extends AbstractFuture<SyncStat> implements 
Runnable
 +public abstract class SyncTask extends AsyncFuture<SyncStat> implements 
Runnable
  {
      private static final Logger logger = 
LoggerFactory.getLogger(SyncTask.class);
  
@@@ -103,4 -105,12 +106,12 @@@
      }
  
      public void abort() {}
+ 
+     void sendRequest(SyncRequest request, InetAddressAndPort to)
+     {
+         RepairMessage.sendMessageWithFailureCB(request,
+                                                SYNC_REQ,
+                                                to,
 -                                               this::setException);
++                                               this::tryFailure);
+     }
  }
diff --cc src/java/org/apache/cassandra/repair/ValidationTask.java
index 616a2d806b,b4aef249c5..2ad17612d8
--- a/src/java/org/apache/cassandra/repair/ValidationTask.java
+++ b/src/java/org/apache/cassandra/repair/ValidationTask.java
@@@ -56,8 -52,10 +55,10 @@@ public class ValidationTask extends Asy
       */
      public void run()
      {
-         ValidationRequest request = new ValidationRequest(desc, nowInSec);
-         MessagingService.instance().send(Message.out(VALIDATION_REQ, 
request), endpoint);
+         RepairMessage.sendMessageWithFailureCB(new ValidationRequest(desc, 
nowInSec),
+                                                VALIDATION_REQ,
+                                                endpoint,
 -                                               this::setException);
++                                               this::tryFailure);
      }
  
      /**
diff --cc src/java/org/apache/cassandra/repair/messages/RepairMessage.java
index 3137b4e474,165911dc9b..00ce888696
--- a/src/java/org/apache/cassandra/repair/messages/RepairMessage.java
+++ b/src/java/org/apache/cassandra/repair/messages/RepairMessage.java
@@@ -17,7 -17,23 +17,23 @@@
   */
  package org.apache.cassandra.repair.messages;
  
 -import java.util.UUID;
 -
+ import org.slf4j.Logger;
+ import org.slf4j.LoggerFactory;
+ 
+ import org.apache.cassandra.exceptions.RepairException;
+ import org.apache.cassandra.exceptions.RequestFailureReason;
+ import org.apache.cassandra.gms.Gossiper;
+ import org.apache.cassandra.locator.InetAddressAndPort;
+ import org.apache.cassandra.net.Message;
+ import org.apache.cassandra.net.MessagingService;
+ import org.apache.cassandra.net.RequestCallback;
+ import org.apache.cassandra.net.Verb;
  import org.apache.cassandra.repair.RepairJobDesc;
++import org.apache.cassandra.streaming.PreviewKind;
+ import org.apache.cassandra.utils.CassandraVersion;
++import org.apache.cassandra.utils.TimeUUID;
+ 
+ import static org.apache.cassandra.net.MessageFlag.CALL_BACK_ON_FAILURE;
  
  /**
   * Base class of all repair related request/response messages.
@@@ -32,4 -50,49 +50,49 @@@ public abstract class RepairMessag
      {
          this.desc = desc;
      }
+ 
+     public interface RepairFailureCallback
+     {
+         void onFailure(Exception e);
+     }
+ 
+     public static void sendMessageWithFailureCB(RepairMessage request, Verb 
verb, InetAddressAndPort endpoint, RepairFailureCallback failureCallback)
+     {
+         RequestCallback<?> callback = new RequestCallback<Object>()
+         {
+             @Override
+             public void onResponse(Message<Object> msg)
+             {
+                 logger.info("[#{}] {} received by {}", 
request.desc.parentSessionId, verb, endpoint);
+                 // todo: at some point we should make repair messages follow 
the normal path, actually using this
+             }
+ 
+             @Override
+             public boolean invokeOnFailure()
+             {
+                 return true;
+             }
+ 
+             public void onFailure(InetAddressAndPort from, 
RequestFailureReason failureReason)
+             {
+                 logger.error("[#{}] {} failed on {}: {}", 
request.desc.parentSessionId, verb, from, failureReason);
+ 
+                 if (supportsTimeouts(from, request.desc.parentSessionId))
 -                    failureCallback.onFailure(new 
RepairException(request.desc, String.format("Got %s failure from %s: %s", verb, 
from, failureReason)));
++                    
failureCallback.onFailure(RepairException.error(request.desc, PreviewKind.NONE, 
String.format("Got %s failure from %s: %s", verb, from, failureReason)));
+             }
+         };
+ 
+         
MessagingService.instance().sendWithCallback(Message.outWithFlag(verb, request, 
CALL_BACK_ON_FAILURE),
+                                                      endpoint,
+                                                      callback);
+     }
+ 
 -    private static boolean supportsTimeouts(InetAddressAndPort from, UUID 
parentSessionId)
++    private static boolean supportsTimeouts(InetAddressAndPort from, TimeUUID 
parentSessionId)
+     {
+         CassandraVersion remoteVersion = 
Gossiper.instance.getReleaseVersion(from);
+         if (remoteVersion != null && 
remoteVersion.compareTo(SUPPORTS_TIMEOUTS) >= 0)
+             return true;
+         logger.warn("[#{}] Not failing repair due to remote host {} not 
supporting repair message timeouts (version = {})", parentSessionId, from, 
remoteVersion);
+         return false;
+     }
  }
diff --cc src/java/org/apache/cassandra/service/ActiveRepairService.java
index 745067d6cd,4a990f662c..68247eaf58
--- a/src/java/org/apache/cassandra/service/ActiveRepairService.java
+++ b/src/java/org/apache/cassandra/service/ActiveRepairService.java
@@@ -108,29 -87,15 +108,27 @@@ import org.apache.cassandra.repair.mess
  import org.apache.cassandra.repair.messages.SyncResponse;
  import org.apache.cassandra.repair.messages.ValidationResponse;
  import org.apache.cassandra.schema.TableId;
 -import org.apache.cassandra.streaming.PreviewKind;
  import org.apache.cassandra.utils.FBUtilities;
  import org.apache.cassandra.utils.MBeanWrapper;
 +import org.apache.cassandra.utils.MerkleTrees;
  import org.apache.cassandra.utils.Pair;
 -import org.apache.cassandra.utils.UUIDGen;
 +import org.apache.cassandra.utils.concurrent.Future;
 +import org.apache.cassandra.utils.concurrent.FutureCombiner;
 +import org.apache.cassandra.utils.concurrent.ImmediateFuture;
- import org.checkerframework.checker.nullness.qual.Nullable;
  
  import static com.google.common.collect.Iterables.concat;
  import static com.google.common.collect.Iterables.transform;
 +import static java.util.Collections.synchronizedSet;
 +import static java.util.concurrent.TimeUnit.MILLISECONDS;
 +import static 
org.apache.cassandra.concurrent.ExecutorFactory.Global.executorFactory;
 +import static 
org.apache.cassandra.config.Config.RepairCommandPoolFullStrategy.reject;
 +import static org.apache.cassandra.config.DatabaseDescriptor.*;
 +import static org.apache.cassandra.net.Message.out;
  import static org.apache.cassandra.net.Verb.PREPARE_MSG;
 +import static org.apache.cassandra.utils.Clock.Global.currentTimeMillis;
 +import static org.apache.cassandra.utils.Simulate.With.MONITORS;
 +import static org.apache.cassandra.utils.Clock.Global.nanoTime;
- import static org.apache.cassandra.utils.TimeUUID.Generator.nextTimeUUID;
 +import static 
org.apache.cassandra.utils.concurrent.CountDownLatch.newCountDownLatch;
  
  /**
   * ActiveRepairService is the starting point for manual "active" repairs.
diff --cc 
test/distributed/org/apache/cassandra/distributed/test/RepairCoordinatorBase.java
index fc058dbfe1,fc058dbfe1..f1266fafaf
--- 
a/test/distributed/org/apache/cassandra/distributed/test/RepairCoordinatorBase.java
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/RepairCoordinatorBase.java
@@@ -22,6 -22,6 +22,7 @@@ import java.io.IOException
  import java.util.ArrayList;
  import java.util.Collection;
  import java.util.List;
++import java.util.concurrent.RejectedExecutionException;
  
  import org.junit.AfterClass;
  import org.junit.BeforeClass;
@@@ -78,6 -78,6 +79,8 @@@ public class RepairCoordinatorBase exte
                                .withConfig(c -> c.with(Feature.NETWORK)
                                                  .with(Feature.GOSSIP))
                                .start());
++
++        CLUSTER.setUncaughtExceptionsFilter(throwable -> throwable instanceof 
RejectedExecutionException && "RepairJobTask has shut 
down".equals(throwable.getMessage()));
      }
  
      @AfterClass
diff --cc 
test/distributed/org/apache/cassandra/distributed/upgrade/RepairRequestTimeoutUpgradeTest.java
index 0000000000,0000000000..9ef4fc0c83
new file mode 100644
--- /dev/null
+++ 
b/test/distributed/org/apache/cassandra/distributed/upgrade/RepairRequestTimeoutUpgradeTest.java
@@@ -1,0 -1,0 +1,58 @@@
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements.  See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership.  The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License.  You may obtain a copy of the License at
++ *
++ *     http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++
++package org.apache.cassandra.distributed.upgrade;
++
++import org.junit.Test;
++
++import com.vdurmont.semver4j.Semver;
++import org.apache.cassandra.distributed.api.ConsistencyLevel;
++import org.apache.cassandra.distributed.api.Feature;
++
++import static org.apache.cassandra.distributed.shared.AssertUtils.assertRows;
++import static org.apache.cassandra.distributed.shared.AssertUtils.row;
++import static org.apache.cassandra.net.Verb.SYNC_REQ;
++import static org.apache.cassandra.net.Verb.VALIDATION_REQ;
++
++public class RepairRequestTimeoutUpgradeTest extends UpgradeTestBase
++{
++    @Test
++    public void simpleUpgradeWithNetworkAndGossipTest() throws Throwable
++    {
++        new TestCase()
++        .nodes(2)
++        .nodesToUpgrade(1)
++        .withConfig((cfg) -> cfg.with(Feature.NETWORK, 
Feature.GOSSIP).set("repair_request_timeout_in_ms", 1000))
++        .upgrades(v40, v41)
++        .setup((cluster) -> {
++            cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, 
ck int, v int, PRIMARY KEY (pk, ck))");
++            for (int i = 0; i < 10; i++)
++                cluster.get(i % 2 + 1).executeInternal("INSERT INTO " + 
KEYSPACE + ".tbl (pk, ck, v) VALUES ("+i+", 1, 1)");
++            cluster.forEach(i -> i.flush(KEYSPACE));
++        })
++        .runAfterNodeUpgrade((cluster, node) -> {
++            cluster.filters().verbs(VALIDATION_REQ.id).drop();
++            cluster.get(2).nodetoolResult("repair", KEYSPACE, 
"-full").asserts().failure();
++            cluster.filters().reset();
++            for (int i = 10; i < 20; i++)
++                cluster.get(i % 2 + 1).executeInternal("INSERT INTO " + 
KEYSPACE + ".tbl (pk, ck, v) VALUES ("+i+", 1, 1)");
++            cluster.forEach(i -> i.flush(KEYSPACE));
++            cluster.get(1).nodetoolResult("repair", KEYSPACE, 
"-full").asserts().success();
++        }).run();
++    }
++}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to