This is an automated email from the ASF dual-hosted git repository.

bdeggleston pushed a commit to branch cep-45-mutation-tracking
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/cep-45-mutation-tracking by 
this push:
     new f31142bc77 CEP-45: Add support for unlogged batches
f31142bc77 is described below

commit f31142bc775a1ccbcd33d3d13a86ceceb34d2cde
Author: Blake Eggleston <[email protected]>
AuthorDate: Thu Oct 23 10:51:22 2025 -0700

    CEP-45: Add support for unlogged batches
    
    Patch by Blake Eggleston; Reviewed by Abe Ratnofsky for CASSANDRA-20957
---
 .../apache/cassandra/batchlog/BatchlogManager.java |  25 +--
 .../apache/cassandra/hints/HintsDispatcher.java    |   8 +-
 .../org/apache/cassandra/service/StorageProxy.java |  70 ++++++---
 .../ConsensusMigrationMutationHelper.java          | 111 +++++++++----
 .../distributed/test/TrackedBatchTest.java         | 149 ++++++++++++++++++
 .../apache/cassandra/hints/HintsServiceTest.java   |   8 +-
 .../ConsensusMigrationMutationHelperTest.java      | 171 +++++++++++++++++++++
 7 files changed, 478 insertions(+), 64 deletions(-)

diff --git a/src/java/org/apache/cassandra/batchlog/BatchlogManager.java 
b/src/java/org/apache/cassandra/batchlog/BatchlogManager.java
index 5be64ae884..1b2373e91a 100644
--- a/src/java/org/apache/cassandra/batchlog/BatchlogManager.java
+++ b/src/java/org/apache/cassandra/batchlog/BatchlogManager.java
@@ -54,6 +54,7 @@ import org.apache.cassandra.db.WriteType;
 import org.apache.cassandra.db.marshal.BytesType;
 import org.apache.cassandra.db.partitions.PartitionUpdate;
 import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.exceptions.InvalidRequestException;
 import org.apache.cassandra.exceptions.RetryOnDifferentSystemException;
 import org.apache.cassandra.exceptions.WriteFailureException;
 import org.apache.cassandra.exceptions.WriteTimeoutException;
@@ -393,7 +394,7 @@ public class BatchlogManager implements BatchlogManagerMBean
         private final TimeUUID id;
         private final long writtenAt;
         private final int unsplitGcGs;
-        private final List<Mutation> normalMutations;
+        private final List<Mutation> untrackedMutations;
         private final List<Mutation> accordMutations;
         private final int replayedBytes;
         private final ClusterMetadata cm;
@@ -410,10 +411,14 @@ public class BatchlogManager implements 
BatchlogManagerMBean
             List<Mutation> unsplitMutations = new 
ArrayList<>(serializedMutations.size());
             this.replayedBytes = addMutations(unsplitMutations, writtenAt, 
version, serializedMutations);
             unsplitGcGs = gcgs(unsplitMutations);
-            SplitMutations<Mutation> splitMutations = 
ConsensusMigrationMutationHelper.splitMutationsIntoAccordAndNormal(cm, 
unsplitMutations);
-            logger.trace("Replaying batch with Accord {} and normal {}", 
splitMutations.accordMutations(), splitMutations.normalMutations());
-            normalMutations = splitMutations.normalMutations();
+            SplitMutations<Mutation> splitMutations = 
ConsensusMigrationMutationHelper.splitMutations(cm, unsplitMutations);
+            logger.trace("Replaying batch with Accord {} and normal {}", 
splitMutations.accordMutations(), splitMutations.untrackedMutations());
+            untrackedMutations = splitMutations.untrackedMutations();
             accordMutations = splitMutations.accordMutations();
+
+            if (splitMutations.trackedMutations() != null)
+                throw new InvalidRequestException("Mutation tracking is 
currently unsupported with logged batches");
+
             if (accordMutations != null)
                 accordTxnStart = new 
Dispatcher.RequestTime(Clock.Global.nanoTime());
             this.cm = cm;
@@ -423,7 +428,7 @@ public class BatchlogManager implements BatchlogManagerMBean
         {
             logger.trace("Replaying batch {}", id);
 
-            if ((normalMutations == null || normalMutations.isEmpty()) && 
(accordMutations == null || accordMutations.isEmpty()))
+            if ((untrackedMutations == null || untrackedMutations.isEmpty()) 
&& (accordMutations == null || accordMutations.isEmpty()))
                 return false;
 
             if (MILLISECONDS.toSeconds(writtenAt) + unsplitGcGs <= 
FBUtilities.nowInSeconds())
@@ -435,8 +440,8 @@ public class BatchlogManager implements BatchlogManagerMBean
                 accordResult = accordMutations != null ? 
mutateWithAccordAsync(cm, accordMutations, null, accordTxnStart, 
PreserveTimestamp.yes) : null;
             }
 
-            if (normalMutations != null)
-                replayHandlers = sendReplays(normalMutations, writtenAt, 
hintedNodes);
+            if (untrackedMutations != null)
+                replayHandlers = sendReplays(untrackedMutations, writtenAt, 
hintedNodes);
 
             rateLimiter.acquire(replayedBytes); // acquire afterwards, to not 
mess up ttl calculation.
 
@@ -545,10 +550,10 @@ public class BatchlogManager implements 
BatchlogManagerMBean
 
         private void writeHintsForUndeliveredEndpoints(int startFrom, 
Set<UUID> hintedNodes)
         {
-            if (normalMutations == null)
+            if (untrackedMutations == null)
                 return;
 
-            int gcgs = gcgs(normalMutations);
+            int gcgs = gcgs(untrackedMutations);
 
             // expired
             if (MILLISECONDS.toSeconds(writtenAt) + gcgs <= 
FBUtilities.nowInSeconds())
@@ -558,7 +563,7 @@ public class BatchlogManager implements BatchlogManagerMBean
             for (int i = startFrom; i < replayHandlers.size(); i++)
             {
                 ReplayWriteResponseHandler<Mutation> handler = 
replayHandlers.get(i);
-                Mutation undeliveredMutation = normalMutations.get(i);
+                Mutation undeliveredMutation = untrackedMutations.get(i);
 
                 if (handler != null)
                 {
diff --git a/src/java/org/apache/cassandra/hints/HintsDispatcher.java 
b/src/java/org/apache/cassandra/hints/HintsDispatcher.java
index 3753b9334d..c15b46eaea 100644
--- a/src/java/org/apache/cassandra/hints/HintsDispatcher.java
+++ b/src/java/org/apache/cassandra/hints/HintsDispatcher.java
@@ -415,12 +415,14 @@ final class HintsDispatcher implements AutoCloseable
 
     private SplitHint splitHintIntoAccordAndNormal(ClusterMetadata cm, Hint 
hint)
     {
-        SplitMutation<Mutation> splitMutation = 
ConsensusMigrationMutationHelper.instance().splitMutationIntoAccordAndNormal(hint.mutation,
 cm);
+        SplitMutation<Mutation> splitMutation = 
ConsensusMigrationMutationHelper.instance().splitMutation(hint.mutation, cm);
+        if (splitMutation.trackedMutation != null)
+            throw new IllegalStateException("Cannot generate hints for tracked 
mutations");
         if (splitMutation.accordMutation == null)
             return new SplitHint(null, hint);
-        if (splitMutation.normalMutation == null)
+        if (splitMutation.untrackedMutation == null)
             return new SplitHint(splitMutation.accordMutation, null);
-        Hint normalHint = Hint.create(splitMutation.normalMutation, 
hint.creationTime, splitMutation.normalMutation.smallestGCGS());
+        Hint normalHint = Hint.create(splitMutation.untrackedMutation, 
hint.creationTime, splitMutation.untrackedMutation.smallestGCGS());
         return new SplitHint(splitMutation.accordMutation, normalHint);
     }
 
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java 
b/src/java/org/apache/cassandra/service/StorageProxy.java
index f3d52c422e..355d20d19a 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -150,6 +150,7 @@ import 
org.apache.cassandra.service.accord.txn.TxnRangeReadResult;
 import org.apache.cassandra.service.accord.txn.TxnRead;
 import org.apache.cassandra.service.accord.txn.TxnResult;
 import org.apache.cassandra.service.consensus.TransactionalMode;
+import 
org.apache.cassandra.service.consensus.migration.ConsensusMigrationMutationHelper;
 import 
org.apache.cassandra.service.consensus.migration.ConsensusMigrationMutationHelper.SplitConsumer;
 import 
org.apache.cassandra.service.consensus.migration.ConsensusMigrationMutationHelper.SplitMutations;
 import org.apache.cassandra.service.consensus.migration.ConsensusRequestRouter;
@@ -218,7 +219,7 @@ import static 
org.apache.cassandra.service.StorageProxy.ConsensusAttemptResult.s
 import static 
org.apache.cassandra.service.accord.txn.TxnResult.Kind.range_read;
 import static 
org.apache.cassandra.service.accord.txn.TxnResult.Kind.retry_new_protocol;
 import static 
org.apache.cassandra.service.consensus.migration.ConsensusMigrationMutationHelper.mutateWithAccordAsync;
-import static 
org.apache.cassandra.service.consensus.migration.ConsensusMigrationMutationHelper.splitMutationsIntoAccordAndNormal;
+import static 
org.apache.cassandra.service.consensus.migration.ConsensusMigrationMutationHelper.splitMutations;
 import static 
org.apache.cassandra.service.consensus.migration.ConsensusRequestRouter.getTableMetadata;
 import static 
org.apache.cassandra.service.consensus.migration.ConsensusRequestRouter.shouldReadEphemerally;
 import static 
org.apache.cassandra.service.consensus.migration.ConsensusRequestRouter.splitReadsIntoAccordAndNormal;
@@ -1328,14 +1329,11 @@ public class StorageProxy implements StorageProxyMBean
                 throw new InvalidRequestException("Mutation tracking is 
currently unsupported with triggers");
             if (mutateAtomically)
                 throw new InvalidRequestException("Mutation tracking is 
currently unsupported with logged batches");
-            if (mutations.size() > 1)
-                throw new InvalidRequestException("Mutation tracking is 
currently unsupported with unlogged batches");
             if (updatesView)
                 throw new InvalidRequestException("Mutation tracking is 
currently unsupported with materialized views");
-
-            mutateWithTracking((Mutation) mutations.get(0), consistencyLevel, 
requestTime);
         }
-        else if (augmented != null || mutateAtomically || updatesView)
+
+        if (augmented != null || mutateAtomically || updatesView)
             mutateAtomically(augmented != null ? augmented : 
(List<Mutation>)mutations, consistencyLevel, updatesView, requestTime);
         else
             dispatchMutationsWithRetryOnDifferentSystem(mutations, 
consistencyLevel, requestTime, preserveTimestamps);
@@ -1348,29 +1346,41 @@ public class StorageProxy implements StorageProxyMBean
             ClusterMetadata cm = ClusterMetadata.current();
             try
             {
-                SplitMutations<?> splitMutations = 
splitMutationsIntoAccordAndNormal(cm, (List<IMutation>)mutations);
+                SplitMutations<?> splitMutations = splitMutations(cm, 
(List<IMutation>)mutations);
+                List<? extends IMutation> trackedMutations = 
splitMutations.trackedMutations();
                 List<? extends IMutation> accordMutations = 
splitMutations.accordMutations();
-                List<? extends IMutation> normalMutations = 
splitMutations.normalMutations();
+                List<? extends IMutation> untrackedMutations = 
splitMutations.untrackedMutations();
                 // If there was ever any attempt to apply part of the mutation 
using the eventually consistent path
                 // then we need to continue to use the timestamp used by the 
eventually consistent path to not
                 // end up with multiple timestamps, but if it only ever used 
the transactional path then we can
                 // use the transactional timestamp to get linearizability
-                if (!preserveTimestamps.preserve && normalMutations != null)
+                if (!preserveTimestamps.preserve && (untrackedMutations != 
null || trackedMutations != null))
                     preserveTimestamps = PreserveTimestamp.yes;
                 // A BATCH statement has multiple mutations mixing server 
timestamps and `USING TIMESTAMP`,
                 // which is not linearizable for the writes to Accord tables.
                 if (accordMutations != null && preserveTimestamps == 
PreserveTimestamp.mixedTimeSource)
                     checkMixedTimeSourceHandling();
+
+                // Supports batches with multiple tracked mutations 
(previously limited to one).
+                List<AbstractWriteResponseHandler<?>> trackedHandlers = 
trackedMutations != null ? new ArrayList<>(trackedMutations.size()) : null;
+                if (trackedMutations != null)
+                {
+                    for (IMutation trackedMutation : trackedMutations)
+                    {
+                        
trackedHandlers.add(TrackedWriteRequest.perform((Mutation) trackedMutation, 
consistencyLevel, requestTime));
+                    }
+                }
+
                 IAccordResult<TxnResult> accordResult = accordMutations != 
null ? mutateWithAccordAsync(cm, accordMutations, consistencyLevel, 
requestTime, preserveTimestamps) : null;
-                Tracing.trace("Split mutations into Accord {} and normal {}", 
accordMutations, normalMutations);
+                Tracing.trace("Split mutations into tracked {}, Accord {}, and 
untracked {}", trackedMutations, accordMutations, untrackedMutations);
 
                 Throwable failure = null;
                 try
                 {
-                    if (normalMutations != null)
+                    if (untrackedMutations != null)
                     {
-                        mutate(normalMutations, consistencyLevel, requestTime);
-                        Tracing.trace("Successfully wrote normal mutations");
+                        mutate(untrackedMutations, consistencyLevel, 
requestTime);
+                        Tracing.trace("Successfully wrote untracked 
mutations");
                     }
                 }
                 catch (RetryOnDifferentSystemException e)
@@ -1378,7 +1388,7 @@ public class StorageProxy implements StorageProxyMBean
                     writeMetrics.retryDifferentSystem.mark();
                     
writeMetricsForLevel(consistencyLevel).retryDifferentSystem.mark();
                     logger.debug("Retrying mutations on different system 
because some mutations were misrouted according to Cassandra");
-                    Tracing.trace("Got {} from normal mutations, will retry", 
e);
+                    Tracing.trace("Got {} from untracked mutations, will 
retry", e);
                     continue;
                 }
                 catch (CoordinatorBehindException e)
@@ -1387,7 +1397,7 @@ public class StorageProxy implements StorageProxyMBean
                     
writeMetricsForLevel(consistencyLevel).retryCoordinatorBehind.mark();
                     
mutations.forEach(IMutation::clearCachedSerializationsForRetry);
                     logger.debug("Retrying mutations now that coordinator has 
caught up to cluster metadata");
-                    Tracing.trace("Got {} from normal mutations, will retry", 
e);
+                    Tracing.trace("Got {} from untracked mutations, will 
retry", e);
                     continue;
                 }
                 catch (Exception e)
@@ -1395,6 +1405,22 @@ public class StorageProxy implements StorageProxyMBean
                     failure = Throwables.merge(failure, e);
                 }
 
+                try
+                {
+                    if (trackedHandlers != null)
+                    {
+                        for (AbstractWriteResponseHandler<?> handler : 
trackedHandlers)
+                        {
+                            handler.get();
+                        }
+                        Tracing.trace("Successfully wrote tracked mutations");
+                    }
+                }
+                catch (Exception e)
+                {
+                    failure = Throwables.merge(failure, e);
+                }
+
                 // Check if the Accord mutations succeeded asynchronously
                 try
                 {
@@ -1536,17 +1562,21 @@ public class StorageProxy implements StorageProxyMBean
                 BatchlogCleanup cleanup = new BatchlogCleanup(() -> 
asyncRemoveFromBatchlog(batchlogReplicaPlan, batchUUID, requestTime));
 
                 // add a handler for each mutation that will not be written on 
Accord - includes checking availability, but doesn't initiate any writes, yet
-                SplitConsumer<Mutation> splitConsumer = (accordMutation, 
normalMutation, originalMutations, mutationIndex) -> {
-                    Mutation eitherMutation = normalMutation != null ? 
normalMutation : accordMutation;
+                SplitConsumer<Mutation> splitConsumer = (accordMutation, 
untrackedMutation, trackedMutation, originalMutations, mutationIndex) -> {
+                    Mutation eitherMutation = untrackedMutation != null ? 
untrackedMutation : accordMutation;
                     Keyspace keyspace = 
Keyspace.open(eitherMutation.getKeyspaceName());
                     Token tk = eitherMutation.key().getToken();
 
                     if (accordMutation != null)
                         accordMutations.add(accordMutation);
 
-                    if (normalMutation == null)
+                    if (untrackedMutation == null && trackedMutation == null)
                         return;
 
+                    if (trackedMutation != null)
+                        throw new InvalidRequestException("Mutation tracking 
is currently unsupported with logged batches");
+
+
                     // Always construct the replica plan to check availability
                     ReplicaPlan.ForWrite dataReplicaPlan = 
ReplicaPlans.forWrite(cm, keyspace, consistencyLevel, tk, 
ReplicaPlans.writeAll);
 
@@ -1555,7 +1585,7 @@ public class StorageProxy implements StorageProxyMBean
                     else
                         writeMetrics.remoteRequests.mark();
 
-                    WriteResponseHandlerWrapper wrapper = 
wrapBatchResponseHandler(normalMutation,
+                    WriteResponseHandlerWrapper wrapper = 
wrapBatchResponseHandler(untrackedMutation,
                                                                                
    dataReplicaPlan,
                                                                                
    batchConsistencyLevel,
                                                                                
    WriteType.BATCH,
@@ -1563,7 +1593,7 @@ public class StorageProxy implements StorageProxyMBean
                                                                                
    requestTime);
                     wrappers.add(wrapper);
                 };
-                splitMutationsIntoAccordAndNormal(cm, mutations,  
splitConsumer);
+                ConsensusMigrationMutationHelper.splitMutations(cm, mutations, 
splitConsumer);
                 attributeNonAccordLatency = !wrappers.isEmpty();
                 cleanup.setMutationsWaitingFor(wrappers.size() + 
(accordMutations.isEmpty() ? 0 : 1));
                 Tracing.trace("Split batch into Accord {} and normal {}", 
accordMutations, wrappers);
diff --git 
a/src/java/org/apache/cassandra/service/consensus/migration/ConsensusMigrationMutationHelper.java
 
b/src/java/org/apache/cassandra/service/consensus/migration/ConsensusMigrationMutationHelper.java
index c319daee9b..282eca84eb 100644
--- 
a/src/java/org/apache/cassandra/service/consensus/migration/ConsensusMigrationMutationHelper.java
+++ 
b/src/java/org/apache/cassandra/service/consensus/migration/ConsensusMigrationMutationHelper.java
@@ -130,38 +130,48 @@ public class ConsensusMigrationMutationHelper
     }
 
     /**
-     * Result of splitting mutations across Accord and non-transactional 
boundaries
+     * Result of splitting mutations for different replication systems: 
tracked / untracked / accord
      */
     public static class SplitMutations<T extends IMutation> implements 
SplitConsumer<T>
     {
+        @Nullable
+        private List<T> trackedMutations;
+
         @Nullable
         private List<T> accordMutations;
 
         @Nullable
-        private List<T> normalMutations;
+        private List<T> untrackedMutations;
 
         private SplitMutations() {}
 
+        public List<T> trackedMutations()
+        {
+            return trackedMutations;
+        }
+
         public List<T> accordMutations()
         {
             return accordMutations;
         }
 
-        public List<T> normalMutations()
+        public List<T> untrackedMutations()
         {
-            return normalMutations;
+            return untrackedMutations;
         }
 
         @Override
-        public void consume(@Nullable T accordMutation, @Nullable T 
normalMutation, List<T> mutations, int mutationIndex)
+        public void consume(@Nullable T accordMutation, @Nullable T 
untrackedMutation, @Nullable T trackedMutation, List<T> mutations, int 
mutationIndex)
         {
             // Avoid allocating an ArrayList in common single mutation single 
system case
-            if (mutations.size() == 1 && (accordMutation != null ^ 
normalMutation != null))
+            if (mutations.size() == 1 && (accordMutation != null ^ 
(untrackedMutation != null || trackedMutation != null)))
             {
                 if (accordMutation != null)
                     accordMutations = mutations;
+                else if (untrackedMutation != null)
+                    untrackedMutations = mutations;
                 else
-                    normalMutations = mutations;
+                    trackedMutations = mutations;
                 return;
             }
 
@@ -171,57 +181,103 @@ public class ConsensusMigrationMutationHelper
                     accordMutations = new 
ArrayList<>(Math.min(mutations.size(), 10));
                 accordMutations.add(accordMutation);
             }
-            if (normalMutation != null)
+            if (untrackedMutation != null)
+            {
+                if (untrackedMutations == null)
+                    untrackedMutations = new 
ArrayList<>(Math.min(mutations.size(), 10));
+                untrackedMutations.add(untrackedMutation);
+            }
+
+            if (trackedMutation != null)
             {
-                if (normalMutations == null)
-                    normalMutations = new 
ArrayList<>(Math.min(mutations.size(), 10));
-                normalMutations.add(normalMutation);
+                if (trackedMutations == null)
+                    trackedMutations = new 
ArrayList<>(Math.min(mutations.size(), 10));
+                trackedMutations.add(trackedMutation);
             }
         }
     }
 
     public interface SplitConsumer<T extends IMutation>
     {
-        void consume(@Nullable T accordMutation, @Nullable T normalMutation, 
List<T> mutations, int mutationIndex);
+        void consume(@Nullable T accordMutation, @Nullable T 
untrackedMutation, @Nullable T trackedMutation, List<T> mutations, int 
mutationIndex);
     }
 
-    public static <T extends IMutation> SplitMutations<T> 
splitMutationsIntoAccordAndNormal(ClusterMetadata cm, List<T> mutations)
+    public static <T extends IMutation> void splitMutations(ClusterMetadata 
cm, List<T> mutations, SplitConsumer<T> splitConsumer)
     {
-        SplitMutations<T> splitMutations = new SplitMutations<>();
-        splitMutationsIntoAccordAndNormal(cm, mutations, splitMutations);
-        return splitMutations;
+        for (int i=0,mi=mutations.size(); i<mi; i++)
+        {
+            SplitMutation<T> splitMutation = 
instance.splitMutation(mutations.get(i), cm);
+            splitConsumer.consume(splitMutation.accordMutation, 
splitMutation.untrackedMutation, splitMutation.trackedMutation, mutations, i);
+        }
     }
 
-    public static <T extends IMutation> void 
splitMutationsIntoAccordAndNormal(ClusterMetadata cm, List<T> mutations, 
SplitConsumer<T> splitConsumer)
+    private static boolean isTrackedMutation(IMutation mutation)
     {
-        for (int i=0,mi=mutations.size(); i<mi; i++)
+        return 
Schema.instance.getKeyspaceMetadata(mutation.getKeyspaceName()).params.replicationType.isTracked();
+    }
+
+
+    /**
+     * Splits mutations into tracked/untracked/accord mutations
+     */
+    public static <T extends IMutation> SplitMutations<T> 
splitMutations(ClusterMetadata cm, List<T> mutations)
+    {
+        SplitMutations<T> result = new SplitMutations<>();
+
+        for (T mutation : mutations)
         {
-            SplitMutation<T> splitMutation = 
instance.splitMutationIntoAccordAndNormal(mutations.get(i), cm);
-            splitConsumer.consume(splitMutation.accordMutation, 
splitMutation.normalMutation, mutations, i);
+            SplitMutation<T> split = instance.splitMutation(mutation, cm);
+
+            if (split.accordMutation != null)
+            {
+                if (result.accordMutations == null)
+                    result.accordMutations = new ArrayList<>();
+                result.accordMutations.add(split.accordMutation);
+            }
+
+            if (split.untrackedMutation != null)
+            {
+                if (result.untrackedMutations == null)
+                    result.untrackedMutations = new ArrayList<>();
+                result.untrackedMutations.add(split.untrackedMutation);
+            }
+
+            if (split.trackedMutation != null)
+            {
+                if (result.trackedMutations == null)
+                    result.trackedMutations = new ArrayList<>();
+                result.trackedMutations.add(split.trackedMutation);
+            }
         }
+
+        return result;
     }
 
     /**
-     * Result of splitting a mutation across Accord and non-transactional 
boundaries
+     * Result of splitting a mutation across Accord and untracked boundaries
      */
     public static class SplitMutation<T extends IMutation>
     {
         @Nullable
         public final T accordMutation;
         @Nullable
-        public final T normalMutation;
+        public final T untrackedMutation;
+        @Nullable
+        public final T trackedMutation;
 
-        public SplitMutation(@Nullable T accordMutation, @Nullable T 
normalMutation)
+        public SplitMutation(@Nullable T accordMutation, @Nullable T 
untrackedMutation, @Nullable T trackedMutation)
         {
             this.accordMutation = accordMutation;
-            this.normalMutation = normalMutation;
+            this.untrackedMutation = untrackedMutation;
+            this.trackedMutation = trackedMutation;
         }
     }
 
-    public <T extends IMutation> SplitMutation<T> 
splitMutationIntoAccordAndNormal(T mutation, ClusterMetadata cm)
+    public <T extends IMutation> SplitMutation<T> splitMutation(T mutation, 
ClusterMetadata cm)
     {
+        boolean isTracked = isTrackedMutation(mutation);
         if (mutation.potentialTxnConflicts().allowed)
-            return new SplitMutation<>(null, mutation);
+            return new SplitMutation<>(null, isTracked ? null : mutation, 
isTracked ? mutation : null);
 
         Token token = mutation.key().getToken();
         Predicate<TableId> isAccordUpdate = tableId -> 
tokenShouldBeWrittenThroughAccord(cm, tableId, token, 
TransactionalMode::nonSerialWritesThroughAccord, 
TransactionalMigrationFromMode::nonSerialWritesThroughAccord);
@@ -232,7 +288,8 @@ public class ConsensusMigrationMutationHelper
             checkState((accordMutation == null ? false : 
accordMutation.hasUpdateForTable(pu.metadata().id))
                        || (normalMutation == null ? false : 
normalMutation.hasUpdateForTable(pu.metadata().id)),
                        "All partition updates should still be present after 
splitting");
-        return new SplitMutation(accordMutation, normalMutation);
+
+        return new SplitMutation(accordMutation, isTracked ? null : 
normalMutation, isTracked ? normalMutation : null);
     }
 
     public IAccordResult<TxnResult> mutateWithAccordAsync(ClusterMetadata cm, 
Mutation mutation, @Nullable ConsistencyLevel consistencyLevel, 
Dispatcher.RequestTime requestTime, PreserveTimestamp preserveTimestamps)
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/TrackedBatchTest.java 
b/test/distributed/org/apache/cassandra/distributed/test/TrackedBatchTest.java
new file mode 100644
index 0000000000..9b683cf7e2
--- /dev/null
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/TrackedBatchTest.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test;
+
+import org.junit.Test;
+
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.ConsistencyLevel;
+import org.apache.cassandra.distributed.api.Feature;
+import org.apache.cassandra.distributed.api.IInvokableInstance;
+import org.apache.cassandra.schema.KeyspaceMetadata;
+import org.apache.cassandra.schema.ReplicationType;
+import org.apache.cassandra.schema.Schema;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Distributed tests for unlogged batches with mutation tracking.
+ * Tests the new capability to run batches against tracked keyspaces.
+ */
+public class TrackedBatchTest extends TestBaseImpl
+{
+    private static final String TRACKED_KS = "tracked_ks";
+    private static final String UNTRACKED_KS = "untracked_ks";
+
+    @Test
+    public void testMultipleTrackedMutations() throws Throwable
+    {
+        try (Cluster cluster = Cluster.build(3)
+                                      .withConfig(cfg -> 
cfg.with(Feature.NETWORK)
+                                                            
.with(Feature.GOSSIP)
+                                                            
.set("mutation_tracking_enabled", "true"))
+                                      .start())
+        {
+            // Create tracked keyspace
+            cluster.schemaChange(withKeyspace("CREATE KEYSPACE " + TRACKED_KS 
+ " WITH replication = " +
+                                              "{'class': 'SimpleStrategy', 
'replication_factor': 3} " +
+                                              "AND 
replication_type='tracked';"));
+            cluster.schemaChange("CREATE TABLE " + TRACKED_KS + ".tbl (k int 
primary key, v int);");
+
+            // Verify keyspace is tracked
+            String keyspaceName = TRACKED_KS;
+            cluster.get(1).runOnInstance(() -> {
+                KeyspaceMetadata keyspace = 
Schema.instance.getKeyspaceMetadata(keyspaceName);
+                assertEquals(ReplicationType.tracked, 
keyspace.params.replicationType);
+            });
+
+            // Execute unlogged batch with multiple mutations to tracked 
keyspace
+            String batchCql = "BEGIN UNLOGGED BATCH\n" +
+                              "  INSERT INTO " + TRACKED_KS + ".tbl (k, v) 
VALUES (1, 100);\n" +
+                              "  INSERT INTO " + TRACKED_KS + ".tbl (k, v) 
VALUES (2, 200);\n" +
+                              "  INSERT INTO " + TRACKED_KS + ".tbl (k, v) 
VALUES (3, 300);\n" +
+                              "APPLY BATCH";
+
+            cluster.coordinator(1).execute(batchCql, ConsistencyLevel.QUORUM);
+
+            // Verify all mutations succeeded
+            Object[][] result = cluster.coordinator(1).execute("SELECT * FROM 
" + TRACKED_KS + ".tbl", ConsistencyLevel.QUORUM);
+            assertEquals(3, result.length);
+
+            // Verify data on all nodes (at RF=3, all nodes should have the 
data)
+            for (int i = 1; i <= 3; i++)
+            {
+                IInvokableInstance node = cluster.get(i);
+                Object[][] nodeResult = node.executeInternal("SELECT * FROM " 
+ TRACKED_KS + ".tbl");
+                assertEquals(3, nodeResult.length);
+            }
+        }
+    }
+
+    @Test
+    public void testMixedTrackedUntracked() throws Throwable
+    {
+        try (Cluster cluster = Cluster.build(3)
+                                      .withConfig(cfg -> 
cfg.with(Feature.NETWORK)
+                                                            
.with(Feature.GOSSIP)
+                                                            
.set("mutation_tracking_enabled", "true"))
+                                      .start())
+        {
+            // Create tracked keyspace
+            cluster.schemaChange(withKeyspace("CREATE KEYSPACE " + TRACKED_KS 
+ " WITH replication = " +
+                                              "{'class': 'SimpleStrategy', 
'replication_factor': 3} " +
+                                              "AND 
replication_type='tracked';"));
+            cluster.schemaChange("CREATE TABLE " + TRACKED_KS + ".tbl (k int 
primary key, v int);");
+
+            // Create untracked keyspace
+            cluster.schemaChange(withKeyspace("CREATE KEYSPACE " + 
UNTRACKED_KS + " WITH replication = " +
+                                              "{'class': 'SimpleStrategy', 
'replication_factor': 3};"));
+            cluster.schemaChange("CREATE TABLE " + UNTRACKED_KS + ".tbl (k int 
primary key, v int);");
+
+            // Verify keyspace types
+            String trackedKsName = TRACKED_KS;
+            String untrackedKsName = UNTRACKED_KS;
+            cluster.get(1).runOnInstance(() -> {
+                KeyspaceMetadata tracked = 
Schema.instance.getKeyspaceMetadata(trackedKsName);
+                assertEquals(ReplicationType.tracked, 
tracked.params.replicationType);
+
+                KeyspaceMetadata untracked = 
Schema.instance.getKeyspaceMetadata(untrackedKsName);
+                assertEquals(ReplicationType.untracked, 
untracked.params.replicationType);
+            });
+
+            // Execute mixed batch
+            String batchCql = "BEGIN UNLOGGED BATCH\n" +
+                              "  INSERT INTO " + TRACKED_KS + ".tbl (k, v) 
VALUES (1, 100);\n" +
+                              "  INSERT INTO " + UNTRACKED_KS + ".tbl (k, v) 
VALUES (2, 200);\n" +
+                              "  INSERT INTO " + TRACKED_KS + ".tbl (k, v) 
VALUES (3, 300);\n" +
+                              "  INSERT INTO " + UNTRACKED_KS + ".tbl (k, v) 
VALUES (4, 400);\n" +
+                              "APPLY BATCH";
+
+            cluster.coordinator(1).execute(batchCql, ConsistencyLevel.QUORUM);
+
+            // Verify tracked keyspace mutations
+            Object[][] trackedResult = cluster.coordinator(1).execute("SELECT 
* FROM " + TRACKED_KS + ".tbl", ConsistencyLevel.QUORUM);
+            assertEquals(2, trackedResult.length);
+
+            // Verify untracked keyspace mutations
+            Object[][] untrackedResult = 
cluster.coordinator(1).execute("SELECT * FROM " + UNTRACKED_KS + ".tbl", 
ConsistencyLevel.QUORUM);
+            assertEquals(2, untrackedResult.length);
+
+            // Verify data on all nodes
+            for (int i = 1; i <= 3; i++)
+            {
+                IInvokableInstance node = cluster.get(i);
+
+                Object[][] trackedNodeResult = node.executeInternal("SELECT * 
FROM " + TRACKED_KS + ".tbl");
+                assertEquals(2, trackedNodeResult.length);
+
+                Object[][] untrackedNodeResult = node.executeInternal("SELECT 
* FROM " + UNTRACKED_KS + ".tbl");
+                assertEquals(2, untrackedNodeResult.length);
+            }
+        }
+    }
+}
diff --git a/test/unit/org/apache/cassandra/hints/HintsServiceTest.java 
b/test/unit/org/apache/cassandra/hints/HintsServiceTest.java
index 773b30f677..df71961483 100644
--- a/test/unit/org/apache/cassandra/hints/HintsServiceTest.java
+++ b/test/unit/org/apache/cassandra/hints/HintsServiceTest.java
@@ -220,16 +220,16 @@ public class HintsServiceTest
                     int count = 0;
 
                     @Override
-                    public <T extends IMutation> SplitMutation<T> 
splitMutationIntoAccordAndNormal(T mutation, ClusterMetadata cm)
+                    public <T extends IMutation> SplitMutation<T> 
splitMutation(T mutation, ClusterMetadata cm)
                     {
                         if (count > 2)
-                            return 
super.splitMutationIntoAccordAndNormal(mutation, cm);
+                            return super.splitMutation(mutation, cm);
 
                         SplitMutation split;
                         if (count % 2 == 0)
-                            split = new SplitMutation(mutation, null);
+                            split = new SplitMutation(mutation, null, null);
                         else
-                            split = new SplitMutation<>(null, mutation);
+                            split = new SplitMutation<>(null, mutation, null);
                         count++;
                         return split;
                     }
diff --git 
a/test/unit/org/apache/cassandra/service/ConsensusMigrationMutationHelperTest.java
 
b/test/unit/org/apache/cassandra/service/ConsensusMigrationMutationHelperTest.java
new file mode 100644
index 0000000000..d880771517
--- /dev/null
+++ 
b/test/unit/org/apache/cassandra/service/ConsensusMigrationMutationHelperTest.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.service;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.Mutation;
+import org.apache.cassandra.db.RowUpdateBuilder;
+import org.apache.cassandra.db.marshal.UTF8Type;
+import org.apache.cassandra.distributed.test.log.ClusterMetadataTestHelper;
+import org.apache.cassandra.schema.KeyspaceParams;
+import org.apache.cassandra.schema.ReplicationType;
+import org.apache.cassandra.schema.Schema;
+import org.apache.cassandra.schema.SchemaTransformations;
+import org.apache.cassandra.schema.TableMetadata;
+import 
org.apache.cassandra.service.consensus.migration.ConsensusMigrationMutationHelper;
+import 
org.apache.cassandra.service.consensus.migration.ConsensusMigrationMutationHelper.SplitMutations;
+import org.apache.cassandra.tcm.ClusterMetadata;
+import org.apache.cassandra.tcm.ClusterMetadataService;
+import org.apache.cassandra.tcm.transformations.AlterSchema;
+import org.apache.cassandra.utils.FBUtilities;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+
+/**
+ * Unit tests for ConsensusMigrationMutationHelper mutation splitting logic.
+ *
+ * Focuses on tracked vs untracked keyspace separation without testing Accord 
integration.
+ */
+public class ConsensusMigrationMutationHelperTest
+{
+    private static final String TRACKED_KS = "tracked_ks";
+    private static final String UNTRACKED_KS = "untracked_ks";
+    private static final String TABLE = "test_table";
+
+    @BeforeClass
+    public static void setUpClass()
+    {
+        DatabaseDescriptor.daemonInitialization();
+    }
+
+    @Before
+    public void setUp() throws Exception
+    {
+        // Initialize cluster metadata service for each test
+        ClusterMetadataService.unsetInstance();
+        
ClusterMetadataService.setInstance(ClusterMetadataTestHelper.syncInstanceForTest());
+        
ClusterMetadataService.instance().log().unsafeBootstrapForTesting(FBUtilities.getBroadcastAddressAndPort());
+
+        // Create tracked keyspace with table
+        TableMetadata trackedTable = TableMetadata.builder(TRACKED_KS, TABLE)
+                                                  .addPartitionKeyColumn("pk", 
UTF8Type.instance)
+                                                  .addRegularColumn("value", 
UTF8Type.instance)
+                                                  .build();
+        ClusterMetadataTestHelper.createKeyspace(TRACKED_KS, 
KeyspaceParams.simple(3, ReplicationType.tracked));
+        ClusterMetadataTestHelper.commit(new 
AlterSchema(SchemaTransformations.addTable(trackedTable, false)));
+
+        // Create untracked keyspace with table
+        TableMetadata untrackedTable = TableMetadata.builder(UNTRACKED_KS, 
TABLE)
+                                                    
.addPartitionKeyColumn("pk", UTF8Type.instance)
+                                                    .addRegularColumn("value", 
UTF8Type.instance)
+                                                    .build();
+        ClusterMetadataTestHelper.createKeyspace(UNTRACKED_KS, 
KeyspaceParams.simple(3));
+        ClusterMetadataTestHelper.commit(new 
AlterSchema(SchemaTransformations.addTable(untrackedTable, false)));
+    }
+
+    @Test
+    public void testSplitTrackedOnly()
+    {
+        ClusterMetadata cm = ClusterMetadata.current();
+        List<Mutation> mutations = new ArrayList<>();
+
+        // Create 3 mutations to tracked keyspace
+        mutations.add(createMutation(TRACKED_KS, "key1", "value1"));
+        mutations.add(createMutation(TRACKED_KS, "key2", "value2"));
+        mutations.add(createMutation(TRACKED_KS, "key3", "value3"));
+
+        SplitMutations<Mutation> split = 
ConsensusMigrationMutationHelper.splitMutations(cm, mutations);
+
+        // All mutations should go to tracked bucket
+        assertNotNull(split.trackedMutations());
+        assertEquals(3, split.trackedMutations().size());
+
+        // Other buckets should be null
+        assertNull(split.untrackedMutations());
+        assertNull(split.accordMutations());
+    }
+
+    @Test
+    public void testSplitUntrackedOnly()
+    {
+        ClusterMetadata cm = ClusterMetadata.current();
+        List<Mutation> mutations = new ArrayList<>();
+
+        // Create 3 mutations to untracked keyspace
+        mutations.add(createMutation(UNTRACKED_KS, "key1", "value1"));
+        mutations.add(createMutation(UNTRACKED_KS, "key2", "value2"));
+        mutations.add(createMutation(UNTRACKED_KS, "key3", "value3"));
+
+        SplitMutations<Mutation> split = 
ConsensusMigrationMutationHelper.splitMutations(cm, mutations);
+
+        // All mutations should go to untracked bucket
+        assertNotNull(split.untrackedMutations());
+        assertEquals(3, split.untrackedMutations().size());
+
+        // Other buckets should be null
+        assertNull(split.trackedMutations());
+        assertNull(split.accordMutations());
+    }
+
+    @Test
+    public void testSplitMixedTrackedUntracked()
+    {
+        ClusterMetadata cm = ClusterMetadata.current();
+        List<Mutation> mutations = new ArrayList<>();
+
+        // Create mixed mutations: 2 tracked, 2 untracked
+        mutations.add(createMutation(TRACKED_KS, "key1", "value1"));
+        mutations.add(createMutation(UNTRACKED_KS, "key2", "value2"));
+        mutations.add(createMutation(TRACKED_KS, "key3", "value3"));
+        mutations.add(createMutation(UNTRACKED_KS, "key4", "value4"));
+
+        SplitMutations<Mutation> split = 
ConsensusMigrationMutationHelper.splitMutations(cm, mutations);
+
+        // Check tracked bucket
+        assertNotNull(split.trackedMutations());
+        assertEquals(2, split.trackedMutations().size());
+        assertEquals("key1", 
UTF8Type.instance.compose(split.trackedMutations().get(0).key().getKey()));
+        assertEquals("key3", 
UTF8Type.instance.compose(split.trackedMutations().get(1).key().getKey()));
+
+        // Check untracked bucket
+        assertNotNull(split.untrackedMutations());
+        assertEquals(2, split.untrackedMutations().size());
+        assertEquals("key2", 
UTF8Type.instance.compose(split.untrackedMutations().get(0).key().getKey()));
+        assertEquals("key4", 
UTF8Type.instance.compose(split.untrackedMutations().get(1).key().getKey()));
+
+        // Accord should be null
+        assertNull(split.accordMutations());
+    }
+
+    private Mutation createMutation(String keyspace, String partitionKey, 
String value)
+    {
+        TableMetadata table = Schema.instance.getTableMetadata(keyspace, 
TABLE);
+        return new RowUpdateBuilder(table, 0, partitionKey)
+               .add("value", value)
+               .build();
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to