This is an automated email from the ASF dual-hosted git repository. blambov pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push: new a565711056 Fix data corruption handling issues a565711056 is described below commit a565711056c859398d0b26081b46e71d2076de1d Author: Branimir Lambov <branimir.lam...@datastax.com> AuthorDate: Mon Jul 24 12:23:27 2023 +0300 Fix data corruption handling issues Treat AssertionError as corruption, assert positive deletion timestamps and TTLs and treat localDeletionTime < TTL as invalid. patch by Branimir Lambov; reviewed by Berenguer Blasi for CASSANDRA-18676 --- src/java/org/apache/cassandra/db/DeletionTime.java | 8 ++--- src/java/org/apache/cassandra/db/rows/Cell.java | 41 +++++++++++++++++----- .../cassandra/db/rows/UnfilteredSerializer.java | 8 ++--- .../io/sstable/SSTableIdentityIterator.java | 4 +-- .../org/apache/cassandra/db/ReadCommandTest.java | 10 +++--- .../CorruptedSSTablesCompactionsTest.java | 2 +- 6 files changed, 49 insertions(+), 24 deletions(-) diff --git a/src/java/org/apache/cassandra/db/DeletionTime.java b/src/java/org/apache/cassandra/db/DeletionTime.java index a0450a0fd1..45a5bf841a 100644 --- a/src/java/org/apache/cassandra/db/DeletionTime.java +++ b/src/java/org/apache/cassandra/db/DeletionTime.java @@ -70,8 +70,7 @@ public class DeletionTime implements Comparable<DeletionTime>, IMeasurableMemory private DeletionTime(long markedForDeleteAt, long localDeletionTime) { - this.markedForDeleteAt = markedForDeleteAt; - this.localDeletionTimeUnsignedInteger = Cell.deletionTimeLongToUnsignedInteger(localDeletionTime); + this(markedForDeleteAt, Cell.deletionTimeLongToUnsignedInteger(localDeletionTime)); } private DeletionTime(long markedForDeleteAt, int localDeletionTimeUnsignedInteger) @@ -116,12 +115,13 @@ public class DeletionTime implements Comparable<DeletionTime>, IMeasurableMemory } /** - * check if this deletion time is valid - localDeletionTime can never be negative + * Check if this deletion time is valid - markedForDeleteAt can only negative if the deletion is LIVE. + * localDeletionTime is not checked as it is stored as an unsigned int and cannot be negative. * @return true if it is valid */ public boolean validate() { - return true; + return markedForDeleteAt >= 0 || isLive(); } @Override diff --git a/src/java/org/apache/cassandra/db/rows/Cell.java b/src/java/org/apache/cassandra/db/rows/Cell.java index 9850d08af3..fffcca821a 100644 --- a/src/java/org/apache/cassandra/db/rows/Cell.java +++ b/src/java/org/apache/cassandra/db/rows/Cell.java @@ -21,12 +21,14 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.util.Comparator; -import org.apache.cassandra.config.*; -import org.apache.cassandra.db.*; +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.db.DeletionPurger; +import org.apache.cassandra.db.LivenessInfo; +import org.apache.cassandra.db.SerializationHeader; import org.apache.cassandra.db.marshal.ValueAccessor; +import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.io.util.DataOutputPlus; import org.apache.cassandra.net.MessagingService; -import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.schema.ColumnMetadata; import org.apache.cassandra.utils.CassandraUInt; import org.apache.cassandra.utils.memory.ByteBufferCloner; @@ -213,6 +215,29 @@ public abstract class Cell<V> extends ColumnData protected abstract int localDeletionTimeAsUnsignedInt(); + /** + * Handle unsigned encoding and potentially invalid localDeletionTime. + */ + public static long decodeLocalDeletionTime(long localDeletionTime, int ttl, DeserializationHelper helper) + { + if (localDeletionTime >= ttl) + return localDeletionTime; // fast path, positive and valid signed 32-bit integer + + if (localDeletionTime < 0) + { + // Overflown signed int, decode to long. The result is guaranteed > ttl (and any signed int) + return helper.version < MessagingService.VERSION_50 + ? INVALID_DELETION_TIME + : deletionTimeUnsignedIntegerToLong((int) localDeletionTime); + } + + if (ttl == LivenessInfo.EXPIRED_LIVENESS_TTL) + return localDeletionTime; // ttl is already expired, localDeletionTime is valid + else + return INVALID_DELETION_TIME; // Invalid as it can't occur without corruption and would cause negative + // timestamp on expiry. + } + /** * The serialization format for cell is: * [ flags ][ timestamp ][ deletion time ][ ttl ][ path size ][ path ][ value size ][ value ] @@ -317,11 +342,11 @@ public abstract class Cell<V> extends ColumnData } } - if (localDeletionTime < 0) - localDeletionTime = helper.version < MessagingService.VERSION_50 - ? INVALID_DELETION_TIME - : deletionTimeUnsignedIntegerToLong((int) localDeletionTime); - + if (timestamp < 0) + throw new IOException("Invalid negative timestamp: " + timestamp); + if (ttl < 0) + throw new IOException("Invalid TTL: " + ttl); + localDeletionTime = decodeLocalDeletionTime(localDeletionTime, ttl, helper); return accessor.factory().cell(column, timestamp, ttl, localDeletionTime, value, path); } diff --git a/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java b/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java index c1802cf012..cfbcad177c 100644 --- a/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java +++ b/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java @@ -589,12 +589,12 @@ public class UnfilteredSerializer if (hasTimestamp) { long timestamp = header.readTimestamp(in); + assert timestamp >= 0 : "Invalid negative timestamp " + timestamp; int ttl = hasTTL ? header.readTTL(in) : LivenessInfo.NO_TTL; + assert ttl >= 0 : "Invalid TTL " + ttl; long localDeletionTime = hasTTL ? header.readLocalDeletionTime(in) : LivenessInfo.NO_EXPIRATION_TIME; - if (localDeletionTime < 0) - localDeletionTime = helper.version < MessagingService.VERSION_50 - ? Cell.INVALID_DELETION_TIME - : Cell.deletionTimeUnsignedIntegerToLong((int) localDeletionTime); + + localDeletionTime = Cell.decodeLocalDeletionTime(localDeletionTime, ttl, helper); rowLiveness = LivenessInfo.withExpirationTime(timestamp, ttl, localDeletionTime); } diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java b/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java index b8355fa518..789bc4bd5f 100644 --- a/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java +++ b/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java @@ -138,7 +138,7 @@ public class SSTableIdentityIterator implements Comparable<SSTableIdentityIterat { return iterator.hasNext(); } - catch (IndexOutOfBoundsException | VIntOutOfRangeException e) + catch (IndexOutOfBoundsException | VIntOutOfRangeException | AssertionError e) { sstable.markSuspect(); throw new CorruptSSTableException(e, filename); @@ -163,7 +163,7 @@ public class SSTableIdentityIterator implements Comparable<SSTableIdentityIterat { return doCompute(); } - catch (IndexOutOfBoundsException e) + catch (IndexOutOfBoundsException | VIntOutOfRangeException | AssertionError e) { sstable.markSuspect(); throw new CorruptSSTableException(e, filename); diff --git a/test/unit/org/apache/cassandra/db/ReadCommandTest.java b/test/unit/org/apache/cassandra/db/ReadCommandTest.java index f1efe97722..16de3f061c 100644 --- a/test/unit/org/apache/cassandra/db/ReadCommandTest.java +++ b/test/unit/org/apache/cassandra/db/ReadCommandTest.java @@ -798,21 +798,21 @@ public class ReadCommandTest long nowInSec = FBUtilities.nowInSeconds(); // A simple tombstone - new RowUpdateBuilder(cfs.metadata(), 0, keys[0]).clustering("cc").delete("a").build().apply(); + new RowUpdateBuilder(cfs.metadata(), 100, keys[0]).clustering("cc").delete("a").build().apply(); // Collection with an associated complex deletion - PartitionUpdate.SimpleBuilder builder = PartitionUpdate.simpleBuilder(cfs.metadata(), keys[1]).timestamp(0); + PartitionUpdate.SimpleBuilder builder = PartitionUpdate.simpleBuilder(cfs.metadata(), keys[1]).timestamp(100); builder.row("cc").add("c", ImmutableSet.of("element1", "element2")); builder.buildAsMutation().apply(); // RangeTombstone and a row (not covered by the RT). The row contains a regular tombstone which will not be // purged. This is to prevent the partition from being fully purged and removed from the final results - new RowUpdateBuilder(cfs.metadata(), nowInSec, 0L, keys[2]).addRangeTombstone("aa", "bb").build().apply(); + new RowUpdateBuilder(cfs.metadata(), nowInSec, 100L, keys[2]).addRangeTombstone("aa", "bb").build().apply(); new RowUpdateBuilder(cfs.metadata(), nowInSec+ 1000, 1000L, keys[2]).clustering("cc").delete("a").build().apply(); // Partition with 2 rows, one fully deleted - new RowUpdateBuilder(cfs.metadata.get(), 0, keys[3]).clustering("bb").add("a", ByteBufferUtil.bytes("a")).delete("b").build().apply(); - RowUpdateBuilder.deleteRow(cfs.metadata(), 0, keys[3], "cc").apply(); + new RowUpdateBuilder(cfs.metadata.get(), 100, keys[3]).clustering("bb").add("a", ByteBufferUtil.bytes("a")).delete("b").build().apply(); + RowUpdateBuilder.deleteRow(cfs.metadata(), 100, keys[3], "cc").apply(); Util.flush(cfs); cfs.getLiveSSTables().forEach(sstable -> mutateRepaired(cfs, sstable, 111, null)); diff --git a/test/unit/org/apache/cassandra/db/compaction/CorruptedSSTablesCompactionsTest.java b/test/unit/org/apache/cassandra/db/compaction/CorruptedSSTablesCompactionsTest.java index 26adfb108c..dc78276254 100644 --- a/test/unit/org/apache/cassandra/db/compaction/CorruptedSSTablesCompactionsTest.java +++ b/test/unit/org/apache/cassandra/db/compaction/CorruptedSSTablesCompactionsTest.java @@ -225,7 +225,7 @@ public class CorruptedSSTablesCompactionsTest try { cfs.forceMajorCompaction(); - break; + break; // After all corrupted sstables are marked as such, compaction of the rest should succeed. } catch (Exception e) { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org