This is an automated email from the ASF dual-hosted git repository.
ifesdjeen pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push:
new 3e6a551dba TCM: Catch up committing node on rejection
3e6a551dba is described below
commit 3e6a551dbab6ecdc97b99f9ec3118316bfaf1802
Author: Alex Petrov <[email protected]>
AuthorDate: Thu Jan 11 14:18:46 2024 +0100
TCM: Catch up committing node on rejection
Patch by Alex Petrov; reviewed by Marcus Eriksson for CASSANDRA-19260
---
.../cassandra/tcm/AbstractLocalProcessor.java | 10 +++--
src/java/org/apache/cassandra/tcm/Commit.java | 47 +++++++++++++++++++---
.../org/apache/cassandra/tcm/RemoteProcessor.java | 14 +++++--
...ationSmokeTest.java => LogReplicationTest.java} | 38 ++++++++++++++++-
4 files changed, 94 insertions(+), 15 deletions(-)
diff --git a/src/java/org/apache/cassandra/tcm/AbstractLocalProcessor.java
b/src/java/org/apache/cassandra/tcm/AbstractLocalProcessor.java
index 2d00085f3d..a72b5b664f 100644
--- a/src/java/org/apache/cassandra/tcm/AbstractLocalProcessor.java
+++ b/src/java/org/apache/cassandra/tcm/AbstractLocalProcessor.java
@@ -18,6 +18,7 @@
package org.apache.cassandra.tcm;
+import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import org.slf4j.Logger;
@@ -79,7 +80,7 @@ public abstract class AbstractLocalProcessor implements
Processor
{
return maybeFailure(entryId,
lastKnown,
- () -> new
Commit.Result.Failure(result.rejected().code, result.rejected().reason, true));
+ () ->
Commit.Result.rejected(result.rejected().code, result.rejected().reason,
toLogState(lastKnown)));
}
continue;
@@ -118,9 +119,10 @@ public abstract class AbstractLocalProcessor implements
Processor
retryPolicy.maybeSleep();
}
}
- return new Commit.Result.Failure(SERVER_ERROR,
- String.format("Could not perform
commit using the following retry stategy: %s", retryPolicy.tries),
- false);
+ return Commit.Result.failed(SERVER_ERROR,
+ String.format("Could not perform commit
after %d/%d tries. Time remaining: %dms",
+ retryPolicy.tries,
retryPolicy.maxTries,
+
TimeUnit.NANOSECONDS.toMillis(retryPolicy.remainingNanos())));
}
public Commit.Result maybeFailure(Entry.Id entryId, Epoch lastKnown,
Supplier<Commit.Result.Failure> orElse)
diff --git a/src/java/org/apache/cassandra/tcm/Commit.java
b/src/java/org/apache/cassandra/tcm/Commit.java
index f0146847f1..8871efa5b2 100644
--- a/src/java/org/apache/cassandra/tcm/Commit.java
+++ b/src/java/org/apache/cassandra/tcm/Commit.java
@@ -119,6 +119,9 @@ public class Commit
static volatile Result.Serializer resultSerializerCache;
public interface Result
{
+ IVersionedSerializer<Result> defaultMessageSerializer = new
Serializer(NodeVersion.CURRENT.serializationVersion());
+
+ LogState logState();
boolean isSuccess();
boolean isFailure();
@@ -131,7 +134,6 @@ public class Commit
{
return (Failure) this;
}
- IVersionedSerializer<Result> defaultMessageSerializer = new
Serializer(NodeVersion.CURRENT.serializationVersion());
static IVersionedSerializer<Result> messageSerializer(Version version)
{
@@ -163,6 +165,12 @@ public class Commit
'}';
}
+ @Override
+ public LogState logState()
+ {
+ return logState;
+ }
+
public boolean isSuccess()
{
return true;
@@ -174,6 +182,16 @@ public class Commit
}
}
+ static Failure rejected(ExceptionCode exceptionCode, String reason,
LogState logState)
+ {
+ return new Failure(exceptionCode, reason, logState, true);
+ }
+
+ static Failure failed(ExceptionCode exceptionCode, String message)
+ {
+ return new Failure(exceptionCode, message, LogState.EMPTY, false);
+ }
+
final class Failure implements Result
{
public final ExceptionCode code;
@@ -181,8 +199,9 @@ public class Commit
// Rejection means that we were able to linearize the operation,
// but it was rejected by the internal logic of the transformation.
public final boolean rejected;
+ public final LogState logState;
- public Failure(ExceptionCode code, String message, boolean
rejected)
+ private Failure(ExceptionCode code, String message, LogState
logState, boolean rejected)
{
if (message == null)
message = "";
@@ -190,6 +209,7 @@ public class Commit
// TypeSizes#sizeOf encoder only allows strings that are up to
Short.MAX_VALUE bytes large
this.message = message.substring(0,
Math.min(message.length(), Short.MAX_VALUE));
this.rejected = rejected;
+ this.logState = logState;
}
@Override
@@ -202,6 +222,12 @@ public class Commit
'}';
}
+ @Override
+ public LogState logState()
+ {
+ return logState;
+ }
+
public boolean isSuccess()
{
return false;
@@ -233,7 +259,7 @@ public class Commit
{
out.writeByte(SUCCESS);
out.writeUnsignedVInt32(serializationVersion.asInt());
-
LogState.metadataSerializer.serialize(t.success().logState, out,
serializationVersion);
+ LogState.metadataSerializer.serialize(t.logState(), out,
serializationVersion);
Epoch.serializer.serialize(t.success().epoch, out,
serializationVersion);
}
else
@@ -243,6 +269,8 @@ public class Commit
out.writeByte(failure.rejected ? REJECTED : FAILED);
out.writeUnsignedVInt32(failure.code.value);
out.writeUTF(failure.message);
+ out.writeUnsignedVInt32(serializationVersion.asInt());
+ LogState.metadataSerializer.serialize(t.logState(), out,
serializationVersion);
}
}
@@ -259,8 +287,13 @@ public class Commit
}
else
{
- return new
Failure(ExceptionCode.fromValue(in.readUnsignedVInt32()),
- in.readUTF(),
+ ExceptionCode exceptionCode =
ExceptionCode.fromValue(in.readUnsignedVInt32());
+ String message = in.readUTF();
+ Version deserializationVersion =
Version.fromInt(in.readUnsignedVInt32());
+ LogState delta =
LogState.metadataSerializer.deserialize(in, deserializationVersion);
+ return new Failure(exceptionCode,
+ message,
+ delta,
b == REJECTED);
}
}
@@ -272,7 +305,7 @@ public class Commit
if (t instanceof Success)
{
size +=
VIntCoding.computeUnsignedVIntSize(serializationVersion.asInt());
- size +=
LogState.metadataSerializer.serializedSize(t.success().logState,
serializationVersion);
+ size +=
LogState.metadataSerializer.serializedSize(t.logState(), serializationVersion);
size += Epoch.serializer.serializedSize(t.success().epoch,
serializationVersion);
}
else
@@ -280,6 +313,8 @@ public class Commit
assert t instanceof Failure;
size += VIntCoding.computeUnsignedVIntSize(((Failure)
t).code.value);
size += TypeSizes.sizeof(((Failure)t).message);
+ size +=
VIntCoding.computeUnsignedVIntSize(serializationVersion.asInt());
+ size +=
LogState.metadataSerializer.serializedSize(t.logState(), serializationVersion);
}
return size;
}
diff --git a/src/java/org/apache/cassandra/tcm/RemoteProcessor.java
b/src/java/org/apache/cassandra/tcm/RemoteProcessor.java
index 06772a1ec8..260d151419 100644
--- a/src/java/org/apache/cassandra/tcm/RemoteProcessor.java
+++ b/src/java/org/apache/cassandra/tcm/RemoteProcessor.java
@@ -78,20 +78,26 @@ public final class RemoteProcessor implements Processor
new
CandidateIterator(candidates(false)),
retryPolicy);
+ log.append(result.logState());
+
if (result.isSuccess())
{
Commit.Result.Success success = result.success();
- log.append(success.logState);
log.awaitAtLeast(success.epoch);
}
+ else
+ {
+ log.waitForHighestConsecutive();
+ }
return result;
}
catch (Exception e)
{
- return new Commit.Result.Failure(SERVER_ERROR, e.getMessage() ==
null
- ?
e.getClass().toString()
- : e.getMessage(),
false);
+ return Commit.Result.failed(SERVER_ERROR,
+ e.getMessage() == null
+ ? e.getClass().toString()
+ : e.getMessage());
}
}
diff --git
a/test/distributed/org/apache/cassandra/distributed/test/tcm/LogReplicationSmokeTest.java
b/test/distributed/org/apache/cassandra/distributed/test/tcm/LogReplicationTest.java
similarity index 68%
rename from
test/distributed/org/apache/cassandra/distributed/test/tcm/LogReplicationSmokeTest.java
rename to
test/distributed/org/apache/cassandra/distributed/test/tcm/LogReplicationTest.java
index 3d0b9a1894..fe35274162 100644
---
a/test/distributed/org/apache/cassandra/distributed/test/tcm/LogReplicationSmokeTest.java
+++
b/test/distributed/org/apache/cassandra/distributed/test/tcm/LogReplicationTest.java
@@ -24,9 +24,11 @@ import java.util.Map;
import java.util.Random;
import java.util.Set;
+import org.junit.Assert;
import org.junit.Test;
import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.ConsistencyLevel;
import org.apache.cassandra.distributed.api.IInvokableInstance;
import org.apache.cassandra.distributed.shared.ClusterUtils;
import org.apache.cassandra.distributed.test.TestBaseImpl;
@@ -42,7 +44,7 @@ import static
org.apache.cassandra.distributed.api.Feature.NETWORK;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
-public class LogReplicationSmokeTest extends TestBaseImpl
+public class LogReplicationTest extends TestBaseImpl
{
@Test
public void testRequestingPeerWatermarks() throws Throwable
@@ -73,6 +75,40 @@ public class LogReplicationSmokeTest extends TestBaseImpl
}
}
+ @Test
+ public void testCatchUpOnRejection() throws Throwable
+ {
+ try (Cluster cluster = builder().withNodes(3)
+ .withConfig(config ->
config.with(GOSSIP).with(NETWORK))
+ .start())
+ {
+ init(cluster);
+ IInvokableInstance cmsNode = cluster.get(1);
+ ClusterUtils.waitForCMSToQuiesce(cluster, cmsNode);
+
+ cluster.coordinator(1).execute("CREATE KEYSPACE only_once WITH
replication = {'class': 'SimpleStrategy', 'replication_factor': 1};",
+ ConsistencyLevel.ONE);
+
+ long cmsEpoch = cluster.get(1).callsOnInstance(() ->
ClusterMetadata.current().epoch.getEpoch()).call();
+ long epochBefore = cluster.get(2).callsOnInstance(() ->
ClusterMetadata.current().epoch.getEpoch()).call();
+ Assert.assertTrue(cmsEpoch > epochBefore);
+ // should get rejected
+ try
+ {
+ cluster.coordinator(2).execute("CREATE KEYSPACE only_once WITH
replication = {'class': 'SimpleStrategy', 'replication_factor': 1};",
+ ConsistencyLevel.ONE);
+ Assert.fail("Creation should have failed");
+ }
+ catch (Throwable t)
+ {
+ Assert.assertTrue(t.getMessage().contains("Cannot add existing
keyspace"));
+ System.out.println("t.getMessage() = " + t.getMessage());
+ }
+ long epochAfter = cluster.get(2).callsOnInstance(() ->
ClusterMetadata.current().epoch.getEpoch()).call();
+ Assert.assertTrue(epochAfter > epochBefore);
+ }
+ }
+
private int getConsistentValue(Cluster cluster)
{
Set<Integer> values = new HashSet<>();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]