This is an automated email from the ASF dual-hosted git repository.
bdeggleston pushed a commit to branch cep-45-mutation-tracking
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/cep-45-mutation-tracking by
this push:
new f31142bc77 CEP-45: Add support for unlogged batches
f31142bc77 is described below
commit f31142bc775a1ccbcd33d3d13a86ceceb34d2cde
Author: Blake Eggleston <[email protected]>
AuthorDate: Thu Oct 23 10:51:22 2025 -0700
CEP-45: Add support for unlogged batches
Patch by Blake Eggleston; Reviewed by Abe Ratnofsky for CASSANDRA-20957
---
.../apache/cassandra/batchlog/BatchlogManager.java | 25 +--
.../apache/cassandra/hints/HintsDispatcher.java | 8 +-
.../org/apache/cassandra/service/StorageProxy.java | 70 ++++++---
.../ConsensusMigrationMutationHelper.java | 111 +++++++++----
.../distributed/test/TrackedBatchTest.java | 149 ++++++++++++++++++
.../apache/cassandra/hints/HintsServiceTest.java | 8 +-
.../ConsensusMigrationMutationHelperTest.java | 171 +++++++++++++++++++++
7 files changed, 478 insertions(+), 64 deletions(-)
diff --git a/src/java/org/apache/cassandra/batchlog/BatchlogManager.java
b/src/java/org/apache/cassandra/batchlog/BatchlogManager.java
index 5be64ae884..1b2373e91a 100644
--- a/src/java/org/apache/cassandra/batchlog/BatchlogManager.java
+++ b/src/java/org/apache/cassandra/batchlog/BatchlogManager.java
@@ -54,6 +54,7 @@ import org.apache.cassandra.db.WriteType;
import org.apache.cassandra.db.marshal.BytesType;
import org.apache.cassandra.db.partitions.PartitionUpdate;
import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.exceptions.InvalidRequestException;
import org.apache.cassandra.exceptions.RetryOnDifferentSystemException;
import org.apache.cassandra.exceptions.WriteFailureException;
import org.apache.cassandra.exceptions.WriteTimeoutException;
@@ -393,7 +394,7 @@ public class BatchlogManager implements BatchlogManagerMBean
private final TimeUUID id;
private final long writtenAt;
private final int unsplitGcGs;
- private final List<Mutation> normalMutations;
+ private final List<Mutation> untrackedMutations;
private final List<Mutation> accordMutations;
private final int replayedBytes;
private final ClusterMetadata cm;
@@ -410,10 +411,14 @@ public class BatchlogManager implements
BatchlogManagerMBean
List<Mutation> unsplitMutations = new
ArrayList<>(serializedMutations.size());
this.replayedBytes = addMutations(unsplitMutations, writtenAt,
version, serializedMutations);
unsplitGcGs = gcgs(unsplitMutations);
- SplitMutations<Mutation> splitMutations =
ConsensusMigrationMutationHelper.splitMutationsIntoAccordAndNormal(cm,
unsplitMutations);
- logger.trace("Replaying batch with Accord {} and normal {}",
splitMutations.accordMutations(), splitMutations.normalMutations());
- normalMutations = splitMutations.normalMutations();
+ SplitMutations<Mutation> splitMutations =
ConsensusMigrationMutationHelper.splitMutations(cm, unsplitMutations);
+ logger.trace("Replaying batch with Accord {} and normal {}",
splitMutations.accordMutations(), splitMutations.untrackedMutations());
+ untrackedMutations = splitMutations.untrackedMutations();
accordMutations = splitMutations.accordMutations();
+
+ if (splitMutations.trackedMutations() != null)
+ throw new InvalidRequestException("Mutation tracking is
currently unsupported with logged batches");
+
if (accordMutations != null)
accordTxnStart = new
Dispatcher.RequestTime(Clock.Global.nanoTime());
this.cm = cm;
@@ -423,7 +428,7 @@ public class BatchlogManager implements BatchlogManagerMBean
{
logger.trace("Replaying batch {}", id);
- if ((normalMutations == null || normalMutations.isEmpty()) &&
(accordMutations == null || accordMutations.isEmpty()))
+ if ((untrackedMutations == null || untrackedMutations.isEmpty())
&& (accordMutations == null || accordMutations.isEmpty()))
return false;
if (MILLISECONDS.toSeconds(writtenAt) + unsplitGcGs <=
FBUtilities.nowInSeconds())
@@ -435,8 +440,8 @@ public class BatchlogManager implements BatchlogManagerMBean
accordResult = accordMutations != null ?
mutateWithAccordAsync(cm, accordMutations, null, accordTxnStart,
PreserveTimestamp.yes) : null;
}
- if (normalMutations != null)
- replayHandlers = sendReplays(normalMutations, writtenAt,
hintedNodes);
+ if (untrackedMutations != null)
+ replayHandlers = sendReplays(untrackedMutations, writtenAt,
hintedNodes);
rateLimiter.acquire(replayedBytes); // acquire afterwards, to not
mess up ttl calculation.
@@ -545,10 +550,10 @@ public class BatchlogManager implements
BatchlogManagerMBean
private void writeHintsForUndeliveredEndpoints(int startFrom,
Set<UUID> hintedNodes)
{
- if (normalMutations == null)
+ if (untrackedMutations == null)
return;
- int gcgs = gcgs(normalMutations);
+ int gcgs = gcgs(untrackedMutations);
// expired
if (MILLISECONDS.toSeconds(writtenAt) + gcgs <=
FBUtilities.nowInSeconds())
@@ -558,7 +563,7 @@ public class BatchlogManager implements BatchlogManagerMBean
for (int i = startFrom; i < replayHandlers.size(); i++)
{
ReplayWriteResponseHandler<Mutation> handler =
replayHandlers.get(i);
- Mutation undeliveredMutation = normalMutations.get(i);
+ Mutation undeliveredMutation = untrackedMutations.get(i);
if (handler != null)
{
diff --git a/src/java/org/apache/cassandra/hints/HintsDispatcher.java
b/src/java/org/apache/cassandra/hints/HintsDispatcher.java
index 3753b9334d..c15b46eaea 100644
--- a/src/java/org/apache/cassandra/hints/HintsDispatcher.java
+++ b/src/java/org/apache/cassandra/hints/HintsDispatcher.java
@@ -415,12 +415,14 @@ final class HintsDispatcher implements AutoCloseable
private SplitHint splitHintIntoAccordAndNormal(ClusterMetadata cm, Hint
hint)
{
- SplitMutation<Mutation> splitMutation =
ConsensusMigrationMutationHelper.instance().splitMutationIntoAccordAndNormal(hint.mutation,
cm);
+ SplitMutation<Mutation> splitMutation =
ConsensusMigrationMutationHelper.instance().splitMutation(hint.mutation, cm);
+ if (splitMutation.trackedMutation != null)
+ throw new IllegalStateException("Cannot generate hints for tracked
mutations");
if (splitMutation.accordMutation == null)
return new SplitHint(null, hint);
- if (splitMutation.normalMutation == null)
+ if (splitMutation.untrackedMutation == null)
return new SplitHint(splitMutation.accordMutation, null);
- Hint normalHint = Hint.create(splitMutation.normalMutation,
hint.creationTime, splitMutation.normalMutation.smallestGCGS());
+ Hint normalHint = Hint.create(splitMutation.untrackedMutation,
hint.creationTime, splitMutation.untrackedMutation.smallestGCGS());
return new SplitHint(splitMutation.accordMutation, normalHint);
}
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java
b/src/java/org/apache/cassandra/service/StorageProxy.java
index f3d52c422e..355d20d19a 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -150,6 +150,7 @@ import
org.apache.cassandra.service.accord.txn.TxnRangeReadResult;
import org.apache.cassandra.service.accord.txn.TxnRead;
import org.apache.cassandra.service.accord.txn.TxnResult;
import org.apache.cassandra.service.consensus.TransactionalMode;
+import
org.apache.cassandra.service.consensus.migration.ConsensusMigrationMutationHelper;
import
org.apache.cassandra.service.consensus.migration.ConsensusMigrationMutationHelper.SplitConsumer;
import
org.apache.cassandra.service.consensus.migration.ConsensusMigrationMutationHelper.SplitMutations;
import org.apache.cassandra.service.consensus.migration.ConsensusRequestRouter;
@@ -218,7 +219,7 @@ import static
org.apache.cassandra.service.StorageProxy.ConsensusAttemptResult.s
import static
org.apache.cassandra.service.accord.txn.TxnResult.Kind.range_read;
import static
org.apache.cassandra.service.accord.txn.TxnResult.Kind.retry_new_protocol;
import static
org.apache.cassandra.service.consensus.migration.ConsensusMigrationMutationHelper.mutateWithAccordAsync;
-import static
org.apache.cassandra.service.consensus.migration.ConsensusMigrationMutationHelper.splitMutationsIntoAccordAndNormal;
+import static
org.apache.cassandra.service.consensus.migration.ConsensusMigrationMutationHelper.splitMutations;
import static
org.apache.cassandra.service.consensus.migration.ConsensusRequestRouter.getTableMetadata;
import static
org.apache.cassandra.service.consensus.migration.ConsensusRequestRouter.shouldReadEphemerally;
import static
org.apache.cassandra.service.consensus.migration.ConsensusRequestRouter.splitReadsIntoAccordAndNormal;
@@ -1328,14 +1329,11 @@ public class StorageProxy implements StorageProxyMBean
throw new InvalidRequestException("Mutation tracking is
currently unsupported with triggers");
if (mutateAtomically)
throw new InvalidRequestException("Mutation tracking is
currently unsupported with logged batches");
- if (mutations.size() > 1)
- throw new InvalidRequestException("Mutation tracking is
currently unsupported with unlogged batches");
if (updatesView)
throw new InvalidRequestException("Mutation tracking is
currently unsupported with materialized views");
-
- mutateWithTracking((Mutation) mutations.get(0), consistencyLevel,
requestTime);
}
- else if (augmented != null || mutateAtomically || updatesView)
+
+ if (augmented != null || mutateAtomically || updatesView)
mutateAtomically(augmented != null ? augmented :
(List<Mutation>)mutations, consistencyLevel, updatesView, requestTime);
else
dispatchMutationsWithRetryOnDifferentSystem(mutations,
consistencyLevel, requestTime, preserveTimestamps);
@@ -1348,29 +1346,41 @@ public class StorageProxy implements StorageProxyMBean
ClusterMetadata cm = ClusterMetadata.current();
try
{
- SplitMutations<?> splitMutations =
splitMutationsIntoAccordAndNormal(cm, (List<IMutation>)mutations);
+ SplitMutations<?> splitMutations = splitMutations(cm,
(List<IMutation>)mutations);
+ List<? extends IMutation> trackedMutations =
splitMutations.trackedMutations();
List<? extends IMutation> accordMutations =
splitMutations.accordMutations();
- List<? extends IMutation> normalMutations =
splitMutations.normalMutations();
+ List<? extends IMutation> untrackedMutations =
splitMutations.untrackedMutations();
// If there was ever any attempt to apply part of the mutation
using the eventually consistent path
// then we need to continue to use the timestamp used by the
eventually consistent path to not
// end up with multiple timestamps, but if it only ever used
the transactional path then we can
// use the transactional timestamp to get linearizability
- if (!preserveTimestamps.preserve && normalMutations != null)
+ if (!preserveTimestamps.preserve && (untrackedMutations !=
null || trackedMutations != null))
preserveTimestamps = PreserveTimestamp.yes;
// A BATCH statement has multiple mutations mixing server
timestamps and `USING TIMESTAMP`,
// which is not linearizable for the writes to Accord tables.
if (accordMutations != null && preserveTimestamps ==
PreserveTimestamp.mixedTimeSource)
checkMixedTimeSourceHandling();
+
+ // Supports batches with multiple tracked mutations
(previously limited to one).
+ List<AbstractWriteResponseHandler<?>> trackedHandlers =
trackedMutations != null ? new ArrayList<>(trackedMutations.size()) : null;
+ if (trackedMutations != null)
+ {
+ for (IMutation trackedMutation : trackedMutations)
+ {
+
trackedHandlers.add(TrackedWriteRequest.perform((Mutation) trackedMutation,
consistencyLevel, requestTime));
+ }
+ }
+
IAccordResult<TxnResult> accordResult = accordMutations !=
null ? mutateWithAccordAsync(cm, accordMutations, consistencyLevel,
requestTime, preserveTimestamps) : null;
- Tracing.trace("Split mutations into Accord {} and normal {}",
accordMutations, normalMutations);
+ Tracing.trace("Split mutations into tracked {}, Accord {}, and
untracked {}", trackedMutations, accordMutations, untrackedMutations);
Throwable failure = null;
try
{
- if (normalMutations != null)
+ if (untrackedMutations != null)
{
- mutate(normalMutations, consistencyLevel, requestTime);
- Tracing.trace("Successfully wrote normal mutations");
+ mutate(untrackedMutations, consistencyLevel,
requestTime);
+ Tracing.trace("Successfully wrote untracked
mutations");
}
}
catch (RetryOnDifferentSystemException e)
@@ -1378,7 +1388,7 @@ public class StorageProxy implements StorageProxyMBean
writeMetrics.retryDifferentSystem.mark();
writeMetricsForLevel(consistencyLevel).retryDifferentSystem.mark();
logger.debug("Retrying mutations on different system
because some mutations were misrouted according to Cassandra");
- Tracing.trace("Got {} from normal mutations, will retry",
e);
+ Tracing.trace("Got {} from untracked mutations, will
retry", e);
continue;
}
catch (CoordinatorBehindException e)
@@ -1387,7 +1397,7 @@ public class StorageProxy implements StorageProxyMBean
writeMetricsForLevel(consistencyLevel).retryCoordinatorBehind.mark();
mutations.forEach(IMutation::clearCachedSerializationsForRetry);
logger.debug("Retrying mutations now that coordinator has
caught up to cluster metadata");
- Tracing.trace("Got {} from normal mutations, will retry",
e);
+ Tracing.trace("Got {} from untracked mutations, will
retry", e);
continue;
}
catch (Exception e)
@@ -1395,6 +1405,22 @@ public class StorageProxy implements StorageProxyMBean
failure = Throwables.merge(failure, e);
}
+ try
+ {
+ if (trackedHandlers != null)
+ {
+ for (AbstractWriteResponseHandler<?> handler :
trackedHandlers)
+ {
+ handler.get();
+ }
+ Tracing.trace("Successfully wrote tracked mutations");
+ }
+ }
+ catch (Exception e)
+ {
+ failure = Throwables.merge(failure, e);
+ }
+
// Check if the Accord mutations succeeded asynchronously
try
{
@@ -1536,17 +1562,21 @@ public class StorageProxy implements StorageProxyMBean
BatchlogCleanup cleanup = new BatchlogCleanup(() ->
asyncRemoveFromBatchlog(batchlogReplicaPlan, batchUUID, requestTime));
// add a handler for each mutation that will not be written on
Accord - includes checking availability, but doesn't initiate any writes, yet
- SplitConsumer<Mutation> splitConsumer = (accordMutation,
normalMutation, originalMutations, mutationIndex) -> {
- Mutation eitherMutation = normalMutation != null ?
normalMutation : accordMutation;
+ SplitConsumer<Mutation> splitConsumer = (accordMutation,
untrackedMutation, trackedMutation, originalMutations, mutationIndex) -> {
+ Mutation eitherMutation = untrackedMutation != null ?
untrackedMutation : accordMutation;
Keyspace keyspace =
Keyspace.open(eitherMutation.getKeyspaceName());
Token tk = eitherMutation.key().getToken();
if (accordMutation != null)
accordMutations.add(accordMutation);
- if (normalMutation == null)
+ if (untrackedMutation == null && trackedMutation == null)
return;
+ if (trackedMutation != null)
+ throw new InvalidRequestException("Mutation tracking
is currently unsupported with logged batches");
+
+
// Always construct the replica plan to check availability
ReplicaPlan.ForWrite dataReplicaPlan =
ReplicaPlans.forWrite(cm, keyspace, consistencyLevel, tk,
ReplicaPlans.writeAll);
@@ -1555,7 +1585,7 @@ public class StorageProxy implements StorageProxyMBean
else
writeMetrics.remoteRequests.mark();
- WriteResponseHandlerWrapper wrapper =
wrapBatchResponseHandler(normalMutation,
+ WriteResponseHandlerWrapper wrapper =
wrapBatchResponseHandler(untrackedMutation,
dataReplicaPlan,
batchConsistencyLevel,
WriteType.BATCH,
@@ -1563,7 +1593,7 @@ public class StorageProxy implements StorageProxyMBean
requestTime);
wrappers.add(wrapper);
};
- splitMutationsIntoAccordAndNormal(cm, mutations,
splitConsumer);
+ ConsensusMigrationMutationHelper.splitMutations(cm, mutations,
splitConsumer);
attributeNonAccordLatency = !wrappers.isEmpty();
cleanup.setMutationsWaitingFor(wrappers.size() +
(accordMutations.isEmpty() ? 0 : 1));
Tracing.trace("Split batch into Accord {} and normal {}",
accordMutations, wrappers);
diff --git
a/src/java/org/apache/cassandra/service/consensus/migration/ConsensusMigrationMutationHelper.java
b/src/java/org/apache/cassandra/service/consensus/migration/ConsensusMigrationMutationHelper.java
index c319daee9b..282eca84eb 100644
---
a/src/java/org/apache/cassandra/service/consensus/migration/ConsensusMigrationMutationHelper.java
+++
b/src/java/org/apache/cassandra/service/consensus/migration/ConsensusMigrationMutationHelper.java
@@ -130,38 +130,48 @@ public class ConsensusMigrationMutationHelper
}
/**
- * Result of splitting mutations across Accord and non-transactional
boundaries
+ * Result of splitting mutations for different replication systems:
tracked / untracked / accord
*/
public static class SplitMutations<T extends IMutation> implements
SplitConsumer<T>
{
+ @Nullable
+ private List<T> trackedMutations;
+
@Nullable
private List<T> accordMutations;
@Nullable
- private List<T> normalMutations;
+ private List<T> untrackedMutations;
private SplitMutations() {}
+ public List<T> trackedMutations()
+ {
+ return trackedMutations;
+ }
+
public List<T> accordMutations()
{
return accordMutations;
}
- public List<T> normalMutations()
+ public List<T> untrackedMutations()
{
- return normalMutations;
+ return untrackedMutations;
}
@Override
- public void consume(@Nullable T accordMutation, @Nullable T
normalMutation, List<T> mutations, int mutationIndex)
+ public void consume(@Nullable T accordMutation, @Nullable T
untrackedMutation, @Nullable T trackedMutation, List<T> mutations, int
mutationIndex)
{
// Avoid allocating an ArrayList in common single mutation single
system case
- if (mutations.size() == 1 && (accordMutation != null ^
normalMutation != null))
+ if (mutations.size() == 1 && (accordMutation != null ^
(untrackedMutation != null || trackedMutation != null)))
{
if (accordMutation != null)
accordMutations = mutations;
+ else if (untrackedMutation != null)
+ untrackedMutations = mutations;
else
- normalMutations = mutations;
+ trackedMutations = mutations;
return;
}
@@ -171,57 +181,103 @@ public class ConsensusMigrationMutationHelper
accordMutations = new
ArrayList<>(Math.min(mutations.size(), 10));
accordMutations.add(accordMutation);
}
- if (normalMutation != null)
+ if (untrackedMutation != null)
+ {
+ if (untrackedMutations == null)
+ untrackedMutations = new
ArrayList<>(Math.min(mutations.size(), 10));
+ untrackedMutations.add(untrackedMutation);
+ }
+
+ if (trackedMutation != null)
{
- if (normalMutations == null)
- normalMutations = new
ArrayList<>(Math.min(mutations.size(), 10));
- normalMutations.add(normalMutation);
+ if (trackedMutations == null)
+ trackedMutations = new
ArrayList<>(Math.min(mutations.size(), 10));
+ trackedMutations.add(trackedMutation);
}
}
}
public interface SplitConsumer<T extends IMutation>
{
- void consume(@Nullable T accordMutation, @Nullable T normalMutation,
List<T> mutations, int mutationIndex);
+ void consume(@Nullable T accordMutation, @Nullable T
untrackedMutation, @Nullable T trackedMutation, List<T> mutations, int
mutationIndex);
}
- public static <T extends IMutation> SplitMutations<T>
splitMutationsIntoAccordAndNormal(ClusterMetadata cm, List<T> mutations)
+ public static <T extends IMutation> void splitMutations(ClusterMetadata
cm, List<T> mutations, SplitConsumer<T> splitConsumer)
{
- SplitMutations<T> splitMutations = new SplitMutations<>();
- splitMutationsIntoAccordAndNormal(cm, mutations, splitMutations);
- return splitMutations;
+ for (int i=0,mi=mutations.size(); i<mi; i++)
+ {
+ SplitMutation<T> splitMutation =
instance.splitMutation(mutations.get(i), cm);
+ splitConsumer.consume(splitMutation.accordMutation,
splitMutation.untrackedMutation, splitMutation.trackedMutation, mutations, i);
+ }
}
- public static <T extends IMutation> void
splitMutationsIntoAccordAndNormal(ClusterMetadata cm, List<T> mutations,
SplitConsumer<T> splitConsumer)
+ private static boolean isTrackedMutation(IMutation mutation)
{
- for (int i=0,mi=mutations.size(); i<mi; i++)
+ return
Schema.instance.getKeyspaceMetadata(mutation.getKeyspaceName()).params.replicationType.isTracked();
+ }
+
+
+ /**
+ * Splits mutations into tracked/untracked/accord mutations
+ */
+ public static <T extends IMutation> SplitMutations<T>
splitMutations(ClusterMetadata cm, List<T> mutations)
+ {
+ SplitMutations<T> result = new SplitMutations<>();
+
+ for (T mutation : mutations)
{
- SplitMutation<T> splitMutation =
instance.splitMutationIntoAccordAndNormal(mutations.get(i), cm);
- splitConsumer.consume(splitMutation.accordMutation,
splitMutation.normalMutation, mutations, i);
+ SplitMutation<T> split = instance.splitMutation(mutation, cm);
+
+ if (split.accordMutation != null)
+ {
+ if (result.accordMutations == null)
+ result.accordMutations = new ArrayList<>();
+ result.accordMutations.add(split.accordMutation);
+ }
+
+ if (split.untrackedMutation != null)
+ {
+ if (result.untrackedMutations == null)
+ result.untrackedMutations = new ArrayList<>();
+ result.untrackedMutations.add(split.untrackedMutation);
+ }
+
+ if (split.trackedMutation != null)
+ {
+ if (result.trackedMutations == null)
+ result.trackedMutations = new ArrayList<>();
+ result.trackedMutations.add(split.trackedMutation);
+ }
}
+
+ return result;
}
/**
- * Result of splitting a mutation across Accord and non-transactional
boundaries
+ * Result of splitting a mutation across Accord and untracked boundaries
*/
public static class SplitMutation<T extends IMutation>
{
@Nullable
public final T accordMutation;
@Nullable
- public final T normalMutation;
+ public final T untrackedMutation;
+ @Nullable
+ public final T trackedMutation;
- public SplitMutation(@Nullable T accordMutation, @Nullable T
normalMutation)
+ public SplitMutation(@Nullable T accordMutation, @Nullable T
untrackedMutation, @Nullable T trackedMutation)
{
this.accordMutation = accordMutation;
- this.normalMutation = normalMutation;
+ this.untrackedMutation = untrackedMutation;
+ this.trackedMutation = trackedMutation;
}
}
- public <T extends IMutation> SplitMutation<T>
splitMutationIntoAccordAndNormal(T mutation, ClusterMetadata cm)
+ public <T extends IMutation> SplitMutation<T> splitMutation(T mutation,
ClusterMetadata cm)
{
+ boolean isTracked = isTrackedMutation(mutation);
if (mutation.potentialTxnConflicts().allowed)
- return new SplitMutation<>(null, mutation);
+ return new SplitMutation<>(null, isTracked ? null : mutation,
isTracked ? mutation : null);
Token token = mutation.key().getToken();
Predicate<TableId> isAccordUpdate = tableId ->
tokenShouldBeWrittenThroughAccord(cm, tableId, token,
TransactionalMode::nonSerialWritesThroughAccord,
TransactionalMigrationFromMode::nonSerialWritesThroughAccord);
@@ -232,7 +288,8 @@ public class ConsensusMigrationMutationHelper
checkState((accordMutation == null ? false :
accordMutation.hasUpdateForTable(pu.metadata().id))
|| (normalMutation == null ? false :
normalMutation.hasUpdateForTable(pu.metadata().id)),
"All partition updates should still be present after
splitting");
- return new SplitMutation(accordMutation, normalMutation);
+
+ return new SplitMutation(accordMutation, isTracked ? null :
normalMutation, isTracked ? normalMutation : null);
}
public IAccordResult<TxnResult> mutateWithAccordAsync(ClusterMetadata cm,
Mutation mutation, @Nullable ConsistencyLevel consistencyLevel,
Dispatcher.RequestTime requestTime, PreserveTimestamp preserveTimestamps)
diff --git
a/test/distributed/org/apache/cassandra/distributed/test/TrackedBatchTest.java
b/test/distributed/org/apache/cassandra/distributed/test/TrackedBatchTest.java
new file mode 100644
index 0000000000..9b683cf7e2
--- /dev/null
+++
b/test/distributed/org/apache/cassandra/distributed/test/TrackedBatchTest.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test;
+
+import org.junit.Test;
+
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.ConsistencyLevel;
+import org.apache.cassandra.distributed.api.Feature;
+import org.apache.cassandra.distributed.api.IInvokableInstance;
+import org.apache.cassandra.schema.KeyspaceMetadata;
+import org.apache.cassandra.schema.ReplicationType;
+import org.apache.cassandra.schema.Schema;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Distributed tests for unlogged batches with mutation tracking.
+ * Tests the new capability to run batches against tracked keyspaces.
+ */
+public class TrackedBatchTest extends TestBaseImpl
+{
+ private static final String TRACKED_KS = "tracked_ks";
+ private static final String UNTRACKED_KS = "untracked_ks";
+
+ @Test
+ public void testMultipleTrackedMutations() throws Throwable
+ {
+ try (Cluster cluster = Cluster.build(3)
+ .withConfig(cfg ->
cfg.with(Feature.NETWORK)
+
.with(Feature.GOSSIP)
+
.set("mutation_tracking_enabled", "true"))
+ .start())
+ {
+ // Create tracked keyspace
+ cluster.schemaChange(withKeyspace("CREATE KEYSPACE " + TRACKED_KS
+ " WITH replication = " +
+ "{'class': 'SimpleStrategy',
'replication_factor': 3} " +
+ "AND
replication_type='tracked';"));
+ cluster.schemaChange("CREATE TABLE " + TRACKED_KS + ".tbl (k int
primary key, v int);");
+
+ // Verify keyspace is tracked
+ String keyspaceName = TRACKED_KS;
+ cluster.get(1).runOnInstance(() -> {
+ KeyspaceMetadata keyspace =
Schema.instance.getKeyspaceMetadata(keyspaceName);
+ assertEquals(ReplicationType.tracked,
keyspace.params.replicationType);
+ });
+
+ // Execute unlogged batch with multiple mutations to tracked
keyspace
+ String batchCql = "BEGIN UNLOGGED BATCH\n" +
+ " INSERT INTO " + TRACKED_KS + ".tbl (k, v)
VALUES (1, 100);\n" +
+ " INSERT INTO " + TRACKED_KS + ".tbl (k, v)
VALUES (2, 200);\n" +
+ " INSERT INTO " + TRACKED_KS + ".tbl (k, v)
VALUES (3, 300);\n" +
+ "APPLY BATCH";
+
+ cluster.coordinator(1).execute(batchCql, ConsistencyLevel.QUORUM);
+
+ // Verify all mutations succeeded
+ Object[][] result = cluster.coordinator(1).execute("SELECT * FROM
" + TRACKED_KS + ".tbl", ConsistencyLevel.QUORUM);
+ assertEquals(3, result.length);
+
+ // Verify data on all nodes (at RF=3, all nodes should have the
data)
+ for (int i = 1; i <= 3; i++)
+ {
+ IInvokableInstance node = cluster.get(i);
+ Object[][] nodeResult = node.executeInternal("SELECT * FROM "
+ TRACKED_KS + ".tbl");
+ assertEquals(3, nodeResult.length);
+ }
+ }
+ }
+
+ @Test
+ public void testMixedTrackedUntracked() throws Throwable
+ {
+ try (Cluster cluster = Cluster.build(3)
+ .withConfig(cfg ->
cfg.with(Feature.NETWORK)
+
.with(Feature.GOSSIP)
+
.set("mutation_tracking_enabled", "true"))
+ .start())
+ {
+ // Create tracked keyspace
+ cluster.schemaChange(withKeyspace("CREATE KEYSPACE " + TRACKED_KS
+ " WITH replication = " +
+ "{'class': 'SimpleStrategy',
'replication_factor': 3} " +
+ "AND
replication_type='tracked';"));
+ cluster.schemaChange("CREATE TABLE " + TRACKED_KS + ".tbl (k int
primary key, v int);");
+
+ // Create untracked keyspace
+ cluster.schemaChange(withKeyspace("CREATE KEYSPACE " +
UNTRACKED_KS + " WITH replication = " +
+ "{'class': 'SimpleStrategy',
'replication_factor': 3};"));
+ cluster.schemaChange("CREATE TABLE " + UNTRACKED_KS + ".tbl (k int
primary key, v int);");
+
+ // Verify keyspace types
+ String trackedKsName = TRACKED_KS;
+ String untrackedKsName = UNTRACKED_KS;
+ cluster.get(1).runOnInstance(() -> {
+ KeyspaceMetadata tracked =
Schema.instance.getKeyspaceMetadata(trackedKsName);
+ assertEquals(ReplicationType.tracked,
tracked.params.replicationType);
+
+ KeyspaceMetadata untracked =
Schema.instance.getKeyspaceMetadata(untrackedKsName);
+ assertEquals(ReplicationType.untracked,
untracked.params.replicationType);
+ });
+
+ // Execute mixed batch
+ String batchCql = "BEGIN UNLOGGED BATCH\n" +
+ " INSERT INTO " + TRACKED_KS + ".tbl (k, v)
VALUES (1, 100);\n" +
+ " INSERT INTO " + UNTRACKED_KS + ".tbl (k, v)
VALUES (2, 200);\n" +
+ " INSERT INTO " + TRACKED_KS + ".tbl (k, v)
VALUES (3, 300);\n" +
+ " INSERT INTO " + UNTRACKED_KS + ".tbl (k, v)
VALUES (4, 400);\n" +
+ "APPLY BATCH";
+
+ cluster.coordinator(1).execute(batchCql, ConsistencyLevel.QUORUM);
+
+ // Verify tracked keyspace mutations
+ Object[][] trackedResult = cluster.coordinator(1).execute("SELECT
* FROM " + TRACKED_KS + ".tbl", ConsistencyLevel.QUORUM);
+ assertEquals(2, trackedResult.length);
+
+ // Verify untracked keyspace mutations
+ Object[][] untrackedResult =
cluster.coordinator(1).execute("SELECT * FROM " + UNTRACKED_KS + ".tbl",
ConsistencyLevel.QUORUM);
+ assertEquals(2, untrackedResult.length);
+
+ // Verify data on all nodes
+ for (int i = 1; i <= 3; i++)
+ {
+ IInvokableInstance node = cluster.get(i);
+
+ Object[][] trackedNodeResult = node.executeInternal("SELECT *
FROM " + TRACKED_KS + ".tbl");
+ assertEquals(2, trackedNodeResult.length);
+
+ Object[][] untrackedNodeResult = node.executeInternal("SELECT
* FROM " + UNTRACKED_KS + ".tbl");
+ assertEquals(2, untrackedNodeResult.length);
+ }
+ }
+ }
+}
diff --git a/test/unit/org/apache/cassandra/hints/HintsServiceTest.java
b/test/unit/org/apache/cassandra/hints/HintsServiceTest.java
index 773b30f677..df71961483 100644
--- a/test/unit/org/apache/cassandra/hints/HintsServiceTest.java
+++ b/test/unit/org/apache/cassandra/hints/HintsServiceTest.java
@@ -220,16 +220,16 @@ public class HintsServiceTest
int count = 0;
@Override
- public <T extends IMutation> SplitMutation<T>
splitMutationIntoAccordAndNormal(T mutation, ClusterMetadata cm)
+ public <T extends IMutation> SplitMutation<T>
splitMutation(T mutation, ClusterMetadata cm)
{
if (count > 2)
- return
super.splitMutationIntoAccordAndNormal(mutation, cm);
+ return super.splitMutation(mutation, cm);
SplitMutation split;
if (count % 2 == 0)
- split = new SplitMutation(mutation, null);
+ split = new SplitMutation(mutation, null, null);
else
- split = new SplitMutation<>(null, mutation);
+ split = new SplitMutation<>(null, mutation, null);
count++;
return split;
}
diff --git
a/test/unit/org/apache/cassandra/service/ConsensusMigrationMutationHelperTest.java
b/test/unit/org/apache/cassandra/service/ConsensusMigrationMutationHelperTest.java
new file mode 100644
index 0000000000..d880771517
--- /dev/null
+++
b/test/unit/org/apache/cassandra/service/ConsensusMigrationMutationHelperTest.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.service;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.Mutation;
+import org.apache.cassandra.db.RowUpdateBuilder;
+import org.apache.cassandra.db.marshal.UTF8Type;
+import org.apache.cassandra.distributed.test.log.ClusterMetadataTestHelper;
+import org.apache.cassandra.schema.KeyspaceParams;
+import org.apache.cassandra.schema.ReplicationType;
+import org.apache.cassandra.schema.Schema;
+import org.apache.cassandra.schema.SchemaTransformations;
+import org.apache.cassandra.schema.TableMetadata;
+import
org.apache.cassandra.service.consensus.migration.ConsensusMigrationMutationHelper;
+import
org.apache.cassandra.service.consensus.migration.ConsensusMigrationMutationHelper.SplitMutations;
+import org.apache.cassandra.tcm.ClusterMetadata;
+import org.apache.cassandra.tcm.ClusterMetadataService;
+import org.apache.cassandra.tcm.transformations.AlterSchema;
+import org.apache.cassandra.utils.FBUtilities;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+
+/**
+ * Unit tests for ConsensusMigrationMutationHelper mutation splitting logic.
+ *
+ * Focuses on tracked vs untracked keyspace separation without testing Accord
integration.
+ */
+public class ConsensusMigrationMutationHelperTest
+{
+ private static final String TRACKED_KS = "tracked_ks";
+ private static final String UNTRACKED_KS = "untracked_ks";
+ private static final String TABLE = "test_table";
+
+ @BeforeClass
+ public static void setUpClass()
+ {
+ DatabaseDescriptor.daemonInitialization();
+ }
+
+ @Before
+ public void setUp() throws Exception
+ {
+ // Initialize cluster metadata service for each test
+ ClusterMetadataService.unsetInstance();
+
ClusterMetadataService.setInstance(ClusterMetadataTestHelper.syncInstanceForTest());
+
ClusterMetadataService.instance().log().unsafeBootstrapForTesting(FBUtilities.getBroadcastAddressAndPort());
+
+ // Create tracked keyspace with table
+ TableMetadata trackedTable = TableMetadata.builder(TRACKED_KS, TABLE)
+ .addPartitionKeyColumn("pk",
UTF8Type.instance)
+ .addRegularColumn("value",
UTF8Type.instance)
+ .build();
+ ClusterMetadataTestHelper.createKeyspace(TRACKED_KS,
KeyspaceParams.simple(3, ReplicationType.tracked));
+ ClusterMetadataTestHelper.commit(new
AlterSchema(SchemaTransformations.addTable(trackedTable, false)));
+
+ // Create untracked keyspace with table
+ TableMetadata untrackedTable = TableMetadata.builder(UNTRACKED_KS,
TABLE)
+
.addPartitionKeyColumn("pk", UTF8Type.instance)
+ .addRegularColumn("value",
UTF8Type.instance)
+ .build();
+ ClusterMetadataTestHelper.createKeyspace(UNTRACKED_KS,
KeyspaceParams.simple(3));
+ ClusterMetadataTestHelper.commit(new
AlterSchema(SchemaTransformations.addTable(untrackedTable, false)));
+ }
+
+ @Test
+ public void testSplitTrackedOnly()
+ {
+ ClusterMetadata cm = ClusterMetadata.current();
+ List<Mutation> mutations = new ArrayList<>();
+
+ // Create 3 mutations to tracked keyspace
+ mutations.add(createMutation(TRACKED_KS, "key1", "value1"));
+ mutations.add(createMutation(TRACKED_KS, "key2", "value2"));
+ mutations.add(createMutation(TRACKED_KS, "key3", "value3"));
+
+ SplitMutations<Mutation> split =
ConsensusMigrationMutationHelper.splitMutations(cm, mutations);
+
+ // All mutations should go to tracked bucket
+ assertNotNull(split.trackedMutations());
+ assertEquals(3, split.trackedMutations().size());
+
+ // Other buckets should be null
+ assertNull(split.untrackedMutations());
+ assertNull(split.accordMutations());
+ }
+
+ @Test
+ public void testSplitUntrackedOnly()
+ {
+ ClusterMetadata cm = ClusterMetadata.current();
+ List<Mutation> mutations = new ArrayList<>();
+
+ // Create 3 mutations to untracked keyspace
+ mutations.add(createMutation(UNTRACKED_KS, "key1", "value1"));
+ mutations.add(createMutation(UNTRACKED_KS, "key2", "value2"));
+ mutations.add(createMutation(UNTRACKED_KS, "key3", "value3"));
+
+ SplitMutations<Mutation> split =
ConsensusMigrationMutationHelper.splitMutations(cm, mutations);
+
+ // All mutations should go to untracked bucket
+ assertNotNull(split.untrackedMutations());
+ assertEquals(3, split.untrackedMutations().size());
+
+ // Other buckets should be null
+ assertNull(split.trackedMutations());
+ assertNull(split.accordMutations());
+ }
+
+ @Test
+ public void testSplitMixedTrackedUntracked()
+ {
+ ClusterMetadata cm = ClusterMetadata.current();
+ List<Mutation> mutations = new ArrayList<>();
+
+ // Create mixed mutations: 2 tracked, 2 untracked
+ mutations.add(createMutation(TRACKED_KS, "key1", "value1"));
+ mutations.add(createMutation(UNTRACKED_KS, "key2", "value2"));
+ mutations.add(createMutation(TRACKED_KS, "key3", "value3"));
+ mutations.add(createMutation(UNTRACKED_KS, "key4", "value4"));
+
+ SplitMutations<Mutation> split =
ConsensusMigrationMutationHelper.splitMutations(cm, mutations);
+
+ // Check tracked bucket
+ assertNotNull(split.trackedMutations());
+ assertEquals(2, split.trackedMutations().size());
+ assertEquals("key1",
UTF8Type.instance.compose(split.trackedMutations().get(0).key().getKey()));
+ assertEquals("key3",
UTF8Type.instance.compose(split.trackedMutations().get(1).key().getKey()));
+
+ // Check untracked bucket
+ assertNotNull(split.untrackedMutations());
+ assertEquals(2, split.untrackedMutations().size());
+ assertEquals("key2",
UTF8Type.instance.compose(split.untrackedMutations().get(0).key().getKey()));
+ assertEquals("key4",
UTF8Type.instance.compose(split.untrackedMutations().get(1).key().getKey()));
+
+ // Accord should be null
+ assertNull(split.accordMutations());
+ }
+
+ private Mutation createMutation(String keyspace, String partitionKey,
String value)
+ {
+ TableMetadata table = Schema.instance.getTableMetadata(keyspace,
TABLE);
+ return new RowUpdateBuilder(table, 0, partitionKey)
+ .add("value", value)
+ .build();
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]