This is an automated email from the ASF dual-hosted git repository.

jmckenzie pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/trunk by this push:
     new e36aeb49e0 Log duplicate rows found during nodetool verify and scrub
e36aeb49e0 is described below

commit e36aeb49e008568a2f551bb749bbb55aeaa80a72
Author: Josh McKenzie <[email protected]>
AuthorDate: Mon Aug 1 14:37:26 2022 -0400

    Log duplicate rows found during nodetool verify and scrub
    
    Patch by Marcus Eriksson; reviewed by Josh McKenzie for CASSANDRA-17789
    
    Co-authored-by: Marcus Eriksson <[email protected]>
    Co-authored-by: Josh McKenzie <[email protected]>
---
 CHANGES.txt                                        |  1 +
 .../apache/cassandra/db/compaction/Scrubber.java   | 14 ++++-
 .../apache/cassandra/db/compaction/Verifier.java   | 63 +++++++++++++++++++++-
 3 files changed, 75 insertions(+), 3 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index 4dda88e314..78456e28ad 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 4.2
+ * Log duplicate rows sharing a partition key found in verify and scrub 
(CASSANDRA-17789)
  * Add separate thread pool for Secondary Index building so it doesn't block 
compactions (CASSANDRA-17781)
  * Added JMX call to getSSTableCountPerTWCSBucket for TWCS (CASSANDRA-17774)
  * When doing a host replacement, -Dcassandra.broadcast_interval_ms is used to 
know when to check the ring but checks that the ring wasn't changed in 
-Dcassandra.ring_delay_ms, changes to ring delay should not depend on when we 
publish load stats (CASSANDRA-17776)
diff --git a/src/java/org/apache/cassandra/db/compaction/Scrubber.java 
b/src/java/org/apache/cassandra/db/compaction/Scrubber.java
index 5228d2f43f..c8518ce5e8 100644
--- a/src/java/org/apache/cassandra/db/compaction/Scrubber.java
+++ b/src/java/org/apache/cassandra/db/compaction/Scrubber.java
@@ -381,7 +381,7 @@ public class Scrubber implements Closeable
     @SuppressWarnings("resource")
     private UnfilteredRowIterator getIterator(DecoratedKey key)
     {
-        RowMergingSSTableIterator rowMergingIterator = new 
RowMergingSSTableIterator(SSTableIdentityIterator.create(sstable, dataFile, 
key));
+        RowMergingSSTableIterator rowMergingIterator = new 
RowMergingSSTableIterator(SSTableIdentityIterator.create(sstable, dataFile, 
key), outputHandler);
         return reinsertOverflowedTTLRows ? new 
FixNegativeLocalDeletionTimeIterator(rowMergingIterator,
                                                                                
     outputHandler,
                                                                                
     negativeLocalDeletionInfoMetrics) : rowMergingIterator;
@@ -565,10 +565,12 @@ public class Scrubber implements Closeable
     private static class RowMergingSSTableIterator extends 
WrappingUnfilteredRowIterator
     {
         Unfiltered nextToOffer = null;
+        private final OutputHandler output;
 
-        RowMergingSSTableIterator(UnfilteredRowIterator source)
+        RowMergingSSTableIterator(UnfilteredRowIterator source, OutputHandler 
output)
         {
             super(source);
+            this.output = output;
         }
 
         @Override
@@ -584,6 +586,7 @@ public class Scrubber implements Closeable
 
             if (next.isRow())
             {
+                boolean logged = false;
                 while (wrapped.hasNext())
                 {
                     Unfiltered peek = wrapped.next();
@@ -595,6 +598,13 @@ public class Scrubber implements Closeable
 
                     // Duplicate row, merge it.
                     next = Rows.merge((Row) next, (Row) peek);
+
+                    if (!logged)
+                    {
+                        String partitionKey = 
metadata().partitionKeyType.getString(partitionKey().getKey());
+                        output.warn("Duplicate row detected in " + 
metadata().keyspace + '.' + metadata().name + ": " + partitionKey + " " + 
next.clustering().toString(metadata()));
+                        logged = true;
+                    }
                 }
             }
 
diff --git a/src/java/org/apache/cassandra/db/compaction/Verifier.java 
b/src/java/org/apache/cassandra/db/compaction/Verifier.java
index 29eb29951b..bad050a6cc 100644
--- a/src/java/org/apache/cassandra/db/compaction/Verifier.java
+++ b/src/java/org/apache/cassandra/db/compaction/Verifier.java
@@ -22,6 +22,9 @@ import com.google.common.base.Throwables;
 import com.google.common.collect.ImmutableSet;
 
 import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.rows.Cell;
+import org.apache.cassandra.db.rows.Row;
+import org.apache.cassandra.db.rows.Unfiltered;
 import org.apache.cassandra.db.rows.UnfilteredRowIterator;
 
 import org.apache.cassandra.dht.LocalPartitioner;
@@ -57,7 +60,9 @@ import java.io.IOError;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.file.Files;
+import java.time.Instant;
 import java.util.*;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -317,9 +322,40 @@ public class Verifier implements Closeable
                     if (key == null || dataSize > dataFile.length())
                         markAndThrow(new RuntimeException(String.format("key = 
%s, dataSize=%d, dataFile.length() = %d", key, dataSize, dataFile.length())));
 
-                    //mimic the scrub read path, intentionally unused
                     try (UnfilteredRowIterator iterator = 
SSTableIdentityIterator.create(sstable, dataFile, key))
                     {
+                        Row first = null;
+                        int duplicateRows = 0;
+                        long minTimestamp = Long.MAX_VALUE;
+                        long maxTimestamp = Long.MIN_VALUE;
+                        while (iterator.hasNext())
+                        {
+                            Unfiltered uf = iterator.next();
+                            if (uf.isRow())
+                            {
+                                Row row = (Row) uf;
+                                if (first != null && 
first.clustering().equals(row.clustering()))
+                                {
+                                    duplicateRows++;
+                                    for (Cell cell : row.cells())
+                                    {
+                                        maxTimestamp = 
Math.max(cell.timestamp(), maxTimestamp);
+                                        minTimestamp = 
Math.min(cell.timestamp(), minTimestamp);
+                                    }
+                                }
+                                else
+                                {
+                                    if (duplicateRows > 0)
+                                        logDuplicates(key, first, 
duplicateRows, minTimestamp, maxTimestamp);
+                                    duplicateRows = 0;
+                                    first = row;
+                                    maxTimestamp = Long.MIN_VALUE;
+                                    minTimestamp = Long.MAX_VALUE;
+                                }
+                            }
+                        }
+                        if (duplicateRows > 0)
+                            logDuplicates(key, first, duplicateRows, 
minTimestamp, maxTimestamp);
                     }
 
                     if ( (prevKey != null && prevKey.compareTo(key) > 0) || 
!key.getKey().equals(currentIndexKey) || dataStart != dataStartFromIndex )
@@ -350,6 +386,31 @@ public class Verifier implements Closeable
         outputHandler.output("Verify of " + sstable + " succeeded. All " + 
goodRows + " rows read successfully");
     }
 
+    private void logDuplicates(DecoratedKey key, Row first, int duplicateRows, 
long minTimestamp, long maxTimestamp)
+    {
+        String keyString = 
sstable.metadata().partitionKeyType.getString(key.getKey());
+        long firstMaxTs = Long.MIN_VALUE;
+        long firstMinTs = Long.MAX_VALUE;
+        for (Cell cell : first.cells())
+        {
+            firstMaxTs = Math.max(firstMaxTs, cell.timestamp());
+            firstMinTs = Math.min(firstMinTs, cell.timestamp());
+        }
+        outputHandler.output(String.format("%d duplicate rows found for [%s 
%s] in %s.%s (%s), timestamps: [first row (%s, %s)], [duplicates (%s, %s, 
eq:%b)]",
+                                           duplicateRows,
+                                           keyString, 
first.clustering().toString(sstable.metadata()),
+                                           sstable.metadata().keyspace,
+                                           sstable.metadata().name,
+                                           sstable,
+                                           dateString(firstMinTs), 
dateString(firstMaxTs),
+                                           dateString(minTimestamp), 
dateString(maxTimestamp), minTimestamp == maxTimestamp));
+    }
+
+    private String dateString(long time)
+    {
+        return 
Instant.ofEpochMilli(TimeUnit.MICROSECONDS.toMillis(time)).toString();
+    }
+
     /**
      * Use the fact that check(..) is called with sorted tokens - we keep a 
pointer in to the normalized ranges
      * and only bump the pointer if the key given is out of range. This is 
done to avoid calling .contains(..) many


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to