This is an automated email from the ASF dual-hosted git repository.

blambov pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/trunk by this push:
     new a565711056 Fix data corruption handling issues
a565711056 is described below

commit a565711056c859398d0b26081b46e71d2076de1d
Author: Branimir Lambov <branimir.lam...@datastax.com>
AuthorDate: Mon Jul 24 12:23:27 2023 +0300

    Fix data corruption handling issues
    
    Treat AssertionError as corruption, assert positive deletion timestamps and
    TTLs and treat localDeletionTime < TTL as invalid.
    
    patch by Branimir Lambov; reviewed by Berenguer Blasi for CASSANDRA-18676
---
 src/java/org/apache/cassandra/db/DeletionTime.java |  8 ++---
 src/java/org/apache/cassandra/db/rows/Cell.java    | 41 +++++++++++++++++-----
 .../cassandra/db/rows/UnfilteredSerializer.java    |  8 ++---
 .../io/sstable/SSTableIdentityIterator.java        |  4 +--
 .../org/apache/cassandra/db/ReadCommandTest.java   | 10 +++---
 .../CorruptedSSTablesCompactionsTest.java          |  2 +-
 6 files changed, 49 insertions(+), 24 deletions(-)

diff --git a/src/java/org/apache/cassandra/db/DeletionTime.java 
b/src/java/org/apache/cassandra/db/DeletionTime.java
index a0450a0fd1..45a5bf841a 100644
--- a/src/java/org/apache/cassandra/db/DeletionTime.java
+++ b/src/java/org/apache/cassandra/db/DeletionTime.java
@@ -70,8 +70,7 @@ public class DeletionTime implements 
Comparable<DeletionTime>, IMeasurableMemory
 
     private DeletionTime(long markedForDeleteAt, long localDeletionTime)
     {
-        this.markedForDeleteAt = markedForDeleteAt;
-        this.localDeletionTimeUnsignedInteger = 
Cell.deletionTimeLongToUnsignedInteger(localDeletionTime);
+        this(markedForDeleteAt, 
Cell.deletionTimeLongToUnsignedInteger(localDeletionTime));
     }
 
     private DeletionTime(long markedForDeleteAt, int 
localDeletionTimeUnsignedInteger)
@@ -116,12 +115,13 @@ public class DeletionTime implements 
Comparable<DeletionTime>, IMeasurableMemory
     }
 
     /**
-     * check if this deletion time is valid - localDeletionTime can never be 
negative
+     * Check if this deletion time is valid - markedForDeleteAt can only 
negative if the deletion is LIVE.
+     * localDeletionTime is not checked as it is stored as an unsigned int and 
cannot be negative.
      * @return true if it is valid
      */
     public boolean validate()
     {
-        return true;
+        return markedForDeleteAt >= 0 || isLive();
     }
 
     @Override
diff --git a/src/java/org/apache/cassandra/db/rows/Cell.java 
b/src/java/org/apache/cassandra/db/rows/Cell.java
index 9850d08af3..fffcca821a 100644
--- a/src/java/org/apache/cassandra/db/rows/Cell.java
+++ b/src/java/org/apache/cassandra/db/rows/Cell.java
@@ -21,12 +21,14 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.Comparator;
 
-import org.apache.cassandra.config.*;
-import org.apache.cassandra.db.*;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.DeletionPurger;
+import org.apache.cassandra.db.LivenessInfo;
+import org.apache.cassandra.db.SerializationHeader;
 import org.apache.cassandra.db.marshal.ValueAccessor;
+import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.schema.ColumnMetadata;
 import org.apache.cassandra.utils.CassandraUInt;
 import org.apache.cassandra.utils.memory.ByteBufferCloner;
@@ -213,6 +215,29 @@ public abstract class Cell<V> extends ColumnData
     
     protected abstract int localDeletionTimeAsUnsignedInt();
 
+    /**
+     * Handle unsigned encoding and potentially invalid localDeletionTime.
+     */
+    public static long decodeLocalDeletionTime(long localDeletionTime, int 
ttl, DeserializationHelper helper)
+    {
+        if (localDeletionTime >= ttl)
+            return localDeletionTime;   // fast path, positive and valid 
signed 32-bit integer
+
+        if (localDeletionTime < 0)
+        {
+            // Overflown signed int, decode to long. The result is guaranteed 
> ttl (and any signed int)
+            return helper.version < MessagingService.VERSION_50
+                   ? INVALID_DELETION_TIME
+                   : deletionTimeUnsignedIntegerToLong((int) 
localDeletionTime);
+        }
+
+        if (ttl == LivenessInfo.EXPIRED_LIVENESS_TTL)
+            return localDeletionTime;   // ttl is already expired, 
localDeletionTime is valid
+        else
+            return INVALID_DELETION_TIME;  // Invalid as it can't occur 
without corruption and would cause negative
+                                           // timestamp on expiry.
+    }
+
     /**
      * The serialization format for cell is:
      *     [ flags ][ timestamp ][ deletion time ][    ttl    ][ path size ][ 
path ][ value size ][ value ]
@@ -317,11 +342,11 @@ public abstract class Cell<V> extends ColumnData
                 }
             }
 
-            if (localDeletionTime < 0)
-                localDeletionTime = helper.version < 
MessagingService.VERSION_50
-                                    ? INVALID_DELETION_TIME
-                                    : deletionTimeUnsignedIntegerToLong((int) 
localDeletionTime);
-
+            if (timestamp < 0)
+                throw new IOException("Invalid negative timestamp: " + 
timestamp);
+            if (ttl < 0)
+                throw new IOException("Invalid TTL: " + ttl);
+            localDeletionTime = decodeLocalDeletionTime(localDeletionTime, 
ttl, helper);
             return accessor.factory().cell(column, timestamp, ttl, 
localDeletionTime, value, path);
         }
 
diff --git a/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java 
b/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java
index c1802cf012..cfbcad177c 100644
--- a/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java
+++ b/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java
@@ -589,12 +589,12 @@ public class UnfilteredSerializer
             if (hasTimestamp)
             {
                 long timestamp = header.readTimestamp(in);
+                assert timestamp >= 0 : "Invalid negative timestamp " + 
timestamp;
                 int ttl = hasTTL ? header.readTTL(in) : LivenessInfo.NO_TTL;
+                assert ttl >= 0 : "Invalid TTL " + ttl;
                 long localDeletionTime = hasTTL ? 
header.readLocalDeletionTime(in) : LivenessInfo.NO_EXPIRATION_TIME;
-                if (localDeletionTime < 0)
-                    localDeletionTime = helper.version < 
MessagingService.VERSION_50
-                                        ? Cell.INVALID_DELETION_TIME
-                                        : 
Cell.deletionTimeUnsignedIntegerToLong((int) localDeletionTime);
+
+                localDeletionTime = 
Cell.decodeLocalDeletionTime(localDeletionTime, ttl, helper);
 
                 rowLiveness = LivenessInfo.withExpirationTime(timestamp, ttl, 
localDeletionTime);
             }
diff --git 
a/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java 
b/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java
index b8355fa518..789bc4bd5f 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java
@@ -138,7 +138,7 @@ public class SSTableIdentityIterator implements 
Comparable<SSTableIdentityIterat
         {
             return iterator.hasNext();
         }
-        catch (IndexOutOfBoundsException | VIntOutOfRangeException e)
+        catch (IndexOutOfBoundsException | VIntOutOfRangeException | 
AssertionError e)
         {
             sstable.markSuspect();
             throw new CorruptSSTableException(e, filename);
@@ -163,7 +163,7 @@ public class SSTableIdentityIterator implements 
Comparable<SSTableIdentityIterat
         {
             return doCompute();
         }
-        catch (IndexOutOfBoundsException e)
+        catch (IndexOutOfBoundsException | VIntOutOfRangeException | 
AssertionError e)
         {
             sstable.markSuspect();
             throw new CorruptSSTableException(e, filename);
diff --git a/test/unit/org/apache/cassandra/db/ReadCommandTest.java 
b/test/unit/org/apache/cassandra/db/ReadCommandTest.java
index f1efe97722..16de3f061c 100644
--- a/test/unit/org/apache/cassandra/db/ReadCommandTest.java
+++ b/test/unit/org/apache/cassandra/db/ReadCommandTest.java
@@ -798,21 +798,21 @@ public class ReadCommandTest
         long nowInSec = FBUtilities.nowInSeconds();
 
         // A simple tombstone
-        new RowUpdateBuilder(cfs.metadata(), 0, 
keys[0]).clustering("cc").delete("a").build().apply();
+        new RowUpdateBuilder(cfs.metadata(), 100, 
keys[0]).clustering("cc").delete("a").build().apply();
 
         // Collection with an associated complex deletion
-        PartitionUpdate.SimpleBuilder builder = 
PartitionUpdate.simpleBuilder(cfs.metadata(), keys[1]).timestamp(0);
+        PartitionUpdate.SimpleBuilder builder = 
PartitionUpdate.simpleBuilder(cfs.metadata(), keys[1]).timestamp(100);
         builder.row("cc").add("c", ImmutableSet.of("element1", "element2"));
         builder.buildAsMutation().apply();
 
         // RangeTombstone and a row (not covered by the RT). The row contains 
a regular tombstone which will not be
         // purged. This is to prevent the partition from being fully purged 
and removed from the final results
-        new RowUpdateBuilder(cfs.metadata(), nowInSec, 0L, 
keys[2]).addRangeTombstone("aa", "bb").build().apply();
+        new RowUpdateBuilder(cfs.metadata(), nowInSec, 100L, 
keys[2]).addRangeTombstone("aa", "bb").build().apply();
         new RowUpdateBuilder(cfs.metadata(), nowInSec+ 1000, 1000L, 
keys[2]).clustering("cc").delete("a").build().apply();
 
         // Partition with 2 rows, one fully deleted
-        new RowUpdateBuilder(cfs.metadata.get(), 0, 
keys[3]).clustering("bb").add("a", 
ByteBufferUtil.bytes("a")).delete("b").build().apply();
-        RowUpdateBuilder.deleteRow(cfs.metadata(), 0, keys[3], "cc").apply();
+        new RowUpdateBuilder(cfs.metadata.get(), 100, 
keys[3]).clustering("bb").add("a", 
ByteBufferUtil.bytes("a")).delete("b").build().apply();
+        RowUpdateBuilder.deleteRow(cfs.metadata(), 100, keys[3], "cc").apply();
         Util.flush(cfs);
         cfs.getLiveSSTables().forEach(sstable -> mutateRepaired(cfs, sstable, 
111, null));
 
diff --git 
a/test/unit/org/apache/cassandra/db/compaction/CorruptedSSTablesCompactionsTest.java
 
b/test/unit/org/apache/cassandra/db/compaction/CorruptedSSTablesCompactionsTest.java
index 26adfb108c..dc78276254 100644
--- 
a/test/unit/org/apache/cassandra/db/compaction/CorruptedSSTablesCompactionsTest.java
+++ 
b/test/unit/org/apache/cassandra/db/compaction/CorruptedSSTablesCompactionsTest.java
@@ -225,7 +225,7 @@ public class CorruptedSSTablesCompactionsTest
             try
             {
                 cfs.forceMajorCompaction();
-                break;
+                break; // After all corrupted sstables are marked as such, 
compaction of the rest should succeed.
             }
             catch (Exception e)
             {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to