This is an automated email from the ASF dual-hosted git repository.
asf-gitbox-commits pushed a commit to branch cassandra-6.0
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/cassandra-6.0 by this push:
new 802ce7f8b2 Always send TCM commit failures as Messaging failures
802ce7f8b2 is described below
commit 802ce7f8b2b34fab8b7797061d2c313477415464
Author: Alex Petrov <[email protected]>
AuthorDate: Tue Jan 20 15:25:13 2026 +0100
Always send TCM commit failures as Messaging failures
Patch by Alex Petrov; reviewed by Sam Tunnicliffe for CASSANDRA-21457
Co-authored-by: Ariel Weisberg <[email protected]>
---
CHANGES.txt | 1 +
src/java/org/apache/cassandra/net/Message.java | 6 +
src/java/org/apache/cassandra/tcm/Commit.java | 21 ++-
.../org/apache/cassandra/tcm/PeerLogFetcher.java | 10 +-
.../org/apache/cassandra/tcm/RemoteProcessor.java | 161 ++++++++++++++-------
src/java/org/apache/cassandra/tcm/Retry.java | 5 +
.../test/log/CommitStartupByNonCMSNodeTest.java | 83 +++++++++++
7 files changed, 215 insertions(+), 72 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index 0942f1212b..8ae621f938 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
6.0-alpha2
+ * Always send TCM commit failures as Messaging failures (CASSANDRA-21457)
* Fix ReadCommand serializedSize() using incorrect epoch (CASSANDRA-21438)
* Allocation improvements in ProtocolVersion, StorageProxy and MerkleTree
(CASSANDRA-21199)
* Don’t leave autocompaction disabled during bootstrap and replace
(CASSANDRA-21236)
diff --git a/src/java/org/apache/cassandra/net/Message.java
b/src/java/org/apache/cassandra/net/Message.java
index 41f0281706..14b510937f 100644
--- a/src/java/org/apache/cassandra/net/Message.java
+++ b/src/java/org/apache/cassandra/net/Message.java
@@ -254,6 +254,12 @@ public class Message<T> implements ResponseContext
return outWithParam(nextId(), verb, 0, payload, flag.addTo(0), null,
null);
}
+ public static <T> Message<T> outWithFlag(Verb verb, T payload, MessageFlag
flag, long expireAtNanos)
+ {
+ assert !verb.isResponse();
+ return outWithParam(nextId(), verb, expireAtNanos, payload,
flag.addTo(0), null, null);
+ }
+
public static <T> Message<T> outWithFlag(Verb verb, T payload,
Dispatcher.RequestTime requestTime, MessageFlag flag)
{
return outWithFlags(verb, payload, requestTime, flag.addTo(0));
diff --git a/src/java/org/apache/cassandra/tcm/Commit.java
b/src/java/org/apache/cassandra/tcm/Commit.java
index f002b3e66e..6baf00b6aa 100644
--- a/src/java/org/apache/cassandra/tcm/Commit.java
+++ b/src/java/org/apache/cassandra/tcm/Commit.java
@@ -27,6 +27,8 @@ import com.google.common.annotations.VisibleForTesting;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import accord.utils.Invariants;
+
import org.apache.cassandra.db.TypeSizes;
import org.apache.cassandra.exceptions.ExceptionCode;
import org.apache.cassandra.io.IVersionedSerializer;
@@ -281,7 +283,7 @@ public class Commit
}
else
{
- assert t instanceof Failure;
+ Invariants.require(t instanceof Failure);
Failure failure = (Failure) t;
out.writeByte(failure.rejected ? REJECTED : FAILED);
out.writeUnsignedVInt32(failure.code.value);
@@ -327,9 +329,10 @@ public class Commit
}
else
{
- assert t instanceof Failure;
- size += VIntCoding.computeUnsignedVIntSize(((Failure)
t).code.value);
- size += TypeSizes.sizeof(((Failure)t).message);
+ Invariants.require(t instanceof Failure);
+ Failure failure = (Failure) t;
+ size +=
VIntCoding.computeUnsignedVIntSize(failure.code.value);
+ size += TypeSizes.sizeof(failure.message);
size +=
VIntCoding.computeUnsignedVIntSize(serializationVersion.asInt());
size +=
LogState.metadataSerializer.serializedSize(t.logState(), serializationVersion);
}
@@ -375,15 +378,9 @@ public class Commit
Result.Success success = result.success();
replicator.send(success, message.from());
logger.info("Responding with full result {} to sender {}",
result, message.from());
- // TODO: this response message can get lost; how do we
re-discover this on the other side?
- // TODO: what if we have holes after replaying?
- messagingService.accept(message.responseWith(result),
message.from());
- }
- else
- {
- Result.Failure failure = result.failure();
- messagingService.accept(message.responseWith(failure),
message.from());
}
+
+ messagingService.accept(message.responseWith(result),
message.from());
}
private void checkCMSState()
diff --git a/src/java/org/apache/cassandra/tcm/PeerLogFetcher.java
b/src/java/org/apache/cassandra/tcm/PeerLogFetcher.java
index ceac4c7025..12cdc18e5d 100644
--- a/src/java/org/apache/cassandra/tcm/PeerLogFetcher.java
+++ b/src/java/org/apache/cassandra/tcm/PeerLogFetcher.java
@@ -92,11 +92,11 @@ public class PeerLogFetcher
logger.info("Fetching log from {}, at least {}", remote, awaitAtleast);
try (Timer.Context ctx =
TCMMetrics.instance.fetchPeerLogLatency.time())
{
- RemoteProcessor.sendWithCallbackAsync(fetchRes,
- Verb.TCM_FETCH_PEER_LOG_REQ,
- new FetchPeerLog(before),
- new
RemoteProcessor.CandidateIterator(Collections.singletonList(remote), false),
-
Retry.untilElapsed(DatabaseDescriptor.getCmsAwaitTimeout().to(TimeUnit.NANOSECONDS),
TCMMetrics.instance.fetchLogRetries));
+ RemoteProcessor.sendWithRetries(Verb.TCM_FETCH_PEER_LOG_REQ,
+ new FetchPeerLog(before),
+ fetchRes,
+ new
RemoteProcessor.CandidateIterator(Collections.singletonList(remote), false),
+
Retry.untilElapsed(DatabaseDescriptor.getCmsAwaitTimeout().to(TimeUnit.NANOSECONDS),
TCMMetrics.instance.fetchLogRetries));
return fetchRes.map((logState) -> {
log.append(logState);
diff --git a/src/java/org/apache/cassandra/tcm/RemoteProcessor.java
b/src/java/org/apache/cassandra/tcm/RemoteProcessor.java
index 59b63d42e1..b14e73feb8 100644
--- a/src/java/org/apache/cassandra/tcm/RemoteProcessor.java
+++ b/src/java/org/apache/cassandra/tcm/RemoteProcessor.java
@@ -27,6 +27,7 @@ import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import java.util.function.Consumer;
import java.util.function.Supplier;
import com.codahale.metrics.Timer;
@@ -34,6 +35,7 @@ import com.codahale.metrics.Timer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.cassandra.concurrent.ScheduledExecutors;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.exceptions.RequestFailure;
import org.apache.cassandra.exceptions.RequestFailureReason;
@@ -42,15 +44,16 @@ import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.metrics.TCMMetrics;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.MessageDelivery;
+import org.apache.cassandra.net.MessageFlag;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.net.RequestCallbackWithFailure;
import org.apache.cassandra.net.Verb;
-import org.apache.cassandra.service.WaitStrategy;
import org.apache.cassandra.tcm.Discovery.DiscoveredNodes;
import org.apache.cassandra.tcm.log.Entry;
import org.apache.cassandra.tcm.log.LocalLog;
import org.apache.cassandra.tcm.log.LogState;
import org.apache.cassandra.utils.AbstractIterator;
+import org.apache.cassandra.utils.Clock;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.concurrent.AsyncPromise;
import org.apache.cassandra.utils.concurrent.Future;
@@ -78,10 +81,14 @@ public final class RemoteProcessor implements Processor
{
try
{
- Commit.Result result = sendWithCallback(Verb.TCM_COMMIT_REQ,
- new Commit(entryId,
transform, lastKnown),
- new
CandidateIterator(candidates(false)),
- retryPolicy);
+ Commit.Result result = sendWithRetries(Verb.TCM_COMMIT_REQ,
+ new Commit(entryId,
transform, lastKnown),
+ (Commit.Result res) -> {
+ if (res.isFailure() &&
!res.failure().rejected)
+ throw new
IllegalArgumentException(res.failure().message);
+ },
+ new
CandidateIterator(candidates(false)),
+ retryPolicy);
log.append(result.logState());
@@ -171,11 +178,11 @@ public final class RemoteProcessor implements Processor
{
Promise<LogState> remoteRequest = new AsyncPromise<>();
Epoch currentEpoch = log.metadata().epoch;
- sendWithCallbackAsync(remoteRequest,
- Verb.TCM_FETCH_CMS_LOG_REQ,
- new FetchCMSLog(currentEpoch,
ClusterMetadataService.state() == REMOTE),
- candidates,
-
Retry.withNoTimeLimit(TCMMetrics.instance.fetchLogRetries));
+ sendWithRetries(Verb.TCM_FETCH_CMS_LOG_REQ,
+ new FetchCMSLog(currentEpoch,
ClusterMetadataService.state() == REMOTE),
+ remoteRequest,
+ candidates,
+
Retry.withNoTimeLimit(TCMMetrics.instance.fetchLogRetries));
return remoteRequest.map((replay) -> {
if (!replay.isEmpty())
{
@@ -188,14 +195,13 @@ public final class RemoteProcessor implements Processor
}
}
- // todo rename to send with retries or something
- public static <REQ, RSP> RSP sendWithCallback(Verb verb, REQ request,
CandidateIterator candidates, WaitStrategy backoff)
+ public static <REQ, RSP> RSP sendWithRetries(Verb verb, REQ request,
Consumer<RSP> check, CandidateIterator candidates, Retry retry)
{
+ Promise<RSP> future = AsyncPromise.uncancellable();
+ sendWithRetries(verb, request, check, future, candidates, retry);
try
{
- Promise<RSP> promise = new AsyncPromise<>();
- sendWithCallbackAsync(promise, verb, request, candidates, backoff);
- return promise.await().get();
+ return future.get();
}
catch (InterruptedException | ExecutionException e)
{
@@ -203,47 +209,92 @@ public final class RemoteProcessor implements Processor
}
}
- public static <REQ, RSP> void sendWithCallbackAsync(Promise<RSP> promise,
Verb verb, REQ request, CandidateIterator candidates, WaitStrategy backoff)
+ public static <REQ, RSP> void sendWithRetries(Verb verb, REQ request,
Promise<RSP> future, CandidateIterator candidates, Retry retry)
{
- //TODO (now): the retry defines how long to wait for a retry, but the
old behavior scheduled the message right away... should this be delayed as well?
- MessagingService.instance().<REQ, RSP>sendWithRetries(backoff,
MessageDelivery.ImmediateRetryScheduler.instance,
- verb, request,
candidates,
- (attempt,
success, failure) -> {
- if (failure
!= null) promise.tryFailure(failure);
- else
promise.trySuccess(success.payload);
- },
- (attempt, from,
failure) -> {
- if
(promise.isDone() || promise.isCancelled())
- return
false;
- if
(failure.reason == RequestFailureReason.NOT_CMS)
- {
-
logger.debug("{} is not a member of the CMS, querying it to discover current
membership", from);
-
DiscoveredNodes cms = tryDiscover(from);
-
candidates.addCandidates(cms);
-
candidates.timeout(from);
-
logger.debug("Got CMS from {}: {}, retrying on: {}", from, cms, candidates);
- }
- else
- {
-
candidates.timeout(from);
-
logger.warn("Got error from {}: {} when sending {}, retrying on {}", from,
failure, verb, candidates);
- }
- return true;
- },
- (attempt,
reason, from, failure) -> {
- switch
(reason)
- {
- case
NoMoreCandidates:
-
return String.format("Ran out of candidates while sending %s: %s", verb,
candidates);
- case
GiveUp:
-
return String.format("Could not succeed sending %s to %s; policy %s gave up",
verb, candidates, backoff);
- case
Interrupted:
- case
FailedSchedule:
-
return null;
- default:
-
throw new UnsupportedOperationException(reason.name());
- }
- });
+ sendWithRetries(verb, request, ignore_ -> {}, future, candidates,
retry);
+ }
+
+ /**
+ * Sends a request to given candidate nodes with retries, respecting the
retry policy.
+ *
+ * If request handler expects node to be CMS, handles CMS discovery if a
candidate node reports it's not a CMS member.
+ */
+ public static <REQ, RSP> void sendWithRetries(Verb verb, REQ request,
Consumer<RSP> check, Promise<RSP> future, CandidateIterator candidates, Retry
retry)
+ {
+ if (!candidates.hasNext())
+ {
+ future.setFailure(new
MessageDelivery.NoMoreCandidatesException(String.format("Ran out of candidates
while sending %s: %s", verb, candidates)));
+ return;
+ }
+ else if (retry.hasExpired())
+ {
+ future.setFailure(new
MessageDelivery.GivingUpException(retry.attempts(), String.format("Could not
succeed sending %s to %s; policy %s gave up", verb, candidates, retry)));
+ return;
+ }
+
+ InetAddressAndPort candidate = candidates.next();
+ long waitNanos = Math.min(verb.expiresAfterNanos(), Math.max(0,
retry.remainingNanos()));
+ Message<REQ> msg = Message.outWithFlag(verb, request,
MessageFlag.CALL_BACK_ON_FAILURE, Clock.Global.nanoTime() + waitNanos);
+ MessagingService.instance().sendWithCallback(msg, candidate, new
RequestCallbackWithFailure<RSP>()
+ {
+ @Override
+ public void onFailure(InetAddressAndPort from, RequestFailure
failure)
+ {
+ switch (failure.reason)
+ {
+ case UNKNOWN:
+ candidates.timeout(candidate);
+ logger.warn("Got error from {}: {} when sending {},
retrying on {}", from, failure, verb, candidates);
+ break;
+ case TIMEOUT:
+ candidates.timeout(candidate);
+ logger.warn("Got error from {}: timeout when sending
{}, retrying on {}", candidate, verb, candidates);
+ break;
+ case NOT_CMS:
+ logger.debug("{} is not a member of the CMS, querying
it to discover current membership", from);
+ DiscoveredNodes cms = tryDiscover(from);
+ candidates.addCandidates(cms);
+ candidates.timeout(from);
+ logger.debug("Got CMS from {}: {}, retrying on: {}",
from, cms, candidates);
+ break;
+ default:
+ // Unknown exception - add candidate back and retry
+ candidates.timeout(candidate);
+ logger.warn("Unexpected error ({}) sending {} to {},
retrying", failure, verb, candidate);
+ }
+
+ if (Thread.currentThread().isInterrupted()) // preserve
interrupt status
+ {
+ Thread.currentThread().interrupt();
+ future.setFailure(new InterruptedException("Interrupted
while sending " + verb));
+ }
+ else
+ {
+ long waitFor = retry.computeWait();
+ if (waitFor < 0)
+ {
+ // The retry strategy returns a negative wait to
signal that its attempt budget is exhausted.
+ future.setFailure(new
MessageDelivery.GivingUpException(retry.attempts(), String.format("Could not
succeed sending %s to %s; policy %s exhausted attempts", verb, candidates,
retry)));
+ return;
+ }
+ ScheduledExecutors.nonPeriodicTasks.schedule(() ->
sendWithRetries(verb, request, check, future, candidates, retry), waitFor,
TimeUnit.MILLISECONDS);
+ }
+ }
+
+ @Override
+ public void onResponse(Message<RSP> msg)
+ {
+ try
+ {
+ check.accept(msg.payload);
+ future.setSuccess(msg.payload);
+ }
+ catch (Throwable t)
+ {
+ onFailure(msg.from(), new
RequestFailure(RequestFailureReason.UNKNOWN, t));
+ }
+ }
+ });
}
private static DiscoveredNodes tryDiscover(InetAddressAndPort ep)
diff --git a/src/java/org/apache/cassandra/tcm/Retry.java
b/src/java/org/apache/cassandra/tcm/Retry.java
index f4951bf110..be79f10f26 100644
--- a/src/java/org/apache/cassandra/tcm/Retry.java
+++ b/src/java/org/apache/cassandra/tcm/Retry.java
@@ -85,6 +85,11 @@ public class Retry implements WaitStrategy
return true;
}
+ public long computeWait()
+ {
+ return computeWait(attempts, TimeUnit.MILLISECONDS);
+ }
+
@Override
public long computeWaitUntil(int attempts)
{
diff --git
a/test/distributed/org/apache/cassandra/distributed/test/log/CommitStartupByNonCMSNodeTest.java
b/test/distributed/org/apache/cassandra/distributed/test/log/CommitStartupByNonCMSNodeTest.java
new file mode 100644
index 0000000000..cda51ac274
--- /dev/null
+++
b/test/distributed/org/apache/cassandra/distributed/test/log/CommitStartupByNonCMSNodeTest.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test.log;
+
+import java.util.Random;
+import java.util.function.Supplier;
+
+import org.junit.Test;
+
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.Feature;
+import org.apache.cassandra.distributed.shared.ClusterUtils;
+import org.apache.cassandra.tcm.ClusterMetadata;
+import org.apache.cassandra.tcm.ClusterMetadataService;
+import org.apache.cassandra.tcm.membership.NodeAddresses;
+import org.apache.cassandra.tcm.membership.NodeVersion;
+import org.apache.cassandra.tcm.transformations.Startup;
+import org.apache.cassandra.utils.FBUtilities;
+
+import static org.junit.Assert.fail;
+
+public class CommitStartupByNonCMSNodeTest extends FuzzTestBase
+{
+ @Test
+ public void commitStartupByNonCMSNode() throws Throwable
+ {
+ try (Cluster cluster = Cluster.build(4)
+ .withConfig(conf ->
conf.set("request_timeout", "1000ms") // for TCM commit
+
.set("cms_retry_delay", "10ms,retries=1") // avoid retries by CMS node to avoid
triggering log fetch
+
.set("write_request_timeout", "100ms") // time out paxos writes quickly
+
.with(Feature.NETWORK, Feature.GOSSIP))
+ .start())
+ {
+ cluster.setUncaughtExceptionsFilter(t -> t.getMessage() != null &&
t.getMessage().contains("There are not enough nodes in dc0 datacenter to
satisfy replication factor"));
+ Random rnd = new Random(2);
+ Supplier<Integer> nodeSelector = () -> rnd.nextInt(cluster.size()
- 1) + 1;
+ cluster.get(nodeSelector.get()).nodetoolResult("cms",
"reconfigure", "3").asserts().success();
+ for (int i = 2; i <= 3; i++)
+ ClusterUtils.stopUnchecked(cluster.get(i));
+
+ Thread startNode2 = new Thread(() -> {
+ cluster.get(2).startup();
+ });
+
+ Thread commitStartupTranformation = new Thread(() -> {
+ cluster.get(4).runOnInstance(() -> {
+ try
+ {
+ ClusterMetadataService.instance().commit(new
Startup(ClusterMetadata.current().myNodeId(),
+
new NodeAddresses(FBUtilities.getBroadcastAddressAndPort()),
+
NodeVersion.CURRENT));
+ }
+ catch (Throwable t)
+ {
+ fail("Should not happen");
+ }
+ });
+ });
+ commitStartupTranformation.start();
+ Thread.sleep(10_000);
+ startNode2.start();
+
+ commitStartupTranformation.join();
+ startNode2.join();
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]