This is an automated email from the ASF dual-hosted git repository. blambov pushed a commit to branch cassandra-4.0 in repository https://gitbox.apache.org/repos/asf/cassandra.git
commit c6385ac3ddccabdc7cb650b090fa69c0523274e8 Merge: 31aede3275 87c2af85c1 Author: Branimir Lambov <branimir.lam...@datastax.com> AuthorDate: Thu Sep 21 16:02:01 2023 +0300 Merge branch 'cassandra-3.11' into cassandra-4.0 CHANGES.txt | 1 + .../db/compaction/CompactionController.java | 12 +- .../db/compaction/CompactionControllerTest.java | 141 ++++++++++++++++++++- 3 files changed, 146 insertions(+), 8 deletions(-) diff --cc CHANGES.txt index 13de2ab602,74755be6e7..6c4e0ef6b0 --- a/CHANGES.txt +++ b/CHANGES.txt @@@ -1,12 -1,7 +1,13 @@@ -3.11.17 +4.0.12 + * Enable 3rd party JDK installations for Debian package (CASSANDRA-18844) + * Fix NTS log message when an unrecognized strategy option is passed (CASSANDRA-18679) + * Fix BulkLoader ignoring cipher suites options (CASSANDRA-18582) + * Migrate Python optparse to argparse (CASSANDRA-17914) +Merged from 3.11: + * Fix delayed SSTable release with unsafe_aggressive_sstable_expiration (CASSANDRA-18756) * Revert CASSANDRA-18543 (CASSANDRA-18854) * Fix NPE when using udfContext in UDF after a restart of a node (CASSANDRA-18739) + * Moved jflex from runtime to build dependencies (CASSANDRA-18664) Merged from 3.0: * Add cqlshrc.sample and credentials.sample into Debian package (CASSANDRA-18818) * Refactor validation logic in StorageService.rebuild (CASSANDRA-18803) diff --cc src/java/org/apache/cassandra/db/compaction/CompactionController.java index cee2b58f75,06272a1075..0c520d9491 --- a/src/java/org/apache/cassandra/db/compaction/CompactionController.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionController.java @@@ -73,7 -81,11 +73,9 @@@ public class CompactionController exten public CompactionController(ColumnFamilyStore cfs, Set<SSTableReader> compacting, int gcBefore, RateLimiter limiter, TombstoneOption tombstoneOption) { + //When making changes to the method, be aware that some of the state of the controller may still be uninitialized + //(e.g. TWCS sets up the value of ignoreOverlaps() after this completes) - assert cfs != null; - this.cfs = cfs; - this.gcBefore = gcBefore; + super(cfs, gcBefore, tombstoneOption); this.compacting = compacting; this.limiter = limiter; compactingRepaired = compacting != null && compacting.stream().allMatch(SSTableReader::isRepaired); @@@ -94,18 -107,6 +96,12 @@@ return; } - if (ignoreOverlaps()) - { - logger.debug("not refreshing overlaps - running with ignoreOverlaps activated"); - return; - } - + if (cfs.getNeverPurgeTombstones()) + { + logger.debug("not refreshing overlaps for {}.{} - neverPurgeTombstones is enabled", cfs.keyspace.getName(), cfs.getTableName()); + return; + } + for (SSTableReader reader : overlappingSSTables) { if (reader.isMarkedCompacted()) diff --cc test/unit/org/apache/cassandra/db/compaction/CompactionControllerTest.java index 500a88179f,aa95ba56fb..86546bb9f6 --- a/test/unit/org/apache/cassandra/db/compaction/CompactionControllerTest.java +++ b/test/unit/org/apache/cassandra/db/compaction/CompactionControllerTest.java @@@ -19,13 -19,19 +19,19 @@@ package org.apache.cassandra.db.compaction; import java.nio.ByteBuffer; + import java.util.HashMap; + import java.util.Map; import java.util.Set; + import java.util.concurrent.CountDownLatch; + import java.util.concurrent.TimeUnit; -import java.util.function.Predicate; +import java.util.function.LongPredicate; - import java.util.function.Predicate; + import com.google.common.collect.Iterables; import com.google.common.collect.Sets; + import com.google.common.util.concurrent.Uninterruptibles; import org.junit.BeforeClass; import org.junit.Test; + import org.junit.runner.RunWith; import org.apache.cassandra.SchemaLoader; import org.apache.cassandra.Util; @@@ -183,7 -201,125 +199,125 @@@ public class CompactionControllerTest e assertEquals(1, expired.size()); } + @Test + @BMRules(rules = { + @BMRule(name = "Pause compaction", + targetClass = "CompactionTask", + targetMethod = "runMayThrow", + targetLocation = "INVOKE getCompactionAwareWriter", + condition = "Thread.currentThread().getName().equals(\"compaction1\")", + action = "org.apache.cassandra.db.compaction.CompactionControllerTest.createCompactionControllerLatch.countDown();" + + "com.google.common.util.concurrent.Uninterruptibles.awaitUninterruptibly" + + "(org.apache.cassandra.db.compaction.CompactionControllerTest.compaction2FinishLatch);"), + @BMRule(name = "Check overlaps", + targetClass = "CompactionTask", + targetMethod = "runMayThrow", + targetLocation = "INVOKE finish", + condition = "Thread.currentThread().getName().equals(\"compaction1\")", + action = "org.apache.cassandra.db.compaction.CompactionControllerTest.compaction1RefreshLatch.countDown();" + + "com.google.common.util.concurrent.Uninterruptibles.awaitUninterruptibly" + + "(org.apache.cassandra.db.compaction.CompactionControllerTest.refreshCheckLatch);"), + @BMRule(name = "Increment overlap refresh counter", + targetClass = "ColumnFamilyStore", + targetMethod = "getAndReferenceOverlappingLiveSSTables", + condition = "Thread.currentThread().getName().equals(\"compaction1\")", + action = "org.apache.cassandra.db.compaction.CompactionControllerTest.incrementOverlapRefreshCounter();") + }) + public void testIgnoreOverlaps() throws Exception + { + testOverlapIterator(true); + overlapRefreshCounter = 0; + compaction2FinishLatch = new CountDownLatch(1); + createCompactionControllerLatch = new CountDownLatch(1); + compaction1RefreshLatch = new CountDownLatch(1); + refreshCheckLatch = new CountDownLatch(1); + testOverlapIterator(false); + } + + public void testOverlapIterator(boolean ignoreOverlaps) throws Exception + { + + Keyspace keyspace = Keyspace.open(KEYSPACE); + ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF1); + cfs.truncateBlocking(); + cfs.disableAutoCompaction(); + + //create 2 overlapping sstables + DecoratedKey key = Util.dk("k1"); + long timestamp1 = FBUtilities.timestampMicros(); + long timestamp2 = timestamp1 - 5; - applyMutation(cfs.metadata, key, timestamp1); ++ applyMutation(cfs.metadata(), key, timestamp1); + cfs.forceBlockingFlush(); + assertEquals(cfs.getLiveSSTables().size(), 1); + Set<SSTableReader> sstables = cfs.getLiveSSTables(); + - applyMutation(cfs.metadata, key, timestamp2); ++ applyMutation(cfs.metadata(), key, timestamp2); + cfs.forceBlockingFlush(); + assertEquals(cfs.getLiveSSTables().size(), 2); + String sstable2 = cfs.getLiveSSTables().iterator().next().getFilename(); + + System.setProperty(TimeWindowCompactionStrategyOptions.UNSAFE_AGGRESSIVE_SSTABLE_EXPIRATION_PROPERTY, "true"); + Map<String, String> options = new HashMap<>(); + options.put(TimeWindowCompactionStrategyOptions.COMPACTION_WINDOW_SIZE_KEY, "30"); + options.put(TimeWindowCompactionStrategyOptions.COMPACTION_WINDOW_UNIT_KEY, "SECONDS"); + options.put(TimeWindowCompactionStrategyOptions.TIMESTAMP_RESOLUTION_KEY, "MILLISECONDS"); + options.put(TimeWindowCompactionStrategyOptions.EXPIRED_SSTABLE_CHECK_FREQUENCY_SECONDS_KEY, "0"); + options.put(TimeWindowCompactionStrategyOptions.UNSAFE_AGGRESSIVE_SSTABLE_EXPIRATION_KEY, Boolean.toString(ignoreOverlaps)); + TimeWindowCompactionStrategy twcs = new TimeWindowCompactionStrategy(cfs, options); + for (SSTableReader sstable : cfs.getLiveSSTables()) + twcs.addSSTable(sstable); + + twcs.startup(); + + CompactionTask task = (CompactionTask)twcs.getUserDefinedTask(sstables, 0); + + assertNotNull(task); + assertEquals(1, Iterables.size(task.transaction.originals())); + + //start a compaction for the first sstable (compaction1) + //the overlap iterator should contain sstable2 + //this compaction will be paused by the BMRule + Thread t = new Thread(() -> { + task.execute(null); + }); + + //start a compaction for the second sstable (compaction2) + //the overlap iterator should contain sstable1 + //this compaction should complete as normal + Thread t2 = new Thread(() -> { + Uninterruptibles.awaitUninterruptibly(createCompactionControllerLatch); + assertEquals(1, overlapRefreshCounter); + CompactionManager.instance.forceUserDefinedCompaction(sstable2); + + //after compaction2 is finished, wait 1 minute and then resume compaction1 (this gives enough time for the overlapIterator to be refreshed) + //after resuming, the overlap iterator for compaction1 should be updated to include the new sstable created by compaction2, + //and it should not contain sstable2 + try + { + TimeUnit.MINUTES.sleep(1); + } + catch (InterruptedException e) + { + throw new RuntimeException(e); + } + compaction2FinishLatch.countDown(); + }); + + t.setName("compaction1"); + t.start(); + t2.start(); + + compaction1RefreshLatch.await(); + //at this point, the overlap iterator for compaction1 should be refreshed + + //verify that the overlap iterator for compaction1 is refreshed twice, (once during the constructor, and again after compaction2 finishes) + assertEquals(2, overlapRefreshCounter); + + refreshCheckLatch.countDown(); + t.join(); + } + - private void applyMutation(CFMetaData cfm, DecoratedKey key, long timestamp) + private void applyMutation(TableMetadata cfm, DecoratedKey key, long timestamp) { ByteBuffer val = ByteBufferUtil.bytes(1L); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org