This is an automated email from the ASF dual-hosted git repository.
dcapwell pushed a commit to branch cassandra-5.0
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/cassandra-5.0 by this push:
new dca76145c2 Add retries to IR messages
dca76145c2 is described below
commit dca76145c2c1f846ed624c93b9c64484ce1946b7
Author: David Capwell <[email protected]>
AuthorDate: Tue Oct 31 13:56:29 2023 -0700
Add retries to IR messages
patch by David Capwell; reviewed by Blake Eggleston for CASSANDRA-18962
---
CHANGES.txt | 1 +
src/java/org/apache/cassandra/net/Verb.java | 12 +--
.../cassandra/repair/RepairMessageVerbHandler.java | 17 ++-
.../org/apache/cassandra/repair/SharedContext.java | 115 +++++++++++++++++++++
.../repair/consistent/ConsistentSession.java | 2 +-
.../repair/consistent/CoordinatorSession.java | 57 ++++++----
.../repair/consistent/CoordinatorSessions.java | 24 ++++-
.../cassandra/repair/consistent/LocalSessions.java | 77 ++++++++++----
.../cassandra/repair/messages/FailSession.java | 6 ++
.../cassandra/repair/messages/FinalizeCommit.java | 6 ++
.../cassandra/repair/messages/FinalizePromise.java | 6 ++
.../cassandra/repair/messages/FinalizePropose.java | 6 ++
.../repair/messages/PrepareConsistentRequest.java | 6 ++
.../repair/messages/PrepareConsistentResponse.java | 6 ++
.../cassandra/repair/messages/RepairMessage.java | 32 +++++-
.../org/apache/cassandra/net/MatcherResponse.java | 13 ++-
.../repair/ConcurrentIrWithPreviewFuzzTest.java | 18 +++-
.../cassandra/repair/FailingRepairFuzzTest.java | 1 +
.../org/apache/cassandra/repair/FuzzTestBase.java | 36 +++++--
.../cassandra/repair/SlowMessageFuzzTest.java | 1 +
.../repair/consistent/CoordinatorSessionTest.java | 64 ++++++------
.../repair/consistent/CoordinatorSessionsTest.java | 24 +++--
.../repair/consistent/LocalSessionTest.java | 42 +++++---
.../cassandra/repair/consistent/MockMessaging.java | 88 ++++++++++++++++
24 files changed, 524 insertions(+), 136 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index 8611719d56..f288f3206e 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
5.0-alpha2
+ * Add retries to IR messages (CASSANDRA-18962)
* Add metrics and logging to repair retries (CASSANDRA-18952)
* Remove deprecated code in Cassandra 1.x and 2.x (CASSANDRA-18959)
* ClientRequestSize metrics should not treat CONTAINS restrictions as being
equality-based (CASSANDRA-18896)
diff --git a/src/java/org/apache/cassandra/net/Verb.java
b/src/java/org/apache/cassandra/net/Verb.java
index cf0f2e30a3..c85f0ddeca 100644
--- a/src/java/org/apache/cassandra/net/Verb.java
+++ b/src/java/org/apache/cassandra/net/Verb.java
@@ -169,12 +169,12 @@ public enum Verb
PREPARE_MSG (105, P1, repairWithBackoffTimeout,
ANTI_ENTROPY, () -> PrepareMessage.serializer, () ->
RepairMessageVerbHandler.instance(), REPAIR_RSP ),
SNAPSHOT_MSG (106, P1, repairWithBackoffTimeout,
ANTI_ENTROPY, () -> SnapshotMessage.serializer, () ->
RepairMessageVerbHandler.instance(), REPAIR_RSP ),
CLEANUP_MSG (107, P1, repairWithBackoffTimeout,
ANTI_ENTROPY, () -> CleanupMessage.serializer, () ->
RepairMessageVerbHandler.instance(), REPAIR_RSP ),
- PREPARE_CONSISTENT_RSP (109, P1, repairTimeout, ANTI_ENTROPY, () ->
PrepareConsistentResponse.serializer, () ->
RepairMessageVerbHandler.instance(), REPAIR_RSP ),
- PREPARE_CONSISTENT_REQ (108, P1, repairTimeout, ANTI_ENTROPY, () ->
PrepareConsistentRequest.serializer, () ->
RepairMessageVerbHandler.instance(), REPAIR_RSP ),
- FINALIZE_PROPOSE_MSG (110, P1, repairTimeout, ANTI_ENTROPY, () ->
FinalizePropose.serializer, () ->
RepairMessageVerbHandler.instance(), REPAIR_RSP ),
- FINALIZE_PROMISE_MSG (111, P1, repairTimeout, ANTI_ENTROPY, () ->
FinalizePromise.serializer, () ->
RepairMessageVerbHandler.instance(), REPAIR_RSP ),
- FINALIZE_COMMIT_MSG (112, P1, repairTimeout, ANTI_ENTROPY, () ->
FinalizeCommit.serializer, () ->
RepairMessageVerbHandler.instance(), REPAIR_RSP ),
- FAILED_SESSION_MSG (113, P1, repairTimeout, ANTI_ENTROPY, () ->
FailSession.serializer, () ->
RepairMessageVerbHandler.instance(), REPAIR_RSP ),
+ PREPARE_CONSISTENT_RSP (109, P1, repairWithBackoffTimeout,
ANTI_ENTROPY, () -> PrepareConsistentResponse.serializer, () ->
RepairMessageVerbHandler.instance(), REPAIR_RSP ),
+ PREPARE_CONSISTENT_REQ (108, P1, repairWithBackoffTimeout,
ANTI_ENTROPY, () -> PrepareConsistentRequest.serializer, () ->
RepairMessageVerbHandler.instance(), REPAIR_RSP ),
+ FINALIZE_PROPOSE_MSG (110, P1, repairWithBackoffTimeout,
ANTI_ENTROPY, () -> FinalizePropose.serializer, () ->
RepairMessageVerbHandler.instance(), REPAIR_RSP ),
+ FINALIZE_PROMISE_MSG (111, P1, repairWithBackoffTimeout,
ANTI_ENTROPY, () -> FinalizePromise.serializer, () ->
RepairMessageVerbHandler.instance(), REPAIR_RSP ),
+ FINALIZE_COMMIT_MSG (112, P1, repairWithBackoffTimeout,
ANTI_ENTROPY, () -> FinalizeCommit.serializer, () ->
RepairMessageVerbHandler.instance(), REPAIR_RSP ),
+ FAILED_SESSION_MSG (113, P1, repairWithBackoffTimeout,
ANTI_ENTROPY, () -> FailSession.serializer, () ->
RepairMessageVerbHandler.instance(), REPAIR_RSP ),
STATUS_RSP (115, P1, repairTimeout, ANTI_ENTROPY, () ->
StatusResponse.serializer, () ->
RepairMessageVerbHandler.instance(), REPAIR_RSP ),
STATUS_REQ (114, P1, repairTimeout, ANTI_ENTROPY, () ->
StatusRequest.serializer, () ->
RepairMessageVerbHandler.instance(), REPAIR_RSP ),
diff --git a/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java
b/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java
index 988159860d..34f20cd708 100644
--- a/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java
+++ b/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java
@@ -25,7 +25,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.db.ColumnFamilyStore;
-import org.apache.cassandra.exceptions.RequestFailureReason;
import org.apache.cassandra.net.IVerbHandler;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.repair.messages.*;
@@ -286,27 +285,28 @@ public class RepairMessageVerbHandler implements
IVerbHandler<RepairMessage>
break;
case PREPARE_CONSISTENT_REQ:
-
ctx.repair().consistent.local.handlePrepareMessage(message.from(),
(PrepareConsistentRequest) message.payload);
+
ctx.repair().consistent.local.handlePrepareMessage(message);
break;
case PREPARE_CONSISTENT_RSP:
-
ctx.repair().consistent.coordinated.handlePrepareResponse((PrepareConsistentResponse)
message.payload);
+
ctx.repair().consistent.coordinated.handlePrepareResponse(message);
break;
case FINALIZE_PROPOSE_MSG:
-
ctx.repair().consistent.local.handleFinalizeProposeMessage(message.from(),
(FinalizePropose) message.payload);
+
ctx.repair().consistent.local.handleFinalizeProposeMessage(message);
break;
case FINALIZE_PROMISE_MSG:
-
ctx.repair().consistent.coordinated.handleFinalizePromiseMessage((FinalizePromise)
message.payload);
+
ctx.repair().consistent.coordinated.handleFinalizePromiseMessage(message);
break;
case FINALIZE_COMMIT_MSG:
-
ctx.repair().consistent.local.handleFinalizeCommitMessage(message.from(),
(FinalizeCommit) message.payload);
+
ctx.repair().consistent.local.handleFinalizeCommitMessage(message);
break;
case FAILED_SESSION_MSG:
FailSession failure = (FailSession) message.payload;
+ sendAck(message);
ctx.repair().consistent.coordinated.handleFailSessionMessage(failure);
ctx.repair().consistent.local.handleFailSessionMessage(message.from(), failure);
break;
@@ -410,12 +410,11 @@ public class RepairMessageVerbHandler implements
IVerbHandler<RepairMessage>
private void sendFailureResponse(Message<?> respondTo)
{
- Message<?> reply =
respondTo.failureResponse(RequestFailureReason.UNKNOWN);
- ctx.messaging().send(reply, respondTo.from());
+ RepairMessage.sendFailureResponse(ctx, respondTo);
}
private void sendAck(Message<RepairMessage> message)
{
- ctx.messaging().send(message.emptyResponse(), message.from());
+ RepairMessage.sendAck(ctx, message);
}
}
diff --git a/src/java/org/apache/cassandra/repair/SharedContext.java
b/src/java/org/apache/cassandra/repair/SharedContext.java
index defc59a83c..8ccc88f584 100644
--- a/src/java/org/apache/cassandra/repair/SharedContext.java
+++ b/src/java/org/apache/cassandra/repair/SharedContext.java
@@ -59,6 +59,16 @@ public interface SharedContext
ScheduledExecutorPlus optionalTasks();
MessageDelivery messaging();
+ default SharedContext withMessaging(MessageDelivery messaging)
+ {
+ return new ForwardingSharedContext(this) {
+ @Override
+ public MessageDelivery messaging()
+ {
+ return messaging;
+ }
+ };
+ }
IFailureDetector failureDetector();
IEndpointSnitch snitch();
IGossiper gossiper();
@@ -162,4 +172,109 @@ public interface SharedContext
return StreamPlan::execute;
}
}
+
+ class ForwardingSharedContext implements SharedContext
+ {
+ private final SharedContext delegate;
+
+ public ForwardingSharedContext(SharedContext delegate)
+ {
+ this.delegate = delegate;
+ }
+
+ protected SharedContext delegate()
+ {
+ return delegate;
+ }
+
+ @Override
+ public InetAddressAndPort broadcastAddressAndPort()
+ {
+ return delegate().broadcastAddressAndPort();
+ }
+
+ @Override
+ public Supplier<Random> random()
+ {
+ return delegate().random();
+ }
+
+ @Override
+ public Clock clock()
+ {
+ return delegate().clock();
+ }
+
+ @Override
+ public ExecutorFactory executorFactory()
+ {
+ return delegate().executorFactory();
+ }
+
+ @Override
+ public MBeanWrapper mbean()
+ {
+ return delegate().mbean();
+ }
+
+ @Override
+ public ScheduledExecutorPlus optionalTasks()
+ {
+ return delegate().optionalTasks();
+ }
+
+ @Override
+ public MessageDelivery messaging()
+ {
+ return delegate().messaging();
+ }
+
+ @Override
+ public IFailureDetector failureDetector()
+ {
+ return delegate().failureDetector();
+ }
+
+ @Override
+ public IEndpointSnitch snitch()
+ {
+ return delegate().snitch();
+ }
+
+ @Override
+ public IGossiper gossiper()
+ {
+ return delegate().gossiper();
+ }
+
+ @Override
+ public ICompactionManager compactionManager()
+ {
+ return delegate().compactionManager();
+ }
+
+ @Override
+ public ActiveRepairService repair()
+ {
+ return delegate().repair();
+ }
+
+ @Override
+ public IValidationManager validationManager()
+ {
+ return delegate().validationManager();
+ }
+
+ @Override
+ public TableRepairManager repairManager(ColumnFamilyStore store)
+ {
+ return delegate().repairManager(store);
+ }
+
+ @Override
+ public StreamExecutor streamExecutor()
+ {
+ return delegate().streamExecutor();
+ }
+ }
}
diff --git
a/src/java/org/apache/cassandra/repair/consistent/ConsistentSession.java
b/src/java/org/apache/cassandra/repair/consistent/ConsistentSession.java
index 76645f1adc..8101f02912 100644
--- a/src/java/org/apache/cassandra/repair/consistent/ConsistentSession.java
+++ b/src/java/org/apache/cassandra/repair/consistent/ConsistentSession.java
@@ -70,7 +70,7 @@ import org.apache.cassandra.utils.TimeUUID;
* {@code PREPARED}, and a {@link PrepareConsistentResponse} is sent to the
coordinator indicating success or failure.
* If the pending anti-compaction fails, the local session state is set to
{@code FAILED}.
* <p/>
- * (see {@link LocalSessions#handlePrepareMessage(InetAddressAndPort,
PrepareConsistentRequest)}
+ * (see {@link
LocalSessions#handlePrepareMessage(org.apache.cassandra.net.Message)}
* <p/>
* Once the coordinator recieves positive {@code PrepareConsistentResponse}
messages from all the participants, the
* coordinator begins the normal repair process.
diff --git
a/src/java/org/apache/cassandra/repair/consistent/CoordinatorSession.java
b/src/java/org/apache/cassandra/repair/consistent/CoordinatorSession.java
index 0ca1576df1..6771dfe07d 100644
--- a/src/java/org/apache/cassandra/repair/consistent/CoordinatorSession.java
+++ b/src/java/org/apache/cassandra/repair/consistent/CoordinatorSession.java
@@ -31,6 +31,8 @@ import com.google.common.collect.Iterables;
import org.apache.cassandra.concurrent.ImmediateExecutor;
import org.apache.cassandra.repair.SharedContext;
import org.apache.cassandra.repair.CoordinatedRepairResult;
+import org.apache.cassandra.repair.messages.FinalizePromise;
+import org.apache.cassandra.repair.messages.PrepareConsistentResponse;
import org.apache.cassandra.utils.concurrent.AsyncPromise;
import org.apache.cassandra.utils.concurrent.Future;
@@ -47,10 +49,14 @@ import org.apache.cassandra.repair.messages.FailSession;
import org.apache.cassandra.repair.messages.FinalizeCommit;
import org.apache.cassandra.repair.messages.FinalizePropose;
import org.apache.cassandra.repair.messages.PrepareConsistentRequest;
-import org.apache.cassandra.repair.messages.RepairMessage;
import org.apache.cassandra.service.ActiveRepairService;
import org.apache.cassandra.utils.concurrent.ImmediateFuture;
+import static org.apache.cassandra.repair.messages.RepairMessage.notDone;
+import static org.apache.cassandra.repair.messages.RepairMessage.sendAck;
+import static
org.apache.cassandra.repair.messages.RepairMessage.sendFailureResponse;
+import static
org.apache.cassandra.repair.messages.RepairMessage.sendMessageWithRetries;
+
/**
* Coordinator side logic and state of a consistent repair session. Like
{@link ActiveRepairService.ParentRepairSession},
* there is only one {@code CoordinatorSession} per user repair command,
regardless of the number of tables and token
@@ -154,34 +160,37 @@ public class CoordinatorSession extends ConsistentSession
return getState() == State.FAILED ||
Iterables.any(participantStates.values(), v -> v == State.FAILED);
}
- protected void sendMessage(InetAddressAndPort destination,
Message<RepairMessage> message)
- {
- logger.trace("Sending {} to {}", message.payload, destination);
-
- ctx.messaging().send(message, destination);
- }
-
public Future<Void> prepare()
{
Preconditions.checkArgument(allStates(State.PREPARING));
logger.info("Beginning prepare phase of incremental repair session
{}", sessionID);
- Message<RepairMessage> message =
- Message.out(Verb.PREPARE_CONSISTENT_REQ, new
PrepareConsistentRequest(sessionID, coordinator, participants));
+
+ PrepareConsistentRequest request = new
PrepareConsistentRequest(sessionID, coordinator, participants);
for (final InetAddressAndPort participant : participants)
{
- sendMessage(participant, message);
+ sendMessageWithRetries(ctx,
+ notDone(prepareFuture),
+ request,
+ Verb.PREPARE_CONSISTENT_REQ,
+ participant);
}
return prepareFuture;
}
- public synchronized void handlePrepareResponse(InetAddressAndPort
participant, boolean success)
+ public synchronized void
handlePrepareResponse(Message<PrepareConsistentResponse> msg)
{
+ InetAddressAndPort participant = msg.payload.participant;
+ boolean success = msg.payload.success;
if (getState() == State.FAILED)
{
logger.trace("Incremental repair {} has failed, ignoring prepare
response from {}", sessionID, participant);
+ sendFailureResponse(ctx, msg);
return;
}
+ sendAck(ctx, msg);
+ if (getParticipantState(participant) != State.PREPARING)
+ return;
if (!success)
{
logger.warn("{} failed the prepare phase for incremental repair
session {}", participant, sessionID);
@@ -218,19 +227,29 @@ public class CoordinatorSession extends ConsistentSession
{
Preconditions.checkArgument(allStates(State.REPAIRING));
logger.info("Proposing finalization of repair session {}", sessionID);
- Message<RepairMessage> message =
Message.out(Verb.FINALIZE_PROPOSE_MSG, new FinalizePropose(sessionID));
+ FinalizePropose request = new FinalizePropose(sessionID);
for (final InetAddressAndPort participant : participants)
{
- sendMessage(participant, message);
+ sendMessageWithRetries(ctx, notDone(finalizeProposeFuture),
request, Verb.FINALIZE_PROPOSE_MSG, participant);
}
return finalizeProposeFuture;
}
- public synchronized void handleFinalizePromise(InetAddressAndPort
participant, boolean success)
+ public synchronized void handleFinalizePromise(Message<FinalizePromise>
message)
{
+ InetAddressAndPort participant = message.payload.participant;
+ boolean success = message.payload.promised;
if (getState() == State.FAILED)
{
logger.trace("Incremental repair {} has failed, ignoring finalize
promise from {}", sessionID, participant);
+ sendFailureResponse(ctx, message);
+ return;
+ }
+ sendAck(ctx, message);
+ if (getParticipantState(participant) != State.REPAIRING)
+ {
+ // this message is a retry, or we failed the session; in either
case there is nothing more to do than ack
+ return;
}
else if (!success)
{
@@ -253,10 +272,10 @@ public class CoordinatorSession extends ConsistentSession
{
Preconditions.checkArgument(allStates(State.FINALIZE_PROMISED));
logger.info("Committing finalization of repair session {}", sessionID);
- Message<RepairMessage> message = Message.out(Verb.FINALIZE_COMMIT_MSG,
new FinalizeCommit(sessionID));
+ FinalizeCommit payload = new FinalizeCommit(sessionID);
for (final InetAddressAndPort participant : participants)
{
- sendMessage(participant, message);
+ sendMessageWithRetries(ctx, payload, Verb.FINALIZE_COMMIT_MSG,
participant);
}
setAll(State.FINALIZED);
logger.info("Incremental repair session {} completed", sessionID);
@@ -264,12 +283,12 @@ public class CoordinatorSession extends ConsistentSession
private void sendFailureMessageToParticipants()
{
- Message<RepairMessage> message = Message.out(Verb.FAILED_SESSION_MSG,
new FailSession(sessionID));
+ FailSession payload = new FailSession(sessionID);
for (final InetAddressAndPort participant : participants)
{
if (participantStates.get(participant) != State.FAILED)
{
- sendMessage(participant, message);
+ sendMessageWithRetries(ctx, payload, Verb.FAILED_SESSION_MSG,
participant);
}
}
}
diff --git
a/src/java/org/apache/cassandra/repair/consistent/CoordinatorSessions.java
b/src/java/org/apache/cassandra/repair/consistent/CoordinatorSessions.java
index 1abab8c476..6f02f033d3 100644
--- a/src/java/org/apache/cassandra/repair/consistent/CoordinatorSessions.java
+++ b/src/java/org/apache/cassandra/repair/consistent/CoordinatorSessions.java
@@ -24,15 +24,19 @@ import java.util.Set;
import com.google.common.base.Preconditions;
+import org.apache.cassandra.net.Message;
import org.apache.cassandra.repair.SharedContext;
import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.repair.messages.FailSession;
import org.apache.cassandra.repair.messages.FinalizePromise;
import org.apache.cassandra.repair.messages.PrepareConsistentResponse;
+import org.apache.cassandra.repair.messages.RepairMessage;
import org.apache.cassandra.service.ActiveRepairService;
import org.apache.cassandra.repair.NoSuchRepairSessionException;
import org.apache.cassandra.utils.TimeUUID;
+import static
org.apache.cassandra.repair.messages.RepairMessage.sendFailureResponse;
+
/**
* Container for all consistent repair sessions a node is coordinating
*/
@@ -81,21 +85,31 @@ public class CoordinatorSessions
return sessions.get(sessionId);
}
- public void handlePrepareResponse(PrepareConsistentResponse msg)
+ public void handlePrepareResponse(Message<? extends RepairMessage> msg)
{
- CoordinatorSession session = getSession(msg.parentSession);
+ PrepareConsistentResponse payload = (PrepareConsistentResponse)
msg.payload;
+ CoordinatorSession session = getSession(payload.parentSession);
if (session != null)
{
- session.handlePrepareResponse(msg.participant, msg.success);
+ session.handlePrepareResponse((Message<PrepareConsistentResponse>)
msg);
+ }
+ else
+ {
+ sendFailureResponse(ctx, msg);
}
}
- public void handleFinalizePromiseMessage(FinalizePromise msg)
+ public void handleFinalizePromiseMessage(Message<? extends RepairMessage>
message)
{
+ FinalizePromise msg = (FinalizePromise) message.payload;
CoordinatorSession session = getSession(msg.sessionID);
if (session != null)
{
- session.handleFinalizePromise(msg.participant, msg.promised);
+ session.handleFinalizePromise((Message<FinalizePromise>) message);
+ }
+ else
+ {
+ sendFailureResponse(ctx, message);
}
}
diff --git a/src/java/org/apache/cassandra/repair/consistent/LocalSessions.java
b/src/java/org/apache/cassandra/repair/consistent/LocalSessions.java
index 85bdf15923..75c50e5dce 100644
--- a/src/java/org/apache/cassandra/repair/consistent/LocalSessions.java
+++ b/src/java/org/apache/cassandra/repair/consistent/LocalSessions.java
@@ -36,6 +36,7 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.BooleanSupplier;
import java.util.function.Predicate;
+import java.util.function.Supplier;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
@@ -58,6 +59,7 @@ import org.slf4j.LoggerFactory;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.locator.Replica;
+import org.apache.cassandra.net.Verb;
import org.apache.cassandra.repair.KeyspaceRepairManager;
import org.apache.cassandra.repair.consistent.admin.CleanupSummary;
import org.apache.cassandra.repair.consistent.admin.PendingStat;
@@ -105,6 +107,9 @@ import static
org.apache.cassandra.net.Verb.PREPARE_CONSISTENT_RSP;
import static org.apache.cassandra.net.Verb.STATUS_REQ;
import static org.apache.cassandra.net.Verb.STATUS_RSP;
import static org.apache.cassandra.repair.consistent.ConsistentSession.State.*;
+import static org.apache.cassandra.repair.messages.RepairMessage.always;
+import static org.apache.cassandra.repair.messages.RepairMessage.sendAck;
+import static
org.apache.cassandra.repair.messages.RepairMessage.sendFailureResponse;
/**
* Manages all consistent repair sessions a node is participating in.
@@ -335,11 +340,11 @@ public class LocalSessions
sessionID, session.coordinator);
setStateAndSave(session, FAILED);
- Message<FailSession> message = Message.out(FAILED_SESSION_MSG, new
FailSession(sessionID));
+ FailSession payload = new FailSession(sessionID);
for (InetAddressAndPort participant : session.participants)
{
if (!participant.equals(getBroadcastAddressAndPort()))
- sendMessage(participant, message);
+ sendMessageWithRetries(payload, FAILED_SESSION_MSG,
participant);
}
}
@@ -637,10 +642,13 @@ public class LocalSessions
}
@VisibleForTesting
- synchronized void putSessionUnsafe(LocalSession session)
+ synchronized boolean putSessionUnsafe(LocalSession session)
{
+ if (sessions.containsKey(session.sessionID))
+ return false;
putSession(session);
save(session);
+ return true;
}
private synchronized void putSession(LocalSession session)
@@ -696,12 +704,19 @@ public class LocalSessions
@VisibleForTesting
void setStateAndSave(LocalSession session, ConsistentSession.State state)
+ {
+ maybeSetStateAndSave(session, null, state);
+ }
+
+ private boolean maybeSetStateAndSave(LocalSession session, @Nullable
ConsistentSession.State expected, ConsistentSession.State state)
{
synchronized (session)
{
Preconditions.checkArgument(session.getState().canTransitionTo(state),
"Invalid state transition %s -> %s",
session.getState(), state);
+ if (expected != null && session.getState() != expected)
+ return false;
logger.trace("Changing LocalSession state from {} -> {} for {}",
session.getState(), state, session.sessionID);
boolean wasCompleted = session.isCompleted();
session.setState(state);
@@ -714,6 +729,7 @@ public class LocalSessions
}
for (Listener listener : listeners)
listener.onIRStateChange(session);
+ return true;
}
}
@@ -746,7 +762,7 @@ public class LocalSessions
}
if (sendMessage)
{
- sendMessage(session.coordinator,
Message.out(FAILED_SESSION_MSG, new FailSession(session.sessionID)));
+ sendMessageWithRetries(new FailSession(session.sessionID),
FAILED_SESSION_MSG, session.coordinator);
}
}
}
@@ -802,8 +818,10 @@ public class LocalSessions
* successfully. If the data preparation fails, a failure message is sent
to the coordinator,
* cancelling the session.
*/
- public void handlePrepareMessage(InetAddressAndPort from,
PrepareConsistentRequest request)
+ public void handlePrepareMessage(Message<? extends RepairMessage> message)
{
+ InetAddressAndPort from = message.from();
+ PrepareConsistentRequest request = (PrepareConsistentRequest)
message.payload;
logger.trace("received {} from {}", request, from);
TimeUUID sessionID = request.parentSession;
InetAddressAndPort coordinator = request.coordinator;
@@ -817,12 +835,15 @@ public class LocalSessions
catch (Throwable e)
{
logger.error("Error retrieving ParentRepairSession for session {},
responding with failure", sessionID);
- sendMessage(coordinator, Message.out(PREPARE_CONSISTENT_RSP, new
PrepareConsistentResponse(sessionID, getBroadcastAddressAndPort(), false)));
+ sendFailureResponse(ctx, message);
+ sendMessageWithRetries(always(), new
PrepareConsistentResponse(sessionID, getBroadcastAddressAndPort(), false),
PREPARE_CONSISTENT_RSP, coordinator);
return;
}
LocalSession session = createSessionUnsafe(sessionID, parentSession,
peers);
- putSessionUnsafe(session);
+ sendAck(ctx, message);
+ if (!putSessionUnsafe(session))
+ return;
logger.info("Beginning local incremental repair session {}", session);
ExecutorService executor = ctx.executorFactory().pooled("Repair-" +
sessionID, parentSession.getColumnFamilyStores().size());
@@ -841,10 +862,7 @@ public class LocalSessions
logger.info("Prepare phase for incremental repair session
{} completed", sessionID);
if (!prepareSessionExceptFailed(session))
logger.info("Session {} failed before anticompaction
completed", sessionID);
- Message<PrepareConsistentResponse> message =
- Message.out(PREPARE_CONSISTENT_RSP,
- new PrepareConsistentResponse(sessionID,
getBroadcastAddressAndPort(), session.getState() != FAILED));
- sendMessage(coordinator, message);
+ sendMessageWithRetries(always(), new
PrepareConsistentResponse(sessionID, getBroadcastAddressAndPort(),
session.getState() != FAILED), PREPARE_CONSISTENT_RSP, coordinator);
}
finally
{
@@ -862,9 +880,7 @@ public class LocalSessions
logger.warn("No such repair session: {}", sessionID);
else
logger.error("Prepare phase for incremental repair
session {} failed", sessionID, t);
- sendMessage(coordinator,
- Message.out(PREPARE_CONSISTENT_RSP,
- new
PrepareConsistentResponse(sessionID, getBroadcastAddressAndPort(), false)));
+ sendMessageWithRetries(always(), new
PrepareConsistentResponse(sessionID, getBroadcastAddressAndPort(), false),
PREPARE_CONSISTENT_RSP, coordinator);
failSession(sessionID, false);
}
finally
@@ -875,6 +891,16 @@ public class LocalSessions
});
}
+ private void sendMessageWithRetries(Supplier<Boolean> allowRetry,
RepairMessage request, Verb verb, InetAddressAndPort endpoint)
+ {
+ RepairMessage.sendMessageWithRetries(ctx, allowRetry, request, verb,
endpoint);
+ }
+
+ private void sendMessageWithRetries(RepairMessage request, Verb verb,
InetAddressAndPort endpoint)
+ {
+ RepairMessage.sendMessageWithRetries(ctx, request, verb, endpoint);
+ }
+
/**
* Checks for the session state, and sets it to prepared unless it is on a
failed state.
* Making the checks inside a synchronized block to prevent the session
state from
@@ -904,21 +930,26 @@ public class LocalSessions
}
}
- public void handleFinalizeProposeMessage(InetAddressAndPort from,
FinalizePropose propose)
+ public void handleFinalizeProposeMessage(Message<? extends RepairMessage>
message)
{
+ InetAddressAndPort from = message.from();
+ FinalizePropose propose = (FinalizePropose) message.payload;
logger.trace("received {} from {}", propose, from);
TimeUUID sessionID = propose.sessionID;
LocalSession session = getSession(sessionID);
if (session == null)
{
logger.info("Received FinalizePropose message for unknown repair
session {}, responding with failure", sessionID);
- sendMessage(from, Message.out(FAILED_SESSION_MSG, new
FailSession(sessionID)));
+ sendFailureResponse(ctx, message);
+ sendMessageWithRetries(new FailSession(sessionID),
FAILED_SESSION_MSG, from);
return;
}
+ sendAck(ctx, message);
try
{
- setStateAndSave(session, FINALIZE_PROMISED);
+ if (!maybeSetStateAndSave(session, REPAIRING, FINALIZE_PROMISED))
+ return;
/*
Flushing the repairs table here, *before* responding to the
coordinator prevents a scenario where we respond
@@ -929,7 +960,7 @@ public class LocalSessions
*/
syncTable();
- sendMessage(from, Message.out(FINALIZE_PROMISE_MSG, new
FinalizePromise(sessionID, getBroadcastAddressAndPort(), true)));
+ RepairMessage.sendMessageWithRetries(ctx, new
FinalizePromise(sessionID, getBroadcastAddressAndPort(), true),
FINALIZE_PROMISE_MSG, from);
logger.info("Received FinalizePropose message for incremental
repair session {}, responded with FinalizePromise", sessionID);
}
catch (IllegalArgumentException e)
@@ -959,19 +990,23 @@ public class LocalSessions
* as part of the compaction process, and avoids having to worry about in
progress compactions interfering with the
* promotion.
*/
- public void handleFinalizeCommitMessage(InetAddressAndPort from,
FinalizeCommit commit)
+ public void handleFinalizeCommitMessage(Message<? extends RepairMessage>
message)
{
+ InetAddressAndPort from = message.from();
+ FinalizeCommit commit = (FinalizeCommit) message.payload;
logger.trace("received {} from {}", commit, from);
TimeUUID sessionID = commit.sessionID;
LocalSession session = getSession(sessionID);
if (session == null)
{
logger.warn("Ignoring FinalizeCommit message for unknown repair
session {}", sessionID);
+ sendFailureResponse(ctx, message);
return;
}
+ sendAck(ctx, message);
- setStateAndSave(session, FINALIZED);
- logger.info("Finalized local repair session {}", sessionID);
+ if (maybeSetStateAndSave(session, FINALIZE_PROMISED, FINALIZED))
+ logger.info("Finalized local repair session {}", sessionID);
}
public void handleFailSessionMessage(InetAddressAndPort from, FailSession
msg)
diff --git a/src/java/org/apache/cassandra/repair/messages/FailSession.java
b/src/java/org/apache/cassandra/repair/messages/FailSession.java
index f6826eec6f..19102fb511 100644
--- a/src/java/org/apache/cassandra/repair/messages/FailSession.java
+++ b/src/java/org/apache/cassandra/repair/messages/FailSession.java
@@ -36,6 +36,12 @@ public class FailSession extends RepairMessage
this.sessionID = sessionID;
}
+ @Override
+ public TimeUUID parentRepairSession()
+ {
+ return sessionID;
+ }
+
public boolean equals(Object o)
{
if (this == o) return true;
diff --git a/src/java/org/apache/cassandra/repair/messages/FinalizeCommit.java
b/src/java/org/apache/cassandra/repair/messages/FinalizeCommit.java
index ca4e6d8a88..d1d9efa601 100644
--- a/src/java/org/apache/cassandra/repair/messages/FinalizeCommit.java
+++ b/src/java/org/apache/cassandra/repair/messages/FinalizeCommit.java
@@ -36,6 +36,12 @@ public class FinalizeCommit extends RepairMessage
this.sessionID = sessionID;
}
+ @Override
+ public TimeUUID parentRepairSession()
+ {
+ return sessionID;
+ }
+
public boolean equals(Object o)
{
if (this == o) return true;
diff --git a/src/java/org/apache/cassandra/repair/messages/FinalizePromise.java
b/src/java/org/apache/cassandra/repair/messages/FinalizePromise.java
index c45f46d48e..ed514d859f 100644
--- a/src/java/org/apache/cassandra/repair/messages/FinalizePromise.java
+++ b/src/java/org/apache/cassandra/repair/messages/FinalizePromise.java
@@ -45,6 +45,12 @@ public class FinalizePromise extends RepairMessage
this.promised = promised;
}
+ @Override
+ public TimeUUID parentRepairSession()
+ {
+ return sessionID;
+ }
+
public boolean equals(Object o)
{
if (this == o) return true;
diff --git a/src/java/org/apache/cassandra/repair/messages/FinalizePropose.java
b/src/java/org/apache/cassandra/repair/messages/FinalizePropose.java
index b3c4bfddb9..c1e76e347b 100644
--- a/src/java/org/apache/cassandra/repair/messages/FinalizePropose.java
+++ b/src/java/org/apache/cassandra/repair/messages/FinalizePropose.java
@@ -36,6 +36,12 @@ public class FinalizePropose extends RepairMessage
this.sessionID = sessionID;
}
+ @Override
+ public TimeUUID parentRepairSession()
+ {
+ return sessionID;
+ }
+
public boolean equals(Object o)
{
if (this == o) return true;
diff --git
a/src/java/org/apache/cassandra/repair/messages/PrepareConsistentRequest.java
b/src/java/org/apache/cassandra/repair/messages/PrepareConsistentRequest.java
index 9ac1461f65..1a3a6ecd38 100644
---
a/src/java/org/apache/cassandra/repair/messages/PrepareConsistentRequest.java
+++
b/src/java/org/apache/cassandra/repair/messages/PrepareConsistentRequest.java
@@ -50,6 +50,12 @@ public class PrepareConsistentRequest extends RepairMessage
this.participants = ImmutableSet.copyOf(participants);
}
+ @Override
+ public TimeUUID parentRepairSession()
+ {
+ return parentSession;
+ }
+
public boolean equals(Object o)
{
if (this == o) return true;
diff --git
a/src/java/org/apache/cassandra/repair/messages/PrepareConsistentResponse.java
b/src/java/org/apache/cassandra/repair/messages/PrepareConsistentResponse.java
index a422d7f9f7..80f62c1c03 100644
---
a/src/java/org/apache/cassandra/repair/messages/PrepareConsistentResponse.java
+++
b/src/java/org/apache/cassandra/repair/messages/PrepareConsistentResponse.java
@@ -45,6 +45,12 @@ public class PrepareConsistentResponse extends RepairMessage
this.success = success;
}
+ @Override
+ public TimeUUID parentRepairSession()
+ {
+ return parentSession;
+ }
+
public boolean equals(Object o)
{
if (this == o) return true;
diff --git a/src/java/org/apache/cassandra/repair/messages/RepairMessage.java
b/src/java/org/apache/cassandra/repair/messages/RepairMessage.java
index 6411d12c78..f0cbf78f38 100644
--- a/src/java/org/apache/cassandra/repair/messages/RepairMessage.java
+++ b/src/java/org/apache/cassandra/repair/messages/RepairMessage.java
@@ -86,6 +86,13 @@ public abstract class RepairMessage
map.put(Verb.SYNC_REQ, timeoutVersion);
map.put(Verb.VALIDATION_RSP, SUPPORTS_RETRY);
map.put(Verb.SYNC_RSP, SUPPORTS_RETRY);
+ // IR messages
+ map.put(Verb.PREPARE_CONSISTENT_REQ, SUPPORTS_RETRY);
+ map.put(Verb.PREPARE_CONSISTENT_RSP, SUPPORTS_RETRY);
+ map.put(Verb.FINALIZE_PROPOSE_MSG, SUPPORTS_RETRY);
+ map.put(Verb.FINALIZE_PROMISE_MSG, SUPPORTS_RETRY);
+ map.put(Verb.FINALIZE_COMMIT_MSG, SUPPORTS_RETRY);
+ map.put(Verb.FAILED_SESSION_MSG, SUPPORTS_RETRY);
VERB_TIMEOUT_VERSIONS = Collections.unmodifiableMap(map);
EnumSet<Verb> allowsRetry = EnumSet.noneOf(Verb.class);
@@ -96,6 +103,13 @@ public abstract class RepairMessage
allowsRetry.add(Verb.SYNC_RSP);
allowsRetry.add(Verb.SNAPSHOT_MSG);
allowsRetry.add(Verb.CLEANUP_MSG);
+ // IR messages
+ allowsRetry.add(Verb.PREPARE_CONSISTENT_REQ);
+ allowsRetry.add(Verb.PREPARE_CONSISTENT_RSP);
+ allowsRetry.add(Verb.FINALIZE_PROPOSE_MSG);
+ allowsRetry.add(Verb.FINALIZE_PROMISE_MSG);
+ allowsRetry.add(Verb.FINALIZE_COMMIT_MSG);
+ allowsRetry.add(Verb.FAILED_SESSION_MSG);
ALLOWS_RETRY = Collections.unmodifiableSet(allowsRetry);
}
@@ -154,6 +168,11 @@ public abstract class RepairMessage
sendMessageWithRetries(ctx, backoff(ctx, verb), always(), request,
verb, endpoint, NOOP_CALLBACK, 0);
}
+ public static void sendMessageWithRetries(SharedContext ctx,
Supplier<Boolean> allowRetry, RepairMessage request, Verb verb,
InetAddressAndPort endpoint)
+ {
+ sendMessageWithRetries(ctx, backoff(ctx, verb), allowRetry, request,
verb, endpoint, NOOP_CALLBACK, 0);
+ }
+
@VisibleForTesting
static <T> void sendMessageWithRetries(SharedContext ctx, Backoff backoff,
Supplier<Boolean> allowRetry, RepairMessage request, Verb verb,
InetAddressAndPort endpoint, RequestCallback<T> finalCallback, int attempt)
{
@@ -214,7 +233,7 @@ public abstract class RepairMessage
}
else
{
- noSpam.warn("{} Failure for repair verb " + verb + ";
could not complete within {} attempts", prefix, attempt);
+ noSpam.warn("{} {} failure for repair verb " + verb + ";
could not complete within {} attempts", prefix, reason, attempt);
RepairMetrics.retryFailure(verb);
}
}
@@ -279,4 +298,15 @@ public abstract class RepairMessage
return ErrorHandling.TIMEOUT;
return ErrorHandling.NONE;
}
+
+ public static void sendFailureResponse(SharedContext ctx, Message<?>
respondTo)
+ {
+ Message<?> reply =
respondTo.failureResponse(RequestFailureReason.UNKNOWN);
+ ctx.messaging().send(reply, respondTo.from());
+ }
+
+ public static void sendAck(SharedContext ctx, Message<? extends
RepairMessage> message)
+ {
+ ctx.messaging().send(message.emptyResponse(), message.from());
+ }
}
diff --git a/test/unit/org/apache/cassandra/net/MatcherResponse.java
b/test/unit/org/apache/cassandra/net/MatcherResponse.java
index d7b37591df..cad03fcc71 100644
--- a/test/unit/org/apache/cassandra/net/MatcherResponse.java
+++ b/test/unit/org/apache/cassandra/net/MatcherResponse.java
@@ -187,11 +187,18 @@ public class MatcherResponse
Message<?> response = fnResponse.apply(message, to);
if (response != null)
{
- RequestCallbacks.CallbackInfo cb =
MessagingService.instance().callbacks.get(message.id(), to);
- if (cb != null)
- cb.callback.onResponse(response);
+ if (response.verb().isResponse())
+ {
+ RequestCallbacks.CallbackInfo cb =
MessagingService.instance().callbacks.get(message.id(), to);
+ if (cb != null)
+ cb.callback.onResponse(response);
+ else
+ processResponse(response);
+ }
else
+ {
processResponse(response);
+ }
spy.matchingResponse(response);
}
diff --git
a/test/unit/org/apache/cassandra/repair/ConcurrentIrWithPreviewFuzzTest.java
b/test/unit/org/apache/cassandra/repair/ConcurrentIrWithPreviewFuzzTest.java
index eef13dc8b6..54ae644e34 100644
--- a/test/unit/org/apache/cassandra/repair/ConcurrentIrWithPreviewFuzzTest.java
+++ b/test/unit/org/apache/cassandra/repair/ConcurrentIrWithPreviewFuzzTest.java
@@ -28,9 +28,11 @@ import accord.utils.Gen;
import accord.utils.Gens;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.config.RetrySpec;
+import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.repair.consistent.LocalSessions;
import org.apache.cassandra.repair.state.Completable;
import org.apache.cassandra.utils.Closeable;
+import org.apache.cassandra.utils.FailingBiConsumer;
import org.assertj.core.api.Assertions;
import static accord.utils.Property.qt;
@@ -62,7 +64,7 @@ public class ConcurrentIrWithPreviewFuzzTest extends
FuzzTestBase
// cause a delay in validation to have more failing previews
closeables.add(cluster.nodes.get(pickParticipant(rs,
previewCoordinator, preview)).doValidation(next -> (cfs, validator) -> {
if
(validator.desc.parentSessionId.equals(preview.state.id))
- cluster.unorderedScheduled.schedule(() ->
next.accept(cfs, validator), 1, TimeUnit.HOURS);
+ delayValidation(cluster, ir, next, cfs, validator);
else next.acceptOrFail(cfs, validator);
}));
// make sure listeners don't leak
@@ -88,4 +90,18 @@ public class ConcurrentIrWithPreviewFuzzTest extends
FuzzTestBase
}
});
}
+
+ private void delayValidation(Cluster cluster, RepairCoordinator ir,
FailingBiConsumer<ColumnFamilyStore, Validator> next, ColumnFamilyStore cfs,
Validator validator)
+ {
+ cluster.unorderedScheduled.schedule(() -> {
+ // make sure to wait for IR to complete...
+ Completable.Result result = ir.state.getResult();
+ if (result == null)
+ {
+ delayValidation(cluster, ir, next, cfs, validator);
+ return;
+ }
+ next.accept(cfs, validator);
+ }, 1, TimeUnit.HOURS);
+ }
}
diff --git a/test/unit/org/apache/cassandra/repair/FailingRepairFuzzTest.java
b/test/unit/org/apache/cassandra/repair/FailingRepairFuzzTest.java
index 7c6a4601e1..39576c95f0 100644
--- a/test/unit/org/apache/cassandra/repair/FailingRepairFuzzTest.java
+++ b/test/unit/org/apache/cassandra/repair/FailingRepairFuzzTest.java
@@ -114,6 +114,7 @@ public class FailingRepairFuzzTest extends FuzzTestBase
}
cluster.processAll();
+
Assertions.assertThat(repair.state.isComplete()).describedAs("Repair job did
not complete, and no work is pending...").isTrue();
Assertions.assertThat(repair.state.getResult().kind).describedAs("Unexpected
state: %s -> %s; example %d", repair.state, repair.state.getResult(),
example).isEqualTo(Completable.Result.Kind.FAILURE);
switch (stage)
{
diff --git a/test/unit/org/apache/cassandra/repair/FuzzTestBase.java
b/test/unit/org/apache/cassandra/repair/FuzzTestBase.java
index d29f35799e..fb7702cd82 100644
--- a/test/unit/org/apache/cassandra/repair/FuzzTestBase.java
+++ b/test/unit/org/apache/cassandra/repair/FuzzTestBase.java
@@ -24,6 +24,7 @@ import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.sql.Timestamp;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
@@ -73,6 +74,7 @@ import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.cql3.CQLTester;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.Digest;
+import org.apache.cassandra.db.SystemKeyspace;
import org.apache.cassandra.db.compaction.ICompactionManager;
import org.apache.cassandra.db.marshal.EmptyType;
import org.apache.cassandra.db.repair.CassandraTableRepairManager;
@@ -280,6 +282,12 @@ public abstract class FuzzTestBase extends
CQLTester.InMemory
createSchema();
}
+ protected void cleanupRepairTables()
+ {
+ for (String table : Arrays.asList(SystemKeyspace.REPAIRS))
+ execute(String.format("TRUNCATE %s.%s",
SchemaConstants.SYSTEM_KEYSPACE_NAME, table));
+ }
+
private void createSchema()
{
// The main reason to use random here with a fixed seed is just to
have a set of tables that are not hard coded.
@@ -329,12 +337,8 @@ public abstract class FuzzTestBase extends
CQLTester.InMemory
switch (message.verb())
{
// these messages are not resilent to ephemeral issues
- case PREPARE_CONSISTENT_REQ:
- case PREPARE_CONSISTENT_RSP:
- case FINALIZE_PROPOSE_MSG:
- case FINALIZE_PROMISE_MSG:
- case FINALIZE_COMMIT_MSG:
- case FAILED_SESSION_MSG:
+ case STATUS_REQ:
+ case STATUS_RSP:
noFaults.add(message.id());
return Faults.NONE;
default:
@@ -357,7 +361,10 @@ public abstract class FuzzTestBase extends
CQLTester.InMemory
static void assertSuccess(int example, boolean shouldSync,
RepairCoordinator repair)
{
-
Assertions.assertThat(repair.state.getResult()).describedAs("Unexpected state:
%s -> %s; example %d", repair.state, repair.state.getResult(),
example).isEqualTo(Completable.Result.success(repairSuccessMessage(repair)));
+ Completable.Result result = repair.state.getResult();
+ Assertions.assertThat(result)
+ .describedAs("Expected repair to have completed with
success, but is still running... %s; example %d", repair.state,
example).isNotNull()
+ .describedAs("Unexpected state: %s -> %s; example %d",
repair.state, result,
example).isEqualTo(Completable.Result.success(repairSuccessMessage(repair)));
Assertions.assertThat(repair.state.getStateTimesMillis().keySet()).isEqualTo(EnumSet.allOf(CoordinatorState.State.class));
Assertions.assertThat(repair.state.getSessions()).isNotEmpty();
boolean shouldSnapshot = repair.state.options.getParallelism() !=
RepairParallelism.PARALLEL
@@ -643,6 +650,8 @@ public abstract class FuzzTestBase extends
CQLTester.InMemory
orderedExecutor =
globalExecutor.configureSequential("ignore").build();
unorderedScheduled = globalExecutor.scheduled("ignored");
+
+
// We run tests in an isolated JVM per class, so not cleaing up is
safe... but if that assumption ever changes, will need to cleanup
Stage.ANTI_ENTROPY.unsafeSetExecutor(orderedExecutor);
Stage.INTERNAL_RESPONSE.unsafeSetExecutor(unorderedScheduled);
@@ -747,7 +756,7 @@ public abstract class FuzzTestBase extends
CQLTester.InMemory
}
}
- private static class CallbackContext
+ private class CallbackContext
{
final RequestCallback callback;
@@ -803,6 +812,8 @@ public abstract class FuzzTestBase extends
CQLTester.InMemory
CallbackContext cb;
if (callback != null)
{
+ if (callbacks.containsKey(message.id()))
+ throw new AssertionError("Message id " + message.id()
+ " already has a callback");
cb = new CallbackContext(callback);
callbacks.put(message.id(), cb);
}
@@ -854,7 +865,14 @@ public abstract class FuzzTestBase extends
CQLTester.InMemory
if (ctx != null)
{
assert ctx == cb;
- ctx.onFailure(to,
RequestFailureReason.TIMEOUT);
+ try
+ {
+ ctx.onFailure(to,
RequestFailureReason.TIMEOUT);
+ }
+ catch (Throwable t)
+ {
+ failures.add(t);
+ }
}
}, message.verb().expiresAfterNanos(),
TimeUnit.NANOSECONDS);
}
diff --git a/test/unit/org/apache/cassandra/repair/SlowMessageFuzzTest.java
b/test/unit/org/apache/cassandra/repair/SlowMessageFuzzTest.java
index 00fb2817cb..03c151ec68 100644
--- a/test/unit/org/apache/cassandra/repair/SlowMessageFuzzTest.java
+++ b/test/unit/org/apache/cassandra/repair/SlowMessageFuzzTest.java
@@ -39,6 +39,7 @@ public class SlowMessageFuzzTest extends FuzzTestBase
// to avoid unlucky timing issues, retry until success; given enough
retries we should eventually become success
DatabaseDescriptor.getRepairRetrySpec().maxAttempts = new
RetrySpec.MaxAttempt(Integer.MAX_VALUE);
qt().withPure(false).withExamples(10).check(rs -> {
+ cleanupRepairTables();
Cluster cluster = new Cluster(rs);
enableMessageFaults(cluster);
diff --git
a/test/unit/org/apache/cassandra/repair/consistent/CoordinatorSessionTest.java
b/test/unit/org/apache/cassandra/repair/consistent/CoordinatorSessionTest.java
index d10aa16cf8..1f91b99795 100644
---
a/test/unit/org/apache/cassandra/repair/consistent/CoordinatorSessionTest.java
+++
b/test/unit/org/apache/cassandra/repair/consistent/CoordinatorSessionTest.java
@@ -21,7 +21,6 @@ package org.apache.cassandra.repair.consistent;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
-import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@@ -34,6 +33,7 @@ import com.google.common.collect.Sets;
import org.junit.Assert;
import org.junit.Test;
+import org.apache.cassandra.net.Verb;
import org.apache.cassandra.repair.SharedContext;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
@@ -44,8 +44,10 @@ import org.apache.cassandra.repair.CoordinatedRepairResult;
import org.apache.cassandra.repair.RepairSessionResult;
import org.apache.cassandra.repair.messages.FailSession;
import org.apache.cassandra.repair.messages.FinalizeCommit;
+import org.apache.cassandra.repair.messages.FinalizePromise;
import org.apache.cassandra.repair.messages.FinalizePropose;
import org.apache.cassandra.repair.messages.PrepareConsistentRequest;
+import org.apache.cassandra.repair.messages.PrepareConsistentResponse;
import org.apache.cassandra.repair.messages.RepairMessage;
import org.apache.cassandra.utils.concurrent.AsyncPromise;
import org.apache.cassandra.utils.concurrent.Future;
@@ -82,7 +84,10 @@ public class CoordinatorSessionTest extends
AbstractRepairTest
static InstrumentedCoordinatorSession createInstrumentedSession()
{
- return new InstrumentedCoordinatorSession(createBuilder());
+ MockMessaging msg = new MockMessaging();
+ CoordinatorSession.Builder builder = createBuilder();
+ builder.withContext(SharedContext.Global.instance.withMessaging(msg));
+ return new InstrumentedCoordinatorSession(msg, builder);
}
private static RepairSessionResult createResult(CoordinatorSession
coordinator)
@@ -99,20 +104,11 @@ public class CoordinatorSessionTest extends
AbstractRepairTest
private static class InstrumentedCoordinatorSession extends
CoordinatorSession
{
- public InstrumentedCoordinatorSession(Builder builder)
+ private final Map<InetAddressAndPort, List<RepairMessage>>
sentMessages;
+ public InstrumentedCoordinatorSession(MockMessaging messaging, Builder
builder)
{
super(builder);
- }
-
- Map<InetAddressAndPort, List<RepairMessage>> sentMessages = new
HashMap<>();
-
- protected void sendMessage(InetAddressAndPort destination,
Message<RepairMessage> message)
- {
- if (!sentMessages.containsKey(destination))
- {
- sentMessages.put(destination, new ArrayList<>());
- }
- sentMessages.get(destination).add(message.payload);
+ this.sentMessages = messaging.sentMessages;
}
Runnable onSetRepairing = null;
@@ -242,17 +238,17 @@ public class CoordinatorSessionTest extends
AbstractRepairTest
// participants respond to coordinator, and repair begins once all
participants have responded with success
Assert.assertEquals(ConsistentSession.State.PREPARING,
coordinator.getState());
-
- coordinator.handlePrepareResponse(PARTICIPANT1, true);
+
+
coordinator.handlePrepareResponse(Message.out(Verb.PREPARE_CONSISTENT_RSP, new
PrepareConsistentResponse(coordinator.sessionID, PARTICIPANT1, true)));
Assert.assertEquals(ConsistentSession.State.PREPARING,
coordinator.getState());
-
- coordinator.handlePrepareResponse(PARTICIPANT2, true);
+
+
coordinator.handlePrepareResponse(Message.out(Verb.PREPARE_CONSISTENT_RSP, new
PrepareConsistentResponse(coordinator.sessionID, PARTICIPANT2, true)));
Assert.assertEquals(ConsistentSession.State.PREPARING,
coordinator.getState());
// set the setRepairing callback to verify the correct state when it's
called
Assert.assertFalse(coordinator.setRepairingCalled);
coordinator.onSetRepairing = () -> Assert.assertEquals(PREPARED,
coordinator.getState());
- coordinator.handlePrepareResponse(PARTICIPANT3, true);
+
coordinator.handlePrepareResponse(Message.out(Verb.PREPARE_CONSISTENT_RSP, new
PrepareConsistentResponse(coordinator.sessionID, PARTICIPANT3, true)));
Assert.assertTrue(coordinator.setRepairingCalled);
Assert.assertTrue(repairSubmitted.get());
@@ -276,16 +272,16 @@ public class CoordinatorSessionTest extends
AbstractRepairTest
coordinator.sentMessages.clear();
Assert.assertEquals(ConsistentSession.State.REPAIRING,
coordinator.getState());
- coordinator.handleFinalizePromise(PARTICIPANT1, true);
+
coordinator.handleFinalizePromise(Message.out(Verb.FINALIZE_PROMISE_MSG, new
FinalizePromise(coordinator.sessionID, PARTICIPANT1, true)));
Assert.assertEquals(ConsistentSession.State.REPAIRING,
coordinator.getState());
- coordinator.handleFinalizePromise(PARTICIPANT2, true);
+
coordinator.handleFinalizePromise(Message.out(Verb.FINALIZE_PROMISE_MSG, new
FinalizePromise(coordinator.sessionID, PARTICIPANT2, true)));
Assert.assertEquals(ConsistentSession.State.REPAIRING,
coordinator.getState());
// set the finalizeCommit callback so we can verify the state when
it's called
Assert.assertFalse(coordinator.finalizeCommitCalled);
coordinator.onFinalizeCommit = () ->
Assert.assertEquals(FINALIZE_PROMISED, coordinator.getState());
- coordinator.handleFinalizePromise(PARTICIPANT3, true);
+
coordinator.handleFinalizePromise(Message.out(Verb.FINALIZE_PROMISE_MSG, new
FinalizePromise(coordinator.sessionID, PARTICIPANT3, true)));
Assert.assertTrue(coordinator.finalizeCommitCalled);
Assert.assertEquals(ConsistentSession.State.FINALIZED,
coordinator.getState());
@@ -324,16 +320,16 @@ public class CoordinatorSessionTest extends
AbstractRepairTest
// participants respond to coordinator, and repair begins once all
participants have responded with success
Assert.assertEquals(ConsistentSession.State.PREPARING,
coordinator.getState());
- coordinator.handlePrepareResponse(PARTICIPANT1, true);
+
coordinator.handlePrepareResponse(Message.out(Verb.PREPARE_CONSISTENT_RSP, new
PrepareConsistentResponse(coordinator.sessionID, PARTICIPANT1, true)));
Assert.assertEquals(ConsistentSession.State.PREPARING,
coordinator.getState());
- coordinator.handlePrepareResponse(PARTICIPANT2, true);
+
coordinator.handlePrepareResponse(Message.out(Verb.PREPARE_CONSISTENT_RSP, new
PrepareConsistentResponse(coordinator.sessionID, PARTICIPANT2, true)));
Assert.assertEquals(ConsistentSession.State.PREPARING,
coordinator.getState());
// set the setRepairing callback to verify the correct state when it's
called
Assert.assertFalse(coordinator.setRepairingCalled);
coordinator.onSetRepairing = () -> Assert.assertEquals(PREPARED,
coordinator.getState());
- coordinator.handlePrepareResponse(PARTICIPANT3, true);
+
coordinator.handlePrepareResponse(Message.out(Verb.PREPARE_CONSISTENT_RSP, new
PrepareConsistentResponse(coordinator.sessionID, PARTICIPANT3, true)));
Assert.assertTrue(coordinator.setRepairingCalled);
Assert.assertTrue(repairSubmitted.get());
@@ -388,14 +384,14 @@ public class CoordinatorSessionTest extends
AbstractRepairTest
// participants respond to coordinator, and repair begins once all
participants have responded
Assert.assertEquals(ConsistentSession.State.PREPARING,
coordinator.getState());
- coordinator.handlePrepareResponse(PARTICIPANT1, true);
+
coordinator.handlePrepareResponse(Message.out(Verb.PREPARE_CONSISTENT_RSP, new
PrepareConsistentResponse(coordinator.sessionID, PARTICIPANT1, true)));
Assert.assertEquals(ConsistentSession.State.PREPARING,
coordinator.getState());
Assert.assertEquals(PREPARED,
coordinator.getParticipantState(PARTICIPANT1));
Assert.assertFalse(sessionResult.isDone());
// participant 2 fails to prepare for consistent repair
Assert.assertFalse(coordinator.failCalled);
- coordinator.handlePrepareResponse(PARTICIPANT2, false);
+
coordinator.handlePrepareResponse(Message.out(Verb.PREPARE_CONSISTENT_RSP, new
PrepareConsistentResponse(coordinator.sessionID, PARTICIPANT2, false)));
Assert.assertEquals(ConsistentSession.State.PREPARING,
coordinator.getState());
// we should have sent failure messages to the other participants, but
not yet marked them failed internally
assertMessageSent(coordinator, PARTICIPANT1, new
FailSession(coordinator.sessionID));
@@ -411,7 +407,7 @@ public class CoordinatorSessionTest extends
AbstractRepairTest
// last outstanding response should cause repair to complete in failed
state
Assert.assertFalse(coordinator.setRepairingCalled);
coordinator.onSetRepairing = Assert::fail;
- coordinator.handlePrepareResponse(PARTICIPANT3, true);
+
coordinator.handlePrepareResponse(Message.out(Verb.PREPARE_CONSISTENT_RSP, new
PrepareConsistentResponse(coordinator.sessionID, PARTICIPANT3, true)));
Assert.assertTrue(coordinator.failCalled);
Assert.assertFalse(coordinator.setRepairingCalled);
Assert.assertFalse(repairSubmitted.get());
@@ -453,16 +449,16 @@ public class CoordinatorSessionTest extends
AbstractRepairTest
// participants respond to coordinator, and repair begins once all
participants have responded with success
Assert.assertEquals(ConsistentSession.State.PREPARING,
coordinator.getState());
- coordinator.handlePrepareResponse(PARTICIPANT1, true);
+
coordinator.handlePrepareResponse(Message.out(Verb.PREPARE_CONSISTENT_RSP, new
PrepareConsistentResponse(coordinator.sessionID, PARTICIPANT1, true)));
Assert.assertEquals(ConsistentSession.State.PREPARING,
coordinator.getState());
- coordinator.handlePrepareResponse(PARTICIPANT2, true);
+
coordinator.handlePrepareResponse(Message.out(Verb.PREPARE_CONSISTENT_RSP, new
PrepareConsistentResponse(coordinator.sessionID, PARTICIPANT2, true)));
Assert.assertEquals(ConsistentSession.State.PREPARING,
coordinator.getState());
// set the setRepairing callback to verify the correct state when it's
called
Assert.assertFalse(coordinator.setRepairingCalled);
coordinator.onSetRepairing = () -> Assert.assertEquals(PREPARED,
coordinator.getState());
- coordinator.handlePrepareResponse(PARTICIPANT3, true);
+
coordinator.handlePrepareResponse(Message.out(Verb.PREPARE_CONSISTENT_RSP, new
PrepareConsistentResponse(coordinator.sessionID, PARTICIPANT3, true)));
Assert.assertTrue(coordinator.setRepairingCalled);
Assert.assertTrue(repairSubmitted.get());
@@ -486,18 +482,18 @@ public class CoordinatorSessionTest extends
AbstractRepairTest
coordinator.sentMessages.clear();
Assert.assertEquals(ConsistentSession.State.REPAIRING,
coordinator.getState());
- coordinator.handleFinalizePromise(PARTICIPANT1, true);
+
coordinator.handleFinalizePromise(Message.out(Verb.FINALIZE_PROMISE_MSG, new
FinalizePromise(coordinator.sessionID, PARTICIPANT1, true)));
Assert.assertEquals(ConsistentSession.State.REPAIRING,
coordinator.getState());
Assert.assertFalse(coordinator.failCalled);
- coordinator.handleFinalizePromise(PARTICIPANT2, false);
+
coordinator.handleFinalizePromise(Message.out(Verb.FINALIZE_PROMISE_MSG, new
FinalizePromise(coordinator.sessionID, PARTICIPANT2, false)));
Assert.assertEquals(ConsistentSession.State.FAILED,
coordinator.getState());
Assert.assertTrue(coordinator.failCalled);
// additional success messages should be ignored
Assert.assertFalse(coordinator.finalizeCommitCalled);
coordinator.onFinalizeCommit = Assert::fail;
- coordinator.handleFinalizePromise(PARTICIPANT3, true);
+
coordinator.handleFinalizePromise(Message.out(Verb.FINALIZE_PROMISE_MSG, new
FinalizePromise(coordinator.sessionID, PARTICIPANT3, true)));
Assert.assertFalse(coordinator.finalizeCommitCalled);
Assert.assertEquals(ConsistentSession.State.FAILED,
coordinator.getState());
diff --git
a/test/unit/org/apache/cassandra/repair/consistent/CoordinatorSessionsTest.java
b/test/unit/org/apache/cassandra/repair/consistent/CoordinatorSessionsTest.java
index 9f0ee4635d..cd9dfba4ed 100644
---
a/test/unit/org/apache/cassandra/repair/consistent/CoordinatorSessionsTest.java
+++
b/test/unit/org/apache/cassandra/repair/consistent/CoordinatorSessionsTest.java
@@ -26,6 +26,8 @@ import org.junit.BeforeClass;
import org.junit.Test;
import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.Verb;
import org.apache.cassandra.repair.SharedContext;
import org.apache.cassandra.cql3.statements.schema.CreateTableStatement;
import org.apache.cassandra.repair.AbstractRepairTest;
@@ -60,21 +62,23 @@ public class CoordinatorSessionsTest extends
AbstractRepairTest
int prepareResponseCalls = 0;
InetAddressAndPort preparePeer = null;
boolean prepareSuccess = false;
- public synchronized void handlePrepareResponse(InetAddressAndPort
participant, boolean success)
+ @Override
+ public synchronized void
handlePrepareResponse(Message<PrepareConsistentResponse> msg)
{
prepareResponseCalls++;
- preparePeer = participant;
- prepareSuccess = success;
+ preparePeer = msg.payload.participant;
+ prepareSuccess = msg.payload.success;
}
int finalizePromiseCalls = 0;
InetAddressAndPort promisePeer = null;
boolean promiseSuccess = false;
- public synchronized void handleFinalizePromise(InetAddressAndPort
participant, boolean success)
+ @Override
+ public synchronized void
handleFinalizePromise(Message<FinalizePromise> message)
{
finalizePromiseCalls++;
- promisePeer = participant;
- promiseSuccess = success;
+ promisePeer = message.payload.participant;
+ promiseSuccess = message.payload.promised;
}
int failCalls = 0;
@@ -150,7 +154,7 @@ public class CoordinatorSessionsTest extends
AbstractRepairTest
InstrumentedCoordinatorSession session =
sessions.registerSession(sessionID, PARTICIPANTS, false);
Assert.assertEquals(0, session.prepareResponseCalls);
- sessions.handlePrepareResponse(new
PrepareConsistentResponse(sessionID, PARTICIPANT1, true));
+
sessions.handlePrepareResponse(Message.builder(Verb.PREPARE_CONSISTENT_RSP, new
PrepareConsistentResponse(sessionID, PARTICIPANT1, true)).build());
Assert.assertEquals(1, session.prepareResponseCalls);
Assert.assertEquals(PARTICIPANT1, session.preparePeer);
Assert.assertTrue(session.prepareSuccess);
@@ -162,7 +166,7 @@ public class CoordinatorSessionsTest extends
AbstractRepairTest
InstrumentedCoordinatorSessions sessions = new
InstrumentedCoordinatorSessions();
TimeUUID fakeID = nextTimeUUID();
- sessions.handlePrepareResponse(new PrepareConsistentResponse(fakeID,
PARTICIPANT1, true));
+
sessions.handlePrepareResponse(Message.builder(Verb.PREPARE_CONSISTENT_RSP, new
PrepareConsistentResponse(fakeID, PARTICIPANT1, true)).build());
Assert.assertNull(sessions.getSession(fakeID));
}
@@ -175,7 +179,7 @@ public class CoordinatorSessionsTest extends
AbstractRepairTest
InstrumentedCoordinatorSession session =
sessions.registerSession(sessionID, PARTICIPANTS, false);
Assert.assertEquals(0, session.finalizePromiseCalls);
- sessions.handleFinalizePromiseMessage(new FinalizePromise(sessionID,
PARTICIPANT1, true));
+
sessions.handleFinalizePromiseMessage(Message.builder(Verb.FINALIZE_PROMISE_MSG,
new FinalizePromise(sessionID, PARTICIPANT1, true)).build());
Assert.assertEquals(1, session.finalizePromiseCalls);
Assert.assertEquals(PARTICIPANT1, session.promisePeer);
Assert.assertTrue(session.promiseSuccess);
@@ -187,7 +191,7 @@ public class CoordinatorSessionsTest extends
AbstractRepairTest
InstrumentedCoordinatorSessions sessions = new
InstrumentedCoordinatorSessions();
TimeUUID fakeID = nextTimeUUID();
- sessions.handleFinalizePromiseMessage(new FinalizePromise(fakeID,
PARTICIPANT1, true));
+
sessions.handleFinalizePromiseMessage(Message.builder(Verb.FINALIZE_PROMISE_MSG,
new FinalizePromise(fakeID, PARTICIPANT1, true)).build());
Assert.assertNull(sessions.getSession(fakeID));
}
diff --git
a/test/unit/org/apache/cassandra/repair/consistent/LocalSessionTest.java
b/test/unit/org/apache/cassandra/repair/consistent/LocalSessionTest.java
index 0f9912437e..e19f8abca9 100644
--- a/test/unit/org/apache/cassandra/repair/consistent/LocalSessionTest.java
+++ b/test/unit/org/apache/cassandra/repair/consistent/LocalSessionTest.java
@@ -39,6 +39,7 @@ import org.junit.BeforeClass;
import org.junit.Test;
import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.net.Verb;
import org.apache.cassandra.repair.SharedContext;
import org.apache.cassandra.cql3.QueryProcessor;
import org.apache.cassandra.cql3.UntypedResultSet;
@@ -130,13 +131,20 @@ public class LocalSessionTest extends AbstractRepairTest
static class InstrumentedLocalSessions extends LocalSessions
{
- Map<InetAddressAndPort, List<RepairMessage>> sentMessages = new
HashMap<>();
+ final Map<InetAddressAndPort, List<RepairMessage>> sentMessages;
public InstrumentedLocalSessions()
{
- super(SharedContext.Global.instance);
+ this(new MockMessaging());
}
+ private InstrumentedLocalSessions(MockMessaging messaging)
+ {
+ super(SharedContext.Global.instance.withMessaging(messaging));
+ sentMessages = messaging.sentMessages;
+ }
+
+ @Override
protected void sendMessage(InetAddressAndPort destination, Message<?
extends RepairMessage> message)
{
if (!sentMessages.containsKey(destination))
@@ -178,7 +186,7 @@ public class LocalSessionTest extends AbstractRepairTest
public LocalSession prepareForTest(TimeUUID sessionID)
{
prepareSessionFuture = new AsyncPromise<>();
- handlePrepareMessage(PARTICIPANT1, new
PrepareConsistentRequest(sessionID, COORDINATOR, PARTICIPANTS));
+ handlePrepareMessage(Message.builder(Verb.PREPARE_CONSISTENT_REQ,
new PrepareConsistentRequest(sessionID, COORDINATOR,
PARTICIPANTS)).from(PARTICIPANT1).build());
prepareSessionFuture.trySuccess(null);
sentMessages.clear();
return getSession(sessionID);
@@ -281,7 +289,7 @@ public class LocalSessionTest extends AbstractRepairTest
// replacing future so we can inspect state before and after anti
compaction callback
sessions.prepareSessionFuture = new AsyncPromise<>();
Assert.assertFalse(sessions.prepareSessionCalled);
- sessions.handlePrepareMessage(PARTICIPANT1, new
PrepareConsistentRequest(sessionID, COORDINATOR, PARTICIPANTS));
+
sessions.handlePrepareMessage(Message.builder(Verb.PREPARE_CONSISTENT_REQ, new
PrepareConsistentRequest(sessionID, COORDINATOR,
PARTICIPANTS)).from(PARTICIPANT1).build());
Assert.assertTrue(sessions.prepareSessionCalled);
Assert.assertTrue(sessions.sentMessages.isEmpty());
@@ -316,7 +324,7 @@ public class LocalSessionTest extends AbstractRepairTest
// replacing future so we can inspect state before and after anti
compaction callback
sessions.prepareSessionFuture = new AsyncPromise<>();
Assert.assertFalse(sessions.prepareSessionCalled);
- sessions.handlePrepareMessage(PARTICIPANT1, new
PrepareConsistentRequest(sessionID, COORDINATOR, PARTICIPANTS));
+
sessions.handlePrepareMessage(Message.builder(Verb.PREPARE_CONSISTENT_REQ, new
PrepareConsistentRequest(sessionID, COORDINATOR,
PARTICIPANTS)).from(PARTICIPANT1).build());
Assert.assertTrue(sessions.prepareSessionCalled);
Assert.assertTrue(sessions.sentMessages.isEmpty());
@@ -348,7 +356,7 @@ public class LocalSessionTest extends AbstractRepairTest
{
TimeUUID sessionID = nextTimeUUID();
InstrumentedLocalSessions sessions = new InstrumentedLocalSessions();
- sessions.handlePrepareMessage(PARTICIPANT1, new
PrepareConsistentRequest(sessionID, COORDINATOR, PARTICIPANTS));
+
sessions.handlePrepareMessage(Message.builder(Verb.PREPARE_CONSISTENT_REQ, new
PrepareConsistentRequest(sessionID, COORDINATOR,
PARTICIPANTS)).from(PARTICIPANT1).build());
Assert.assertNull(sessions.getSession(sessionID));
assertMessagesSent(sessions, COORDINATOR, new
PrepareConsistentResponse(sessionID, PARTICIPANT1, false));
}
@@ -372,7 +380,7 @@ public class LocalSessionTest extends AbstractRepairTest
};
sessions.start();
- sessions.handlePrepareMessage(PARTICIPANT1, new
PrepareConsistentRequest(sessionID, COORDINATOR, PARTICIPANTS));
+
sessions.handlePrepareMessage(Message.builder(Verb.PREPARE_CONSISTENT_REQ, new
PrepareConsistentRequest(sessionID, COORDINATOR,
PARTICIPANTS)).from(PARTICIPANT1).build());
BooleanSupplier isCancelled = isCancelledRef.get();
Assert.assertNotNull(isCancelled);
@@ -470,7 +478,7 @@ public class LocalSessionTest extends AbstractRepairTest
// should send a promised message to coordinator and set session state
accordingly
sessions.sentMessages.clear();
- sessions.handleFinalizeProposeMessage(COORDINATOR, new
FinalizePropose(sessionID));
+
sessions.handleFinalizeProposeMessage(Message.builder(Verb.FINALIZE_PROPOSE_MSG,
new FinalizePropose(sessionID)).from(COORDINATOR).build());
Assert.assertEquals(FINALIZE_PROMISED, session.getState());
Assert.assertEquals(session, sessions.loadUnsafe(sessionID));
assertMessagesSent(sessions, COORDINATOR, new
FinalizePromise(sessionID, PARTICIPANT1, true));
@@ -492,7 +500,7 @@ public class LocalSessionTest extends AbstractRepairTest
// should fail the session and send a failure message to the
coordinator
sessions.sentMessages.clear();
- sessions.handleFinalizeProposeMessage(COORDINATOR, new
FinalizePropose(sessionID));
+
sessions.handleFinalizeProposeMessage(Message.builder(Verb.FINALIZE_PROPOSE_MSG,
new FinalizePropose(sessionID)).from(COORDINATOR).build());
Assert.assertEquals(FAILED, session.getState());
Assert.assertEquals(session, sessions.loadUnsafe(sessionID));
assertMessagesSent(sessions, COORDINATOR, new FailSession(sessionID));
@@ -503,7 +511,7 @@ public class LocalSessionTest extends AbstractRepairTest
{
InstrumentedLocalSessions sessions = new InstrumentedLocalSessions();
TimeUUID fakeID = nextTimeUUID();
- sessions.handleFinalizeProposeMessage(COORDINATOR, new
FinalizePropose(fakeID));
+
sessions.handleFinalizeProposeMessage(Message.builder(Verb.FINALIZE_PROPOSE_MSG,
new FinalizePropose(fakeID)).from(COORDINATOR).build());
Assert.assertNull(sessions.getSession(fakeID));
assertMessagesSent(sessions, COORDINATOR, new FailSession(fakeID));
}
@@ -522,12 +530,12 @@ public class LocalSessionTest extends AbstractRepairTest
// create session and move to finalized promised
sessions.prepareForTest(sessionID);
sessions.maybeSetRepairing(sessionID);
- sessions.handleFinalizeProposeMessage(COORDINATOR, new
FinalizePropose(sessionID));
+
sessions.handleFinalizeProposeMessage(Message.builder(Verb.FINALIZE_PROPOSE_MSG,
new FinalizePropose(sessionID)).from(COORDINATOR).build());
Assert.assertEquals(0, (int)
sessions.completedSessions.getOrDefault(sessionID, 0));
sessions.sentMessages.clear();
LocalSession session = sessions.getSession(sessionID);
- sessions.handleFinalizeCommitMessage(PARTICIPANT1, new
FinalizeCommit(sessionID));
+
sessions.handleFinalizeCommitMessage(Message.builder(Verb.FINALIZE_COMMIT_MSG,
new FinalizeCommit(sessionID)).from(PARTICIPANT1).build());
Assert.assertEquals(FINALIZED, session.getState());
Assert.assertEquals(session, sessions.loadUnsafe(sessionID));
@@ -541,7 +549,7 @@ public class LocalSessionTest extends AbstractRepairTest
InstrumentedLocalSessions sessions = new InstrumentedLocalSessions();
sessions.start();
TimeUUID fakeID = nextTimeUUID();
- sessions.handleFinalizeCommitMessage(PARTICIPANT1, new
FinalizeCommit(fakeID));
+
sessions.handleFinalizeCommitMessage(Message.builder(Verb.FINALIZE_COMMIT_MSG,
new FinalizeCommit(fakeID)).from(PARTICIPANT1).build());
Assert.assertNull(sessions.getSession(fakeID));
Assert.assertTrue(sessions.sentMessages.isEmpty());
}
@@ -717,7 +725,7 @@ public class LocalSessionTest extends AbstractRepairTest
InstrumentedLocalSessions sessions = new InstrumentedLocalSessions();
sessions.start();
sessions.prepareSessionFuture = new AsyncPromise<>(); // prevent
moving to prepared
- sessions.handlePrepareMessage(PARTICIPANT1, new
PrepareConsistentRequest(sessionID, COORDINATOR, PARTICIPANTS));
+
sessions.handlePrepareMessage(Message.builder(Verb.PREPARE_CONSISTENT_REQ, new
PrepareConsistentRequest(sessionID, COORDINATOR,
PARTICIPANTS)).from(PARTICIPANT1).build());
LocalSession session = sessions.getSession(sessionID);
Assert.assertNotNull(session);
@@ -744,7 +752,7 @@ public class LocalSessionTest extends AbstractRepairTest
InstrumentedLocalSessions sessions = new InstrumentedLocalSessions();
sessions.start();
sessions.prepareSessionFuture = new AsyncPromise<>();
- sessions.handlePrepareMessage(PARTICIPANT1, new
PrepareConsistentRequest(sessionID, COORDINATOR, PARTICIPANTS));
+
sessions.handlePrepareMessage(Message.builder(Verb.PREPARE_CONSISTENT_REQ, new
PrepareConsistentRequest(sessionID, COORDINATOR,
PARTICIPANTS)).from(PARTICIPANT1).build());
sessions.prepareSessionFuture.trySuccess(null);
Assert.assertTrue(sessions.isSessionInProgress(sessionID));
@@ -770,8 +778,8 @@ public class LocalSessionTest extends AbstractRepairTest
sessions.prepareForTest(sessionID);
sessions.maybeSetRepairing(sessionID);
- sessions.handleFinalizeProposeMessage(COORDINATOR, new
FinalizePropose(sessionID));
- sessions.handleFinalizeCommitMessage(PARTICIPANT1, new
FinalizeCommit(sessionID));
+
sessions.handleFinalizeProposeMessage(Message.builder(Verb.FINALIZE_PROPOSE_MSG,
new FinalizePropose(sessionID)).from(COORDINATOR).build());
+
sessions.handleFinalizeCommitMessage(Message.builder(Verb.FINALIZE_COMMIT_MSG,
new FinalizeCommit(sessionID)).from(PARTICIPANT1).build());
LocalSession session = sessions.getSession(sessionID);
Assert.assertTrue(session.repairedAt !=
ActiveRepairService.UNREPAIRED_SSTABLE);
diff --git
a/test/unit/org/apache/cassandra/repair/consistent/MockMessaging.java
b/test/unit/org/apache/cassandra/repair/consistent/MockMessaging.java
new file mode 100644
index 0000000000..5a59ef2639
--- /dev/null
+++ b/test/unit/org/apache/cassandra/repair/consistent/MockMessaging.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.repair.consistent;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.net.ConnectionType;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.MessageDelivery;
+import org.apache.cassandra.net.NoPayload;
+import org.apache.cassandra.net.RequestCallback;
+import org.apache.cassandra.net.Verb;
+import org.apache.cassandra.repair.messages.RepairMessage;
+import org.apache.cassandra.utils.concurrent.Future;
+
+class MockMessaging implements MessageDelivery
+{
+ Map<InetAddressAndPort, List<RepairMessage>> sentMessages = new
HashMap<>();
+ Map<InetAddressAndPort, Integer> acks = new HashMap<>();
+ Map<InetAddressAndPort, Integer> failures = new HashMap<>();
+
+ @Override
+ public <REQ> void send(Message<REQ> message, InetAddressAndPort
destination)
+ {
+ if (message.verb() == Verb.REPAIR_RSP && message.payload instanceof
NoPayload)
+ {
+ acks.compute(destination, (ignore, accum) -> accum == null ? 1 :
accum + 1);
+ return;
+ }
+ if (message.verb() == Verb.FAILURE_RSP)
+ {
+ failures.compute(destination, (ignore, accum) -> accum == null ?
1 : accum + 1);
+ return;
+ }
+ if (!(message.payload instanceof RepairMessage))
+ throw new AssertionError("Unexpected message: " + message);
+
+ if (!sentMessages.containsKey(destination))
+ {
+ sentMessages.put(destination, new ArrayList<>());
+ }
+ sentMessages.get(destination).add((RepairMessage) message.payload);
+ }
+
+ @Override
+ public <REQ, RSP> void sendWithCallback(Message<REQ> message,
InetAddressAndPort to, RequestCallback<RSP> cb)
+ {
+ send(message, to);
+ }
+
+ @Override
+ public <REQ, RSP> void sendWithCallback(Message<REQ> message,
InetAddressAndPort to, RequestCallback<RSP> cb, ConnectionType
specifyConnection)
+ {
+ send(message, to);
+ }
+
+ @Override
+ public <REQ, RSP> Future<Message<RSP>> sendWithResult(Message<REQ>
message, InetAddressAndPort to)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public <V> void respond(V response, Message<?> message)
+ {
+ throw new UnsupportedOperationException();
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]