This is an automated email from the ASF dual-hosted git repository.

blambov pushed a commit to branch cassandra-4.0
in repository https://gitbox.apache.org/repos/asf/cassandra.git

commit c6385ac3ddccabdc7cb650b090fa69c0523274e8
Merge: 31aede3275 87c2af85c1
Author: Branimir Lambov <branimir.lam...@datastax.com>
AuthorDate: Thu Sep 21 16:02:01 2023 +0300

    Merge branch 'cassandra-3.11' into cassandra-4.0

 CHANGES.txt                                        |   1 +
 .../db/compaction/CompactionController.java        |  12 +-
 .../db/compaction/CompactionControllerTest.java    | 141 ++++++++++++++++++++-
 3 files changed, 146 insertions(+), 8 deletions(-)

diff --cc CHANGES.txt
index 13de2ab602,74755be6e7..6c4e0ef6b0
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,12 -1,7 +1,13 @@@
 -3.11.17
 +4.0.12
 + * Enable 3rd party JDK installations for Debian package (CASSANDRA-18844)
 + * Fix NTS log message when an unrecognized strategy option is passed 
(CASSANDRA-18679)
 + * Fix BulkLoader ignoring cipher suites options (CASSANDRA-18582)
 + * Migrate Python optparse to argparse (CASSANDRA-17914)
 +Merged from 3.11:
+  * Fix delayed SSTable release with unsafe_aggressive_sstable_expiration 
(CASSANDRA-18756)
   * Revert CASSANDRA-18543 (CASSANDRA-18854)
   * Fix NPE when using udfContext in UDF after a restart of a node 
(CASSANDRA-18739)
 + * Moved jflex from runtime to build dependencies (CASSANDRA-18664)
  Merged from 3.0:
   * Add cqlshrc.sample and credentials.sample into Debian package 
(CASSANDRA-18818)
   * Refactor validation logic in StorageService.rebuild (CASSANDRA-18803)
diff --cc src/java/org/apache/cassandra/db/compaction/CompactionController.java
index cee2b58f75,06272a1075..0c520d9491
--- a/src/java/org/apache/cassandra/db/compaction/CompactionController.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionController.java
@@@ -73,7 -81,11 +73,9 @@@ public class CompactionController exten
  
      public CompactionController(ColumnFamilyStore cfs, Set<SSTableReader> 
compacting, int gcBefore, RateLimiter limiter, TombstoneOption tombstoneOption)
      {
+         //When making changes to the method, be aware that some of the state 
of the controller may still be uninitialized
+         //(e.g. TWCS sets up the value of ignoreOverlaps() after this 
completes)
 -        assert cfs != null;
 -        this.cfs = cfs;
 -        this.gcBefore = gcBefore;
 +        super(cfs, gcBefore, tombstoneOption);
          this.compacting = compacting;
          this.limiter = limiter;
          compactingRepaired = compacting != null && 
compacting.stream().allMatch(SSTableReader::isRepaired);
@@@ -94,18 -107,6 +96,12 @@@
              return;
          }
  
-         if (ignoreOverlaps())
-         {
-             logger.debug("not refreshing overlaps - running with 
ignoreOverlaps activated");
-             return;
-         }
- 
 +        if (cfs.getNeverPurgeTombstones())
 +        {
 +            logger.debug("not refreshing overlaps for {}.{} - 
neverPurgeTombstones is enabled", cfs.keyspace.getName(), cfs.getTableName());
 +            return;
 +        }
 +
          for (SSTableReader reader : overlappingSSTables)
          {
              if (reader.isMarkedCompacted())
diff --cc 
test/unit/org/apache/cassandra/db/compaction/CompactionControllerTest.java
index 500a88179f,aa95ba56fb..86546bb9f6
--- a/test/unit/org/apache/cassandra/db/compaction/CompactionControllerTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/CompactionControllerTest.java
@@@ -19,13 -19,19 +19,19 @@@
  package org.apache.cassandra.db.compaction;
  
  import java.nio.ByteBuffer;
+ import java.util.HashMap;
+ import java.util.Map;
  import java.util.Set;
+ import java.util.concurrent.CountDownLatch;
+ import java.util.concurrent.TimeUnit;
 -import java.util.function.Predicate;
 +import java.util.function.LongPredicate;
- import java.util.function.Predicate;
  
+ import com.google.common.collect.Iterables;
  import com.google.common.collect.Sets;
+ import com.google.common.util.concurrent.Uninterruptibles;
  import org.junit.BeforeClass;
  import org.junit.Test;
+ import org.junit.runner.RunWith;
  
  import org.apache.cassandra.SchemaLoader;
  import org.apache.cassandra.Util;
@@@ -183,7 -201,125 +199,125 @@@ public class CompactionControllerTest e
          assertEquals(1, expired.size());
      }
  
+     @Test
+     @BMRules(rules = {
+     @BMRule(name = "Pause compaction",
+     targetClass = "CompactionTask",
+     targetMethod = "runMayThrow",
+     targetLocation = "INVOKE getCompactionAwareWriter",
+     condition = "Thread.currentThread().getName().equals(\"compaction1\")",
+     action = 
"org.apache.cassandra.db.compaction.CompactionControllerTest.createCompactionControllerLatch.countDown();"
 +
+              
"com.google.common.util.concurrent.Uninterruptibles.awaitUninterruptibly" +
+              
"(org.apache.cassandra.db.compaction.CompactionControllerTest.compaction2FinishLatch);"),
+     @BMRule(name = "Check overlaps",
+     targetClass = "CompactionTask",
+     targetMethod = "runMayThrow",
+     targetLocation = "INVOKE finish",
+     condition = "Thread.currentThread().getName().equals(\"compaction1\")",
+     action = 
"org.apache.cassandra.db.compaction.CompactionControllerTest.compaction1RefreshLatch.countDown();"
 +
+              
"com.google.common.util.concurrent.Uninterruptibles.awaitUninterruptibly" +
+              
"(org.apache.cassandra.db.compaction.CompactionControllerTest.refreshCheckLatch);"),
+     @BMRule(name = "Increment overlap refresh counter",
+     targetClass = "ColumnFamilyStore",
+     targetMethod = "getAndReferenceOverlappingLiveSSTables",
+     condition = "Thread.currentThread().getName().equals(\"compaction1\")",
+     action = 
"org.apache.cassandra.db.compaction.CompactionControllerTest.incrementOverlapRefreshCounter();")
+     })
+     public void testIgnoreOverlaps() throws Exception
+     {
+         testOverlapIterator(true);
+         overlapRefreshCounter = 0;
+         compaction2FinishLatch = new CountDownLatch(1);
+         createCompactionControllerLatch = new CountDownLatch(1);
+         compaction1RefreshLatch = new CountDownLatch(1);
+         refreshCheckLatch = new CountDownLatch(1);
+         testOverlapIterator(false);
+     }
+ 
+     public void testOverlapIterator(boolean ignoreOverlaps) throws Exception
+     {
+ 
+         Keyspace keyspace = Keyspace.open(KEYSPACE);
+         ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF1);
+         cfs.truncateBlocking();
+         cfs.disableAutoCompaction();
+ 
+         //create 2 overlapping sstables
+         DecoratedKey key = Util.dk("k1");
+         long timestamp1 = FBUtilities.timestampMicros();
+         long timestamp2 = timestamp1 - 5;
 -        applyMutation(cfs.metadata, key, timestamp1);
++        applyMutation(cfs.metadata(), key, timestamp1);
+         cfs.forceBlockingFlush();
+         assertEquals(cfs.getLiveSSTables().size(), 1);
+         Set<SSTableReader> sstables = cfs.getLiveSSTables();
+ 
 -        applyMutation(cfs.metadata, key, timestamp2);
++        applyMutation(cfs.metadata(), key, timestamp2);
+         cfs.forceBlockingFlush();
+         assertEquals(cfs.getLiveSSTables().size(), 2);
+         String sstable2 = 
cfs.getLiveSSTables().iterator().next().getFilename();
+ 
+         
System.setProperty(TimeWindowCompactionStrategyOptions.UNSAFE_AGGRESSIVE_SSTABLE_EXPIRATION_PROPERTY,
 "true");
+         Map<String, String> options = new HashMap<>();
+         
options.put(TimeWindowCompactionStrategyOptions.COMPACTION_WINDOW_SIZE_KEY, 
"30");
+         
options.put(TimeWindowCompactionStrategyOptions.COMPACTION_WINDOW_UNIT_KEY, 
"SECONDS");
+         
options.put(TimeWindowCompactionStrategyOptions.TIMESTAMP_RESOLUTION_KEY, 
"MILLISECONDS");
+         
options.put(TimeWindowCompactionStrategyOptions.EXPIRED_SSTABLE_CHECK_FREQUENCY_SECONDS_KEY,
 "0");
+         
options.put(TimeWindowCompactionStrategyOptions.UNSAFE_AGGRESSIVE_SSTABLE_EXPIRATION_KEY,
 Boolean.toString(ignoreOverlaps));
+         TimeWindowCompactionStrategy twcs = new 
TimeWindowCompactionStrategy(cfs, options);
+         for (SSTableReader sstable : cfs.getLiveSSTables())
+             twcs.addSSTable(sstable);
+ 
+         twcs.startup();
+ 
+         CompactionTask task = 
(CompactionTask)twcs.getUserDefinedTask(sstables, 0);
+ 
+         assertNotNull(task);
+         assertEquals(1, Iterables.size(task.transaction.originals()));
+ 
+         //start a compaction for the first sstable (compaction1)
+         //the overlap iterator should contain sstable2
+         //this compaction will be paused by the BMRule
+         Thread t = new Thread(() -> {
+             task.execute(null);
+         });
+ 
+         //start a compaction for the second sstable (compaction2)
+         //the overlap iterator should contain sstable1
+         //this compaction should complete as normal
+         Thread t2 = new Thread(() -> {
+             
Uninterruptibles.awaitUninterruptibly(createCompactionControllerLatch);
+             assertEquals(1, overlapRefreshCounter);
+             CompactionManager.instance.forceUserDefinedCompaction(sstable2);
+ 
+             //after compaction2 is finished, wait 1 minute and then resume 
compaction1 (this gives enough time for the overlapIterator to be refreshed)
+             //after resuming, the overlap iterator for compaction1 should be 
updated to include the new sstable created by compaction2,
+             //and it should not contain sstable2
+             try
+             {
+                 TimeUnit.MINUTES.sleep(1);
+             }
+             catch (InterruptedException e)
+             {
+                 throw new RuntimeException(e);
+             }
+             compaction2FinishLatch.countDown();
+         });
+ 
+         t.setName("compaction1");
+         t.start();
+         t2.start();
+ 
+         compaction1RefreshLatch.await();
+         //at this point, the overlap iterator for compaction1 should be 
refreshed
+ 
+         //verify that the overlap iterator for compaction1 is refreshed 
twice, (once during the constructor, and again after compaction2 finishes)
+         assertEquals(2, overlapRefreshCounter);
+ 
+         refreshCheckLatch.countDown();
+         t.join();
+     }
+ 
 -    private void applyMutation(CFMetaData cfm, DecoratedKey key, long 
timestamp)
 +    private void applyMutation(TableMetadata cfm, DecoratedKey key, long 
timestamp)
      {
          ByteBuffer val = ByteBufferUtil.bytes(1L);
  


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to