This is an automated email from the ASF dual-hosted git repository.
blambov pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push:
new a565711056 Fix data corruption handling issues
a565711056 is described below
commit a565711056c859398d0b26081b46e71d2076de1d
Author: Branimir Lambov <[email protected]>
AuthorDate: Mon Jul 24 12:23:27 2023 +0300
Fix data corruption handling issues
Treat AssertionError as corruption, assert positive deletion timestamps and
TTLs and treat localDeletionTime < TTL as invalid.
patch by Branimir Lambov; reviewed by Berenguer Blasi for CASSANDRA-18676
---
src/java/org/apache/cassandra/db/DeletionTime.java | 8 ++---
src/java/org/apache/cassandra/db/rows/Cell.java | 41 +++++++++++++++++-----
.../cassandra/db/rows/UnfilteredSerializer.java | 8 ++---
.../io/sstable/SSTableIdentityIterator.java | 4 +--
.../org/apache/cassandra/db/ReadCommandTest.java | 10 +++---
.../CorruptedSSTablesCompactionsTest.java | 2 +-
6 files changed, 49 insertions(+), 24 deletions(-)
diff --git a/src/java/org/apache/cassandra/db/DeletionTime.java
b/src/java/org/apache/cassandra/db/DeletionTime.java
index a0450a0fd1..45a5bf841a 100644
--- a/src/java/org/apache/cassandra/db/DeletionTime.java
+++ b/src/java/org/apache/cassandra/db/DeletionTime.java
@@ -70,8 +70,7 @@ public class DeletionTime implements
Comparable<DeletionTime>, IMeasurableMemory
private DeletionTime(long markedForDeleteAt, long localDeletionTime)
{
- this.markedForDeleteAt = markedForDeleteAt;
- this.localDeletionTimeUnsignedInteger =
Cell.deletionTimeLongToUnsignedInteger(localDeletionTime);
+ this(markedForDeleteAt,
Cell.deletionTimeLongToUnsignedInteger(localDeletionTime));
}
private DeletionTime(long markedForDeleteAt, int
localDeletionTimeUnsignedInteger)
@@ -116,12 +115,13 @@ public class DeletionTime implements
Comparable<DeletionTime>, IMeasurableMemory
}
/**
- * check if this deletion time is valid - localDeletionTime can never be
negative
+ * Check if this deletion time is valid - markedForDeleteAt can only
negative if the deletion is LIVE.
+ * localDeletionTime is not checked as it is stored as an unsigned int and
cannot be negative.
* @return true if it is valid
*/
public boolean validate()
{
- return true;
+ return markedForDeleteAt >= 0 || isLive();
}
@Override
diff --git a/src/java/org/apache/cassandra/db/rows/Cell.java
b/src/java/org/apache/cassandra/db/rows/Cell.java
index 9850d08af3..fffcca821a 100644
--- a/src/java/org/apache/cassandra/db/rows/Cell.java
+++ b/src/java/org/apache/cassandra/db/rows/Cell.java
@@ -21,12 +21,14 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Comparator;
-import org.apache.cassandra.config.*;
-import org.apache.cassandra.db.*;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.DeletionPurger;
+import org.apache.cassandra.db.LivenessInfo;
+import org.apache.cassandra.db.SerializationHeader;
import org.apache.cassandra.db.marshal.ValueAccessor;
+import org.apache.cassandra.io.util.DataInputPlus;
import org.apache.cassandra.io.util.DataOutputPlus;
import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.io.util.DataInputPlus;
import org.apache.cassandra.schema.ColumnMetadata;
import org.apache.cassandra.utils.CassandraUInt;
import org.apache.cassandra.utils.memory.ByteBufferCloner;
@@ -213,6 +215,29 @@ public abstract class Cell<V> extends ColumnData
protected abstract int localDeletionTimeAsUnsignedInt();
+ /**
+ * Handle unsigned encoding and potentially invalid localDeletionTime.
+ */
+ public static long decodeLocalDeletionTime(long localDeletionTime, int
ttl, DeserializationHelper helper)
+ {
+ if (localDeletionTime >= ttl)
+ return localDeletionTime; // fast path, positive and valid
signed 32-bit integer
+
+ if (localDeletionTime < 0)
+ {
+ // Overflown signed int, decode to long. The result is guaranteed
> ttl (and any signed int)
+ return helper.version < MessagingService.VERSION_50
+ ? INVALID_DELETION_TIME
+ : deletionTimeUnsignedIntegerToLong((int)
localDeletionTime);
+ }
+
+ if (ttl == LivenessInfo.EXPIRED_LIVENESS_TTL)
+ return localDeletionTime; // ttl is already expired,
localDeletionTime is valid
+ else
+ return INVALID_DELETION_TIME; // Invalid as it can't occur
without corruption and would cause negative
+ // timestamp on expiry.
+ }
+
/**
* The serialization format for cell is:
* [ flags ][ timestamp ][ deletion time ][ ttl ][ path size ][
path ][ value size ][ value ]
@@ -317,11 +342,11 @@ public abstract class Cell<V> extends ColumnData
}
}
- if (localDeletionTime < 0)
- localDeletionTime = helper.version <
MessagingService.VERSION_50
- ? INVALID_DELETION_TIME
- : deletionTimeUnsignedIntegerToLong((int)
localDeletionTime);
-
+ if (timestamp < 0)
+ throw new IOException("Invalid negative timestamp: " +
timestamp);
+ if (ttl < 0)
+ throw new IOException("Invalid TTL: " + ttl);
+ localDeletionTime = decodeLocalDeletionTime(localDeletionTime,
ttl, helper);
return accessor.factory().cell(column, timestamp, ttl,
localDeletionTime, value, path);
}
diff --git a/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java
b/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java
index c1802cf012..cfbcad177c 100644
--- a/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java
+++ b/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java
@@ -589,12 +589,12 @@ public class UnfilteredSerializer
if (hasTimestamp)
{
long timestamp = header.readTimestamp(in);
+ assert timestamp >= 0 : "Invalid negative timestamp " +
timestamp;
int ttl = hasTTL ? header.readTTL(in) : LivenessInfo.NO_TTL;
+ assert ttl >= 0 : "Invalid TTL " + ttl;
long localDeletionTime = hasTTL ?
header.readLocalDeletionTime(in) : LivenessInfo.NO_EXPIRATION_TIME;
- if (localDeletionTime < 0)
- localDeletionTime = helper.version <
MessagingService.VERSION_50
- ? Cell.INVALID_DELETION_TIME
- :
Cell.deletionTimeUnsignedIntegerToLong((int) localDeletionTime);
+
+ localDeletionTime =
Cell.decodeLocalDeletionTime(localDeletionTime, ttl, helper);
rowLiveness = LivenessInfo.withExpirationTime(timestamp, ttl,
localDeletionTime);
}
diff --git
a/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java
b/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java
index b8355fa518..789bc4bd5f 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java
@@ -138,7 +138,7 @@ public class SSTableIdentityIterator implements
Comparable<SSTableIdentityIterat
{
return iterator.hasNext();
}
- catch (IndexOutOfBoundsException | VIntOutOfRangeException e)
+ catch (IndexOutOfBoundsException | VIntOutOfRangeException |
AssertionError e)
{
sstable.markSuspect();
throw new CorruptSSTableException(e, filename);
@@ -163,7 +163,7 @@ public class SSTableIdentityIterator implements
Comparable<SSTableIdentityIterat
{
return doCompute();
}
- catch (IndexOutOfBoundsException e)
+ catch (IndexOutOfBoundsException | VIntOutOfRangeException |
AssertionError e)
{
sstable.markSuspect();
throw new CorruptSSTableException(e, filename);
diff --git a/test/unit/org/apache/cassandra/db/ReadCommandTest.java
b/test/unit/org/apache/cassandra/db/ReadCommandTest.java
index f1efe97722..16de3f061c 100644
--- a/test/unit/org/apache/cassandra/db/ReadCommandTest.java
+++ b/test/unit/org/apache/cassandra/db/ReadCommandTest.java
@@ -798,21 +798,21 @@ public class ReadCommandTest
long nowInSec = FBUtilities.nowInSeconds();
// A simple tombstone
- new RowUpdateBuilder(cfs.metadata(), 0,
keys[0]).clustering("cc").delete("a").build().apply();
+ new RowUpdateBuilder(cfs.metadata(), 100,
keys[0]).clustering("cc").delete("a").build().apply();
// Collection with an associated complex deletion
- PartitionUpdate.SimpleBuilder builder =
PartitionUpdate.simpleBuilder(cfs.metadata(), keys[1]).timestamp(0);
+ PartitionUpdate.SimpleBuilder builder =
PartitionUpdate.simpleBuilder(cfs.metadata(), keys[1]).timestamp(100);
builder.row("cc").add("c", ImmutableSet.of("element1", "element2"));
builder.buildAsMutation().apply();
// RangeTombstone and a row (not covered by the RT). The row contains
a regular tombstone which will not be
// purged. This is to prevent the partition from being fully purged
and removed from the final results
- new RowUpdateBuilder(cfs.metadata(), nowInSec, 0L,
keys[2]).addRangeTombstone("aa", "bb").build().apply();
+ new RowUpdateBuilder(cfs.metadata(), nowInSec, 100L,
keys[2]).addRangeTombstone("aa", "bb").build().apply();
new RowUpdateBuilder(cfs.metadata(), nowInSec+ 1000, 1000L,
keys[2]).clustering("cc").delete("a").build().apply();
// Partition with 2 rows, one fully deleted
- new RowUpdateBuilder(cfs.metadata.get(), 0,
keys[3]).clustering("bb").add("a",
ByteBufferUtil.bytes("a")).delete("b").build().apply();
- RowUpdateBuilder.deleteRow(cfs.metadata(), 0, keys[3], "cc").apply();
+ new RowUpdateBuilder(cfs.metadata.get(), 100,
keys[3]).clustering("bb").add("a",
ByteBufferUtil.bytes("a")).delete("b").build().apply();
+ RowUpdateBuilder.deleteRow(cfs.metadata(), 100, keys[3], "cc").apply();
Util.flush(cfs);
cfs.getLiveSSTables().forEach(sstable -> mutateRepaired(cfs, sstable,
111, null));
diff --git
a/test/unit/org/apache/cassandra/db/compaction/CorruptedSSTablesCompactionsTest.java
b/test/unit/org/apache/cassandra/db/compaction/CorruptedSSTablesCompactionsTest.java
index 26adfb108c..dc78276254 100644
---
a/test/unit/org/apache/cassandra/db/compaction/CorruptedSSTablesCompactionsTest.java
+++
b/test/unit/org/apache/cassandra/db/compaction/CorruptedSSTablesCompactionsTest.java
@@ -225,7 +225,7 @@ public class CorruptedSSTablesCompactionsTest
try
{
cfs.forceMajorCompaction();
- break;
+ break; // After all corrupted sstables are marked as such,
compaction of the rest should succeed.
}
catch (Exception e)
{
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]