This is an automated email from the ASF dual-hosted git repository.
bdeggleston pushed a commit to branch cep-45-mutation-tracking
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/cep-45-mutation-tracking by
this push:
new 2af0a2cafb CEP-45: Query forwarding
2af0a2cafb is described below
commit 2af0a2cafb1040e5215efd9066458642411f840f
Author: Abe Ratnofsky <[email protected]>
AuthorDate: Tue Mar 25 12:41:57 2025 -0400
CEP-45: Query forwarding
Patch by Abe Ratnofsky; Reviewed by Blake Eggleston for CASSANDRA-20309
---
CHANGES.txt | 1 +
.../apache/cassandra/db/MutationVerbHandler.java | 20 +-
src/java/org/apache/cassandra/net/Message.java | 7 +
.../org/apache/cassandra/net/MessagingService.java | 10 +
src/java/org/apache/cassandra/net/ParamType.java | 5 +-
.../org/apache/cassandra/net/RequestCallbacks.java | 6 +-
src/java/org/apache/cassandra/net/Verb.java | 2 +
.../cassandra/replication/CoordinatorLog.java | 3 +
.../cassandra/replication/CoordinatorLogId.java | 19 +
.../cassandra/replication/ForwardedWrite.java | 479 +++++++++++++++++++++
.../cassandra/replication/LocalMutationStates.java | 1 +
.../apache/cassandra/replication/MutationId.java | 2 +-
.../cassandra/replication/MutationSummary.java | 1 +
.../replication/MutationTrackingService.java | 6 +
.../org/apache/cassandra/replication/Shard.java | 4 +-
.../cassandra/replication/TrackedWriteRequest.java | 86 ++--
.../service/TrackedWriteResponseHandler.java | 3 +-
.../apache/cassandra/tcm/membership/NodeId.java | 22 +
.../MutationTrackingWriteForwardingTest.java | 117 +++++
19 files changed, 756 insertions(+), 38 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index fb9f8fbeed..0bc461129a 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
cep-45-mutation-tracking
+ * CEP-45: Query forwarding (CASSANDRA-20309)
* Fix mutation tracking startup (CASSANDRA-20540)
* Mutation tracking journal integration, read, and write path
(CASSANDRA-20304, CASSANDRA-20305, CASSANDRA-20308)
* Introduce MutationJournal for coordinator logs (CASSANDRA-20353)
diff --git a/src/java/org/apache/cassandra/db/MutationVerbHandler.java
b/src/java/org/apache/cassandra/db/MutationVerbHandler.java
index c30fae63b4..96e3fa7e7e 100644
--- a/src/java/org/apache/cassandra/db/MutationVerbHandler.java
+++ b/src/java/org/apache/cassandra/db/MutationVerbHandler.java
@@ -17,9 +17,13 @@
*/
package org.apache.cassandra.db;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import org.apache.cassandra.exceptions.WriteTimeoutException;
import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.net.*;
+import org.apache.cassandra.replication.ForwardedWrite;
import org.apache.cassandra.tracing.Tracing;
import static java.util.concurrent.TimeUnit.NANOSECONDS;
@@ -28,12 +32,24 @@ import static
org.apache.cassandra.utils.MonotonicClock.Global.approxTime;
public class MutationVerbHandler extends AbstractMutationVerbHandler<Mutation>
{
+ private static final Logger logger =
LoggerFactory.getLogger(MutationVerbHandler.class);
+
public static final MutationVerbHandler instance = new
MutationVerbHandler();
- private void respond(Message<?> respondTo, InetAddressAndPort
respondToAddress)
+ private void respond(Message<?> incoming, InetAddressAndPort
respondToAddress)
{
+ // Local tracked writes respond in TrackedWriteResponseHandler
+ Message<NoPayload> response = incoming.emptyResponse();
Tracing.trace("Enqueuing response to {}", respondToAddress);
- MessagingService.instance().send(respondTo.emptyResponse(),
respondToAddress);
+ logger.trace("Enqueuing response to {}", respondToAddress);
+ MessagingService.instance().send(response, respondToAddress);
+
+ ForwardedWrite.CoordinatorAckInfo ackTo =
(ForwardedWrite.CoordinatorAckInfo)
incoming.header.params().get(ParamType.COORDINATOR_ACK_INFO);
+ if (ackTo != null)
+ {
+ logger.trace("Enqueuing response for direct acknowledgement of
forwarded tracked mutation to coordinator {}", ackTo.coordinator);
+ MessagingService.instance().send(response, ackTo.coordinator);
+ }
}
private void failed()
diff --git a/src/java/org/apache/cassandra/net/Message.java
b/src/java/org/apache/cassandra/net/Message.java
index 303a86e848..9f6cb7a309 100644
--- a/src/java/org/apache/cassandra/net/Message.java
+++ b/src/java/org/apache/cassandra/net/Message.java
@@ -725,6 +725,13 @@ public class Message<T>
return this;
}
+ public Builder<T> withRequestTime(Dispatcher.RequestTime requestTime)
+ {
+ this.createdAtNanos = requestTime.startedAtNanos();
+ this.expiresAtNanos =
requestTime.computeDeadline(verb.expiresAfterNanos());
+ return this;
+ }
+
public Message<T> build()
{
if (verb == null)
diff --git a/src/java/org/apache/cassandra/net/MessagingService.java
b/src/java/org/apache/cassandra/net/MessagingService.java
index 97a019642a..7ddada9e0a 100644
--- a/src/java/org/apache/cassandra/net/MessagingService.java
+++ b/src/java/org/apache/cassandra/net/MessagingService.java
@@ -30,6 +30,7 @@ import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -43,6 +44,7 @@ import org.apache.cassandra.exceptions.RequestFailureReason;
import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.locator.Replica;
import org.apache.cassandra.metrics.MessagingMetrics;
+import org.apache.cassandra.replication.ForwardedWrite;
import org.apache.cassandra.service.AbstractWriteResponseHandler;
import org.apache.cassandra.utils.ExecutorUtils;
import org.apache.cassandra.utils.FBUtilities;
@@ -449,6 +451,14 @@ public class MessagingService extends
MessagingServiceMBeanImpl implements Messa
send(message, to.endpoint(), null);
}
+ public void sendForwardedWriteWithCallback(Message message, Replica to,
ForwardedWrite.LeaderCallback handler)
+ {
+ Preconditions.checkArgument(message.verb() == Verb.MUTATION_REQ);
+ Preconditions.checkArgument(message.callBackOnFailure());
+ callbacks.addWithExpiration(handler, message, to.endpoint());
+ send(message, to.endpoint(), null);
+ }
+
/**
* Send a message to a given endpoint. This method adheres to the fire and
forget
* style messaging.
diff --git a/src/java/org/apache/cassandra/net/ParamType.java
b/src/java/org/apache/cassandra/net/ParamType.java
index 2367b1a390..e01a3ac869 100644
--- a/src/java/org/apache/cassandra/net/ParamType.java
+++ b/src/java/org/apache/cassandra/net/ParamType.java
@@ -20,6 +20,7 @@ package org.apache.cassandra.net;
import javax.annotation.Nullable;
import org.apache.cassandra.io.IVersionedSerializer;
+import org.apache.cassandra.replication.ForwardedWrite;
import org.apache.cassandra.tracing.Tracing;
import org.apache.cassandra.utils.Int32Serializer;
import org.apache.cassandra.utils.Int64Serializer;
@@ -54,7 +55,9 @@ public enum ParamType
ROW_INDEX_READ_SIZE_WARN (13, Int64Serializer.serializer),
CUSTOM_MAP (14, CustomParamsSerializer.serializer),
TOO_MANY_REFERENCED_INDEXES_WARN (16, Int32Serializer.serializer),
- TOO_MANY_REFERENCED_INDEXES_FAIL (17, Int32Serializer.serializer);
+ TOO_MANY_REFERENCED_INDEXES_FAIL (17, Int32Serializer.serializer),
+ // Different from RESPOND_TO because it's an additional recipient of the
acknowledgement
+ COORDINATOR_ACK_INFO (18,
ForwardedWrite.CoordinatorAckInfo.serializer);
final int id;
final IVersionedSerializer serializer;
diff --git a/src/java/org/apache/cassandra/net/RequestCallbacks.java
b/src/java/org/apache/cassandra/net/RequestCallbacks.java
index ee63c5a3e6..4a46b73b5a 100644
--- a/src/java/org/apache/cassandra/net/RequestCallbacks.java
+++ b/src/java/org/apache/cassandra/net/RequestCallbacks.java
@@ -25,6 +25,7 @@ import java.util.concurrent.TimeoutException;
import javax.annotation.Nullable;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
import org.apache.cassandra.concurrent.ScheduledExecutorPlus;
import org.slf4j.Logger;
@@ -35,6 +36,7 @@ import org.apache.cassandra.exceptions.RequestFailureReason;
import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.locator.Replica;
import org.apache.cassandra.metrics.InternodeOutboundMetrics;
+import org.apache.cassandra.replication.ForwardedWrite;
import org.apache.cassandra.service.AbstractWriteResponseHandler;
import static java.lang.String.format;
@@ -96,14 +98,14 @@ public class RequestCallbacks implements
OutboundMessageCallbacks
public void addWithExpiration(RequestCallback<?> cb, Message<?> message,
InetAddressAndPort to)
{
// mutations need to call the overload
- assert message.verb() != Verb.MUTATION_REQ && message.verb() !=
Verb.COUNTER_MUTATION_REQ;
+ Preconditions.checkArgument((message.verb() != Verb.MUTATION_REQ &&
message.verb() != Verb.COUNTER_MUTATION_REQ) || (cb instanceof
ForwardedWrite.LeaderCallback));
CallbackInfo previous = callbacks.put(key(message.id(), to), new
CallbackInfo(message, to, cb));
assert previous == null : format("Callback already exists for id
%d/%s! (%s)", message.id(), to, previous);
}
public void addWithExpiration(AbstractWriteResponseHandler<?> cb,
Message<?> message, Replica to)
{
- assert message.verb() == Verb.MUTATION_REQ || message.verb() ==
Verb.COUNTER_MUTATION_REQ || message.verb() == Verb.PAXOS_COMMIT_REQ;
+ Preconditions.checkArgument(message.verb() == Verb.MUTATION_REQ ||
message.verb() == Verb.COUNTER_MUTATION_REQ || message.verb() ==
Verb.PAXOS_COMMIT_REQ || message.verb() == Verb.FORWARDING_WRITE);
CallbackInfo previous = callbacks.put(key(message.id(),
to.endpoint()), new CallbackInfo(message, to.endpoint(), cb));
assert previous == null : format("Callback already exists for id
%d/%s! (%s)", message.id(), to.endpoint(), previous);
}
diff --git a/src/java/org/apache/cassandra/net/Verb.java
b/src/java/org/apache/cassandra/net/Verb.java
index c88e1efb96..100aca623d 100644
--- a/src/java/org/apache/cassandra/net/Verb.java
+++ b/src/java/org/apache/cassandra/net/Verb.java
@@ -71,6 +71,7 @@ import org.apache.cassandra.repair.messages.SyncResponse;
import org.apache.cassandra.repair.messages.SyncRequest;
import org.apache.cassandra.repair.messages.ValidationResponse;
import org.apache.cassandra.repair.messages.ValidationRequest;
+import org.apache.cassandra.replication.ForwardedWrite;
import org.apache.cassandra.schema.SchemaMutationsSerializer;
import org.apache.cassandra.schema.SchemaPullVerbHandler;
import org.apache.cassandra.schema.SchemaPushVerbHandler;
@@ -249,6 +250,7 @@ public enum Verb
READ_RECONCILE_SEND (901, P0, rpcTimeout, READ, ()
-> ReadReconcileSend.serializer, () ->
ReadReconcileSend.verbHandler),
READ_RECONCILE_RCV (902, P0, rpcTimeout, MUTATION, ()
-> ReadReconcileReceive.serializer, () ->
ReadReconcileReceive.verbHandler),
READ_RECONCILE_NOTIFY (903, P0, rpcTimeout, REQUEST_RESPONSE, ()
-> ReadReconcileNotify.serializer, () ->
ReadReconcileNotify.verbHandler),
+ FORWARDING_WRITE (904, P3, writeTimeout, MUTATION, ()
-> ForwardedWrite.Request.serializer, () ->
ForwardedWrite.verbHandler),
INITIATE_DATA_MOVEMENTS_RSP (814, P1, rpcTimeout, MISC, () ->
NoPayload.serializer, () -> ResponseVerbHandler.instance
),
INITIATE_DATA_MOVEMENTS_REQ (815, P1, rpcTimeout, MISC, () ->
DataMovement.serializer, () -> DataMovementVerbHandler.instance,
INITIATE_DATA_MOVEMENTS_RSP ),
diff --git a/src/java/org/apache/cassandra/replication/CoordinatorLog.java
b/src/java/org/apache/cassandra/replication/CoordinatorLog.java
index d1b8dda1db..0271d2eccf 100644
--- a/src/java/org/apache/cassandra/replication/CoordinatorLog.java
+++ b/src/java/org/apache/cassandra/replication/CoordinatorLog.java
@@ -21,6 +21,8 @@ import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
+import com.google.common.base.Preconditions;
+
import org.apache.cassandra.db.Mutation;
import org.apache.cassandra.db.PartitionPosition;
import org.apache.cassandra.dht.AbstractBounds;
@@ -72,6 +74,7 @@ public abstract class CoordinatorLog
void witnessedRemoteMutation(MutationId mutationId, int onHostId)
{
+ Preconditions.checkArgument(!mutationId.isNone());
logger.trace("witnessed remote mutation {} from {}", mutationId,
onHostId);
lock.writeLock().lock();
try
diff --git a/src/java/org/apache/cassandra/replication/CoordinatorLogId.java
b/src/java/org/apache/cassandra/replication/CoordinatorLogId.java
index 55c38485b0..37d4f25851 100644
--- a/src/java/org/apache/cassandra/replication/CoordinatorLogId.java
+++ b/src/java/org/apache/cassandra/replication/CoordinatorLogId.java
@@ -28,6 +28,8 @@ import java.util.Comparator;
public class CoordinatorLogId implements Serializable
{
+ private static final CoordinatorLogId NONE = new
CoordinatorLogId(Integer.MIN_VALUE, Integer.MIN_VALUE);
+
/** TCM host ID */
protected final int hostId;
@@ -79,6 +81,21 @@ public class CoordinatorLogId implements Serializable
return (int) coordinatorLogId;
}
+ public static CoordinatorLogId none()
+ {
+ return NONE;
+ }
+
+ static boolean isNone(int hostId, int hostLogId)
+ {
+ return hostId == NONE.hostId && hostLogId == NONE.hostLogId;
+ }
+
+ public boolean isNone()
+ {
+ return this == NONE || isNone(hostId, hostLogId);
+ }
+
@Override
public String toString()
{
@@ -115,6 +132,8 @@ public class CoordinatorLogId implements Serializable
{
int hostId = in.readInt();
int hostLogId = in.readInt();
+ if (isNone(hostId, hostLogId))
+ return none();
return new CoordinatorLogId(hostId, hostLogId);
}
diff --git a/src/java/org/apache/cassandra/replication/ForwardedWrite.java
b/src/java/org/apache/cassandra/replication/ForwardedWrite.java
new file mode 100644
index 0000000000..51a072a423
--- /dev/null
+++ b/src/java/org/apache/cassandra/replication/ForwardedWrite.java
@@ -0,0 +1,479 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.replication;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Sets;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.locator.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.exceptions.RequestFailureReason;
+import org.apache.cassandra.io.IVersionedSerializer;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.net.IVerbHandler;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.MessageFlag;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.net.NoPayload;
+import org.apache.cassandra.net.ParamType;
+import org.apache.cassandra.net.RequestCallback;
+import org.apache.cassandra.net.Verb;
+import org.apache.cassandra.service.AbstractWriteResponseHandler;
+import org.apache.cassandra.tcm.ClusterMetadata;
+import org.apache.cassandra.tcm.membership.NodeId;
+import org.apache.cassandra.transport.Dispatcher;
+import org.apache.cassandra.utils.FBUtilities;
+
+import static org.apache.cassandra.net.Verb.MUTATION_REQ;
+
+/**
+ * For a forwarded write there are 2 nodes involved in coordination, a
coordinator and a leader. The coordinator is the
+ * node that the client is communicating with, and the leader is the mutation
replica that is handling the mutation
+ * tracking for that write.
+ */
+public class ForwardedWrite
+{
+ private static final Logger logger =
LoggerFactory.getLogger(ForwardedWrite.class);
+
+ public interface Request
+ {
+ enum Kind
+ {
+ MUTATION(0);
+
+ private final byte id;
+
+ Kind(int id)
+ {
+ this.id = (byte) id;
+ }
+
+ IVersionedSerializer<Request> serializer()
+ {
+ switch (this)
+ {
+ case MUTATION:
+ return MutationRequest.serializer;
+ default:
+ throw new IllegalStateException("Unhandled kind " +
this);
+ }
+ }
+
+ static final IVersionedSerializer<Kind> serializer = new
IVersionedSerializer<Request.Kind>()
+ {
+ @Override
+ public void serialize(Kind kind, DataOutputPlus out, int
version) throws IOException
+ {
+ out.writeByte(kind.id);
+
+ }
+
+ @Override
+ public Kind deserialize(DataInputPlus in, int version) throws
IOException
+ {
+ byte id = in.readByte();
+ switch (id)
+ {
+ case 0:
+ return MUTATION;
+ default:
+ throw new IllegalStateException("Unknown kind: " +
id);
+ }
+ }
+
+ @Override
+ public long serializedSize(Kind kind, int version)
+ {
+ return TypeSizes.BYTE_SIZE;
+ }
+ };
+ }
+
+ Kind kind();
+ DecoratedKey key();
+ void applyLocallyAndForwardToReplicas(CoordinatorAckInfo ackTo);
+
+ IVersionedSerializer<Request> serializer = new IVersionedSerializer<>()
+ {
+ @Override
+ public void serialize(Request request, DataOutputPlus out, int
version) throws IOException
+ {
+ Kind.serializer.serialize(request.kind(), out, version);
+ request.kind().serializer().serialize(request, out, version);
+ }
+
+ @Override
+ public Request deserialize(DataInputPlus in, int version) throws
IOException
+ {
+ Kind kind = Kind.serializer.deserialize(in, version);
+ return kind.serializer().deserialize(in, version);
+ }
+
+ @Override
+ public long serializedSize(Request request, int version)
+ {
+ long size = Kind.serializer.serializedSize(request.kind(),
version);
+ size += request.kind().serializer().serializedSize(request,
version);
+ return size;
+ }
+ };
+ }
+
+ public static class MutationRequest implements Request
+ {
+ private final Mutation mutation;
+ private final Set<NodeId> recipients;
+
+ private static Set<NodeId> nodeIds(ReplicaPlan.ForWrite plan)
+ {
+ ClusterMetadata cm = ClusterMetadata.current();
+ Set<NodeId> recipients = new HashSet<>(plan.liveAndDown().size());
+ for (Replica replica : plan.liveAndDown())
+ recipients.add(cm.directory.peerId(replica.endpoint()));
+ return recipients;
+ }
+
+ MutationRequest(Mutation mutation, ReplicaPlan.ForWrite plan)
+ {
+ this(mutation, nodeIds(plan));
+ }
+
+ public MutationRequest(Mutation mutation, Set<NodeId> recipients)
+ {
+ Preconditions.checkArgument(mutation.id().isNone());
+ this.mutation = mutation;
+ this.recipients = recipients;
+ }
+
+ @Override
+ public Kind kind()
+ {
+ return Kind.MUTATION;
+ }
+
+ @Override
+ public DecoratedKey key()
+ {
+ return mutation.key();
+ }
+
+ @Override
+ public void applyLocallyAndForwardToReplicas(CoordinatorAckInfo ackTo)
+ {
+ Preconditions.checkState(ackTo != null);
+ Preconditions.checkArgument(mutation.id().isNone());
+ String keyspaceName = mutation.getKeyspaceName();
+ Token token = mutation.key().getToken();
+
+ MutationId id =
MutationTrackingService.instance.nextMutationId(keyspaceName, token);
+ // Do not wait for handler completion, since the coordinator is
already waiting and we don't want to block the stage
+ LeaderCallback handler = new LeaderCallback(keyspaceName,
mutation.key().getToken(), id, ackTo);
+ applyLocallyAndForwardToReplicas(mutation.withMutationId(id),
recipients, handler, ackTo);
+ }
+
+ // TODO: refactor common with applyLocallyAndSendToReplicas
+ private static void applyLocallyAndForwardToReplicas(Mutation
mutation, Set<NodeId> recipients, LeaderCallback handler, CoordinatorAckInfo
ackTo)
+ {
+ Preconditions.checkState(ackTo != null);
+ ClusterMetadata cm = ClusterMetadata.current();
+ String localDataCenter = cm.locator.local().datacenter;
+
+ boolean applyLocally = false;
+
+ // this DC replicas
+ List<Replica> localDCReplicas = null;
+
+ // extra-DC, grouped by DC
+ Map<String, List<Replica>> remoteDCReplicas = null;
+
+ // only need to create a Message for non-local writes
+ Message<Mutation> message = null;
+
+ // Expensive, but easier to work with Replica than
InetAddressAndPort for now
+ Keyspace keyspace = Keyspace.open(mutation.getKeyspaceName());
+ EndpointsForToken endpoints =
cm.placements.get(keyspace.getMetadata().params.replication).writes.forToken(mutation.key().getToken()).get();
+ Map<NodeId, Replica> replicas = new HashMap<>(recipients.size());
+ for (Replica replica : endpoints)
+ replicas.put(cm.directory.peerId(replica.endpoint()), replica);
+
+ // For performance, Mutation caches serialized buffers that are
computed lazily in serializedBuffer(). That
+ // computation is not synchronized however, and we will
potentially call that method concurrently for each
+ // dispatched message (not that concurrent calls to
serializedBuffer() are "unsafe" per se, just that they
+ // may result in multiple computations, making the caching
optimization moot). So forcing the serialization
+ // here to make sure it's already cached/computed when it's
concurrently used later.
+ // Side note: we have one cached buffers for each used
EncodingVersion and this only pre-compute the one for
+ // the current version, but it's just an optimization, and we're
ok not optimizing for mixed-version clusters.
+ Mutation.serializer.prepareSerializedBuffer(mutation,
MessagingService.current_version);
+
+ for (NodeId recipient : recipients)
+ {
+ if (cm.myNodeId().equals(recipient))
+ {
+ applyLocally = true;
+ continue;
+ }
+
+ if (message == null)
+ message = Message.builder(MUTATION_REQ, mutation)
+ .withRequestTime(handler.getRequestTime())
+
.withFlag(MessageFlag.CALL_BACK_ON_FAILURE)
+
.withParam(ParamType.COORDINATOR_ACK_INFO, ackTo)
+ .withId(ackTo.id)
+ .build();
+
+ Replica replica = replicas.get(recipient);
+ String dc = cm.locator.location(replica.endpoint()).datacenter;
+
+ if (localDataCenter.equals(dc))
+ {
+ if (localDCReplicas == null)
+ localDCReplicas = new ArrayList<>();
+ localDCReplicas.add(replica);
+ }
+ else
+ {
+ if (remoteDCReplicas == null)
+ remoteDCReplicas = new HashMap<>();
+
+ List<Replica> messages = remoteDCReplicas.get(dc);
+ if (messages == null)
+ messages = remoteDCReplicas.computeIfAbsent(dc, ignore
-> new ArrayList<>(3)); // most DCs will have <= 3 replicas
+ messages.add(replica);
+ }
+ }
+
+ Preconditions.checkState(applyLocally); // the leader is always a
replica
+ TrackedWriteRequest.applyMutationLocally(mutation, handler);
+
+ if (localDCReplicas != null)
+ for (Replica replica : localDCReplicas)
+ MessagingService.instance().sendWithCallback(message,
replica.endpoint(), handler);
+
+ if (remoteDCReplicas != null)
+ {
+ // for each datacenter, send the message to one node to relay
the write to other replicas
+ for (List<Replica> dcReplicas : remoteDCReplicas.values())
+ TrackedWriteRequest.sendMessagesToRemoteDC(message,
EndpointsForToken.copyOf(mutation.key().getToken(), dcReplicas), handler,
ackTo);
+ }
+ }
+
+ public static final IVersionedSerializer<Request> serializer = new
IVersionedSerializer<>()
+ {
+ @Override
+ public void serialize(Request r, DataOutputPlus out, int version)
throws IOException
+ {
+ MutationRequest request = (MutationRequest) r;
+ Mutation.serializer.serialize(request.mutation, out, version);
+ out.writeInt(request.recipients.size());
+ for (NodeId recipient : request.recipients)
+ NodeId.messagingSerializer.serialize(recipient, out,
version);
+ }
+
+ @Override
+ public Request deserialize(DataInputPlus in, int version) throws
IOException
+ {
+ Mutation mutation = Mutation.serializer.deserialize(in,
version);
+ int numRecipients = in.readInt();
+ Set<NodeId> recipients =
Sets.newHashSetWithExpectedSize(numRecipients);
+ for (int i = 0; i < numRecipients; i++)
+ recipients.add(NodeId.messagingSerializer.deserialize(in,
version));
+ return new MutationRequest(mutation, recipients);
+ }
+
+ @Override
+ public long serializedSize(Request r, int version)
+ {
+ MutationRequest request = (MutationRequest) r;
+ long size =
Mutation.serializer.serializedSize(request.mutation, version);
+ size += TypeSizes.INT_SIZE;
+ for (NodeId recipient : request.recipients)
+ size +=
NodeId.messagingSerializer.serializedSize(recipient, version);
+ return size;
+ }
+ };
+ }
+
+ public static AbstractWriteResponseHandler<Object>
forwardMutation(Mutation mutation, ReplicaPlan.ForWrite plan,
AbstractReplicationStrategy strategy, Dispatcher.RequestTime requestTime)
+ {
+ // find leader
+ NodeProximity proximity = DatabaseDescriptor.getNodeProximity();
+ ClusterMetadata cm = ClusterMetadata.current();
+ Token token = mutation.key().getToken();
+ Keyspace keyspace = Keyspace.open(mutation.getKeyspaceName());
+ EndpointsForRange endpoints =
cm.placements.get(keyspace.getMetadata().params.replication).writes.forRange(token).get();
+ if (logger.isTraceEnabled())
+ logger.trace("Finding best leader from replicas {}", endpoints);
+
+ // TODO: Should match ReplicaPlans.findCounterLeaderReplica, including
DC-local priority, current health, severity, etc.
+ Replica leader =
proximity.sortedByProximity(FBUtilities.getBroadcastAddressAndPort(),
endpoints).get(0);
+
+ // create callback and forward to leader
+ if (logger.isTraceEnabled())
+ logger.trace("Selected {} as leader for mutation with key {}",
leader.endpoint(), mutation.key());
+
+ AbstractWriteResponseHandler<Object> handler =
strategy.getWriteResponseHandler(plan, null, WriteType.SIMPLE, null,
requestTime);
+
+ // Add callbacks for replicas to respond directly to coordinator
+ Message<Request> toLeader = Message.out(Verb.FORWARDING_WRITE, new
MutationRequest(mutation, plan));
+ for (Replica endpoint : endpoints)
+ {
+ if (logger.isTraceEnabled())
+ logger.trace("Adding forwarding callback for response from {}
id {}", endpoint, toLeader.id());
+ MessagingService.instance().callbacks.addWithExpiration(handler,
toLeader, endpoint);
+ }
+
+ MessagingService.instance().send(toLeader, leader.endpoint());
+
+ return handler;
+ }
+
+ public static final IVerbHandler<Request> verbHandler = new
IVerbHandler<>()
+ {
+ @Override
+ public void doVerb(Message<Request> incoming)
+ {
+ if (logger.isTraceEnabled())
+ logger.trace("Received incoming ForwardedWriteRequest {} id
{}", incoming, incoming.id());
+ CoordinatorAckInfo ackTo =
CoordinatorAckInfo.toCoordinator(incoming.from(), incoming.id());
+ Request request = incoming.payload;
+
+ // Once we support epoch changes, check epoch from coordinator
here, after potential queueing on the Stage
+ try
+ {
+ request.applyLocallyAndForwardToReplicas(ackTo);
+ }
+ catch (Exception e)
+ {
+ logger.error("Exception while executing forwarded write with
key {} on leader", request.key(), e);
+
MessagingService.instance().respondWithFailure(RequestFailureReason.UNKNOWN,
incoming);
+ }
+ }
+ };
+
+ // Leader just needs to acknowledge propagation for its own log, not for
client consistency level
+ // See
org.apache.cassandra.service.TrackedWriteResponseHandler.onResponse, this class
should probably merge with that one
+ public static class LeaderCallback implements RequestCallback<NoPayload>
+ {
+ private final String keyspace;
+ private final Token token;
+ private final MutationId id;
+ private final CoordinatorAckInfo ackTo;
+ private final Dispatcher.RequestTime requestTime =
Dispatcher.RequestTime.forImmediateExecution();
+
+ public LeaderCallback(String keyspace, Token token, MutationId id,
CoordinatorAckInfo ackTo)
+ {
+ this.keyspace = keyspace;
+ this.token = token;
+ this.id = id;
+ this.ackTo = ackTo;
+ }
+
+ @Override
+ public void onResponse(Message<NoPayload> msg)
+ {
+ // Local mutations are witnessed from Keyspace.applyInternalTracked
+ if (msg != null)
+
MutationTrackingService.instance.witnessedRemoteMutation(keyspace, token, id,
msg.from());
+
+ // Local write needs to be ack'd to coordinator
+ if (msg == null && ackTo != null)
+ {
+ Message<NoPayload> message =
Message.builder(Verb.MUTATION_RSP, NoPayload.noPayload)
+
.from(FBUtilities.getBroadcastAddressAndPort())
+ .withId(ackTo.id)
+ .build();
+ MessagingService.instance().send(message, ackTo.coordinator);
+ }
+ }
+
+ @Override
+ public void onFailure(InetAddressAndPort from, RequestFailureReason
failureReason)
+ {
+ logger.error("Got failure from {} reason {}", from, failureReason);
+ }
+
+ @Override
+ public boolean invokeOnFailure()
+ {
+ return true;
+ }
+
+ public Dispatcher.RequestTime getRequestTime()
+ {
+ return requestTime;
+ }
+ }
+
+ public static class CoordinatorAckInfo
+ {
+ public static IVersionedSerializer<CoordinatorAckInfo> serializer =
new IVersionedSerializer<>()
+ {
+ @Override
+ public void serialize(CoordinatorAckInfo ackTo, DataOutputPlus
out, int version) throws IOException
+ {
+
InetAddressAndPort.Serializer.inetAddressAndPortSerializer.serialize(ackTo.coordinator,
out, version);
+ out.writeLong(ackTo.id);
+ }
+
+ @Override
+ public CoordinatorAckInfo deserialize(DataInputPlus in, int
version) throws IOException
+ {
+ InetAddressAndPort coordinator =
InetAddressAndPort.Serializer.inetAddressAndPortSerializer.deserialize(in,
version);
+ long id = in.readLong();
+ return new CoordinatorAckInfo(coordinator, id);
+ }
+
+ @Override
+ public long serializedSize(CoordinatorAckInfo ackTo, int version)
+ {
+ long size = 0;
+ size +=
InetAddressAndPort.Serializer.inetAddressAndPortSerializer.serializedSize(ackTo.coordinator,
version);
+ size += TypeSizes.LONG_SIZE;
+ return size;
+ }
+ };
+
+ public final InetAddressAndPort coordinator;
+ public final long id;
+
+ private CoordinatorAckInfo(InetAddressAndPort coordinator, long id)
+ {
+ this.coordinator = coordinator;
+ this.id = id;
+ }
+
+ private static CoordinatorAckInfo toCoordinator(InetAddressAndPort
coordinator, long messageId)
+ {
+ return new CoordinatorAckInfo(coordinator, messageId);
+ }
+ }
+}
diff --git a/src/java/org/apache/cassandra/replication/LocalMutationStates.java
b/src/java/org/apache/cassandra/replication/LocalMutationStates.java
index b46a2aca3a..b3f8a2d347 100644
--- a/src/java/org/apache/cassandra/replication/LocalMutationStates.java
+++ b/src/java/org/apache/cassandra/replication/LocalMutationStates.java
@@ -124,6 +124,7 @@ class LocalMutationStates
void finishWriting(Mutation mutation)
{
+ Preconditions.checkArgument(!mutation.id().isNone());
Entry entry = statesMap.get(mutation.id().offset());
Preconditions.checkNotNull(entry);
entry.visibility = Visibility.VISIBLE;
diff --git a/src/java/org/apache/cassandra/replication/MutationId.java
b/src/java/org/apache/cassandra/replication/MutationId.java
index ea33708833..5faf4db7d5 100644
--- a/src/java/org/apache/cassandra/replication/MutationId.java
+++ b/src/java/org/apache/cassandra/replication/MutationId.java
@@ -33,7 +33,7 @@ import org.apache.cassandra.io.util.DataOutputPlus;
*/
public class MutationId extends ShortMutationId
{
- private static final long NONE_LOG_ID = Long.MIN_VALUE;
+ private static final long NONE_LOG_ID = CoordinatorLogId.none().asLong();
private static final long NONE_SEQUENCE_ID = Long.MIN_VALUE;
private static final int NONE_OFFSET = offset(NONE_SEQUENCE_ID);
private static final int NONE_TIMESTAMP = timestamp(NONE_SEQUENCE_ID);
diff --git a/src/java/org/apache/cassandra/replication/MutationSummary.java
b/src/java/org/apache/cassandra/replication/MutationSummary.java
index 18b0ca3bb4..4f07d9ca62 100644
--- a/src/java/org/apache/cassandra/replication/MutationSummary.java
+++ b/src/java/org/apache/cassandra/replication/MutationSummary.java
@@ -21,6 +21,7 @@ import java.io.IOException;
import java.util.*;
import com.google.common.base.Preconditions;
+
import org.agrona.collections.Long2ObjectHashMap;
import org.apache.cassandra.db.Digest;
import org.apache.cassandra.db.TypeSizes;
diff --git
a/src/java/org/apache/cassandra/replication/MutationTrackingService.java
b/src/java/org/apache/cassandra/replication/MutationTrackingService.java
index 310dc4dac6..01cc05274d 100644
--- a/src/java/org/apache/cassandra/replication/MutationTrackingService.java
+++ b/src/java/org/apache/cassandra/replication/MutationTrackingService.java
@@ -24,6 +24,8 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.function.IntSupplier;
+import com.google.common.base.Preconditions;
+
import org.agrona.collections.IntArrayList;
import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.db.Mutation;
@@ -92,16 +94,19 @@ public class MutationTrackingService
public void witnessedRemoteMutation(String keyspace, Token token,
MutationId mutationId, InetAddressAndPort onHost)
{
+ Preconditions.checkArgument(!mutationId.isNone());
getOrCreate(keyspace).witnessedRemoteMutation(token, mutationId,
onHost);
}
public void startWriting(Mutation mutation)
{
+ Preconditions.checkArgument(!mutation.id().isNone());
getOrCreate(mutation.getKeyspaceName()).startWriting(mutation);
}
public void finishWriting(Mutation mutation)
{
+ Preconditions.checkArgument(!mutation.id().isNone());
getOrCreate(mutation.getKeyspaceName()).finishWriting(mutation);
}
@@ -153,6 +158,7 @@ public class MutationTrackingService
static KeyspaceShards make(KeyspaceMetadata keyspace, ClusterMetadata
cluster, IntSupplier logIdProvider)
{
+
Preconditions.checkArgument(keyspace.params.replicationType.isTracked());
Map<Range<Token>, Shard> shards = new HashMap<>();
cluster.placements.get(keyspace.params.replication).writes.forEach((tokenRange,
forRange) -> {
IntArrayList participants = new IntArrayList(forRange.size(),
IntArrayList.DEFAULT_NULL_VALUE);
diff --git a/src/java/org/apache/cassandra/replication/Shard.java
b/src/java/org/apache/cassandra/replication/Shard.java
index 538e9a57c0..72ff1119ca 100644
--- a/src/java/org/apache/cassandra/replication/Shard.java
+++ b/src/java/org/apache/cassandra/replication/Shard.java
@@ -52,7 +52,9 @@ public class Shard
this.sinceEpoch = sinceEpoch;
this.logs = new NonBlockingHashMapLong<>();
this.currentLocalLog = startNewLog(localHostId,
logIdProvider.getAsInt(), participants);
- logs.put(currentLocalLog.logId.asLong(), currentLocalLog);
+ CoordinatorLogId logId = currentLocalLog.logId;
+ Preconditions.checkArgument(!logId.isNone());
+ logs.put(logId.asLong(), currentLocalLog);
}
MutationId nextId()
diff --git a/src/java/org/apache/cassandra/replication/TrackedWriteRequest.java
b/src/java/org/apache/cassandra/replication/TrackedWriteRequest.java
index 5ff3acb34e..7ae7b35e77 100644
--- a/src/java/org/apache/cassandra/replication/TrackedWriteRequest.java
+++ b/src/java/org/apache/cassandra/replication/TrackedWriteRequest.java
@@ -49,14 +49,17 @@ import org.apache.cassandra.net.ForwardingInfo;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.MessageFlag;
import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.net.NoPayload;
+import org.apache.cassandra.net.ParamType;
+import org.apache.cassandra.net.RequestCallback;
import org.apache.cassandra.net.Verb;
+import org.apache.cassandra.service.AbstractWriteResponseHandler;
import org.apache.cassandra.service.TrackedWriteResponseHandler;
import org.apache.cassandra.tracing.Tracing;
import org.apache.cassandra.transport.Dispatcher;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.MonotonicClock;
-import static java.util.Collections.singletonList;
import static java.util.concurrent.TimeUnit.NANOSECONDS;
import static
org.apache.cassandra.metrics.ClientRequestsMetricsHolder.writeMetrics;
import static org.apache.cassandra.net.Verb.MUTATION_REQ;
@@ -72,34 +75,37 @@ public class TrackedWriteRequest
* @param consistencyLevel the consistency level for the write operation
* @param requestTime object holding times when request got enqueued and
started execution
*/
- public static TrackedWriteResponseHandler perform(
+ public static AbstractWriteResponseHandler<?> perform(
Mutation mutation, ConsistencyLevel consistencyLevel,
Dispatcher.RequestTime requestTime)
{
Tracing.trace("Determining replicas for mutation");
+ Preconditions.checkArgument(mutation.id().isNone());
String keyspaceName = mutation.getKeyspaceName();
Keyspace keyspace = Keyspace.open(keyspaceName);
Token token = mutation.key().getToken();
- MutationId id =
MutationTrackingService.instance.nextMutationId(keyspaceName, token);
- mutation = mutation.withMutationId(id);
-
ReplicaPlan.ForWrite plan = ReplicaPlans.forWrite(keyspace,
consistencyLevel, token, ReplicaPlans.writeNormal);
+ AbstractReplicationStrategy rs = plan.replicationStrategy();
- if (plan.lookup(FBUtilities.getBroadcastAddressAndPort()) != null)
- writeMetrics.localRequests.mark();
- else
+ if (plan.lookup(FBUtilities.getBroadcastAddressAndPort()) == null)
+ {
+ if (logger.isTraceEnabled())
+ logger.trace("Remote tracked request {} {}", mutation, plan);
writeMetrics.remoteRequests.mark();
+ return ForwardedWrite.forwardMutation(mutation, plan, rs,
requestTime);
+ }
- AbstractReplicationStrategy rs = plan.replicationStrategy();
-
- TrackedWriteResponseHandler handler =
- TrackedWriteResponseHandler.wrap(rs.getWriteResponseHandler(plan,
null, WriteType.SIMPLE, null, requestTime),
- keyspaceName,
- mutation.key().getToken(),
- id);
+ if (logger.isTraceEnabled())
+ logger.trace("Local tracked request {} {}", mutation, plan);
+ writeMetrics.localRequests.mark();
+ MutationId id =
MutationTrackingService.instance.nextMutationId(keyspaceName, token);
+ mutation = mutation.withMutationId(id);
+ TrackedWriteResponseHandler handler =
TrackedWriteResponseHandler.wrap(rs.getWriteResponseHandler(plan, null,
WriteType.SIMPLE, null, requestTime),
+ keyspaceName,
+ mutation.key().getToken(),
+ id);
applyLocallyAndSendToReplicas(mutation, plan, handler);
-
return handler;
}
@@ -142,7 +148,12 @@ public class TrackedWriteRequest
}
if (message == null)
- message = Message.outWithFlags(MUTATION_REQ, mutation,
handler.getRequestTime(), singletonList(MessageFlag.CALL_BACK_ON_FAILURE));
+ {
+ Message.Builder<Mutation> builder =
Message.builder(MUTATION_REQ, mutation)
+ .withRequestTime(handler.getRequestTime())
+ .withFlag(MessageFlag.CALL_BACK_ON_FAILURE);
+ message = builder.build();
+ }
String dc =
DatabaseDescriptor.getLocator().location(destination.endpoint()).datacenter;
@@ -175,31 +186,42 @@ public class TrackedWriteRequest
{
// for each datacenter, send the message to one node to relay the
write to other replicas
for (List<Replica> dcReplicas : remoteDCReplicas.values())
- sendMessagesToRemoteDC(message,
EndpointsForToken.copyOf(mutation.key().getToken(), dcReplicas), handler);
+ sendMessagesToRemoteDC(message,
EndpointsForToken.copyOf(mutation.key().getToken(), dcReplicas), handler, null);
}
}
- private static void applyMutationLocally(Mutation mutation,
TrackedWriteResponseHandler handler)
+ static void applyMutationLocally(Mutation mutation,
RequestCallback<NoPayload> handler)
{
+ Preconditions.checkArgument(handler instanceof
TrackedWriteResponseHandler || handler instanceof
ForwardedWrite.LeaderCallback);
Stage.MUTATION.maybeExecuteImmediately(new
LocalMutationRunnable(mutation, handler));
}
private static class LocalMutationRunnable implements
DebuggableTask.RunnableDebuggableTask
{
private final Mutation mutation;
- private final TrackedWriteResponseHandler handler;
+ private final RequestCallback<NoPayload> handler;
- LocalMutationRunnable(Mutation mutation, TrackedWriteResponseHandler
handler)
+ LocalMutationRunnable(Mutation mutation, RequestCallback<NoPayload>
handler)
{
+ Preconditions.checkArgument(handler instanceof
TrackedWriteResponseHandler || handler instanceof
ForwardedWrite.LeaderCallback);
this.mutation = mutation;
this.handler = handler;
}
+ private Dispatcher.RequestTime getRequestTime()
+ {
+ if (handler instanceof TrackedWriteResponseHandler)
+ return ((TrackedWriteResponseHandler)
handler).getRequestTime();
+ if (handler instanceof ForwardedWrite.LeaderCallback)
+ return ((ForwardedWrite.LeaderCallback)
handler).getRequestTime();
+ throw new IllegalStateException();
+ }
+
@Override
public final void run()
{
long now = MonotonicClock.Global.approxTime.now();
- long deadline =
handler.getRequestTime().computeDeadline(MUTATION_REQ.expiresAfterNanos());
+ long deadline =
getRequestTime().computeDeadline(MUTATION_REQ.expiresAfterNanos());
if (now > deadline)
{
@@ -224,13 +246,13 @@ public class TrackedWriteRequest
@Override
public long creationTimeNanos()
{
- return handler.getRequestTime().enqueuedAtNanos();
+ return getRequestTime().enqueuedAtNanos();
}
@Override
public long startTimeNanos()
{
- return handler.getRequestTime().startedAtNanos();
+ return getRequestTime().startedAtNanos();
}
@Override
@@ -245,9 +267,10 @@ public class TrackedWriteRequest
/*
* Send the message to the first replica of targets, and have it forward
the message to others in its DC
*/
- private static void sendMessagesToRemoteDC(Message<? extends IMutation>
message,
- EndpointsForToken targets,
- TrackedWriteResponseHandler
handler)
+ static void sendMessagesToRemoteDC(Message<? extends IMutation> message,
+ EndpointsForToken targets,
+ RequestCallback<NoPayload> handler,
+ ForwardedWrite.CoordinatorAckInfo
ackTo)
{
final Replica target;
@@ -258,7 +281,7 @@ public class TrackedWriteRequest
for (Replica replica : forwardToReplicas)
{
-
MessagingService.instance().callbacks.addWithExpiration(handler, message,
replica);
+
MessagingService.instance().callbacks.addWithExpiration(handler, message,
replica.endpoint());
logger.trace("Adding FWD message to {}@{}", message.id(),
replica);
}
@@ -272,9 +295,14 @@ public class TrackedWriteRequest
{
target = targets.get(0);
}
+ if (ackTo != null)
+ message = message.withParam(ParamType.COORDINATOR_ACK_INFO, ackTo);
Tracing.trace("Sending mutation to remote replica {}", target);
- MessagingService.instance().sendWriteWithCallback(message, target,
handler);
+ if (handler instanceof ForwardedWrite.LeaderCallback)
+
MessagingService.instance().sendForwardedWriteWithCallback(message, target,
(ForwardedWrite.LeaderCallback) handler);
+ else
+ MessagingService.instance().sendWriteWithCallback(message, target,
(AbstractWriteResponseHandler<?>) handler);
logger.trace("Sending message to {}@{}", message.id(), target);
}
diff --git
a/src/java/org/apache/cassandra/service/TrackedWriteResponseHandler.java
b/src/java/org/apache/cassandra/service/TrackedWriteResponseHandler.java
index b224cbf1c0..56528f89c8 100644
--- a/src/java/org/apache/cassandra/service/TrackedWriteResponseHandler.java
+++ b/src/java/org/apache/cassandra/service/TrackedWriteResponseHandler.java
@@ -54,10 +54,9 @@ public class TrackedWriteResponseHandler extends
AbstractWriteResponseHandler<No
@Override
public void onResponse(Message<NoPayload> msg)
{
- /* local mutations are witnessed from Keyspace.applyInternalTracked */
+ // Local mutations are witnessed from Keyspace.applyInternalTracked
if (msg != null)
MutationTrackingService.instance.witnessedRemoteMutation(keyspace,
token, mutationId, msg.from());
-
wrapped.onResponse(msg);
}
diff --git a/src/java/org/apache/cassandra/tcm/membership/NodeId.java
b/src/java/org/apache/cassandra/tcm/membership/NodeId.java
index f011314815..e9dead773d 100644
--- a/src/java/org/apache/cassandra/tcm/membership/NodeId.java
+++ b/src/java/org/apache/cassandra/tcm/membership/NodeId.java
@@ -25,6 +25,7 @@ import java.util.UUID;
import com.google.common.primitives.Ints;
import org.apache.cassandra.db.TypeSizes;
+import org.apache.cassandra.io.IVersionedSerializer;
import org.apache.cassandra.io.util.DataInputPlus;
import org.apache.cassandra.io.util.DataOutputPlus;
import org.apache.cassandra.tcm.MultiStepOperation;
@@ -123,4 +124,25 @@ public class NodeId implements Comparable<NodeId>,
MultiStepOperation.SequenceKe
return TypeSizes.sizeofUnsignedVInt(t.id);
}
}
+
+ public static final IVersionedSerializer<NodeId> messagingSerializer = new
IVersionedSerializer<NodeId>()
+ {
+ @Override
+ public void serialize(NodeId n, DataOutputPlus out, int version)
throws IOException
+ {
+ out.writeUnsignedVInt32(n.id);
+ }
+
+ @Override
+ public NodeId deserialize(DataInputPlus in, int version) throws
IOException
+ {
+ return new NodeId(in.readUnsignedVInt32());
+ }
+
+ @Override
+ public long serializedSize(NodeId n, int version)
+ {
+ return TypeSizes.sizeofUnsignedVInt(n.id);
+ }
+ };
}
diff --git
a/test/distributed/org/apache/cassandra/distributed/test/tracking/MutationTrackingWriteForwardingTest.java
b/test/distributed/org/apache/cassandra/distributed/test/tracking/MutationTrackingWriteForwardingTest.java
new file mode 100644
index 0000000000..108451ff81
--- /dev/null
+++
b/test/distributed/org/apache/cassandra/distributed/test/tracking/MutationTrackingWriteForwardingTest.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test.tracking;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.junit.Test;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.ConsistencyLevel;
+import org.apache.cassandra.distributed.api.Feature;
+import org.apache.cassandra.distributed.api.IInstance;
+import org.apache.cassandra.distributed.api.IInvokableInstance;
+import org.apache.cassandra.distributed.shared.NetworkTopology;
+import org.apache.cassandra.distributed.test.TestBaseImpl;
+import org.apache.cassandra.replication.MutationSummary;
+import org.apache.cassandra.replication.MutationTrackingService;
+import org.apache.cassandra.schema.Schema;
+import org.apache.cassandra.schema.TableId;
+import org.assertj.core.api.Assertions;
+
+import static java.lang.String.format;
+import static
org.apache.cassandra.distributed.shared.NetworkTopology.dcAndRack;
+import static
org.apache.cassandra.distributed.shared.NetworkTopology.networkTopology;
+
+public class MutationTrackingWriteForwardingTest extends TestBaseImpl
+{
+ private static final int NODES = 3;
+ private static final int RF = 1;
+
+ private static int inst(int i)
+ {
+ return (i % NODES) + 1;
+ }
+
+ @Test
+ public void testBasicWriteForwarding() throws Throwable
+ {
+ // 2 DCs, 1 replica in each, to test forwarding to instances in remote
DCs and local DCs
+ Map<Integer, NetworkTopology.DcAndRack> topology = networkTopology(3,
(nodeid) -> nodeid % 2 == 1 ? dcAndRack("dc1", "rack1") : dcAndRack("dc2",
"rack2"));
+
+ // TODO: disable background reconciliation so we can test that writes
are reconciling immediately
+ try (Cluster cluster = Cluster.build(NODES)
+ .withConfig(cfg ->
cfg.with(Feature.NETWORK)
+
.with(Feature.GOSSIP)
+
.set("mutation_tracking_enabled", "true")
+
.set("write_request_timeout", "1000ms"))
+ .withNodeIdTopology(topology)
+ .start())
+ {
+ String keyspaceName = "basic_write_forwarding_test";
+ String tableName = "tbl";
+ cluster.schemaChange(format("CREATE KEYSPACE %s WITH replication =
" +
+ "{'class': 'NetworkTopologyStrategy',
'replication_factor': " + RF + "} " +
+ "AND replication_type='tracked';",
keyspaceName));
+ cluster.schemaChange(format("CREATE TABLE %s.%s (k int, c int, v
int, primary key (k, c));", keyspaceName, tableName));
+
+ Map<IInstance, Integer> instanceUnreconciled = new HashMap<>();
+ int ROWS = 100;
+ for (int inserted = 0; inserted < ROWS; inserted++)
+ {
+ // Writes should be completed for the client, regardless of
whether they are forwarded or not
+ cluster.coordinator(inst(inserted)).execute(format("INSERT
INTO %s.%s (k, c, v) VALUES (?, ?, ?)", keyspaceName, tableName),
ConsistencyLevel.ALL, inserted, inserted, inserted);
+
+ // Writes should be ack'd in the journal too, but these could
lag behind client acks, so could be
+ // permissive here. Each write should be reconciled on the
leader, unreconciled on the replica (until
+ // background reconciliation broadcast is implemented), and
ignored on others.
+ IInstance replica = null;
+ for (IInvokableInstance instance : cluster)
+ {
+ int unreconciled = instance.callOnInstance(() -> {
+ Token token =
DatabaseDescriptor.getPartitioner().getMinimumToken();
+ Range<Token> fullRange = new Range<>(token, token);
+ TableId tableId =
Schema.instance.getTableMetadata(keyspaceName, tableName).id;
+ MutationSummary summary =
MutationTrackingService.instance.createSummaryForRange(fullRange, tableId,
true);
+ return summary.unreconciledIds();
+ });
+ int lastUnreconciled =
instanceUnreconciled.getOrDefault(instance, 0);
+ int newUnreconciled = unreconciled - lastUnreconciled;
+ if (newUnreconciled == 1)
+ {
+ Assertions.assertThat(replica).isNull();
+ replica = instance;
+ }
+ instanceUnreconciled.put(instance, unreconciled);
+ }
+ Assertions.assertThat(replica).isNotNull();
+ }
+ Assertions.assertThat(instanceUnreconciled).matches(map -> {
+ int sum = 0;
+ for (Integer value : map.values())
+ sum += value;
+ return sum == ROWS;
+ });
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]