This is an automated email from the ASF dual-hosted git repository.

smiklosovic pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git

commit 5519df34cb725904ef567e377657cddd580e7c0f
Merge: c9fe399b06 e0ac46c5a7
Author: Stefan Miklosovic <[email protected]>
AuthorDate: Thu Mar 5 16:12:02 2026 +0100

    Merge branch 'cassandra-5.0' into trunk

 CHANGES.txt                                        |   1 +
 .../cassandra/db/compaction/CompactionTask.java    |   1 +
 .../db/compaction/CompactionTaskTest.java          | 188 ++++++++++++++++++++-
 .../db/lifecycle/TestableLifecycleTransaction.java |  49 ++++++
 4 files changed, 238 insertions(+), 1 deletion(-)

diff --cc CHANGES.txt
index 36ad265565,755372a335..7d89df7e56
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -393,26 -113,7 +393,27 @@@ Merged from 4.1
   * Optionally skip exception logging on invalid legacy protocol magic 
exception (CASSANDRA-19483)
   * Fix SimpleClient ability to release acquired capacity (CASSANDRA-20202)
   * Fix WaitQueue.Signal.awaitUninterruptibly may block forever if invoking 
thread is interrupted (CASSANDRA-20084)
 + * Run audit_logging_options through santiation and validation on startup 
(CASSANDRA-20208)
 + * Enforce CQL message size limit on multiframe messages (CASSANDRA-20052)
 + * Fix race condition in DecayingEstimatedHistogramReservoir during rescale 
(CASSANDRA-19365)
  Merged from 4.0:
++ * Obsolete expired SSTables before compaction starts (CASSANDRA-19776)
 + * Switch lz4-java to at.yawk.lz4 version due to CVE (CASSANDRA-20152)
 + * Restrict BytesType compatibility to scalar types only (CASSANDRA-20982)
 + * Backport fix to nodetool gcstats output for direct memory (CASSANDRA-21037)
 + * ArrayIndexOutOfBoundsException with repaired data tracking and counters 
(CASSANDRA-20871)
 + * Fix cleanup of old incremental repair sessions in case of owned token 
range changes or a table deleting (CASSANDRA-20877)
 + * Fix memory leak in BufferPoolAllocator when a capacity needs to be 
extended (CASSANDRA-20753)
 + * Leveled Compaction doesn't validate maxBytesForLevel when the table is 
altered/created (CASSANDRA-20570)
 + * Updated dtest-api to 0.0.18 and removed JMX-related classes that now live 
in the dtest-api (CASSANDRA-20884)
 + * Fixed incorrect error message constant for keyspace name length validation 
(CASSANDRA-20915)
 + * Prevent too long table names not fitting file names (CASSANDRA-20389)
 + * Update Jackson to 2.19.2 (CASSANDRA-20848)
 + * Update commons-lang3 to 3.18.0 (CASSANDRA-20849)
 + * Add NativeTransportMaxConcurrentConnectionsPerIp to StorageProxyMBean 
(CASSANDRA-20642)
 + * Make secondary index implementations notified about rows in fully expired 
SSTables in compaction (CASSANDRA-20829)
 + * Ensure prepared_statement INSERT timestamp precedes eviction DELETE 
(CASSANDRA-19703)
 + * Gossip doesn't converge due to race condition when updating EndpointStates 
multiple fields (CASSANDRA-20659)
   * Handle sstable metadata stats file getting a new mtime after compaction 
has finished (CASSANDRA-18119)
   * Honor MAX_PARALLEL_TRANSFERS correctly (CASSANDRA-20532)
   * Updating a column with a new TTL but same expiration time is 
non-deterministic and causes repair mismatches. (CASSANDRA-20561)
diff --cc src/java/org/apache/cassandra/db/compaction/CompactionTask.java
index 15f317b565,9566ef8430..7336c4543a
--- a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
@@@ -183,24 -143,12 +183,25 @@@ public class CompactionTask extends Abs
  
              final Set<SSTableReader> fullyExpiredSSTables = 
controller.getFullyExpiredSSTables();
  
 +            
maybeNotifyIndexersAboutRowsInFullyExpiredSSTables(fullyExpiredSSTables);
 +
 +            if (!fullyExpiredSSTables.isEmpty())
 +            {
 +                logger.debug("Compaction {} dropping expired sstables: {}", 
transaction.opIdString(), fullyExpiredSSTables);
++                fullyExpiredSSTables.forEach(transaction::obsolete);
 +                actuallyCompact.removeAll(fullyExpiredSSTables);
 +            }
 +
              TimeUUID taskId = transaction.opId();
              // select SSTables to compact based on available disk space.
 -            if 
(!buildCompactionCandidatesForAvailableDiskSpace(fullyExpiredSSTables, taskId))
 +            final boolean hasExpirations = !fullyExpiredSSTables.isEmpty();
 +            if ((shouldReduceScopeForSpace() && 
!buildCompactionCandidatesForAvailableDiskSpace(actuallyCompact, 
hasExpirations, taskId))
 +                || hasExpirations)
              {
                  // The set of sstables has changed (one or more were excluded 
due to limited available disk space).
 -                // We need to recompute the overlaps between sstables.
 +                // We need to recompute the overlaps between sstables. The 
iterators used in the compaction controller
 +                // and tracker will reflect the changed set of sstables made 
by LifecycleTransaction.cancel(),
 +                // so refreshing the overlaps will be based on the updated 
set of sstables.
                  controller.refreshOverlaps();
              }
  
diff --cc test/unit/org/apache/cassandra/db/compaction/CompactionTaskTest.java
index a69bb5ff34,35763140e6..1a7c73d566
--- a/test/unit/org/apache/cassandra/db/compaction/CompactionTaskTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/CompactionTaskTest.java
@@@ -21,9 -21,10 +21,11 @@@ package org.apache.cassandra.db.compact
  import java.io.IOException;
  import java.util.ArrayList;
  import java.util.Collections;
+ import java.util.HashSet;
  import java.util.List;
 +import java.util.Map;
  import java.util.Set;
+ import java.util.concurrent.atomic.AtomicInteger;
  
  import org.junit.Assert;
  import org.junit.Before;
@@@ -36,9 -38,15 +39,17 @@@ import org.apache.cassandra.cql3.QueryP
  import org.apache.cassandra.cql3.UntypedResultSet;
  import org.apache.cassandra.cql3.statements.schema.CreateTableStatement;
  import org.apache.cassandra.db.ColumnFamilyStore;
+ import org.apache.cassandra.db.Directories;
  import org.apache.cassandra.db.SystemKeyspace;
+ import org.apache.cassandra.db.compaction.writers.CompactionAwareWriter;
+ import org.apache.cassandra.db.compaction.writers.MaxSSTableSizeWriter;
++import org.apache.cassandra.db.lifecycle.ILifecycleTransaction;
  import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
+ import org.apache.cassandra.db.lifecycle.SSTableSet;
 -import org.apache.cassandra.db.lifecycle.TestableLifecycleTransaction;
+ import org.apache.cassandra.db.lifecycle.View;
++import org.apache.cassandra.db.lifecycle.WrappedLifecycleTransaction;
 +import org.apache.cassandra.db.marshal.UTF8Type;
+ import org.apache.cassandra.io.sstable.SSTableRewriter;
  import org.apache.cassandra.io.sstable.format.SSTableReader;
  import org.apache.cassandra.schema.KeyspaceParams;
  import org.apache.cassandra.schema.Schema;
@@@ -142,6 -155,175 +162,172 @@@ public class CompactionTaskTes
          
Assert.assertEquals(Transactional.AbstractTransactional.State.ABORTED, 
txn.state());
      }
  
+     /**
+      * Test that even some SSTables are fully expired, we can still select 
and reference them
+      * while they are part of compaction.
+      *
+      * @see <a 
href="https://issues.apache.org/jira/browse/CASSANDRA-19776>CASSANDRA-19776</a>
+      */
+     @Test
+     public void testFullyExpiredSSTablesAreNotReleasedPrematurely()
+     {
+         Assert.assertEquals(0, gcGraceCfs.getLiveSSTables().size());
+         gcGraceCfs.getCompactionStrategyManager().disable();
+ 
+         // Use large SSTables (10+ MiB) so that switching to new output 
SSTables happens during
+         // compaction. Without large enough SSTables, the output fits in one 
SSTable and no switch happens
+         int numKeys = 5000;
+         String data = "x".repeat(2048); // ~2KB padding per row
+ 
+         // Similar technique to get fully expired SSTables as in 
TTLExpiryTest#testAggressiveFullyExpired
+         // SSTable 1 (will be fully expired - superseded by SSTable 2)
+         for (int k = 0; k < numKeys; k++)
+         {
+             QueryProcessor.executeInternal("INSERT INTO ks.tbl2 (k, col1, 
data) VALUES (?, 1, ?) USING TIMESTAMP 1 AND TTL 1", k, data);
+             QueryProcessor.executeInternal("INSERT INTO ks.tbl2 (k, col2, 
data) VALUES (?, 1, ?) USING TIMESTAMP 3 AND TTL 1", k, data);
+         }
+         Util.flush(gcGraceCfs);
+ 
+         // SSTable 2 (will be fully expired - superseded by SSTables 3 and 4)
+         for (int k = 0; k < numKeys; k++)
+         {
+             QueryProcessor.executeInternal("INSERT INTO ks.tbl2 (k, col1, 
data) VALUES (?, 1, ?) USING TIMESTAMP 2 AND TTL 1", k, data);
+             QueryProcessor.executeInternal("INSERT INTO ks.tbl2 (k, col2, 
data) VALUES (?, 1, ?) USING TIMESTAMP 5 AND TTL 1", k, data);
+         }
+         Util.flush(gcGraceCfs);
+ 
+         Set<SSTableReader> toBeObsolete = new 
HashSet<>(gcGraceCfs.getLiveSSTables());
+         Assert.assertEquals(2, toBeObsolete.size());
+ 
+         // SSTable 3 (not fully expired)
+         for (int k = 0; k < numKeys; k++)
+         {
+             QueryProcessor.executeInternal("INSERT INTO ks.tbl2 (k, col1, 
data) VALUES (?, 1, ?) USING TIMESTAMP 4 AND TTL 1", k, data);
+             QueryProcessor.executeInternal("INSERT INTO ks.tbl2 (k, col3, 
data) VALUES (?, 1, ?) USING TIMESTAMP 7 AND TTL 1", k, data);
+         }
+         Util.flush(gcGraceCfs);
+ 
+         // SSTable 4 (not fully expired - col3 has longer TTL)
+         for (int k = 0; k < numKeys; k++)
+         {
+             QueryProcessor.executeInternal("INSERT INTO ks.tbl2 (k, col3, 
data) VALUES (?, 1, ?) USING TIMESTAMP 6 AND TTL 3", k, data);
+             QueryProcessor.executeInternal("INSERT INTO ks.tbl2 (k, col2, 
data) VALUES (?, 1, ?) USING TIMESTAMP 8 AND TTL 1", k, data);
+         }
+         Util.flush(gcGraceCfs);
+ 
+         Set<SSTableReader> sstables = gcGraceCfs.getLiveSSTables();
+         Assert.assertEquals(4, sstables.size());
+ 
+         // Enable preemptive opening so that SSTableRewriter.switchWriter() 
calls checkpoint().
+         int originalPreemptiveOpenInterval = 
DatabaseDescriptor.getSSTablePreemptiveOpenIntervalInMiB();
+         DatabaseDescriptor.setSSTablePreemptiveOpenIntervalInMiB(2);
+ 
+         // collector of stacktraces to later check if checkpoint was called 
in switchWriter
+         final List<StackTraceElement[]> stacks = new ArrayList<>();
+ 
+         try
+         {
+             AtomicInteger checkpointCount = new AtomicInteger(0);
+ 
 -            // Hook into transaction's checkpoint and doCommit methods to 
verify that after
++            // Hook into transaction's checkpoint and commit methods to 
verify that after
+             // checkpointing, all SSTables (including fully expired ones) 
remain referenceable.
 -            // We use TestableLifecycleTransaction because in cassandra-5.0, 
AbstractCompactionTask.transaction
 -            // is LifecycleTransaction (concrete), not ILifecycleTransaction 
(interface), so
 -            // WrappedLifecycleTransaction cannot be used with CompactionTask 
directly.
 -            LifecycleTransaction txn = new 
TestableLifecycleTransaction(gcGraceCfs.getTracker(), OperationType.COMPACTION, 
sstables)
++            ILifecycleTransaction txn = new 
WrappedLifecycleTransaction(gcGraceCfs.getTracker().tryModify(sstables, 
OperationType.COMPACTION))
+             {
+                 @Override
+                 public void checkpoint()
+                 {
+                     stacks.add(Thread.currentThread().getStackTrace());
+ 
+                     for (SSTableReader r : toBeObsolete)
+                         Assert.assertTrue(this.isObsolete(r));
+ 
+                     assertAllSSTablesAreReferenceable();
+ 
+                     super.checkpoint();
+ 
+                     // This is the critical assertion: after checkpoint(), 
all SSTables in the
+                     // CANONICAL view must still be referenceable. Before the 
fix, fully expired
+                     // SSTables would lose their references here, causing 
selectAndReference() to
+                     // spin loop (CASSANDRA-19776).
+                     assertAllSSTablesAreReferenceable();
+ 
+                     checkpointCount.incrementAndGet();
+                 }
+ 
+                 @Override
 -                public Throwable doCommit(Throwable accumulate)
++                public Throwable commit(Throwable accumulate)
+                 {
+                     assertAllSSTablesAreReferenceable();
 -                    return super.doCommit(accumulate);
++                    return super.commit(accumulate);
+                 }
+ 
+                 private void assertAllSSTablesAreReferenceable()
+                 {
+                     // This simulates what EstimatedPartitionCount metric and 
similar code paths do.
+                     // It is crucial that tryRef does not return null; a null 
result means some SSTables
+                     // are not referenceable, which would cause 
selectAndReference() to spin loop.
+                     ColumnFamilyStore.ViewFragment view = 
gcGraceCfs.select(View.selectFunction(SSTableSet.CANONICAL));
+                     Refs<SSTableReader> refs = Refs.tryRef(view.sstables);
+                     Assert.assertNotNull("Some SSTables in CANONICAL view are 
not referenceable (CASSANDRA-19776)", refs);
+                     refs.close();
+                 }
+             };
+ 
+             // Use MaxSSTableSizeWriter with a small max size (2 MiB) to 
force output sstable switches during compaction.
+             long maxSSTableSize = 2L * 1024 * 1024; // 2 MiB
+             CompactionTask task = new CompactionTask(gcGraceCfs, txn, 
FBUtilities.nowInSeconds() + 2)
+             {
+                 @Override
+                 public CompactionAwareWriter 
getCompactionAwareWriter(ColumnFamilyStore cfs,
+                                                                       
Directories directories,
 -                                                                      
LifecycleTransaction transaction,
++                                                                      
ILifecycleTransaction transaction,
+                                                                       
Set<SSTableReader> nonExpiredSSTables)
+                 {
+                     return new MaxSSTableSizeWriter(cfs, directories, 
transaction, nonExpiredSSTables,
+                                                     maxSSTableSize, 0);
+                 }
+             };
+ 
 -            try (CompactionController compactionController = 
task.getCompactionController(txn.originals()))
++            try (CompactionController compactionController = 
task.getCompactionController(task.inputSSTables()))
+             {
+                 Set<SSTableReader> fullyExpiredSSTables = 
compactionController.getFullyExpiredSSTables();
+                 Assert.assertEquals(2, fullyExpiredSSTables.size());
+                 task.execute(null);
+             }
+ 
+             // Verify that checkpoint was called more than once, proving that 
output sstable switching
+             // happened during compaction. Without MaxSSTableSizeWriter and 
large enough SSTables,
+             // checkpoint would only be called at the end, which would not 
exercise the CASSANDRA-19776 fix.
+             Assert.assertTrue("Expected checkpoint() to be called more than 
once during compaction, but was called "
+                               + checkpointCount.get() + " time(s). Output 
sstable switching did not occur.",
+                               checkpointCount.get() > 1);
+         }
+         finally
+         {
+             
DatabaseDescriptor.setSSTablePreemptiveOpenIntervalInMiB(originalPreemptiveOpenInterval);
+         }
+ 
+         Assert.assertFalse(stacks.isEmpty());
+ 
+         boolean checkpointCalledInSSTableRewriter = false;
+ 
+         for (int i = 0; i < stacks.size(); i++)
+         {
+             for (StackTraceElement element : stacks.get(i))
+             {
+                 if 
(element.getClassName().equals(SSTableRewriter.class.getName())
+                     && (element.getMethodName().equals("switchWriter")
+                         || 
element.getMethodName().equals("maybeReopenEarly")))
+                 {
+                     checkpointCalledInSSTableRewriter = true;
+                     break;
+                 }
+             }
+         }
+ 
+         Assert.assertTrue(checkpointCalledInSSTableRewriter);
+     }
+ 
      private static void mutateRepaired(SSTableReader sstable, long 
repairedAt, TimeUUID pendingRepair, boolean isTransient) throws IOException
      {
          
sstable.descriptor.getMetadataSerializer().mutateRepairMetadata(sstable.descriptor,
 repairedAt, pendingRepair, isTransient);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to