This is an automated email from the ASF dual-hosted git repository.
marcuse pushed a commit to branch cassandra-3.11
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/cassandra-3.11 by this push:
new b773bc7 Make sure user defined compaction transactions are always
closed
b773bc7 is described below
commit b773bc7ac51fedc07145017edaefa919fac25696
Author: Marcus Eriksson <[email protected]>
AuthorDate: Thu May 9 10:29:44 2019 +0200
Make sure user defined compaction transactions are always closed
Patch by marcuse; reviewed by Blake Eggleston for CASSANDRA-15123
---
CHANGES.txt | 1 +
.../cassandra/db/compaction/CompactionManager.java | 58 +++++++------
.../db/compaction/CompactionsBytemanTest.java | 99 ++++++++++++++++++++++
3 files changed, 134 insertions(+), 24 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index 43dbda3..dc8baf2 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
3.11.5
+ * Make sure user defined compaction transactions are always closed
(CASSANDRA-15123)
* Fix cassandra-env.sh to use $CASSANDRA_CONF to find cassandra-jaas.config
(CASSANDRA-14305)
* Fixed nodetool cfstats printing index name twice (CASSANDRA-14903)
* Add flag to disable SASI indexes, and warnings on creation (CASSANDRA-14866)
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index e0ec179..7086d77 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -784,36 +784,39 @@ public class CompactionManager implements
CompactionManagerMBean
public void forceCompactionForTokenRange(ColumnFamilyStore cfStore,
Collection<Range<Token>> ranges)
{
- final Collection<AbstractCompactionTask> tasks =
cfStore.runWithCompactionsDisabled(() ->
- {
- Collection<SSTableReader> sstables =
sstablesInBounds(cfStore, ranges);
- if (sstables == null || sstables.isEmpty())
- {
- logger.debug("No sstables found for the provided
token range");
- return null;
- }
- return
cfStore.getCompactionStrategyManager().getUserDefinedTasks(sstables,
getDefaultGcBefore(cfStore, FBUtilities.nowInSeconds()));
- }, false, false);
+ Callable<Collection<AbstractCompactionTask>> taskCreator = () -> {
+ Collection<SSTableReader> sstables = sstablesInBounds(cfStore,
ranges);
+ if (sstables == null || sstables.isEmpty())
+ {
+ logger.debug("No sstables found for the provided token range");
+ return null;
+ }
+ return
cfStore.getCompactionStrategyManager().getUserDefinedTasks(sstables,
getDefaultGcBefore(cfStore, FBUtilities.nowInSeconds()));
+ };
+
+ final Collection<AbstractCompactionTask> tasks =
cfStore.runWithCompactionsDisabled(taskCreator, false, false);
if (tasks == null)
return;
Runnable runnable = new WrappedRunnable()
{
- protected void runMayThrow()
+ protected void runMayThrow() throws Exception
{
- for (AbstractCompactionTask task : tasks)
- if (task != null)
- task.execute(metrics);
+ try
+ {
+ for (AbstractCompactionTask task : tasks)
+ if (task != null)
+ task.execute(metrics);
+ }
+ finally
+ {
+ FBUtilities.closeAll(tasks.stream().map(task ->
task.transaction).collect(Collectors.toList()));
+ }
}
};
- if (executor.isShutdown())
- {
- logger.info("Compaction executor has shut down, not submitting
task");
- return;
- }
- FBUtilities.waitOnFuture(executor.submit(runnable));
+ FBUtilities.waitOnFuture(executor.submitIfRunning(runnable, "force
compaction for token range"));
}
private static Collection<SSTableReader>
sstablesInBounds(ColumnFamilyStore cfs, Collection<Range<Token>>
tokenRangeCollection)
@@ -915,7 +918,7 @@ public class CompactionManager implements
CompactionManagerMBean
{
Runnable runnable = new WrappedRunnable()
{
- protected void runMayThrow()
+ protected void runMayThrow() throws Exception
{
// look up the sstables now that we're on the compaction
executor, so we don't try to re-compact
// something that was already being compacted earlier.
@@ -941,10 +944,17 @@ public class CompactionManager implements
CompactionManagerMBean
else
{
List<AbstractCompactionTask> tasks =
cfs.getCompactionStrategyManager().getUserDefinedTasks(sstables, gcBefore);
- for (AbstractCompactionTask task : tasks)
+ try
{
- if (task != null)
- task.execute(metrics);
+ for (AbstractCompactionTask task : tasks)
+ {
+ if (task != null)
+ task.execute(metrics);
+ }
+ }
+ finally
+ {
+ FBUtilities.closeAll(tasks.stream().map(task ->
task.transaction).collect(Collectors.toList()));
}
}
}
diff --git
a/test/unit/org/apache/cassandra/db/compaction/CompactionsBytemanTest.java
b/test/unit/org/apache/cassandra/db/compaction/CompactionsBytemanTest.java
index 0b391a5..d5f2800 100644
--- a/test/unit/org/apache/cassandra/db/compaction/CompactionsBytemanTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/CompactionsBytemanTest.java
@@ -18,17 +18,32 @@
package org.apache.cassandra.db.compaction;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+
import org.junit.Test;
import org.junit.runner.RunWith;
import org.apache.cassandra.cql3.CQLTester;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.io.sstable.Descriptor;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.metrics.CompactionMetrics;
import org.apache.cassandra.utils.FBUtilities;
import org.jboss.byteman.contrib.bmunit.BMRule;
import org.jboss.byteman.contrib.bmunit.BMUnitRunner;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
@RunWith(BMUnitRunner.class)
public class CompactionsBytemanTest extends CQLTester
@@ -53,4 +68,88 @@ public class CompactionsBytemanTest extends CQLTester
FBUtilities.waitOnFutures(CompactionManager.instance.submitBackground(cfs));
assertEquals(0, CompactionManager.instance.compactingCF.count(cfs));
}
+
+ @Test
+ @BMRule(name = "Stop all compactions",
+ targetClass = "CompactionTask",
+ targetMethod = "runMayThrow",
+ targetLocation = "AT INVOKE getCompactionAwareWriter",
+ action = "$ci.stop()")
+ public void testStopUserDefinedCompactionRepaired() throws Throwable
+ {
+ testStopCompactionRepaired((cfs) -> {
+ Collection<Descriptor> files =
cfs.getLiveSSTables().stream().map(s ->
s.descriptor).collect(Collectors.toList());
+
FBUtilities.waitOnFuture(CompactionManager.instance.submitUserDefined(cfs,
files, CompactionManager.NO_GC));
+ });
+ }
+
+ @Test
+ @BMRule(name = "Stop all compactions",
+ targetClass = "CompactionTask",
+ targetMethod = "runMayThrow",
+ targetLocation = "AT INVOKE getCompactionAwareWriter",
+ action = "$ci.stop()")
+ public void testStopSubRangeCompactionRepaired() throws Throwable
+ {
+ testStopCompactionRepaired((cfs) -> {
+ Collection<Range<Token>> ranges = Collections.singleton(new
Range<>(cfs.getPartitioner().getMinimumToken(),
+
cfs.getPartitioner().getMaximumToken()));
+ CompactionManager.instance.forceCompactionForTokenRange(cfs,
ranges);
+ });
+ }
+
+ public void testStopCompactionRepaired(Consumer<ColumnFamilyStore>
compactionRunner) throws Throwable
+ {
+ String table = createTable("CREATE TABLE %s (k INT, c INT, v INT,
PRIMARY KEY (k, c))");
+ ColumnFamilyStore cfs =
Keyspace.open(CQLTester.KEYSPACE).getColumnFamilyStore(table);
+ cfs.disableAutoCompaction();
+ for (int i = 0; i < 5; i++)
+ {
+ for (int j = 0; j < 10; j++)
+ {
+ execute("insert into %s (k, c, v) values (?, ?, ?)", i, j,
i*j);
+ }
+ cfs.forceBlockingFlush();
+ }
+ setRepaired(cfs, cfs.getLiveSSTables());
+ for (int i = 0; i < 5; i++)
+ {
+ for (int j = 0; j < 10; j++)
+ {
+ execute("insert into %s (k, c, v) values (?, ?, ?)", i, j,
i*j);
+ }
+ cfs.forceBlockingFlush();
+ }
+
+ assertTrue(cfs.getTracker().getCompacting().isEmpty());
+ assertTrue(CompactionMetrics.getCompactions().stream().noneMatch(h ->
h.getCompactionInfo().getCFMetaData().equals(cfs.metadata)));
+
+ try
+ {
+ compactionRunner.accept(cfs);
+ fail("compaction should fail");
+ }
+ catch (RuntimeException t)
+ {
+ if (!(t.getCause().getCause() instanceof
CompactionInterruptedException))
+ throw t;
+ //expected
+ }
+
+ assertTrue(cfs.getTracker().getCompacting().isEmpty());
+ assertTrue(CompactionMetrics.getCompactions().stream().noneMatch(h ->
h.getCompactionInfo().getCFMetaData().equals(cfs.metadata)));
+
+ }
+
+ private void setRepaired(ColumnFamilyStore cfs, Iterable<SSTableReader>
sstables) throws IOException
+ {
+ Set<SSTableReader> changed = new HashSet<>();
+ for (SSTableReader sstable: sstables)
+ {
+
sstable.descriptor.getMetadataSerializer().mutateRepairedAt(sstable.descriptor,
System.currentTimeMillis());
+ sstable.reloadSSTableMetadata();
+ changed.add(sstable);
+ }
+ cfs.getTracker().notifySSTableRepairedStatusChanged(changed);
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]