This is an automated email from the ASF dual-hosted git repository.

asf-gitbox-commits pushed a commit to branch cassandra-6.0
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/cassandra-6.0 by this push:
     new 802ce7f8b2 Always send TCM commit failures as Messaging failures
802ce7f8b2 is described below

commit 802ce7f8b2b34fab8b7797061d2c313477415464
Author: Alex Petrov <[email protected]>
AuthorDate: Tue Jan 20 15:25:13 2026 +0100

    Always send TCM commit failures as Messaging failures
    
    Patch by Alex Petrov; reviewed by Sam Tunnicliffe for CASSANDRA-21457
    
    Co-authored-by: Ariel Weisberg <[email protected]>
---
 CHANGES.txt                                        |   1 +
 src/java/org/apache/cassandra/net/Message.java     |   6 +
 src/java/org/apache/cassandra/tcm/Commit.java      |  21 ++-
 .../org/apache/cassandra/tcm/PeerLogFetcher.java   |  10 +-
 .../org/apache/cassandra/tcm/RemoteProcessor.java  | 161 ++++++++++++++-------
 src/java/org/apache/cassandra/tcm/Retry.java       |   5 +
 .../test/log/CommitStartupByNonCMSNodeTest.java    |  83 +++++++++++
 7 files changed, 215 insertions(+), 72 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index 0942f1212b..8ae621f938 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 6.0-alpha2
+ * Always send TCM commit failures as Messaging failures (CASSANDRA-21457)
  * Fix ReadCommand serializedSize() using incorrect epoch (CASSANDRA-21438)
  * Allocation improvements in ProtocolVersion, StorageProxy and MerkleTree 
(CASSANDRA-21199)
  * Don’t leave autocompaction disabled during bootstrap and replace 
(CASSANDRA-21236)
diff --git a/src/java/org/apache/cassandra/net/Message.java 
b/src/java/org/apache/cassandra/net/Message.java
index 41f0281706..14b510937f 100644
--- a/src/java/org/apache/cassandra/net/Message.java
+++ b/src/java/org/apache/cassandra/net/Message.java
@@ -254,6 +254,12 @@ public class Message<T> implements ResponseContext
         return outWithParam(nextId(), verb, 0, payload, flag.addTo(0), null, 
null);
     }
 
+    public static <T> Message<T> outWithFlag(Verb verb, T payload, MessageFlag 
flag, long expireAtNanos)
+    {
+        assert !verb.isResponse();
+        return outWithParam(nextId(), verb, expireAtNanos, payload, 
flag.addTo(0), null, null);
+    }
+
     public static <T> Message<T> outWithFlag(Verb verb, T payload, 
Dispatcher.RequestTime requestTime, MessageFlag flag)
     {
         return outWithFlags(verb, payload, requestTime, flag.addTo(0));
diff --git a/src/java/org/apache/cassandra/tcm/Commit.java 
b/src/java/org/apache/cassandra/tcm/Commit.java
index f002b3e66e..6baf00b6aa 100644
--- a/src/java/org/apache/cassandra/tcm/Commit.java
+++ b/src/java/org/apache/cassandra/tcm/Commit.java
@@ -27,6 +27,8 @@ import com.google.common.annotations.VisibleForTesting;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import accord.utils.Invariants;
+
 import org.apache.cassandra.db.TypeSizes;
 import org.apache.cassandra.exceptions.ExceptionCode;
 import org.apache.cassandra.io.IVersionedSerializer;
@@ -281,7 +283,7 @@ public class Commit
                 }
                 else
                 {
-                    assert t instanceof Failure;
+                    Invariants.require(t instanceof Failure);
                     Failure failure = (Failure) t;
                     out.writeByte(failure.rejected ? REJECTED : FAILED);
                     out.writeUnsignedVInt32(failure.code.value);
@@ -327,9 +329,10 @@ public class Commit
                 }
                 else
                 {
-                    assert t instanceof Failure;
-                    size += VIntCoding.computeUnsignedVIntSize(((Failure) 
t).code.value);
-                    size += TypeSizes.sizeof(((Failure)t).message);
+                    Invariants.require(t instanceof Failure);
+                    Failure failure = (Failure) t;
+                    size += 
VIntCoding.computeUnsignedVIntSize(failure.code.value);
+                    size += TypeSizes.sizeof(failure.message);
                     size += 
VIntCoding.computeUnsignedVIntSize(serializationVersion.asInt());
                     size += 
LogState.metadataSerializer.serializedSize(t.logState(), serializationVersion);
                 }
@@ -375,15 +378,9 @@ public class Commit
                 Result.Success success = result.success();
                 replicator.send(success, message.from());
                 logger.info("Responding with full result {} to sender {}", 
result, message.from());
-                // TODO: this response message can get lost; how do we 
re-discover this on the other side?
-                // TODO: what if we have holes after replaying?
-                messagingService.accept(message.responseWith(result), 
message.from());
-            }
-            else
-            {
-                Result.Failure failure = result.failure();
-                messagingService.accept(message.responseWith(failure), 
message.from());
             }
+
+            messagingService.accept(message.responseWith(result), 
message.from());
         }
 
         private void checkCMSState()
diff --git a/src/java/org/apache/cassandra/tcm/PeerLogFetcher.java 
b/src/java/org/apache/cassandra/tcm/PeerLogFetcher.java
index ceac4c7025..12cdc18e5d 100644
--- a/src/java/org/apache/cassandra/tcm/PeerLogFetcher.java
+++ b/src/java/org/apache/cassandra/tcm/PeerLogFetcher.java
@@ -92,11 +92,11 @@ public class PeerLogFetcher
         logger.info("Fetching log from {}, at least {}", remote, awaitAtleast);
         try (Timer.Context ctx = 
TCMMetrics.instance.fetchPeerLogLatency.time())
         {
-            RemoteProcessor.sendWithCallbackAsync(fetchRes,
-                                                  Verb.TCM_FETCH_PEER_LOG_REQ,
-                                                  new FetchPeerLog(before),
-                                                  new 
RemoteProcessor.CandidateIterator(Collections.singletonList(remote), false),
-                                                  
Retry.untilElapsed(DatabaseDescriptor.getCmsAwaitTimeout().to(TimeUnit.NANOSECONDS),
 TCMMetrics.instance.fetchLogRetries));
+            RemoteProcessor.sendWithRetries(Verb.TCM_FETCH_PEER_LOG_REQ,
+                                            new FetchPeerLog(before),
+                                            fetchRes,
+                                            new 
RemoteProcessor.CandidateIterator(Collections.singletonList(remote), false),
+                                            
Retry.untilElapsed(DatabaseDescriptor.getCmsAwaitTimeout().to(TimeUnit.NANOSECONDS),
 TCMMetrics.instance.fetchLogRetries));
 
             return fetchRes.map((logState) -> {
                 log.append(logState);
diff --git a/src/java/org/apache/cassandra/tcm/RemoteProcessor.java 
b/src/java/org/apache/cassandra/tcm/RemoteProcessor.java
index 59b63d42e1..b14e73feb8 100644
--- a/src/java/org/apache/cassandra/tcm/RemoteProcessor.java
+++ b/src/java/org/apache/cassandra/tcm/RemoteProcessor.java
@@ -27,6 +27,7 @@ import java.util.concurrent.ConcurrentLinkedDeque;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import java.util.function.Consumer;
 import java.util.function.Supplier;
 
 import com.codahale.metrics.Timer;
@@ -34,6 +35,7 @@ import com.codahale.metrics.Timer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.cassandra.concurrent.ScheduledExecutors;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.exceptions.RequestFailure;
 import org.apache.cassandra.exceptions.RequestFailureReason;
@@ -42,15 +44,16 @@ import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.metrics.TCMMetrics;
 import org.apache.cassandra.net.Message;
 import org.apache.cassandra.net.MessageDelivery;
+import org.apache.cassandra.net.MessageFlag;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.net.RequestCallbackWithFailure;
 import org.apache.cassandra.net.Verb;
-import org.apache.cassandra.service.WaitStrategy;
 import org.apache.cassandra.tcm.Discovery.DiscoveredNodes;
 import org.apache.cassandra.tcm.log.Entry;
 import org.apache.cassandra.tcm.log.LocalLog;
 import org.apache.cassandra.tcm.log.LogState;
 import org.apache.cassandra.utils.AbstractIterator;
+import org.apache.cassandra.utils.Clock;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.concurrent.AsyncPromise;
 import org.apache.cassandra.utils.concurrent.Future;
@@ -78,10 +81,14 @@ public final class RemoteProcessor implements Processor
     {
         try
         {
-            Commit.Result result = sendWithCallback(Verb.TCM_COMMIT_REQ,
-                                                    new Commit(entryId, 
transform, lastKnown),
-                                                    new 
CandidateIterator(candidates(false)),
-                                                    retryPolicy);
+            Commit.Result result = sendWithRetries(Verb.TCM_COMMIT_REQ,
+                                                   new Commit(entryId, 
transform, lastKnown),
+                                                   (Commit.Result res) -> {
+                                                       if (res.isFailure() && 
!res.failure().rejected)
+                                                           throw new 
IllegalArgumentException(res.failure().message);
+                                                   },
+                                                   new 
CandidateIterator(candidates(false)),
+                                                   retryPolicy);
 
             log.append(result.logState());
 
@@ -171,11 +178,11 @@ public final class RemoteProcessor implements Processor
         {
             Promise<LogState> remoteRequest = new AsyncPromise<>();
             Epoch currentEpoch = log.metadata().epoch;
-            sendWithCallbackAsync(remoteRequest,
-                                  Verb.TCM_FETCH_CMS_LOG_REQ,
-                                  new FetchCMSLog(currentEpoch, 
ClusterMetadataService.state() == REMOTE),
-                                  candidates,
-                                  
Retry.withNoTimeLimit(TCMMetrics.instance.fetchLogRetries));
+            sendWithRetries(Verb.TCM_FETCH_CMS_LOG_REQ,
+                            new FetchCMSLog(currentEpoch, 
ClusterMetadataService.state() == REMOTE),
+                            remoteRequest,
+                            candidates,
+                            
Retry.withNoTimeLimit(TCMMetrics.instance.fetchLogRetries));
             return remoteRequest.map((replay) -> {
                 if (!replay.isEmpty())
                 {
@@ -188,14 +195,13 @@ public final class RemoteProcessor implements Processor
         }
     }
 
-    // todo rename to send with retries or something
-    public static <REQ, RSP> RSP sendWithCallback(Verb verb, REQ request, 
CandidateIterator candidates, WaitStrategy backoff)
+    public static <REQ, RSP> RSP sendWithRetries(Verb verb, REQ request, 
Consumer<RSP> check, CandidateIterator candidates, Retry retry)
     {
+        Promise<RSP> future = AsyncPromise.uncancellable();
+        sendWithRetries(verb, request, check, future, candidates, retry);
         try
         {
-            Promise<RSP> promise = new AsyncPromise<>();
-            sendWithCallbackAsync(promise, verb, request, candidates, backoff);
-            return promise.await().get();
+            return future.get();
         }
         catch (InterruptedException | ExecutionException e)
         {
@@ -203,47 +209,92 @@ public final class RemoteProcessor implements Processor
         }
     }
 
-    public static <REQ, RSP> void sendWithCallbackAsync(Promise<RSP> promise, 
Verb verb, REQ request, CandidateIterator candidates, WaitStrategy backoff)
+    public static <REQ, RSP> void sendWithRetries(Verb verb, REQ request, 
Promise<RSP> future, CandidateIterator candidates, Retry retry)
     {
-        //TODO (now): the retry defines how long to wait for a retry, but the 
old behavior scheduled the message right away... should this be delayed as well?
-        MessagingService.instance().<REQ, RSP>sendWithRetries(backoff, 
MessageDelivery.ImmediateRetryScheduler.instance,
-                                                              verb, request, 
candidates,
-                                                              (attempt, 
success, failure) -> {
-                                                                  if (failure 
!= null) promise.tryFailure(failure);
-                                                                  else 
promise.trySuccess(success.payload);
-                                                              },
-                                                              (attempt, from, 
failure) -> {
-                                                                  if 
(promise.isDone() || promise.isCancelled())
-                                                                      return 
false;
-                                                                  if 
(failure.reason == RequestFailureReason.NOT_CMS)
-                                                                  {
-                                                                      
logger.debug("{} is not a member of the CMS, querying it to discover current 
membership", from);
-                                                                      
DiscoveredNodes cms = tryDiscover(from);
-                                                                      
candidates.addCandidates(cms);
-                                                                      
candidates.timeout(from);
-                                                                      
logger.debug("Got CMS from {}: {}, retrying on: {}", from, cms, candidates);
-                                                                  }
-                                                                  else
-                                                                  {
-                                                                      
candidates.timeout(from);
-                                                                      
logger.warn("Got error from {}: {} when sending {}, retrying on {}", from, 
failure, verb, candidates);
-                                                                  }
-                                                                  return true;
-                                                              },
-                                                              (attempt, 
reason, from, failure) -> {
-                                                                  switch 
(reason)
-                                                                  {
-                                                                      case 
NoMoreCandidates:
-                                                                          
return String.format("Ran out of candidates while sending %s: %s", verb, 
candidates);
-                                                                      case 
GiveUp:
-                                                                          
return String.format("Could not succeed sending %s to %s; policy %s gave up", 
verb, candidates, backoff);
-                                                                      case 
Interrupted:
-                                                                      case 
FailedSchedule:
-                                                                          
return null;
-                                                                      default:
-                                                                          
throw new UnsupportedOperationException(reason.name());
-                                                                  }
-                                                              });
+        sendWithRetries(verb, request, ignore_ -> {}, future, candidates, 
retry);
+    }
+
+    /**
+     * Sends a request to given candidate nodes with retries, respecting the 
retry policy.
+     *
+     * If request handler expects node to be CMS, handles CMS discovery if a 
candidate node reports it's not a CMS member.
+     */
+    public static <REQ, RSP> void sendWithRetries(Verb verb, REQ request, 
Consumer<RSP> check, Promise<RSP> future, CandidateIterator candidates, Retry 
retry)
+    {
+        if (!candidates.hasNext())
+        {
+            future.setFailure(new 
MessageDelivery.NoMoreCandidatesException(String.format("Ran out of candidates 
while sending %s: %s", verb, candidates)));
+            return;
+        }
+        else if (retry.hasExpired())
+        {
+            future.setFailure(new 
MessageDelivery.GivingUpException(retry.attempts(), String.format("Could not 
succeed sending %s to %s; policy %s gave up", verb, candidates, retry)));
+            return;
+        }
+
+        InetAddressAndPort candidate = candidates.next();
+        long waitNanos = Math.min(verb.expiresAfterNanos(), Math.max(0, 
retry.remainingNanos()));
+        Message<REQ> msg = Message.outWithFlag(verb, request, 
MessageFlag.CALL_BACK_ON_FAILURE, Clock.Global.nanoTime() + waitNanos);
+        MessagingService.instance().sendWithCallback(msg, candidate, new 
RequestCallbackWithFailure<RSP>()
+        {
+            @Override
+            public void onFailure(InetAddressAndPort from, RequestFailure 
failure)
+            {
+                switch (failure.reason)
+                {
+                    case UNKNOWN:
+                        candidates.timeout(candidate);
+                        logger.warn("Got error from {}: {} when sending {}, 
retrying on {}", from, failure, verb, candidates);
+                        break;
+                    case TIMEOUT:
+                        candidates.timeout(candidate);
+                        logger.warn("Got error from {}: timeout when sending 
{}, retrying on {}", candidate, verb, candidates);
+                        break;
+                    case NOT_CMS:
+                        logger.debug("{} is not a member of the CMS, querying 
it to discover current membership", from);
+                        DiscoveredNodes cms = tryDiscover(from);
+                        candidates.addCandidates(cms);
+                        candidates.timeout(from);
+                        logger.debug("Got CMS from {}: {}, retrying on: {}", 
from, cms, candidates);
+                        break;
+                    default:
+                        // Unknown exception - add candidate back and retry
+                        candidates.timeout(candidate);
+                        logger.warn("Unexpected error ({}) sending {} to {}, 
retrying", failure, verb, candidate);
+                }
+
+                if (Thread.currentThread().isInterrupted()) // preserve 
interrupt status
+                {
+                    Thread.currentThread().interrupt();
+                    future.setFailure(new InterruptedException("Interrupted 
while sending " + verb));
+                }
+                else
+                {
+                    long waitFor = retry.computeWait();
+                    if (waitFor < 0)
+                    {
+                        // The retry strategy returns a negative wait to 
signal that its attempt budget is exhausted.
+                        future.setFailure(new 
MessageDelivery.GivingUpException(retry.attempts(), String.format("Could not 
succeed sending %s to %s; policy %s exhausted attempts", verb, candidates, 
retry)));
+                        return;
+                    }
+                    ScheduledExecutors.nonPeriodicTasks.schedule(() -> 
sendWithRetries(verb, request, check, future, candidates, retry), waitFor, 
TimeUnit.MILLISECONDS);
+                }
+            }
+
+            @Override
+            public void onResponse(Message<RSP> msg)
+            {
+                try
+                {
+                    check.accept(msg.payload);
+                    future.setSuccess(msg.payload);
+                }
+                catch (Throwable t)
+                {
+                    onFailure(msg.from(), new 
RequestFailure(RequestFailureReason.UNKNOWN, t));
+                }
+            }
+        });
     }
 
     private static DiscoveredNodes tryDiscover(InetAddressAndPort ep)
diff --git a/src/java/org/apache/cassandra/tcm/Retry.java 
b/src/java/org/apache/cassandra/tcm/Retry.java
index f4951bf110..be79f10f26 100644
--- a/src/java/org/apache/cassandra/tcm/Retry.java
+++ b/src/java/org/apache/cassandra/tcm/Retry.java
@@ -85,6 +85,11 @@ public class Retry implements WaitStrategy
         return true;
     }
 
+    public long computeWait()
+    {
+        return computeWait(attempts, TimeUnit.MILLISECONDS);
+    }
+
     @Override
     public long computeWaitUntil(int attempts)
     {
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/log/CommitStartupByNonCMSNodeTest.java
 
b/test/distributed/org/apache/cassandra/distributed/test/log/CommitStartupByNonCMSNodeTest.java
new file mode 100644
index 0000000000..cda51ac274
--- /dev/null
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/log/CommitStartupByNonCMSNodeTest.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test.log;
+
+import java.util.Random;
+import java.util.function.Supplier;
+
+import org.junit.Test;
+
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.Feature;
+import org.apache.cassandra.distributed.shared.ClusterUtils;
+import org.apache.cassandra.tcm.ClusterMetadata;
+import org.apache.cassandra.tcm.ClusterMetadataService;
+import org.apache.cassandra.tcm.membership.NodeAddresses;
+import org.apache.cassandra.tcm.membership.NodeVersion;
+import org.apache.cassandra.tcm.transformations.Startup;
+import org.apache.cassandra.utils.FBUtilities;
+
+import static org.junit.Assert.fail;
+
+public class CommitStartupByNonCMSNodeTest extends FuzzTestBase
+{
+    @Test
+    public void commitStartupByNonCMSNode() throws Throwable
+    {
+        try (Cluster cluster = Cluster.build(4)
+                                      .withConfig(conf -> 
conf.set("request_timeout", "1000ms") // for TCM commit
+                                                              
.set("cms_retry_delay", "10ms,retries=1") // avoid retries by CMS node to avoid 
triggering log fetch
+                                                              
.set("write_request_timeout", "100ms") // time out paxos writes quickly
+                                                              
.with(Feature.NETWORK, Feature.GOSSIP))
+                                      .start())
+        {
+            cluster.setUncaughtExceptionsFilter(t -> t.getMessage() != null && 
t.getMessage().contains("There are not enough nodes in dc0 datacenter to 
satisfy replication factor"));
+            Random rnd = new Random(2);
+            Supplier<Integer> nodeSelector = () -> rnd.nextInt(cluster.size() 
- 1) + 1;
+            cluster.get(nodeSelector.get()).nodetoolResult("cms", 
"reconfigure", "3").asserts().success();
+            for (int i = 2; i <= 3; i++)
+                ClusterUtils.stopUnchecked(cluster.get(i));
+
+            Thread startNode2 = new Thread(() -> {
+                cluster.get(2).startup();
+            });
+
+            Thread commitStartupTranformation = new Thread(() -> {
+                cluster.get(4).runOnInstance(() -> {
+                    try
+                    {
+                        ClusterMetadataService.instance().commit(new 
Startup(ClusterMetadata.current().myNodeId(),
+                                                                             
new NodeAddresses(FBUtilities.getBroadcastAddressAndPort()),
+                                                                             
NodeVersion.CURRENT));
+                    }
+                    catch (Throwable t)
+                    {
+                        fail("Should not happen");
+                    }
+                });
+            });
+            commitStartupTranformation.start();
+            Thread.sleep(10_000);
+            startNode2.start();
+
+            commitStartupTranformation.join();
+            startNode2.join();
+        }
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to