This is an automated email from the ASF dual-hosted git repository.
blambov pushed a commit to branch cassandra-3.11
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/cassandra-3.11 by this push:
new 87c2af85c1 Fix delayed SSTable release with
unsafe_aggressive_sstable_expiration
87c2af85c1 is described below
commit 87c2af85c1305c130af7d66f83dec03a1c4a8bb2
Author: Ethan Brown <[email protected]>
AuthorDate: Fri Aug 18 13:02:15 2023 -0700
Fix delayed SSTable release with unsafe_aggressive_sstable_expiration
patch by Ethan Brown; reviewed by Branimir Lambov and Mick Semb Wever for
CASSANDRA-18756
---
CHANGES.txt | 1 +
.../db/compaction/CompactionController.java | 12 +-
.../db/compaction/CompactionControllerTest.java | 140 +++++++++++++++++++++
3 files changed, 146 insertions(+), 7 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index 2d9e2059e1..74755be6e7 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
3.11.17
+ * Fix delayed SSTable release with unsafe_aggressive_sstable_expiration
(CASSANDRA-18756)
* Revert CASSANDRA-18543 (CASSANDRA-18854)
* Fix NPE when using udfContext in UDF after a restart of a node
(CASSANDRA-18739)
Merged from 3.0:
diff --git
a/src/java/org/apache/cassandra/db/compaction/CompactionController.java
b/src/java/org/apache/cassandra/db/compaction/CompactionController.java
index 19318ff1a9..06272a1075 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionController.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionController.java
@@ -81,6 +81,8 @@ public class CompactionController implements AutoCloseable
public CompactionController(ColumnFamilyStore cfs, Set<SSTableReader>
compacting, int gcBefore, RateLimiter limiter, TombstoneOption tombstoneOption)
{
+ //When making changes to the method, be aware that some of the state
of the controller may still be uninitialized
+ //(e.g. TWCS sets up the value of ignoreOverlaps() after this
completes)
assert cfs != null;
this.cfs = cfs;
this.gcBefore = gcBefore;
@@ -105,12 +107,6 @@ public class CompactionController implements AutoCloseable
return;
}
- if (ignoreOverlaps())
- {
- logger.debug("not refreshing overlaps - running with
ignoreOverlaps activated");
- return;
- }
-
for (SSTableReader reader : overlappingSSTables)
{
if (reader.isMarkedCompacted())
@@ -129,7 +125,7 @@ public class CompactionController implements AutoCloseable
if (this.overlappingSSTables != null)
close();
- if (compacting == null || ignoreOverlaps())
+ if (compacting == null)
overlappingSSTables =
Refs.tryRef(Collections.<SSTableReader>emptyList());
else
overlappingSSTables =
cfs.getAndReferenceOverlappingLiveSSTables(compacting);
@@ -358,6 +354,8 @@ public class CompactionController implements AutoCloseable
* This strategy can retain for a long time a lot of sstables on disk (see
CASSANDRA-13418) so this option
* control whether or not this check should be ignored.
*
+ * Do NOT call this method in the CompactionController constructor
+ *
* @return false by default
*/
protected boolean ignoreOverlaps()
diff --git
a/test/unit/org/apache/cassandra/db/compaction/CompactionControllerTest.java
b/test/unit/org/apache/cassandra/db/compaction/CompactionControllerTest.java
index 052206e685..aa95ba56fb 100644
--- a/test/unit/org/apache/cassandra/db/compaction/CompactionControllerTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/CompactionControllerTest.java
@@ -19,12 +19,19 @@
package org.apache.cassandra.db.compaction;
import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Map;
import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
import java.util.function.Predicate;
+import com.google.common.collect.Iterables;
import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.Uninterruptibles;
import org.junit.BeforeClass;
import org.junit.Test;
+import org.junit.runner.RunWith;
import org.apache.cassandra.SchemaLoader;
import org.apache.cassandra.Util;
@@ -41,17 +48,27 @@ import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.schema.KeyspaceParams;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.FBUtilities;
+import org.jboss.byteman.contrib.bmunit.BMRule;
+import org.jboss.byteman.contrib.bmunit.BMRules;
+import org.jboss.byteman.contrib.bmunit.BMUnitRunner;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.assertNotNull;
+@RunWith(BMUnitRunner.class)
public class CompactionControllerTest extends SchemaLoader
{
private static final String KEYSPACE = "CompactionControllerTest";
private static final String CF1 = "Standard1";
private static final String CF2 = "Standard2";
+ private static final int TTL_SECONDS = 10;
+ private static CountDownLatch compaction2FinishLatch = new
CountDownLatch(1);
+ private static CountDownLatch createCompactionControllerLatch = new
CountDownLatch(1);
+ private static CountDownLatch compaction1RefreshLatch = new
CountDownLatch(1);
+ private static CountDownLatch refreshCheckLatch = new CountDownLatch(1);
+ private static int overlapRefreshCounter = 0;
@BeforeClass
public static void defineSchema() throws ConfigurationException
@@ -184,6 +201,124 @@ public class CompactionControllerTest extends SchemaLoader
assertEquals(1, expired.size());
}
+ @Test
+ @BMRules(rules = {
+ @BMRule(name = "Pause compaction",
+ targetClass = "CompactionTask",
+ targetMethod = "runMayThrow",
+ targetLocation = "INVOKE getCompactionAwareWriter",
+ condition = "Thread.currentThread().getName().equals(\"compaction1\")",
+ action =
"org.apache.cassandra.db.compaction.CompactionControllerTest.createCompactionControllerLatch.countDown();"
+
+
"com.google.common.util.concurrent.Uninterruptibles.awaitUninterruptibly" +
+
"(org.apache.cassandra.db.compaction.CompactionControllerTest.compaction2FinishLatch);"),
+ @BMRule(name = "Check overlaps",
+ targetClass = "CompactionTask",
+ targetMethod = "runMayThrow",
+ targetLocation = "INVOKE finish",
+ condition = "Thread.currentThread().getName().equals(\"compaction1\")",
+ action =
"org.apache.cassandra.db.compaction.CompactionControllerTest.compaction1RefreshLatch.countDown();"
+
+
"com.google.common.util.concurrent.Uninterruptibles.awaitUninterruptibly" +
+
"(org.apache.cassandra.db.compaction.CompactionControllerTest.refreshCheckLatch);"),
+ @BMRule(name = "Increment overlap refresh counter",
+ targetClass = "ColumnFamilyStore",
+ targetMethod = "getAndReferenceOverlappingLiveSSTables",
+ condition = "Thread.currentThread().getName().equals(\"compaction1\")",
+ action =
"org.apache.cassandra.db.compaction.CompactionControllerTest.incrementOverlapRefreshCounter();")
+ })
+ public void testIgnoreOverlaps() throws Exception
+ {
+ testOverlapIterator(true);
+ overlapRefreshCounter = 0;
+ compaction2FinishLatch = new CountDownLatch(1);
+ createCompactionControllerLatch = new CountDownLatch(1);
+ compaction1RefreshLatch = new CountDownLatch(1);
+ refreshCheckLatch = new CountDownLatch(1);
+ testOverlapIterator(false);
+ }
+
+ public void testOverlapIterator(boolean ignoreOverlaps) throws Exception
+ {
+
+ Keyspace keyspace = Keyspace.open(KEYSPACE);
+ ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF1);
+ cfs.truncateBlocking();
+ cfs.disableAutoCompaction();
+
+ //create 2 overlapping sstables
+ DecoratedKey key = Util.dk("k1");
+ long timestamp1 = FBUtilities.timestampMicros();
+ long timestamp2 = timestamp1 - 5;
+ applyMutation(cfs.metadata, key, timestamp1);
+ cfs.forceBlockingFlush();
+ assertEquals(cfs.getLiveSSTables().size(), 1);
+ Set<SSTableReader> sstables = cfs.getLiveSSTables();
+
+ applyMutation(cfs.metadata, key, timestamp2);
+ cfs.forceBlockingFlush();
+ assertEquals(cfs.getLiveSSTables().size(), 2);
+ String sstable2 =
cfs.getLiveSSTables().iterator().next().getFilename();
+
+
System.setProperty(TimeWindowCompactionStrategyOptions.UNSAFE_AGGRESSIVE_SSTABLE_EXPIRATION_PROPERTY,
"true");
+ Map<String, String> options = new HashMap<>();
+
options.put(TimeWindowCompactionStrategyOptions.COMPACTION_WINDOW_SIZE_KEY,
"30");
+
options.put(TimeWindowCompactionStrategyOptions.COMPACTION_WINDOW_UNIT_KEY,
"SECONDS");
+
options.put(TimeWindowCompactionStrategyOptions.TIMESTAMP_RESOLUTION_KEY,
"MILLISECONDS");
+
options.put(TimeWindowCompactionStrategyOptions.EXPIRED_SSTABLE_CHECK_FREQUENCY_SECONDS_KEY,
"0");
+
options.put(TimeWindowCompactionStrategyOptions.UNSAFE_AGGRESSIVE_SSTABLE_EXPIRATION_KEY,
Boolean.toString(ignoreOverlaps));
+ TimeWindowCompactionStrategy twcs = new
TimeWindowCompactionStrategy(cfs, options);
+ for (SSTableReader sstable : cfs.getLiveSSTables())
+ twcs.addSSTable(sstable);
+
+ twcs.startup();
+
+ CompactionTask task =
(CompactionTask)twcs.getUserDefinedTask(sstables, 0);
+
+ assertNotNull(task);
+ assertEquals(1, Iterables.size(task.transaction.originals()));
+
+ //start a compaction for the first sstable (compaction1)
+ //the overlap iterator should contain sstable2
+ //this compaction will be paused by the BMRule
+ Thread t = new Thread(() -> {
+ task.execute(null);
+ });
+
+ //start a compaction for the second sstable (compaction2)
+ //the overlap iterator should contain sstable1
+ //this compaction should complete as normal
+ Thread t2 = new Thread(() -> {
+
Uninterruptibles.awaitUninterruptibly(createCompactionControllerLatch);
+ assertEquals(1, overlapRefreshCounter);
+ CompactionManager.instance.forceUserDefinedCompaction(sstable2);
+
+ //after compaction2 is finished, wait 1 minute and then resume
compaction1 (this gives enough time for the overlapIterator to be refreshed)
+ //after resuming, the overlap iterator for compaction1 should be
updated to include the new sstable created by compaction2,
+ //and it should not contain sstable2
+ try
+ {
+ TimeUnit.MINUTES.sleep(1);
+ }
+ catch (InterruptedException e)
+ {
+ throw new RuntimeException(e);
+ }
+ compaction2FinishLatch.countDown();
+ });
+
+ t.setName("compaction1");
+ t.start();
+ t2.start();
+
+ compaction1RefreshLatch.await();
+ //at this point, the overlap iterator for compaction1 should be
refreshed
+
+ //verify that the overlap iterator for compaction1 is refreshed twice,
(once during the constructor, and again after compaction2 finishes)
+ assertEquals(2, overlapRefreshCounter);
+
+ refreshCheckLatch.countDown();
+ t.join();
+ }
+
private void applyMutation(CFMetaData cfm, DecoratedKey key, long
timestamp)
{
ByteBuffer val = ByteBufferUtil.bytes(1L);
@@ -206,4 +341,9 @@ public class CompactionControllerTest extends SchemaLoader
assertFalse(evaluator.test(boundary));
assertTrue(evaluator.test(boundary - 1));
}
+
+ public static void incrementOverlapRefreshCounter()
+ {
+ overlapRefreshCounter++;
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]