This is an automated email from the ASF dual-hosted git repository.

aber pushed a commit to branch cep-45-mutation-tracking
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/cep-45-mutation-tracking by 
this push:
     new 4b5bed734d CEP-45: Add metrics for Mutation Tracking
4b5bed734d is described below

commit 4b5bed734d34e47ce7dcdf0b3bc8e3bbf36d285e
Author: Aparna Naik <[email protected]>
AuthorDate: Wed Nov 5 15:42:12 2025 -0800

    CEP-45: Add metrics for Mutation Tracking
    
    patch by Aparna Naik; reviewed by Abe Ratnofsky and Blake Eggleston for 
CASSANDRA-20986
---
 src/java/org/apache/cassandra/journal/Journal.java |  16 +
 .../metrics/CassandraMetricsRegistry.java          |   1 +
 .../cassandra/metrics/MutationTrackingMetrics.java |  57 ++++
 .../cassandra/replication/CoordinatorLog.java      |  26 ++
 .../cassandra/replication/MutationJournal.java     |   5 +
 .../replication/MutationTrackingService.java       |  23 +-
 .../org/apache/cassandra/replication/Shard.java    |  10 +
 .../replication/UnreconciledMutations.java         |   5 +
 .../test/tracking/MutationTrackingMetricsTest.java | 357 +++++++++++++++++++++
 9 files changed, 498 insertions(+), 2 deletions(-)

diff --git a/src/java/org/apache/cassandra/journal/Journal.java 
b/src/java/org/apache/cassandra/journal/Journal.java
index 136d70a9dc..c404c325be 100644
--- a/src/java/org/apache/cassandra/journal/Journal.java
+++ b/src/java/org/apache/cassandra/journal/Journal.java
@@ -832,6 +832,22 @@ public class Journal<K, V> implements Shutdownable
         }
     }
 
+    public long getDiskSpaceUsed()
+    {
+        long totalSize = 0;
+
+        try (ReferencedSegments<K, V> refs = selectAndReference(s -> true))
+        {
+            for (Segment<K, V> segment : refs.all())
+            {
+                File dataFile = segment.descriptor.fileFor(Component.DATA);
+                if (dataFile.exists())
+                    totalSize += dataFile.length();
+            }
+        }
+        return totalSize;
+    }
+
     private ActiveSegment<K, V> createSegment()
     {
         Descriptor descriptor = Descriptor.create(directory, 
nextSegmentId.getAndIncrement(), params.userVersion());
diff --git 
a/src/java/org/apache/cassandra/metrics/CassandraMetricsRegistry.java 
b/src/java/org/apache/cassandra/metrics/CassandraMetricsRegistry.java
index 44bcf2d6cf..133acedcbf 100644
--- a/src/java/org/apache/cassandra/metrics/CassandraMetricsRegistry.java
+++ b/src/java/org/apache/cassandra/metrics/CassandraMetricsRegistry.java
@@ -139,6 +139,7 @@ public class CassandraMetricsRegistry extends MetricRegistry
                                    .add(KeyspaceMetrics.TYPE_NAME)
                                    .add(MemtablePool.TYPE_NAME)
                                    .add(MessagingMetrics.TYPE_NAME)
+                                   .add(MutationTrackingMetrics.TYPE_NAME)
                                    .add(MutualTlsMetrics.TYPE_NAME)
                                    .add(PaxosMetrics.TYPE_NAME)
                                    .add(ReadRepairMetrics.TYPE_NAME)
diff --git a/src/java/org/apache/cassandra/metrics/MutationTrackingMetrics.java 
b/src/java/org/apache/cassandra/metrics/MutationTrackingMetrics.java
new file mode 100644
index 0000000000..822d7c5ac1
--- /dev/null
+++ b/src/java/org/apache/cassandra/metrics/MutationTrackingMetrics.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.metrics;
+
+import com.codahale.metrics.Counter;
+import com.codahale.metrics.Gauge;
+import com.codahale.metrics.Histogram;
+import org.apache.cassandra.replication.MutationJournal;
+import org.apache.cassandra.replication.MutationTrackingService;
+
+import static org.apache.cassandra.metrics.CassandraMetricsRegistry.Metrics;
+
+public class MutationTrackingMetrics
+{
+    public static final String TYPE_NAME = "MutationTracking";
+    private static final MetricNameFactory factory = new 
DefaultNameFactory(TYPE_NAME);
+
+    public static final MutationTrackingMetrics instance = new 
MutationTrackingMetrics();
+
+    public final Counter broadcastOffsetsDiscovered; // Newly-witnessed 
offsets discovered via broadcast
+    public final Counter writeTimeOffsetsDiscovered; // Newly-witnessed 
offsets discovered at write time
+    public final Histogram readSummarySize; // Read summary sizes
+    public final Gauge<Long> unreconciledMutationCount; // Number of 
unreconciled mutations
+    public final Gauge<Long> journalDiskSpaceUsed; // Size of MutationJournal 
on disk
+
+    @SuppressWarnings("Convert2MethodRef")
+    private MutationTrackingMetrics()
+    {
+        broadcastOffsetsDiscovered = 
Metrics.counter(factory.createMetricName("BroadcastOffsetsDiscovered"));
+        writeTimeOffsetsDiscovered = 
Metrics.counter(factory.createMetricName("WriteTimeOffsetsDiscovered"));
+        readSummarySize = 
Metrics.histogram(factory.createMetricName("ReadSummarySize"), true);
+        unreconciledMutationCount = Metrics.register(
+                factory.createMetricName("UnreconciledMutationCount"),
+                () -> 
MutationTrackingService.instance.getUnreconciledMutationCount()
+        );
+        journalDiskSpaceUsed = Metrics.register(
+                factory.createMetricName("JournalDiskSpaceUsed"),
+                () -> MutationJournal.instance.getDiskSpaceUsed()
+        );
+    }
+}
\ No newline at end of file
diff --git a/src/java/org/apache/cassandra/replication/CoordinatorLog.java 
b/src/java/org/apache/cassandra/replication/CoordinatorLog.java
index 9dac9af12a..bd39863d67 100644
--- a/src/java/org/apache/cassandra/replication/CoordinatorLog.java
+++ b/src/java/org/apache/cassandra/replication/CoordinatorLog.java
@@ -46,6 +46,7 @@ import org.apache.cassandra.db.marshal.ListType;
 import org.apache.cassandra.dht.AbstractBounds;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.metrics.MutationTrackingMetrics;
 import org.apache.cassandra.schema.SchemaConstants;
 import org.apache.cassandra.schema.TableId;
 import org.apache.cassandra.tcm.ClusterMetadata;
@@ -206,8 +207,14 @@ public abstract class CoordinatorLog
 
     private void updateWitnessedReplicatedOffsets(Offsets offsets, int 
onNodeId)
     {
+        // Track newly-witnessed offsets from broadcasts (use array for lambda)
+        int[] newlyWitnessedCount = {0};
+
         witnessedOffsets.get(onNodeId).addAll(offsets, (ignore, start, end) ->
         {
+            // Count the newly-witnessed offsets in this range
+            newlyWitnessedCount[0] += (end - start + 1);
+
             for (int offset = start; offset <= end; ++offset)
             {
                 // TODO (desired): use the fact that Offsets are ordered to 
optimise this look up
@@ -219,6 +226,9 @@ public abstract class CoordinatorLog
                 logger.trace("done applying WRO, now {}", witnessedOffsets);
             }
         });
+
+        // Record metric for newly witnessed offsets only
+        
MutationTrackingMetrics.instance.broadcastOffsetsDiscovered.inc(newlyWitnessedCount[0]);
     }
 
     private void updatePersistedReplicatedOffsets(Offsets offsets, int 
onNodeId)
@@ -273,6 +283,19 @@ public abstract class CoordinatorLog
         }
     }
 
+    public long getUnreconciledCount()
+    {
+        lock.readLock().lock();
+        try
+        {
+            return unreconciledMutations.size();
+        }
+        finally
+        {
+            lock.readLock().unlock();
+        }
+    }
+
     boolean startWriting(Mutation mutation)
     {
         lock.writeLock().lock();
@@ -302,6 +325,9 @@ public abstract class CoordinatorLog
             if (!witnessedOffsets.get(localNodeId).add(offset))
                 return;
 
+            // Track write-time discovery of newly-witnessed offset
+            MutationTrackingMetrics.instance.writeTimeOffsetsDiscovered.inc();
+
             unreconciledMutations.finishWriting(mutation);
 
             if (remoteReplicasWitnessed(offset))
diff --git a/src/java/org/apache/cassandra/replication/MutationJournal.java 
b/src/java/org/apache/cassandra/replication/MutationJournal.java
index e2a2a82e64..21ff7d49c6 100644
--- a/src/java/org/apache/cassandra/replication/MutationJournal.java
+++ b/src/java/org/apache/cassandra/replication/MutationJournal.java
@@ -746,4 +746,9 @@ public class MutationJournal
     {
         return journal.countStaticSegmentsForTesting();
     }
+
+    public long getDiskSpaceUsed()
+    {
+        return journal.getDiskSpaceUsed();
+    }
 }
diff --git 
a/src/java/org/apache/cassandra/replication/MutationTrackingService.java 
b/src/java/org/apache/cassandra/replication/MutationTrackingService.java
index 5a65ca7d47..aae8fe4fa3 100644
--- a/src/java/org/apache/cassandra/replication/MutationTrackingService.java
+++ b/src/java/org/apache/cassandra/replication/MutationTrackingService.java
@@ -57,6 +57,7 @@ import org.apache.cassandra.exceptions.RequestFailure;
 import org.apache.cassandra.gms.FailureDetector;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.metrics.MutationTrackingMetrics;
 import org.apache.cassandra.net.Message;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.net.Verb;
@@ -419,7 +420,9 @@ public class MutationTrackingService
         shardLock.readLock().lock();
         try
         {
-            return getOrCreateShards(tableId).createSummaryForKey(key, 
tableId, includePending);
+            MutationSummary summary = 
getOrCreateShards(tableId).createSummaryForKey(key, tableId, includePending);
+            
MutationTrackingMetrics.instance.readSummarySize.update(summary.size());
+            return summary;
         }
         finally
         {
@@ -432,7 +435,9 @@ public class MutationTrackingService
         shardLock.readLock().lock();
         try
         {
-            return getOrCreateShards(tableId).createSummaryForRange(range, 
tableId, includePending);
+            MutationSummary summary = 
getOrCreateShards(tableId).createSummaryForRange(range, tableId, 
includePending);
+            
MutationTrackingMetrics.instance.readSummarySize.update(summary.size());
+            return summary;
         }
         finally
         {
@@ -459,6 +464,20 @@ public class MutationTrackingService
         }
     }
 
+    public long getUnreconciledMutationCount()
+    {
+        if (!isStarted())
+            return 0L;
+
+        final long[] count = {0L};
+        forEachKeyspace(ks -> {
+            ks.forEachShard(shard -> {
+                count[0] += shard.getUnreconciledCount();
+            });
+        });
+        return count[0];
+    }
+
     public void collectLocallyMissingMutations(MutationSummary remoteSummary, 
Log2OffsetsMap.Mutable into)
     {
         shardLock.readLock().lock();
diff --git a/src/java/org/apache/cassandra/replication/Shard.java 
b/src/java/org/apache/cassandra/replication/Shard.java
index 924ae7f357..a264b0141c 100644
--- a/src/java/org/apache/cassandra/replication/Shard.java
+++ b/src/java/org/apache/cassandra/replication/Shard.java
@@ -314,6 +314,16 @@ public class Shard
         return getOrCreate(logId.asLong());
     }
 
+    public long getUnreconciledCount()
+    {
+        long count = 0;
+        for (CoordinatorLog log : logs.values())
+        {
+            count += log.getUnreconciledCount();
+        }
+        return count;
+    }
+
     @Nonnull
     private CoordinatorLog get(CoordinatorLogId logId)
     {
diff --git 
a/src/java/org/apache/cassandra/replication/UnreconciledMutations.java 
b/src/java/org/apache/cassandra/replication/UnreconciledMutations.java
index 314e9303d7..22c1550230 100644
--- a/src/java/org/apache/cassandra/replication/UnreconciledMutations.java
+++ b/src/java/org/apache/cassandra/replication/UnreconciledMutations.java
@@ -248,6 +248,11 @@ public class UnreconciledMutations
         return statesMap.isEmpty();
     }
 
+    public int size()
+    {
+        return statesMap.size();
+    }
+
     static UnreconciledMutations loadFromJournal(Node2OffsetsMap 
witnessedOffsets, int localNodeId)
     {
         UnreconciledMutations result = new UnreconciledMutations();
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/tracking/MutationTrackingMetricsTest.java
 
b/test/distributed/org/apache/cassandra/distributed/test/tracking/MutationTrackingMetricsTest.java
new file mode 100644
index 0000000000..54d3c6b973
--- /dev/null
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/tracking/MutationTrackingMetricsTest.java
@@ -0,0 +1,357 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test.tracking;
+
+import org.apache.cassandra.distributed.api.ConsistencyLevel;
+import org.apache.cassandra.distributed.api.Feature;
+import org.apache.cassandra.distributed.test.TestBaseImpl;
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.metrics.MutationTrackingMetrics;
+import org.apache.cassandra.net.Verb;
+import org.apache.cassandra.replication.MutationJournal;
+import org.apache.cassandra.replication.MutationTrackingService;
+
+import org.awaitility.Awaitility;
+import org.junit.Test;
+
+import java.time.Duration;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+public class MutationTrackingMetricsTest extends TestBaseImpl
+{
+    private static final String CREATE_KEYSPACE =
+            "CREATE KEYSPACE %s WITH replication = " +
+                    "{'class': 'SimpleStrategy', 'replication_factor': 3} " +
+                    "AND replication_type = 'tracked'";
+
+    private static final String CREATE_TABLE =
+            "CREATE TABLE %s.tbl (pk int PRIMARY KEY, val text)";
+
+    @Test(timeout = 60000)
+    @SuppressWarnings("Convert2MethodRef")
+    public void testWriteTimeOffsetsDiscoveredMetric() throws Throwable
+    {
+        try (Cluster cluster = Cluster.build(3)
+                .withConfig(cfg -> cfg.with(Feature.NETWORK)
+                        .with(Feature.GOSSIP))
+                .start())
+        {
+            cluster.schemaChange(withKeyspace(CREATE_KEYSPACE));
+            cluster.schemaChange(String.format(CREATE_TABLE, KEYSPACE));
+
+            // Get initial write-time discovery counts on all nodes
+            long initialNode1Count = cluster.get(1).callOnInstance(() -> 
MutationTrackingMetrics.instance.writeTimeOffsetsDiscovered.getCount());
+            long initialNode2Count = cluster.get(2).callOnInstance(() -> 
MutationTrackingMetrics.instance.writeTimeOffsetsDiscovered.getCount());
+            long initialNode3Count = cluster.get(3).callOnInstance(() -> 
MutationTrackingMetrics.instance.writeTimeOffsetsDiscovered.getCount());
+
+            // Perform writes with QUORUM - each write goes to at least 2 
replicas
+            int numWrites = 10;
+            for (int i = 0; i < numWrites; i++)
+            {
+                cluster.coordinator(1).execute(withKeyspace("INSERT INTO 
%s.tbl (pk, val) VALUES (?, ?)"),
+                        ConsistencyLevel.QUORUM, i, "test" + i);
+            }
+
+            // Wait for all nodes to discover offsets at write time
+            // With RF=3, each node should discover offsets and total should 
be at least numWrites * 3
+            Awaitility.await()
+                      .atMost(Duration.ofSeconds(5))
+                      .pollInterval(Duration.ofMillis(100))
+                      .until(() -> {
+                          long node1Delta = cluster.get(1).callOnInstance(() 
-> MutationTrackingMetrics.instance.writeTimeOffsetsDiscovered.getCount()) - 
initialNode1Count;
+                          long node2Delta = cluster.get(2).callOnInstance(() 
-> MutationTrackingMetrics.instance.writeTimeOffsetsDiscovered.getCount()) - 
initialNode2Count;
+                          long node3Delta = cluster.get(3).callOnInstance(() 
-> MutationTrackingMetrics.instance.writeTimeOffsetsDiscovered.getCount()) - 
initialNode3Count;
+                          long totalDiscovered = node1Delta + node2Delta + 
node3Delta;
+
+                          return node1Delta > 0 && node2Delta > 0 && 
node3Delta > 0 && totalDiscovered >= (long) numWrites * 3;
+                      });
+
+            // Verify final counts
+            long afterNode1Count = cluster.get(1).callOnInstance(() -> 
MutationTrackingMetrics.instance.writeTimeOffsetsDiscovered.getCount());
+            long afterNode2Count = cluster.get(2).callOnInstance(() -> 
MutationTrackingMetrics.instance.writeTimeOffsetsDiscovered.getCount());
+            long afterNode3Count = cluster.get(3).callOnInstance(() -> 
MutationTrackingMetrics.instance.writeTimeOffsetsDiscovered.getCount());
+
+            long node1Delta = afterNode1Count - initialNode1Count;
+            long node2Delta = afterNode2Count - initialNode2Count;
+            long node3Delta = afterNode3Count - initialNode3Count;
+
+            assertThat(node1Delta)
+                    .as("Node 1 should have discovered offsets at write time")
+                    .isGreaterThan(0L);
+
+            assertThat(node2Delta)
+                    .as("Node 2 should have discovered offsets at write time")
+                    .isGreaterThan(0L);
+
+            assertThat(node3Delta)
+                    .as("Node 3 should have discovered offsets at write time")
+                    .isGreaterThan(0L);
+
+            long totalDiscovered = node1Delta + node2Delta + node3Delta;
+            assertThat(totalDiscovered)
+                    .as("Total write-time discoveries across all nodes should 
be at least %d (RF=3)", numWrites * 3)
+                    .isGreaterThanOrEqualTo((long) numWrites * 3);
+        }
+    }
+
+    @Test(timeout = 60000)
+    @SuppressWarnings("Convert2MethodRef")
+    public void testBroadcastOffsetsDiscoveredMetric() throws Throwable
+    {
+        try (Cluster cluster = Cluster.build(3)
+                .withConfig(cfg -> cfg.with(Feature.NETWORK)
+                        .with(Feature.GOSSIP))
+                .start())
+        {
+            cluster.schemaChange(withKeyspace(CREATE_KEYSPACE));
+            cluster.schemaChange(String.format(CREATE_TABLE, KEYSPACE));
+
+            // Record initial broadcast metrics on receiving node 3 since we 
are next going to block this node to from receiving mutations
+            long initialNode3Count = cluster.get(3).callOnInstance(() -> 
MutationTrackingMetrics.instance.broadcastOffsetsDiscovered.getCount());
+
+            // Block node 3 from receiving mutation writes (but allow 
broadcast messages)
+            cluster.filters().verbs(Verb.MUTATION_REQ.id).to(3).drop();
+
+            // Write data - nodes 1 and 2 will get it, node 3 won't
+            int numWrites = 5;
+            for (int i = 0; i < numWrites; i++)
+            {
+                cluster.coordinator(1).execute(withKeyspace("INSERT INTO 
%s.tbl (pk, val) VALUES (?, ?)"),
+                        ConsistencyLevel.QUORUM, i, "test" + i);
+            }
+
+            // Verify node 3 missed the writes
+            Object[][] node3Before = cluster.coordinator(3).execute(
+                    withKeyspace("SELECT * FROM %s.tbl"), 
ConsistencyLevel.ONE);
+            assertThat(node3Before.length)
+                .as("Node 3 should have no data (was blocked)")
+                .isEqualTo(0);
+
+            // Broadcast offsets from node 1 to other nodes
+            // This tells node 3 about mutations it's missing
+            cluster.get(1).runOnInstance(() -> 
MutationTrackingService.instance.broadcastOffsetsForTesting());
+
+            // Wait for broadcasts to propagate to node 3
+            long[] previousCount = {0};
+            Awaitility.await()
+                    .atMost(Duration.ofSeconds(5))
+                    .pollInterval(Duration.ofMillis(100))
+                    .until(() -> {
+                        long currentCount = cluster.get(3).callOnInstance(() 
-> MutationTrackingMetrics.instance.broadcastOffsetsDiscovered.getCount());
+                        boolean hasDiscoveredOffsets = currentCount > 
initialNode3Count;
+                        boolean isStable = hasDiscoveredOffsets && 
currentCount == previousCount[0];
+                        previousCount[0] = currentCount;
+                        return isStable;
+                    });
+
+            // Get the count after first broadcast
+            long afterFirstBroadcast = cluster.get(3).callOnInstance(() -> 
MutationTrackingMetrics.instance.broadcastOffsetsDiscovered.getCount());
+
+            // Broadcast the same offsets again (duplicate) - should NOT 
increment metric
+            cluster.get(1).runOnInstance(() -> 
MutationTrackingService.instance.broadcastOffsetsForTesting());
+
+            // Wait for duplicate broadcast to propagate, then verify metric 
stayed the same
+            // We poll to ensure the broadcast had time to arrive, then check 
it didn't increment
+            Awaitility.await()
+                      .pollDelay(Duration.ofMillis(200))
+                      .atMost(Duration.ofSeconds(2))
+                      .pollInterval(Duration.ofMillis(100))
+                      .until(() -> {
+                          long count = cluster.get(3).callOnInstance(() -> 
MutationTrackingMetrics.instance.broadcastOffsetsDiscovered.getCount());
+                          return count == afterFirstBroadcast; // Should 
remain at the same value (duplicate doesn't increment)
+                      });
+
+            // Clear filter to allow reconciliation
+            cluster.filters().reset();
+
+            // Read from node 3 to trigger reconciliation using broadcast 
offsets
+            // Node 3 knows it's missing data (from broadcast offsets) and 
will request it
+            // Poll for reconciliation to complete
+            Awaitility.await()
+                      .atMost(Duration.ofSeconds(10))
+                      .pollInterval(Duration.ofMillis(200))
+                      .until(() -> {
+                          Object[][] result = cluster.coordinator(3).execute(
+                                  withKeyspace("SELECT * FROM %s.tbl"),
+                                  ConsistencyLevel.QUORUM);
+                          return result.length == numWrites;
+                      });
+
+            // Verify all rows data is present after reconciliation
+            Object[][] result = cluster.coordinator(3).execute(
+                    withKeyspace("SELECT * FROM %s.tbl"),
+                    ConsistencyLevel.QUORUM);
+            assertThat(result.length)
+                .as("Should return all rows after reconciliation")
+                .isEqualTo(numWrites);
+
+            // Check metrics after reconciliation - if reconciliation worked, 
broadcasts happened
+            long afterNode3Count = cluster.get(3).callOnInstance(() -> 
MutationTrackingMetrics.instance.broadcastOffsetsDiscovered.getCount());
+            long node3Delta = afterNode3Count - initialNode3Count;
+
+            // Node 3 was blocked before and now must have applied broadcast 
offsets
+            assertThat(node3Delta)
+                .as("Node 3 should have applied broadcast offsets")
+                .isGreaterThan(0L);
+        }
+    }
+
+    @Test(timeout = 60000)
+    @SuppressWarnings("Convert2MethodRef")
+    public void testReadSummarySizeMetric() throws Throwable
+    {
+        try (Cluster cluster = Cluster.build(3)
+                .withConfig(cfg -> cfg.with(Feature.NETWORK)
+                        .with(Feature.GOSSIP))
+                .start())
+        {
+            cluster.schemaChange(withKeyspace(CREATE_KEYSPACE));
+            cluster.schemaChange(String.format(CREATE_TABLE, KEYSPACE));
+
+            // Get initial metric value from coordinator node
+            long initialSize = cluster.get(1).callOnInstance(() -> 
MutationTrackingMetrics.instance.readSummarySize.getCount());
+
+            // Insert test data
+            int numWrites = 10;
+            for (int i = 0; i < numWrites; i++)
+            {
+                cluster.coordinator(1).execute(withKeyspace("INSERT INTO 
%s.tbl (pk, val) VALUES (?, ?)"),
+                        ConsistencyLevel.QUORUM, i, "test" + i);
+            }
+
+            // Execute read operations (metric should increment once per read 
request)
+            int numReads = 10;
+            for (int i = 0; i < numReads; i++)
+            {
+                cluster.coordinator(1).execute(withKeyspace("SELECT * FROM 
%s.tbl WHERE pk = ?"),
+                        ConsistencyLevel.QUORUM, i);
+            }
+
+            // Verify metric incremented by at least twice the number of reads 
as
+            // each read creates TWO summaries: initial (before read) + 
secondary (after read)
+            // This is to detect concurrent writes during read execution for 
proper reconciliation
+            long afterSize = cluster.get(1).callOnInstance(() -> 
MutationTrackingMetrics.instance.readSummarySize.getCount());
+
+            long delta = afterSize - initialSize;
+            assertThat(delta)
+                .as("Should have at least twice of %d summaries", numReads)
+                .isGreaterThanOrEqualTo(2L * numReads);
+        }
+    }
+
+    @Test(timeout = 60000)
+    @SuppressWarnings("Convert2MethodRef")
+    public void testUnreconciledMutationCountMetric() throws Throwable
+    {
+        try (Cluster cluster = Cluster.build(3)
+                .withConfig(cfg -> cfg.with(Feature.NETWORK)
+                        .with(Feature.GOSSIP))
+                .start())
+        {
+            cluster.schemaChange(withKeyspace(CREATE_KEYSPACE));
+            cluster.schemaChange(String.format(CREATE_TABLE, KEYSPACE));
+
+            // Get initial unreconciled count (should be 0)
+            long initialCount = cluster.get(1).callOnInstance(() -> 
MutationTrackingMetrics.instance.unreconciledMutationCount.getValue());
+            assertThat(initialCount)
+                .as("Initial unreconciled count should be 0")
+                .isEqualTo(0L);
+
+            // Block node 3 from receiving messages from node 1
+            cluster.filters().verbs(Verb.MUTATION_REQ.id).from(1).to(3).drop();
+
+            // Write with QUORUM (only nodes 1 and 2 will receive writes, node 
3 won't)
+            int numWrites = 10;
+            for (int i = 0; i < numWrites; i++)
+            {
+                cluster.coordinator(1).execute(withKeyspace("INSERT INTO 
%s.tbl (pk, val) VALUES (?, ?)"),
+                        ConsistencyLevel.QUORUM, i, "test" + i);
+            }
+
+            // Node 1 should now have unreconciled mutations (since node 3 
didn't get them)
+            long afterWrites = cluster.get(1).callOnInstance(() -> 
MutationTrackingMetrics.instance.unreconciledMutationCount.getValue());
+            assertThat(afterWrites)
+                .as("Expected %d unreconciled mutations (node 3 blocked)", 
numWrites)
+                .isEqualTo((long) numWrites);
+
+            // Clear filters to allow reconciliation
+            cluster.filters().reset();
+
+            // Perform reads to trigger reconciliation
+            for (int i = 0; i < numWrites; i++)
+            {
+                cluster.coordinator(1).execute(withKeyspace("SELECT * FROM 
%s.tbl WHERE pk = ?"),
+                        ConsistencyLevel.QUORUM, i);
+            }
+
+            // Wait for reconciliation to complete
+            Awaitility.await()
+                      .atMost(Duration.ofSeconds(5))
+                      .pollInterval(Duration.ofMillis(100))
+                      .until(() -> {
+                          long count = cluster.get(1).callOnInstance(() -> 
MutationTrackingMetrics.instance.unreconciledMutationCount.getValue());
+                          return count == 0;
+                      });
+
+            // Verify reconciliation actually happened
+            long afterReconcile = cluster.get(1).callOnInstance(() -> 
MutationTrackingMetrics.instance.unreconciledMutationCount.getValue());
+            assertThat(afterReconcile)
+                .as("Unreconciled count should be 0 after reconciliation")
+                .isEqualTo(0L);
+        }
+    }
+
+    @Test(timeout = 60000)
+    @SuppressWarnings("Convert2MethodRef")
+    public void testJournalDiskSpaceUsedMetric() throws Throwable
+    {
+        try (Cluster cluster = Cluster.build(1)
+                .withConfig(cfg -> cfg.with(Feature.NETWORK)
+                        .set("commitlog_segment_size", "1MiB")) // Create a 
smaller size segment
+                .start())
+        {
+            cluster.schemaChange(withKeyspace(CREATE_KEYSPACE));
+            cluster.schemaChange(String.format(CREATE_TABLE, KEYSPACE));
+
+            // Get initial disk space - would be 2 * 1024 * 1024 as 2 
segements are allocated by default
+            long initialSpace = cluster.get(1).callOnInstance(() -> 
MutationTrackingMetrics.instance.journalDiskSpaceUsed.getValue());
+
+            // Write enough data to fill 1MiB segment and force new segment 
creation
+            int numWrites = 200;
+            for (int i = 0; i < numWrites; i++)
+            {
+                cluster.coordinator(1).execute(
+                        withKeyspace("INSERT INTO %s.tbl (pk, val) VALUES (?, 
?)"),
+                        ConsistencyLevel.ONE, i, "test-" + i);
+
+                // Close segment every 20 writes to create multiple segments
+                if (i % 20 == 0 && i > 0)
+                    cluster.get(1).runOnInstance(() -> 
MutationJournal.instance.closeCurrentSegmentForTestingIfNonEmpty());
+            }
+
+            // Verify disk space increased
+            long afterWrites = cluster.get(1).callOnInstance(() -> 
MutationTrackingMetrics.instance.journalDiskSpaceUsed.getValue());
+
+            assertThat(afterWrites)
+                .as("Disk space should increase after writes: before=%d", 
initialSpace)
+                .isGreaterThan(initialSpace);
+        }
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to