This is an automated email from the ASF dual-hosted git repository.

marcuse pushed a commit to branch cassandra-3.11
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/cassandra-3.11 by this push:
     new b773bc7  Make sure user defined compaction transactions are always 
closed
b773bc7 is described below

commit b773bc7ac51fedc07145017edaefa919fac25696
Author: Marcus Eriksson <[email protected]>
AuthorDate: Thu May 9 10:29:44 2019 +0200

    Make sure user defined compaction transactions are always closed
    
    Patch by marcuse; reviewed by Blake Eggleston for CASSANDRA-15123
---
 CHANGES.txt                                        |  1 +
 .../cassandra/db/compaction/CompactionManager.java | 58 +++++++------
 .../db/compaction/CompactionsBytemanTest.java      | 99 ++++++++++++++++++++++
 3 files changed, 134 insertions(+), 24 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index 43dbda3..dc8baf2 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.11.5
+ * Make sure user defined compaction transactions are always closed 
(CASSANDRA-15123)
  * Fix cassandra-env.sh to use $CASSANDRA_CONF to find cassandra-jaas.config 
(CASSANDRA-14305)
  * Fixed nodetool cfstats printing index name twice (CASSANDRA-14903)
  * Add flag to disable SASI indexes, and warnings on creation (CASSANDRA-14866)
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java 
b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index e0ec179..7086d77 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -784,36 +784,39 @@ public class CompactionManager implements 
CompactionManagerMBean
 
     public void forceCompactionForTokenRange(ColumnFamilyStore cfStore, 
Collection<Range<Token>> ranges)
     {
-        final Collection<AbstractCompactionTask> tasks = 
cfStore.runWithCompactionsDisabled(() ->
-                   {
-                       Collection<SSTableReader> sstables = 
sstablesInBounds(cfStore, ranges);
-                       if (sstables == null || sstables.isEmpty())
-                       {
-                           logger.debug("No sstables found for the provided 
token range");
-                           return null;
-                       }
-                       return 
cfStore.getCompactionStrategyManager().getUserDefinedTasks(sstables, 
getDefaultGcBefore(cfStore, FBUtilities.nowInSeconds()));
-                   }, false, false);
+        Callable<Collection<AbstractCompactionTask>> taskCreator = () -> {
+            Collection<SSTableReader> sstables = sstablesInBounds(cfStore, 
ranges);
+            if (sstables == null || sstables.isEmpty())
+            {
+                logger.debug("No sstables found for the provided token range");
+                return null;
+            }
+            return 
cfStore.getCompactionStrategyManager().getUserDefinedTasks(sstables, 
getDefaultGcBefore(cfStore, FBUtilities.nowInSeconds()));
+        };
+
+        final Collection<AbstractCompactionTask> tasks = 
cfStore.runWithCompactionsDisabled(taskCreator, false, false);
 
         if (tasks == null)
             return;
 
         Runnable runnable = new WrappedRunnable()
         {
-            protected void runMayThrow()
+            protected void runMayThrow() throws Exception
             {
-                for (AbstractCompactionTask task : tasks)
-                    if (task != null)
-                        task.execute(metrics);
+                try
+                {
+                    for (AbstractCompactionTask task : tasks)
+                        if (task != null)
+                            task.execute(metrics);
+                }
+                finally
+                {
+                    FBUtilities.closeAll(tasks.stream().map(task -> 
task.transaction).collect(Collectors.toList()));
+                }
             }
         };
 
-        if (executor.isShutdown())
-        {
-            logger.info("Compaction executor has shut down, not submitting 
task");
-            return;
-        }
-        FBUtilities.waitOnFuture(executor.submit(runnable));
+        FBUtilities.waitOnFuture(executor.submitIfRunning(runnable, "force 
compaction for token range"));
     }
 
     private static Collection<SSTableReader> 
sstablesInBounds(ColumnFamilyStore cfs, Collection<Range<Token>> 
tokenRangeCollection)
@@ -915,7 +918,7 @@ public class CompactionManager implements 
CompactionManagerMBean
     {
         Runnable runnable = new WrappedRunnable()
         {
-            protected void runMayThrow()
+            protected void runMayThrow() throws Exception
             {
                 // look up the sstables now that we're on the compaction 
executor, so we don't try to re-compact
                 // something that was already being compacted earlier.
@@ -941,10 +944,17 @@ public class CompactionManager implements 
CompactionManagerMBean
                 else
                 {
                     List<AbstractCompactionTask> tasks = 
cfs.getCompactionStrategyManager().getUserDefinedTasks(sstables, gcBefore);
-                    for (AbstractCompactionTask task : tasks)
+                    try
                     {
-                        if (task != null)
-                            task.execute(metrics);
+                        for (AbstractCompactionTask task : tasks)
+                        {
+                            if (task != null)
+                                task.execute(metrics);
+                        }
+                    }
+                    finally
+                    {
+                        FBUtilities.closeAll(tasks.stream().map(task -> 
task.transaction).collect(Collectors.toList()));
                     }
                 }
             }
diff --git 
a/test/unit/org/apache/cassandra/db/compaction/CompactionsBytemanTest.java 
b/test/unit/org/apache/cassandra/db/compaction/CompactionsBytemanTest.java
index 0b391a5..d5f2800 100644
--- a/test/unit/org/apache/cassandra/db/compaction/CompactionsBytemanTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/CompactionsBytemanTest.java
@@ -18,17 +18,32 @@
 
 package org.apache.cassandra.db.compaction;
 
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+
 import org.junit.Test;
 import org.junit.runner.RunWith;
 
 import org.apache.cassandra.cql3.CQLTester;
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.io.sstable.Descriptor;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.metrics.CompactionMetrics;
 import org.apache.cassandra.utils.FBUtilities;
 import org.jboss.byteman.contrib.bmunit.BMRule;
 import org.jboss.byteman.contrib.bmunit.BMUnitRunner;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 @RunWith(BMUnitRunner.class)
 public class CompactionsBytemanTest extends CQLTester
@@ -53,4 +68,88 @@ public class CompactionsBytemanTest extends CQLTester
         
FBUtilities.waitOnFutures(CompactionManager.instance.submitBackground(cfs));
         assertEquals(0, CompactionManager.instance.compactingCF.count(cfs));
     }
+
+    @Test
+    @BMRule(name = "Stop all compactions",
+    targetClass = "CompactionTask",
+    targetMethod = "runMayThrow",
+    targetLocation = "AT INVOKE getCompactionAwareWriter",
+    action = "$ci.stop()")
+    public void testStopUserDefinedCompactionRepaired() throws Throwable
+    {
+        testStopCompactionRepaired((cfs) -> {
+            Collection<Descriptor> files = 
cfs.getLiveSSTables().stream().map(s -> 
s.descriptor).collect(Collectors.toList());
+            
FBUtilities.waitOnFuture(CompactionManager.instance.submitUserDefined(cfs, 
files, CompactionManager.NO_GC));
+        });
+    }
+
+    @Test
+    @BMRule(name = "Stop all compactions",
+    targetClass = "CompactionTask",
+    targetMethod = "runMayThrow",
+    targetLocation = "AT INVOKE getCompactionAwareWriter",
+    action = "$ci.stop()")
+    public void testStopSubRangeCompactionRepaired() throws Throwable
+    {
+        testStopCompactionRepaired((cfs) -> {
+            Collection<Range<Token>> ranges = Collections.singleton(new 
Range<>(cfs.getPartitioner().getMinimumToken(),
+                                                                               
 cfs.getPartitioner().getMaximumToken()));
+            CompactionManager.instance.forceCompactionForTokenRange(cfs, 
ranges);
+        });
+    }
+
+    public void testStopCompactionRepaired(Consumer<ColumnFamilyStore> 
compactionRunner) throws Throwable
+    {
+        String table = createTable("CREATE TABLE %s (k INT, c INT, v INT, 
PRIMARY KEY (k, c))");
+        ColumnFamilyStore cfs = 
Keyspace.open(CQLTester.KEYSPACE).getColumnFamilyStore(table);
+        cfs.disableAutoCompaction();
+        for (int i = 0; i < 5; i++)
+        {
+            for (int j = 0; j < 10; j++)
+            {
+                execute("insert into %s (k, c, v) values (?, ?, ?)", i, j, 
i*j);
+            }
+            cfs.forceBlockingFlush();
+        }
+        setRepaired(cfs, cfs.getLiveSSTables());
+        for (int i = 0; i < 5; i++)
+        {
+            for (int j = 0; j < 10; j++)
+            {
+                execute("insert into %s (k, c, v) values (?, ?, ?)", i, j, 
i*j);
+            }
+            cfs.forceBlockingFlush();
+        }
+
+        assertTrue(cfs.getTracker().getCompacting().isEmpty());
+        assertTrue(CompactionMetrics.getCompactions().stream().noneMatch(h -> 
h.getCompactionInfo().getCFMetaData().equals(cfs.metadata)));
+
+        try
+        {
+            compactionRunner.accept(cfs);
+            fail("compaction should fail");
+        }
+        catch (RuntimeException t)
+        {
+            if (!(t.getCause().getCause() instanceof 
CompactionInterruptedException))
+                throw t;
+            //expected
+        }
+
+        assertTrue(cfs.getTracker().getCompacting().isEmpty());
+        assertTrue(CompactionMetrics.getCompactions().stream().noneMatch(h -> 
h.getCompactionInfo().getCFMetaData().equals(cfs.metadata)));
+
+    }
+
+    private void setRepaired(ColumnFamilyStore cfs, Iterable<SSTableReader> 
sstables) throws IOException
+    {
+        Set<SSTableReader> changed = new HashSet<>();
+        for (SSTableReader sstable: sstables)
+        {
+            
sstable.descriptor.getMetadataSerializer().mutateRepairedAt(sstable.descriptor, 
System.currentTimeMillis());
+            sstable.reloadSSTableMetadata();
+            changed.add(sstable);
+        }
+        cfs.getTracker().notifySSTableRepairedStatusChanged(changed);
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to