This is an automated email from the ASF dual-hosted git repository. smiklosovic pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/cassandra.git
commit 5519df34cb725904ef567e377657cddd580e7c0f Merge: c9fe399b06 e0ac46c5a7 Author: Stefan Miklosovic <[email protected]> AuthorDate: Thu Mar 5 16:12:02 2026 +0100 Merge branch 'cassandra-5.0' into trunk CHANGES.txt | 1 + .../cassandra/db/compaction/CompactionTask.java | 1 + .../db/compaction/CompactionTaskTest.java | 188 ++++++++++++++++++++- .../db/lifecycle/TestableLifecycleTransaction.java | 49 ++++++ 4 files changed, 238 insertions(+), 1 deletion(-) diff --cc CHANGES.txt index 36ad265565,755372a335..7d89df7e56 --- a/CHANGES.txt +++ b/CHANGES.txt @@@ -393,26 -113,7 +393,27 @@@ Merged from 4.1 * Optionally skip exception logging on invalid legacy protocol magic exception (CASSANDRA-19483) * Fix SimpleClient ability to release acquired capacity (CASSANDRA-20202) * Fix WaitQueue.Signal.awaitUninterruptibly may block forever if invoking thread is interrupted (CASSANDRA-20084) + * Run audit_logging_options through santiation and validation on startup (CASSANDRA-20208) + * Enforce CQL message size limit on multiframe messages (CASSANDRA-20052) + * Fix race condition in DecayingEstimatedHistogramReservoir during rescale (CASSANDRA-19365) Merged from 4.0: ++ * Obsolete expired SSTables before compaction starts (CASSANDRA-19776) + * Switch lz4-java to at.yawk.lz4 version due to CVE (CASSANDRA-20152) + * Restrict BytesType compatibility to scalar types only (CASSANDRA-20982) + * Backport fix to nodetool gcstats output for direct memory (CASSANDRA-21037) + * ArrayIndexOutOfBoundsException with repaired data tracking and counters (CASSANDRA-20871) + * Fix cleanup of old incremental repair sessions in case of owned token range changes or a table deleting (CASSANDRA-20877) + * Fix memory leak in BufferPoolAllocator when a capacity needs to be extended (CASSANDRA-20753) + * Leveled Compaction doesn't validate maxBytesForLevel when the table is altered/created (CASSANDRA-20570) + * Updated dtest-api to 0.0.18 and removed JMX-related classes that now live in the dtest-api (CASSANDRA-20884) + * Fixed incorrect error message constant for keyspace name length validation (CASSANDRA-20915) + * Prevent too long table names not fitting file names (CASSANDRA-20389) + * Update Jackson to 2.19.2 (CASSANDRA-20848) + * Update commons-lang3 to 3.18.0 (CASSANDRA-20849) + * Add NativeTransportMaxConcurrentConnectionsPerIp to StorageProxyMBean (CASSANDRA-20642) + * Make secondary index implementations notified about rows in fully expired SSTables in compaction (CASSANDRA-20829) + * Ensure prepared_statement INSERT timestamp precedes eviction DELETE (CASSANDRA-19703) + * Gossip doesn't converge due to race condition when updating EndpointStates multiple fields (CASSANDRA-20659) * Handle sstable metadata stats file getting a new mtime after compaction has finished (CASSANDRA-18119) * Honor MAX_PARALLEL_TRANSFERS correctly (CASSANDRA-20532) * Updating a column with a new TTL but same expiration time is non-deterministic and causes repair mismatches. (CASSANDRA-20561) diff --cc src/java/org/apache/cassandra/db/compaction/CompactionTask.java index 15f317b565,9566ef8430..7336c4543a --- a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java @@@ -183,24 -143,12 +183,25 @@@ public class CompactionTask extends Abs final Set<SSTableReader> fullyExpiredSSTables = controller.getFullyExpiredSSTables(); + maybeNotifyIndexersAboutRowsInFullyExpiredSSTables(fullyExpiredSSTables); + + if (!fullyExpiredSSTables.isEmpty()) + { + logger.debug("Compaction {} dropping expired sstables: {}", transaction.opIdString(), fullyExpiredSSTables); ++ fullyExpiredSSTables.forEach(transaction::obsolete); + actuallyCompact.removeAll(fullyExpiredSSTables); + } + TimeUUID taskId = transaction.opId(); // select SSTables to compact based on available disk space. - if (!buildCompactionCandidatesForAvailableDiskSpace(fullyExpiredSSTables, taskId)) + final boolean hasExpirations = !fullyExpiredSSTables.isEmpty(); + if ((shouldReduceScopeForSpace() && !buildCompactionCandidatesForAvailableDiskSpace(actuallyCompact, hasExpirations, taskId)) + || hasExpirations) { // The set of sstables has changed (one or more were excluded due to limited available disk space). - // We need to recompute the overlaps between sstables. + // We need to recompute the overlaps between sstables. The iterators used in the compaction controller + // and tracker will reflect the changed set of sstables made by LifecycleTransaction.cancel(), + // so refreshing the overlaps will be based on the updated set of sstables. controller.refreshOverlaps(); } diff --cc test/unit/org/apache/cassandra/db/compaction/CompactionTaskTest.java index a69bb5ff34,35763140e6..1a7c73d566 --- a/test/unit/org/apache/cassandra/db/compaction/CompactionTaskTest.java +++ b/test/unit/org/apache/cassandra/db/compaction/CompactionTaskTest.java @@@ -21,9 -21,10 +21,11 @@@ package org.apache.cassandra.db.compact import java.io.IOException; import java.util.ArrayList; import java.util.Collections; + import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Set; + import java.util.concurrent.atomic.AtomicInteger; import org.junit.Assert; import org.junit.Before; @@@ -36,9 -38,15 +39,17 @@@ import org.apache.cassandra.cql3.QueryP import org.apache.cassandra.cql3.UntypedResultSet; import org.apache.cassandra.cql3.statements.schema.CreateTableStatement; import org.apache.cassandra.db.ColumnFamilyStore; + import org.apache.cassandra.db.Directories; import org.apache.cassandra.db.SystemKeyspace; + import org.apache.cassandra.db.compaction.writers.CompactionAwareWriter; + import org.apache.cassandra.db.compaction.writers.MaxSSTableSizeWriter; ++import org.apache.cassandra.db.lifecycle.ILifecycleTransaction; import org.apache.cassandra.db.lifecycle.LifecycleTransaction; + import org.apache.cassandra.db.lifecycle.SSTableSet; -import org.apache.cassandra.db.lifecycle.TestableLifecycleTransaction; + import org.apache.cassandra.db.lifecycle.View; ++import org.apache.cassandra.db.lifecycle.WrappedLifecycleTransaction; +import org.apache.cassandra.db.marshal.UTF8Type; + import org.apache.cassandra.io.sstable.SSTableRewriter; import org.apache.cassandra.io.sstable.format.SSTableReader; import org.apache.cassandra.schema.KeyspaceParams; import org.apache.cassandra.schema.Schema; @@@ -142,6 -155,175 +162,172 @@@ public class CompactionTaskTes Assert.assertEquals(Transactional.AbstractTransactional.State.ABORTED, txn.state()); } + /** + * Test that even some SSTables are fully expired, we can still select and reference them + * while they are part of compaction. + * + * @see <a href="https://issues.apache.org/jira/browse/CASSANDRA-19776>CASSANDRA-19776</a> + */ + @Test + public void testFullyExpiredSSTablesAreNotReleasedPrematurely() + { + Assert.assertEquals(0, gcGraceCfs.getLiveSSTables().size()); + gcGraceCfs.getCompactionStrategyManager().disable(); + + // Use large SSTables (10+ MiB) so that switching to new output SSTables happens during + // compaction. Without large enough SSTables, the output fits in one SSTable and no switch happens + int numKeys = 5000; + String data = "x".repeat(2048); // ~2KB padding per row + + // Similar technique to get fully expired SSTables as in TTLExpiryTest#testAggressiveFullyExpired + // SSTable 1 (will be fully expired - superseded by SSTable 2) + for (int k = 0; k < numKeys; k++) + { + QueryProcessor.executeInternal("INSERT INTO ks.tbl2 (k, col1, data) VALUES (?, 1, ?) USING TIMESTAMP 1 AND TTL 1", k, data); + QueryProcessor.executeInternal("INSERT INTO ks.tbl2 (k, col2, data) VALUES (?, 1, ?) USING TIMESTAMP 3 AND TTL 1", k, data); + } + Util.flush(gcGraceCfs); + + // SSTable 2 (will be fully expired - superseded by SSTables 3 and 4) + for (int k = 0; k < numKeys; k++) + { + QueryProcessor.executeInternal("INSERT INTO ks.tbl2 (k, col1, data) VALUES (?, 1, ?) USING TIMESTAMP 2 AND TTL 1", k, data); + QueryProcessor.executeInternal("INSERT INTO ks.tbl2 (k, col2, data) VALUES (?, 1, ?) USING TIMESTAMP 5 AND TTL 1", k, data); + } + Util.flush(gcGraceCfs); + + Set<SSTableReader> toBeObsolete = new HashSet<>(gcGraceCfs.getLiveSSTables()); + Assert.assertEquals(2, toBeObsolete.size()); + + // SSTable 3 (not fully expired) + for (int k = 0; k < numKeys; k++) + { + QueryProcessor.executeInternal("INSERT INTO ks.tbl2 (k, col1, data) VALUES (?, 1, ?) USING TIMESTAMP 4 AND TTL 1", k, data); + QueryProcessor.executeInternal("INSERT INTO ks.tbl2 (k, col3, data) VALUES (?, 1, ?) USING TIMESTAMP 7 AND TTL 1", k, data); + } + Util.flush(gcGraceCfs); + + // SSTable 4 (not fully expired - col3 has longer TTL) + for (int k = 0; k < numKeys; k++) + { + QueryProcessor.executeInternal("INSERT INTO ks.tbl2 (k, col3, data) VALUES (?, 1, ?) USING TIMESTAMP 6 AND TTL 3", k, data); + QueryProcessor.executeInternal("INSERT INTO ks.tbl2 (k, col2, data) VALUES (?, 1, ?) USING TIMESTAMP 8 AND TTL 1", k, data); + } + Util.flush(gcGraceCfs); + + Set<SSTableReader> sstables = gcGraceCfs.getLiveSSTables(); + Assert.assertEquals(4, sstables.size()); + + // Enable preemptive opening so that SSTableRewriter.switchWriter() calls checkpoint(). + int originalPreemptiveOpenInterval = DatabaseDescriptor.getSSTablePreemptiveOpenIntervalInMiB(); + DatabaseDescriptor.setSSTablePreemptiveOpenIntervalInMiB(2); + + // collector of stacktraces to later check if checkpoint was called in switchWriter + final List<StackTraceElement[]> stacks = new ArrayList<>(); + + try + { + AtomicInteger checkpointCount = new AtomicInteger(0); + - // Hook into transaction's checkpoint and doCommit methods to verify that after ++ // Hook into transaction's checkpoint and commit methods to verify that after + // checkpointing, all SSTables (including fully expired ones) remain referenceable. - // We use TestableLifecycleTransaction because in cassandra-5.0, AbstractCompactionTask.transaction - // is LifecycleTransaction (concrete), not ILifecycleTransaction (interface), so - // WrappedLifecycleTransaction cannot be used with CompactionTask directly. - LifecycleTransaction txn = new TestableLifecycleTransaction(gcGraceCfs.getTracker(), OperationType.COMPACTION, sstables) ++ ILifecycleTransaction txn = new WrappedLifecycleTransaction(gcGraceCfs.getTracker().tryModify(sstables, OperationType.COMPACTION)) + { + @Override + public void checkpoint() + { + stacks.add(Thread.currentThread().getStackTrace()); + + for (SSTableReader r : toBeObsolete) + Assert.assertTrue(this.isObsolete(r)); + + assertAllSSTablesAreReferenceable(); + + super.checkpoint(); + + // This is the critical assertion: after checkpoint(), all SSTables in the + // CANONICAL view must still be referenceable. Before the fix, fully expired + // SSTables would lose their references here, causing selectAndReference() to + // spin loop (CASSANDRA-19776). + assertAllSSTablesAreReferenceable(); + + checkpointCount.incrementAndGet(); + } + + @Override - public Throwable doCommit(Throwable accumulate) ++ public Throwable commit(Throwable accumulate) + { + assertAllSSTablesAreReferenceable(); - return super.doCommit(accumulate); ++ return super.commit(accumulate); + } + + private void assertAllSSTablesAreReferenceable() + { + // This simulates what EstimatedPartitionCount metric and similar code paths do. + // It is crucial that tryRef does not return null; a null result means some SSTables + // are not referenceable, which would cause selectAndReference() to spin loop. + ColumnFamilyStore.ViewFragment view = gcGraceCfs.select(View.selectFunction(SSTableSet.CANONICAL)); + Refs<SSTableReader> refs = Refs.tryRef(view.sstables); + Assert.assertNotNull("Some SSTables in CANONICAL view are not referenceable (CASSANDRA-19776)", refs); + refs.close(); + } + }; + + // Use MaxSSTableSizeWriter with a small max size (2 MiB) to force output sstable switches during compaction. + long maxSSTableSize = 2L * 1024 * 1024; // 2 MiB + CompactionTask task = new CompactionTask(gcGraceCfs, txn, FBUtilities.nowInSeconds() + 2) + { + @Override + public CompactionAwareWriter getCompactionAwareWriter(ColumnFamilyStore cfs, + Directories directories, - LifecycleTransaction transaction, ++ ILifecycleTransaction transaction, + Set<SSTableReader> nonExpiredSSTables) + { + return new MaxSSTableSizeWriter(cfs, directories, transaction, nonExpiredSSTables, + maxSSTableSize, 0); + } + }; + - try (CompactionController compactionController = task.getCompactionController(txn.originals())) ++ try (CompactionController compactionController = task.getCompactionController(task.inputSSTables())) + { + Set<SSTableReader> fullyExpiredSSTables = compactionController.getFullyExpiredSSTables(); + Assert.assertEquals(2, fullyExpiredSSTables.size()); + task.execute(null); + } + + // Verify that checkpoint was called more than once, proving that output sstable switching + // happened during compaction. Without MaxSSTableSizeWriter and large enough SSTables, + // checkpoint would only be called at the end, which would not exercise the CASSANDRA-19776 fix. + Assert.assertTrue("Expected checkpoint() to be called more than once during compaction, but was called " + + checkpointCount.get() + " time(s). Output sstable switching did not occur.", + checkpointCount.get() > 1); + } + finally + { + DatabaseDescriptor.setSSTablePreemptiveOpenIntervalInMiB(originalPreemptiveOpenInterval); + } + + Assert.assertFalse(stacks.isEmpty()); + + boolean checkpointCalledInSSTableRewriter = false; + + for (int i = 0; i < stacks.size(); i++) + { + for (StackTraceElement element : stacks.get(i)) + { + if (element.getClassName().equals(SSTableRewriter.class.getName()) + && (element.getMethodName().equals("switchWriter") + || element.getMethodName().equals("maybeReopenEarly"))) + { + checkpointCalledInSSTableRewriter = true; + break; + } + } + } + + Assert.assertTrue(checkpointCalledInSSTableRewriter); + } + private static void mutateRepaired(SSTableReader sstable, long repairedAt, TimeUUID pendingRepair, boolean isTransient) throws IOException { sstable.descriptor.getMetadataSerializer().mutateRepairMetadata(sstable.descriptor, repairedAt, pendingRepair, isTransient); --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
