This is an automated email from the ASF dual-hosted git repository.
jmckenzie pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push:
new e36aeb49e0 Log duplicate rows found during nodetool verify and scrub
e36aeb49e0 is described below
commit e36aeb49e008568a2f551bb749bbb55aeaa80a72
Author: Josh McKenzie <[email protected]>
AuthorDate: Mon Aug 1 14:37:26 2022 -0400
Log duplicate rows found during nodetool verify and scrub
Patch by Marcus Eriksson; reviewed by Josh McKenzie for CASSANDRA-17789
Co-authored-by: Marcus Eriksson <[email protected]>
Co-authored-by: Josh McKenzie <[email protected]>
---
CHANGES.txt | 1 +
.../apache/cassandra/db/compaction/Scrubber.java | 14 ++++-
.../apache/cassandra/db/compaction/Verifier.java | 63 +++++++++++++++++++++-
3 files changed, 75 insertions(+), 3 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index 4dda88e314..78456e28ad 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
4.2
+ * Log duplicate rows sharing a partition key found in verify and scrub
(CASSANDRA-17789)
* Add separate thread pool for Secondary Index building so it doesn't block
compactions (CASSANDRA-17781)
* Added JMX call to getSSTableCountPerTWCSBucket for TWCS (CASSANDRA-17774)
* When doing a host replacement, -Dcassandra.broadcast_interval_ms is used to
know when to check the ring but checks that the ring wasn't changed in
-Dcassandra.ring_delay_ms, changes to ring delay should not depend on when we
publish load stats (CASSANDRA-17776)
diff --git a/src/java/org/apache/cassandra/db/compaction/Scrubber.java
b/src/java/org/apache/cassandra/db/compaction/Scrubber.java
index 5228d2f43f..c8518ce5e8 100644
--- a/src/java/org/apache/cassandra/db/compaction/Scrubber.java
+++ b/src/java/org/apache/cassandra/db/compaction/Scrubber.java
@@ -381,7 +381,7 @@ public class Scrubber implements Closeable
@SuppressWarnings("resource")
private UnfilteredRowIterator getIterator(DecoratedKey key)
{
- RowMergingSSTableIterator rowMergingIterator = new
RowMergingSSTableIterator(SSTableIdentityIterator.create(sstable, dataFile,
key));
+ RowMergingSSTableIterator rowMergingIterator = new
RowMergingSSTableIterator(SSTableIdentityIterator.create(sstable, dataFile,
key), outputHandler);
return reinsertOverflowedTTLRows ? new
FixNegativeLocalDeletionTimeIterator(rowMergingIterator,
outputHandler,
negativeLocalDeletionInfoMetrics) : rowMergingIterator;
@@ -565,10 +565,12 @@ public class Scrubber implements Closeable
private static class RowMergingSSTableIterator extends
WrappingUnfilteredRowIterator
{
Unfiltered nextToOffer = null;
+ private final OutputHandler output;
- RowMergingSSTableIterator(UnfilteredRowIterator source)
+ RowMergingSSTableIterator(UnfilteredRowIterator source, OutputHandler
output)
{
super(source);
+ this.output = output;
}
@Override
@@ -584,6 +586,7 @@ public class Scrubber implements Closeable
if (next.isRow())
{
+ boolean logged = false;
while (wrapped.hasNext())
{
Unfiltered peek = wrapped.next();
@@ -595,6 +598,13 @@ public class Scrubber implements Closeable
// Duplicate row, merge it.
next = Rows.merge((Row) next, (Row) peek);
+
+ if (!logged)
+ {
+ String partitionKey =
metadata().partitionKeyType.getString(partitionKey().getKey());
+ output.warn("Duplicate row detected in " +
metadata().keyspace + '.' + metadata().name + ": " + partitionKey + " " +
next.clustering().toString(metadata()));
+ logged = true;
+ }
}
}
diff --git a/src/java/org/apache/cassandra/db/compaction/Verifier.java
b/src/java/org/apache/cassandra/db/compaction/Verifier.java
index 29eb29951b..bad050a6cc 100644
--- a/src/java/org/apache/cassandra/db/compaction/Verifier.java
+++ b/src/java/org/apache/cassandra/db/compaction/Verifier.java
@@ -22,6 +22,9 @@ import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableSet;
import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.rows.Cell;
+import org.apache.cassandra.db.rows.Row;
+import org.apache.cassandra.db.rows.Unfiltered;
import org.apache.cassandra.db.rows.UnfilteredRowIterator;
import org.apache.cassandra.dht.LocalPartitioner;
@@ -57,7 +60,9 @@ import java.io.IOError;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.file.Files;
+import java.time.Instant;
import java.util.*;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -317,9 +322,40 @@ public class Verifier implements Closeable
if (key == null || dataSize > dataFile.length())
markAndThrow(new RuntimeException(String.format("key =
%s, dataSize=%d, dataFile.length() = %d", key, dataSize, dataFile.length())));
- //mimic the scrub read path, intentionally unused
try (UnfilteredRowIterator iterator =
SSTableIdentityIterator.create(sstable, dataFile, key))
{
+ Row first = null;
+ int duplicateRows = 0;
+ long minTimestamp = Long.MAX_VALUE;
+ long maxTimestamp = Long.MIN_VALUE;
+ while (iterator.hasNext())
+ {
+ Unfiltered uf = iterator.next();
+ if (uf.isRow())
+ {
+ Row row = (Row) uf;
+ if (first != null &&
first.clustering().equals(row.clustering()))
+ {
+ duplicateRows++;
+ for (Cell cell : row.cells())
+ {
+ maxTimestamp =
Math.max(cell.timestamp(), maxTimestamp);
+ minTimestamp =
Math.min(cell.timestamp(), minTimestamp);
+ }
+ }
+ else
+ {
+ if (duplicateRows > 0)
+ logDuplicates(key, first,
duplicateRows, minTimestamp, maxTimestamp);
+ duplicateRows = 0;
+ first = row;
+ maxTimestamp = Long.MIN_VALUE;
+ minTimestamp = Long.MAX_VALUE;
+ }
+ }
+ }
+ if (duplicateRows > 0)
+ logDuplicates(key, first, duplicateRows,
minTimestamp, maxTimestamp);
}
if ( (prevKey != null && prevKey.compareTo(key) > 0) ||
!key.getKey().equals(currentIndexKey) || dataStart != dataStartFromIndex )
@@ -350,6 +386,31 @@ public class Verifier implements Closeable
outputHandler.output("Verify of " + sstable + " succeeded. All " +
goodRows + " rows read successfully");
}
+ private void logDuplicates(DecoratedKey key, Row first, int duplicateRows,
long minTimestamp, long maxTimestamp)
+ {
+ String keyString =
sstable.metadata().partitionKeyType.getString(key.getKey());
+ long firstMaxTs = Long.MIN_VALUE;
+ long firstMinTs = Long.MAX_VALUE;
+ for (Cell cell : first.cells())
+ {
+ firstMaxTs = Math.max(firstMaxTs, cell.timestamp());
+ firstMinTs = Math.min(firstMinTs, cell.timestamp());
+ }
+ outputHandler.output(String.format("%d duplicate rows found for [%s
%s] in %s.%s (%s), timestamps: [first row (%s, %s)], [duplicates (%s, %s,
eq:%b)]",
+ duplicateRows,
+ keyString,
first.clustering().toString(sstable.metadata()),
+ sstable.metadata().keyspace,
+ sstable.metadata().name,
+ sstable,
+ dateString(firstMinTs),
dateString(firstMaxTs),
+ dateString(minTimestamp),
dateString(maxTimestamp), minTimestamp == maxTimestamp));
+ }
+
+ private String dateString(long time)
+ {
+ return
Instant.ofEpochMilli(TimeUnit.MICROSECONDS.toMillis(time)).toString();
+ }
+
/**
* Use the fact that check(..) is called with sorted tokens - we keep a
pointer in to the normalized ranges
* and only bump the pointer if the key given is out of range. This is
done to avoid calling .contains(..) many
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]