This is an automated email from the ASF dual-hosted git repository.

marcuse pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git

commit 0706d32b0bd478160deb0143deb9811d49050b10
Author: Marcus Eriksson <[email protected]>
AuthorDate: Fri Feb 15 08:55:54 2019 +0100

    Dont try to interrupt index compactions when starting anticompactions.
    
    Patch by marcuse; reviewed by Blake Eggleston for CASSANDRA-15024
---
 CHANGES.txt                                        |  1 +
 .../org/apache/cassandra/db/ColumnFamilyStore.java | 33 +++++++++----
 .../cassandra/db/compaction/CompactionManager.java |  2 +-
 .../cassandra/db/repair/PendingAntiCompaction.java | 15 ++++--
 .../db/compaction/CancelCompactionsTest.java       | 56 +++++++++++++++++-----
 .../repair/AbstractPendingAntiCompactionTest.java  | 14 +++++-
 .../db/repair/PendingAntiCompactionTest.java       | 29 +++++++++++
 7 files changed, 122 insertions(+), 28 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index 7b06757..ffa251b 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 4.0
+ * Don't try to cancel 2i compactions when starting anticompaction 
(CASSANDRA-15024)
  * Avoid NPE in RepairRunnable.recordFailure (CASSANDRA-15025)
  * SSL Cert Hot Reloading should check for sanity of the new 
keystore/truststore before loading it (CASSANDRA-14991)
  * Avoid leaking threads when failing anticompactions and rate limit 
anticompactions (CASSANDRA-15002)
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java 
b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index da75f43..c09b884 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -2185,31 +2185,44 @@ public class ColumnFamilyStore implements 
ColumnFamilyStoreMBean
 
     public <V> V runWithCompactionsDisabled(Callable<V> callable, boolean 
interruptValidation, boolean interruptViews)
     {
-        return runWithCompactionsDisabled(callable, (sstable) -> true, 
interruptValidation, interruptViews);
+        return runWithCompactionsDisabled(callable, (sstable) -> true, 
interruptValidation, interruptViews, true);
     }
 
-    public <V> V runWithCompactionsDisabled(Callable<V> callable, 
Predicate<SSTableReader> sstablesPredicate, boolean interruptValidation, 
boolean interruptViews)
+    /**
+     * Runs callable with compactions paused and compactions including 
sstables matching sstablePredicate stopped
+     *
+     * @param callable what to do when compactions are paused
+     * @param sstablesPredicate which sstables should we cancel compactions for
+     * @param interruptValidation if we should interrupt validation compactions
+     * @param interruptViews if we should interrupt view compactions
+     * @param interruptIndexes if we should interrupt compactions on indexes. 
NOTE: if you set this to true your sstablePredicate
+     *                         must be able to handle LocalPartitioner 
sstables!
+     */
+    public <V> V runWithCompactionsDisabled(Callable<V> callable, 
Predicate<SSTableReader> sstablesPredicate, boolean interruptValidation, 
boolean interruptViews, boolean interruptIndexes)
     {
         // synchronize so that concurrent invocations don't re-enable 
compactions partway through unexpectedly,
         // and so we only run one major compaction at a time
         synchronized (this)
         {
             logger.trace("Cancelling in-progress compactions for {}", 
metadata.name);
+            Iterable<ColumnFamilyStore> toInterruptFor = interruptIndexes
+                                                         ? concatWithIndexes()
+                                                         : 
Collections.singleton(this);
 
-            Iterable<ColumnFamilyStore> selfWithAuxiliaryCfs = interruptViews
-                                                               ? 
Iterables.concat(concatWithIndexes(), viewManager.allViewsCfs())
-                                                               : 
concatWithIndexes();
+            toInterruptFor = interruptViews
+                             ? Iterables.concat(toInterruptFor, 
viewManager.allViewsCfs())
+                             : toInterruptFor;
 
-            for (ColumnFamilyStore cfs : selfWithAuxiliaryCfs)
+            for (ColumnFamilyStore cfs : toInterruptFor)
                 cfs.getCompactionStrategyManager().pause();
             try
             {
                 // interrupt in-progress compactions
-                
CompactionManager.instance.interruptCompactionForCFs(selfWithAuxiliaryCfs, 
sstablesPredicate, interruptValidation);
-                
CompactionManager.instance.waitForCessation(selfWithAuxiliaryCfs, 
sstablesPredicate);
+                
CompactionManager.instance.interruptCompactionForCFs(toInterruptFor, 
sstablesPredicate, interruptValidation);
+                CompactionManager.instance.waitForCessation(toInterruptFor, 
sstablesPredicate);
 
                 // doublecheck that we finished, instead of timing out
-                for (ColumnFamilyStore cfs : selfWithAuxiliaryCfs)
+                for (ColumnFamilyStore cfs : toInterruptFor)
                 {
                     if 
(cfs.getTracker().getCompacting().stream().anyMatch(sstablesPredicate))
                     {
@@ -2231,7 +2244,7 @@ public class ColumnFamilyStore implements 
ColumnFamilyStoreMBean
             }
             finally
             {
-                for (ColumnFamilyStore cfs : selfWithAuxiliaryCfs)
+                for (ColumnFamilyStore cfs : toInterruptFor)
                     cfs.getCompactionStrategyManager().resume();
             }
         }
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java 
b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index 84a3543..85aff08 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -861,7 +861,7 @@ public class CompactionManager implements 
CompactionManagerMBean
                            return null;
                        }
                        return 
cfStore.getCompactionStrategyManager().getUserDefinedTasks(sstables, 
getDefaultGcBefore(cfStore, FBUtilities.nowInSeconds()));
-                   }, (sstable) -> new Bounds<>(sstable.first.getToken(), 
sstable.last.getToken()).intersects(ranges), false, false);
+                   }, (sstable) -> new Bounds<>(sstable.first.getToken(), 
sstable.last.getToken()).intersects(ranges), false, false, false);
 
         if (tasks == null)
             return;
diff --git a/src/java/org/apache/cassandra/db/repair/PendingAntiCompaction.java 
b/src/java/org/apache/cassandra/db/repair/PendingAntiCompaction.java
index 0449cf1..d5c3ca0 100644
--- a/src/java/org/apache/cassandra/db/repair/PendingAntiCompaction.java
+++ b/src/java/org/apache/cassandra/db/repair/PendingAntiCompaction.java
@@ -140,8 +140,8 @@ public class PendingAntiCompaction
             {
                 // todo: start tracking the parent repair session id that 
created the anticompaction to be able to give a better error messsage here:
                 String message = String.format("Prepare phase for incremental 
repair session %s has failed because it encountered " +
-                                               "intersecting sstables 
belonging to another incremental repair session. This is " +
-                                               "caused by starting multiple 
conflicting incremental repairs at the same time", prsid);
+                                               "intersecting sstables (%s) 
belonging to another incremental repair session. This is " +
+                                               "caused by starting multiple 
conflicting incremental repairs at the same time", prsid, ci.getSSTables());
                 throw new SSTableAcquisitionException(message);
             }
             return true;
@@ -185,6 +185,8 @@ public class PendingAntiCompaction
                 LifecycleTransaction txn = 
cfs.getTracker().tryModify(sstables, OperationType.ANTICOMPACTION);
                 if (txn != null)
                     return new AcquireResult(cfs, Refs.ref(sstables), txn);
+                else
+                    logger.error("Could not mark compacting for {} (sstables = 
{}, compacting = {})", sessionID, sstables, cfs.getTracker().getCompacting());
             }
             catch (SSTableAcquisitionException e)
             {
@@ -212,7 +214,7 @@ public class PendingAntiCompaction
                 {
                     // Note that anticompactions are not disabled when running 
this. This is safe since runWithCompactionsDisabled
                     // is synchronized - acquireTuple and predicate can only 
be run by a single thread (for the given cfs).
-                    return cfs.runWithCompactionsDisabled(this::acquireTuple, 
predicate, false, false);
+                    return cfs.runWithCompactionsDisabled(this::acquireTuple, 
predicate, false, false, false);
                 }
                 catch (SSTableAcquisitionException e)
                 {
@@ -224,9 +226,14 @@ public class PendingAntiCompaction
                     Uninterruptibles.sleepUninterruptibly(acquireSleepMillis, 
TimeUnit.MILLISECONDS);
 
                     if (System.currentTimeMillis() - start > delay)
-                        logger.debug("{} Timed out waiting to acquire 
sstables", sessionID, e);
+                        logger.warn("{} Timed out waiting to acquire 
sstables", sessionID, e);
 
                 }
+                catch (Throwable t)
+                {
+                    logger.error("Got exception disabling compactions for 
session {}", sessionID, t);
+                    throw t;
+                }
             } while (System.currentTimeMillis() - start < delay);
             return null;
         }
diff --git 
a/test/unit/org/apache/cassandra/db/compaction/CancelCompactionsTest.java 
b/test/unit/org/apache/cassandra/db/compaction/CancelCompactionsTest.java
index 4b05fc4..cb4ef4a 100644
--- a/test/unit/org/apache/cassandra/db/compaction/CancelCompactionsTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/CancelCompactionsTest.java
@@ -36,9 +36,11 @@ import org.junit.BeforeClass;
 import org.junit.Test;
 
 import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.cql3.CQLTester;
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
 import org.apache.cassandra.db.repair.PendingAntiCompaction;
+import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.dht.Murmur3Partitioner;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
@@ -61,14 +63,8 @@ import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 
-public class CancelCompactionsTest
+public class CancelCompactionsTest extends CQLTester
 {
-    @BeforeClass
-    public static void setup()
-    {
-        DatabaseDescriptor.daemonInitialization();
-    }
-
     /**
      * makes sure we only cancel compactions if the precidate says we have 
overlapping sstables
      */
@@ -88,14 +84,14 @@ public class CancelCompactionsTest
             assertEquals(1, activeCompactions.size());
             
assertEquals(activeCompactions.get(0).getCompactionInfo().getSSTables(), 
toMarkCompacting);
             // predicate requires the non-compacting sstables, should not 
cancel the one currently compacting:
-            cfs.runWithCompactionsDisabled(() -> null, (sstable) -> 
!toMarkCompacting.contains(sstable), false, false);
+            cfs.runWithCompactionsDisabled(() -> null, (sstable) -> 
!toMarkCompacting.contains(sstable), false, false, true);
             assertEquals(1, activeCompactions.size());
             assertFalse(activeCompactions.get(0).isStopRequested());
 
             // predicate requires the compacting ones - make sure stop is 
requested and that when we abort that
             // compaction we actually run the callable (countdown the latch)
             CountDownLatch cdl = new CountDownLatch(1);
-            Thread t = new Thread(() -> cfs.runWithCompactionsDisabled(() -> { 
cdl.countDown(); return null; }, toMarkCompacting::contains, false, false));
+            Thread t = new Thread(() -> cfs.runWithCompactionsDisabled(() -> { 
cdl.countDown(); return null; }, toMarkCompacting::contains, false, false, 
true));
             t.start();
             while (!activeCompactions.get(0).isStopRequested())
                 Thread.sleep(100);
@@ -141,13 +137,13 @@ public class CancelCompactionsTest
             expectedSSTables.add(new HashSet<>(sstables.subList(6, 9)));
             assertEquals(compactingSSTables, expectedSSTables);
 
-            cfs.runWithCompactionsDisabled(() -> null, (sstable) -> false, 
false, false);
+            cfs.runWithCompactionsDisabled(() -> null, (sstable) -> false, 
false, false, true);
             assertEquals(2, activeCompactions.size());
             
assertTrue(activeCompactions.stream().noneMatch(CompactionInfo.Holder::isStopRequested));
 
             CountDownLatch cdl = new CountDownLatch(1);
             // start a compaction which only needs the sstables where first 
token is > 50 - these are the sstables compacted by tcts.get(1)
-            Thread t = new Thread(() -> cfs.runWithCompactionsDisabled(() -> { 
cdl.countDown(); return null; }, (sstable) -> first(sstable) > 50, false, 
false));
+            Thread t = new Thread(() -> cfs.runWithCompactionsDisabled(() -> { 
cdl.countDown(); return null; }, (sstable) -> first(sstable) > 50, false, 
false, true));
             t.start();
             activeCompactions = 
CompactionManager.instance.active.getCompactions();
             assertEquals(2, activeCompactions.size());
@@ -335,7 +331,7 @@ public class CancelCompactionsTest
             }
         }
         assertTrue(foundCompaction);
-        cfs.runWithCompactionsDisabled(() -> {compactionsStopped.countDown(); 
return null;}, (sstable) -> true, false, false);
+        cfs.runWithCompactionsDisabled(() -> {compactionsStopped.countDown(); 
return null;}, (sstable) -> true, false, false, true);
         // wait for the runWithCompactionsDisabled callable
         compactionsStopped.await();
         assertEquals(1, 
CompactionManager.instance.active.getCompactions().size());
@@ -418,4 +414,40 @@ public class CancelCompactionsTest
 
         }
     }
+
+    @Test
+    public void test2iCancellation() throws Throwable
+    {
+        createTable("create table %s (id int primary key, something int)");
+        createIndex("create index on %s(something)");
+        getCurrentColumnFamilyStore().disableAutoCompaction();
+        for (int i = 0; i < 10; i++)
+            execute("insert into %s (id, something) values (?, ?)", i, i);
+        flush();
+        ColumnFamilyStore idx = 
getCurrentColumnFamilyStore().indexManager.getAllIndexColumnFamilyStores().iterator().next();
+        Set<SSTableReader> sstables = new HashSet<>();
+        try (LifecycleTransaction txn = 
idx.getTracker().tryModify(idx.getLiveSSTables(), OperationType.COMPACTION))
+        {
+            getCurrentColumnFamilyStore().runWithCompactionsDisabled(() -> 
true, (sstable) -> { sstables.add(sstable); return true;}, false, false, false);
+        }
+        // the predicate only gets compacting sstables, and we are only 
compacting the 2i sstables - with interruptIndexes = false we should see no 
sstables here
+        assertTrue(sstables.isEmpty());
+    }
+
+    @Test
+    public void testSubrangeCompactionWith2i() throws Throwable
+    {
+        createTable("create table %s (id int primary key, something int)");
+        createIndex("create index on %s(something)");
+        getCurrentColumnFamilyStore().disableAutoCompaction();
+        for (int i = 0; i < 10; i++)
+            execute("insert into %s (id, something) values (?, ?)", i, i);
+        flush();
+        ColumnFamilyStore idx = 
getCurrentColumnFamilyStore().indexManager.getAllIndexColumnFamilyStores().iterator().next();
+        try (LifecycleTransaction txn = 
idx.getTracker().tryModify(idx.getLiveSSTables(), OperationType.COMPACTION))
+        {
+            IPartitioner partitioner = 
getCurrentColumnFamilyStore().getPartitioner();
+            
getCurrentColumnFamilyStore().forceCompactionForTokenRange(Collections.singleton(new
 Range<>(partitioner.getMinimumToken(), partitioner.getMaximumToken())));
+        }
+    }
 }
diff --git 
a/test/unit/org/apache/cassandra/db/repair/AbstractPendingAntiCompactionTest.java
 
b/test/unit/org/apache/cassandra/db/repair/AbstractPendingAntiCompactionTest.java
index 5adb7d6..62b7db1 100644
--- 
a/test/unit/org/apache/cassandra/db/repair/AbstractPendingAntiCompactionTest.java
+++ 
b/test/unit/org/apache/cassandra/db/repair/AbstractPendingAntiCompactionTest.java
@@ -30,14 +30,18 @@ import org.junit.Ignore;
 
 import org.apache.cassandra.SchemaLoader;
 import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.cql3.ColumnIdentifier;
 import org.apache.cassandra.cql3.QueryProcessor;
 import org.apache.cassandra.cql3.statements.schema.CreateTableStatement;
+import org.apache.cassandra.cql3.statements.schema.IndexTarget;
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.repair.AbstractRepairTest;
 import org.apache.cassandra.repair.consistent.LocalSessionAccessor;
+import org.apache.cassandra.schema.IndexMetadata;
+import org.apache.cassandra.schema.Indexes;
 import org.apache.cassandra.schema.KeyspaceParams;
 import org.apache.cassandra.schema.Schema;
 import org.apache.cassandra.schema.TableMetadata;
@@ -79,7 +83,15 @@ public abstract class AbstractPendingAntiCompactionTest
     {
         ks = "ks_" + System.currentTimeMillis();
         cfm = CreateTableStatement.parse(String.format("CREATE TABLE %s.%s (k 
INT PRIMARY KEY, v INT)", ks, tbl), ks).build();
-        TableMetadata cfm2 = CreateTableStatement.parse(String.format("CREATE 
TABLE %s.%s (k INT PRIMARY KEY, v INT)", ks, tbl2), ks).build();
+
+        Indexes.Builder indexes = Indexes.builder();
+        
indexes.add(IndexMetadata.fromIndexTargets(Collections.singletonList(new 
IndexTarget(new ColumnIdentifier("v", true),
+                                                                               
              IndexTarget.Type.VALUES)),
+                                                   tbl2 + "_idx",
+                                                   
IndexMetadata.Kind.COMPOSITES, Collections.emptyMap()));
+
+        TableMetadata cfm2 = CreateTableStatement.parse(String.format("CREATE 
TABLE %s.%s (k INT PRIMARY KEY, v INT)", ks, tbl2), 
ks).indexes(indexes.build()).build();
+
         SchemaLoader.createKeyspace(ks, KeyspaceParams.simple(1), cfm, cfm2);
         cfs = Schema.instance.getColumnFamilyStoreInstance(cfm.id);
         cfs2 = Schema.instance.getColumnFamilyStoreInstance(cfm2.id);
diff --git 
a/test/unit/org/apache/cassandra/db/repair/PendingAntiCompactionTest.java 
b/test/unit/org/apache/cassandra/db/repair/PendingAntiCompactionTest.java
index 1d4a97f..e44c697 100644
--- a/test/unit/org/apache/cassandra/db/repair/PendingAntiCompactionTest.java
+++ b/test/unit/org/apache/cassandra/db/repair/PendingAntiCompactionTest.java
@@ -677,6 +677,35 @@ public class PendingAntiCompactionTest extends 
AbstractPendingAntiCompactionTest
         }
     }
 
+    @Test
+    public void testWith2i() throws ExecutionException, InterruptedException
+    {
+        cfs2.disableAutoCompaction();
+        makeSSTables(2, cfs2, 100);
+        ColumnFamilyStore idx = 
cfs2.indexManager.getAllIndexColumnFamilyStores().iterator().next();
+        ExecutorService es = Executors.newFixedThreadPool(1);
+        try
+        {
+            UUID prsid = prepareSession();
+            for (SSTableReader sstable : cfs2.getLiveSSTables())
+                assertFalse(sstable.isPendingRepair());
+
+            // mark the sstables pending, with a 2i compaction going, which 
should be untouched;
+            try (LifecycleTransaction txn = 
idx.getTracker().tryModify(idx.getLiveSSTables(), OperationType.COMPACTION))
+            {
+                PendingAntiCompaction pac = new PendingAntiCompaction(prsid, 
Collections.singleton(cfs2), atEndpoint(FULL_RANGE, NO_RANGES), es);
+                pac.run().get();
+            }
+            // and make sure it succeeded;
+            for (SSTableReader sstable : cfs2.getLiveSSTables())
+                assertTrue(sstable.isPendingRepair());
+        }
+        finally
+        {
+            es.shutdown();
+        }
+    }
+
     private static RangesAtEndpoint atEndpoint(Collection<Range<Token>> full, 
Collection<Range<Token>> trans)
     {
         RangesAtEndpoint.Builder builder = RangesAtEndpoint.builder(local);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to