This is an automated email from the ASF dual-hosted git repository. marcuse pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/cassandra.git
commit 0706d32b0bd478160deb0143deb9811d49050b10 Author: Marcus Eriksson <[email protected]> AuthorDate: Fri Feb 15 08:55:54 2019 +0100 Dont try to interrupt index compactions when starting anticompactions. Patch by marcuse; reviewed by Blake Eggleston for CASSANDRA-15024 --- CHANGES.txt | 1 + .../org/apache/cassandra/db/ColumnFamilyStore.java | 33 +++++++++---- .../cassandra/db/compaction/CompactionManager.java | 2 +- .../cassandra/db/repair/PendingAntiCompaction.java | 15 ++++-- .../db/compaction/CancelCompactionsTest.java | 56 +++++++++++++++++----- .../repair/AbstractPendingAntiCompactionTest.java | 14 +++++- .../db/repair/PendingAntiCompactionTest.java | 29 +++++++++++ 7 files changed, 122 insertions(+), 28 deletions(-) diff --git a/CHANGES.txt b/CHANGES.txt index 7b06757..ffa251b 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 4.0 + * Don't try to cancel 2i compactions when starting anticompaction (CASSANDRA-15024) * Avoid NPE in RepairRunnable.recordFailure (CASSANDRA-15025) * SSL Cert Hot Reloading should check for sanity of the new keystore/truststore before loading it (CASSANDRA-14991) * Avoid leaking threads when failing anticompactions and rate limit anticompactions (CASSANDRA-15002) diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java index da75f43..c09b884 100644 --- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java +++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java @@ -2185,31 +2185,44 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean public <V> V runWithCompactionsDisabled(Callable<V> callable, boolean interruptValidation, boolean interruptViews) { - return runWithCompactionsDisabled(callable, (sstable) -> true, interruptValidation, interruptViews); + return runWithCompactionsDisabled(callable, (sstable) -> true, interruptValidation, interruptViews, true); } - public <V> V runWithCompactionsDisabled(Callable<V> callable, Predicate<SSTableReader> sstablesPredicate, boolean interruptValidation, boolean interruptViews) + /** + * Runs callable with compactions paused and compactions including sstables matching sstablePredicate stopped + * + * @param callable what to do when compactions are paused + * @param sstablesPredicate which sstables should we cancel compactions for + * @param interruptValidation if we should interrupt validation compactions + * @param interruptViews if we should interrupt view compactions + * @param interruptIndexes if we should interrupt compactions on indexes. NOTE: if you set this to true your sstablePredicate + * must be able to handle LocalPartitioner sstables! + */ + public <V> V runWithCompactionsDisabled(Callable<V> callable, Predicate<SSTableReader> sstablesPredicate, boolean interruptValidation, boolean interruptViews, boolean interruptIndexes) { // synchronize so that concurrent invocations don't re-enable compactions partway through unexpectedly, // and so we only run one major compaction at a time synchronized (this) { logger.trace("Cancelling in-progress compactions for {}", metadata.name); + Iterable<ColumnFamilyStore> toInterruptFor = interruptIndexes + ? concatWithIndexes() + : Collections.singleton(this); - Iterable<ColumnFamilyStore> selfWithAuxiliaryCfs = interruptViews - ? Iterables.concat(concatWithIndexes(), viewManager.allViewsCfs()) - : concatWithIndexes(); + toInterruptFor = interruptViews + ? Iterables.concat(toInterruptFor, viewManager.allViewsCfs()) + : toInterruptFor; - for (ColumnFamilyStore cfs : selfWithAuxiliaryCfs) + for (ColumnFamilyStore cfs : toInterruptFor) cfs.getCompactionStrategyManager().pause(); try { // interrupt in-progress compactions - CompactionManager.instance.interruptCompactionForCFs(selfWithAuxiliaryCfs, sstablesPredicate, interruptValidation); - CompactionManager.instance.waitForCessation(selfWithAuxiliaryCfs, sstablesPredicate); + CompactionManager.instance.interruptCompactionForCFs(toInterruptFor, sstablesPredicate, interruptValidation); + CompactionManager.instance.waitForCessation(toInterruptFor, sstablesPredicate); // doublecheck that we finished, instead of timing out - for (ColumnFamilyStore cfs : selfWithAuxiliaryCfs) + for (ColumnFamilyStore cfs : toInterruptFor) { if (cfs.getTracker().getCompacting().stream().anyMatch(sstablesPredicate)) { @@ -2231,7 +2244,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean } finally { - for (ColumnFamilyStore cfs : selfWithAuxiliaryCfs) + for (ColumnFamilyStore cfs : toInterruptFor) cfs.getCompactionStrategyManager().resume(); } } diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java index 84a3543..85aff08 100644 --- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java @@ -861,7 +861,7 @@ public class CompactionManager implements CompactionManagerMBean return null; } return cfStore.getCompactionStrategyManager().getUserDefinedTasks(sstables, getDefaultGcBefore(cfStore, FBUtilities.nowInSeconds())); - }, (sstable) -> new Bounds<>(sstable.first.getToken(), sstable.last.getToken()).intersects(ranges), false, false); + }, (sstable) -> new Bounds<>(sstable.first.getToken(), sstable.last.getToken()).intersects(ranges), false, false, false); if (tasks == null) return; diff --git a/src/java/org/apache/cassandra/db/repair/PendingAntiCompaction.java b/src/java/org/apache/cassandra/db/repair/PendingAntiCompaction.java index 0449cf1..d5c3ca0 100644 --- a/src/java/org/apache/cassandra/db/repair/PendingAntiCompaction.java +++ b/src/java/org/apache/cassandra/db/repair/PendingAntiCompaction.java @@ -140,8 +140,8 @@ public class PendingAntiCompaction { // todo: start tracking the parent repair session id that created the anticompaction to be able to give a better error messsage here: String message = String.format("Prepare phase for incremental repair session %s has failed because it encountered " + - "intersecting sstables belonging to another incremental repair session. This is " + - "caused by starting multiple conflicting incremental repairs at the same time", prsid); + "intersecting sstables (%s) belonging to another incremental repair session. This is " + + "caused by starting multiple conflicting incremental repairs at the same time", prsid, ci.getSSTables()); throw new SSTableAcquisitionException(message); } return true; @@ -185,6 +185,8 @@ public class PendingAntiCompaction LifecycleTransaction txn = cfs.getTracker().tryModify(sstables, OperationType.ANTICOMPACTION); if (txn != null) return new AcquireResult(cfs, Refs.ref(sstables), txn); + else + logger.error("Could not mark compacting for {} (sstables = {}, compacting = {})", sessionID, sstables, cfs.getTracker().getCompacting()); } catch (SSTableAcquisitionException e) { @@ -212,7 +214,7 @@ public class PendingAntiCompaction { // Note that anticompactions are not disabled when running this. This is safe since runWithCompactionsDisabled // is synchronized - acquireTuple and predicate can only be run by a single thread (for the given cfs). - return cfs.runWithCompactionsDisabled(this::acquireTuple, predicate, false, false); + return cfs.runWithCompactionsDisabled(this::acquireTuple, predicate, false, false, false); } catch (SSTableAcquisitionException e) { @@ -224,9 +226,14 @@ public class PendingAntiCompaction Uninterruptibles.sleepUninterruptibly(acquireSleepMillis, TimeUnit.MILLISECONDS); if (System.currentTimeMillis() - start > delay) - logger.debug("{} Timed out waiting to acquire sstables", sessionID, e); + logger.warn("{} Timed out waiting to acquire sstables", sessionID, e); } + catch (Throwable t) + { + logger.error("Got exception disabling compactions for session {}", sessionID, t); + throw t; + } } while (System.currentTimeMillis() - start < delay); return null; } diff --git a/test/unit/org/apache/cassandra/db/compaction/CancelCompactionsTest.java b/test/unit/org/apache/cassandra/db/compaction/CancelCompactionsTest.java index 4b05fc4..cb4ef4a 100644 --- a/test/unit/org/apache/cassandra/db/compaction/CancelCompactionsTest.java +++ b/test/unit/org/apache/cassandra/db/compaction/CancelCompactionsTest.java @@ -36,9 +36,11 @@ import org.junit.BeforeClass; import org.junit.Test; import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.cql3.CQLTester; import org.apache.cassandra.db.ColumnFamilyStore; import org.apache.cassandra.db.lifecycle.LifecycleTransaction; import org.apache.cassandra.db.repair.PendingAntiCompaction; +import org.apache.cassandra.dht.IPartitioner; import org.apache.cassandra.dht.Murmur3Partitioner; import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; @@ -61,14 +63,8 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; -public class CancelCompactionsTest +public class CancelCompactionsTest extends CQLTester { - @BeforeClass - public static void setup() - { - DatabaseDescriptor.daemonInitialization(); - } - /** * makes sure we only cancel compactions if the precidate says we have overlapping sstables */ @@ -88,14 +84,14 @@ public class CancelCompactionsTest assertEquals(1, activeCompactions.size()); assertEquals(activeCompactions.get(0).getCompactionInfo().getSSTables(), toMarkCompacting); // predicate requires the non-compacting sstables, should not cancel the one currently compacting: - cfs.runWithCompactionsDisabled(() -> null, (sstable) -> !toMarkCompacting.contains(sstable), false, false); + cfs.runWithCompactionsDisabled(() -> null, (sstable) -> !toMarkCompacting.contains(sstable), false, false, true); assertEquals(1, activeCompactions.size()); assertFalse(activeCompactions.get(0).isStopRequested()); // predicate requires the compacting ones - make sure stop is requested and that when we abort that // compaction we actually run the callable (countdown the latch) CountDownLatch cdl = new CountDownLatch(1); - Thread t = new Thread(() -> cfs.runWithCompactionsDisabled(() -> { cdl.countDown(); return null; }, toMarkCompacting::contains, false, false)); + Thread t = new Thread(() -> cfs.runWithCompactionsDisabled(() -> { cdl.countDown(); return null; }, toMarkCompacting::contains, false, false, true)); t.start(); while (!activeCompactions.get(0).isStopRequested()) Thread.sleep(100); @@ -141,13 +137,13 @@ public class CancelCompactionsTest expectedSSTables.add(new HashSet<>(sstables.subList(6, 9))); assertEquals(compactingSSTables, expectedSSTables); - cfs.runWithCompactionsDisabled(() -> null, (sstable) -> false, false, false); + cfs.runWithCompactionsDisabled(() -> null, (sstable) -> false, false, false, true); assertEquals(2, activeCompactions.size()); assertTrue(activeCompactions.stream().noneMatch(CompactionInfo.Holder::isStopRequested)); CountDownLatch cdl = new CountDownLatch(1); // start a compaction which only needs the sstables where first token is > 50 - these are the sstables compacted by tcts.get(1) - Thread t = new Thread(() -> cfs.runWithCompactionsDisabled(() -> { cdl.countDown(); return null; }, (sstable) -> first(sstable) > 50, false, false)); + Thread t = new Thread(() -> cfs.runWithCompactionsDisabled(() -> { cdl.countDown(); return null; }, (sstable) -> first(sstable) > 50, false, false, true)); t.start(); activeCompactions = CompactionManager.instance.active.getCompactions(); assertEquals(2, activeCompactions.size()); @@ -335,7 +331,7 @@ public class CancelCompactionsTest } } assertTrue(foundCompaction); - cfs.runWithCompactionsDisabled(() -> {compactionsStopped.countDown(); return null;}, (sstable) -> true, false, false); + cfs.runWithCompactionsDisabled(() -> {compactionsStopped.countDown(); return null;}, (sstable) -> true, false, false, true); // wait for the runWithCompactionsDisabled callable compactionsStopped.await(); assertEquals(1, CompactionManager.instance.active.getCompactions().size()); @@ -418,4 +414,40 @@ public class CancelCompactionsTest } } + + @Test + public void test2iCancellation() throws Throwable + { + createTable("create table %s (id int primary key, something int)"); + createIndex("create index on %s(something)"); + getCurrentColumnFamilyStore().disableAutoCompaction(); + for (int i = 0; i < 10; i++) + execute("insert into %s (id, something) values (?, ?)", i, i); + flush(); + ColumnFamilyStore idx = getCurrentColumnFamilyStore().indexManager.getAllIndexColumnFamilyStores().iterator().next(); + Set<SSTableReader> sstables = new HashSet<>(); + try (LifecycleTransaction txn = idx.getTracker().tryModify(idx.getLiveSSTables(), OperationType.COMPACTION)) + { + getCurrentColumnFamilyStore().runWithCompactionsDisabled(() -> true, (sstable) -> { sstables.add(sstable); return true;}, false, false, false); + } + // the predicate only gets compacting sstables, and we are only compacting the 2i sstables - with interruptIndexes = false we should see no sstables here + assertTrue(sstables.isEmpty()); + } + + @Test + public void testSubrangeCompactionWith2i() throws Throwable + { + createTable("create table %s (id int primary key, something int)"); + createIndex("create index on %s(something)"); + getCurrentColumnFamilyStore().disableAutoCompaction(); + for (int i = 0; i < 10; i++) + execute("insert into %s (id, something) values (?, ?)", i, i); + flush(); + ColumnFamilyStore idx = getCurrentColumnFamilyStore().indexManager.getAllIndexColumnFamilyStores().iterator().next(); + try (LifecycleTransaction txn = idx.getTracker().tryModify(idx.getLiveSSTables(), OperationType.COMPACTION)) + { + IPartitioner partitioner = getCurrentColumnFamilyStore().getPartitioner(); + getCurrentColumnFamilyStore().forceCompactionForTokenRange(Collections.singleton(new Range<>(partitioner.getMinimumToken(), partitioner.getMaximumToken()))); + } + } } diff --git a/test/unit/org/apache/cassandra/db/repair/AbstractPendingAntiCompactionTest.java b/test/unit/org/apache/cassandra/db/repair/AbstractPendingAntiCompactionTest.java index 5adb7d6..62b7db1 100644 --- a/test/unit/org/apache/cassandra/db/repair/AbstractPendingAntiCompactionTest.java +++ b/test/unit/org/apache/cassandra/db/repair/AbstractPendingAntiCompactionTest.java @@ -30,14 +30,18 @@ import org.junit.Ignore; import org.apache.cassandra.SchemaLoader; import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.cql3.ColumnIdentifier; import org.apache.cassandra.cql3.QueryProcessor; import org.apache.cassandra.cql3.statements.schema.CreateTableStatement; +import org.apache.cassandra.cql3.statements.schema.IndexTarget; import org.apache.cassandra.db.ColumnFamilyStore; import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.repair.AbstractRepairTest; import org.apache.cassandra.repair.consistent.LocalSessionAccessor; +import org.apache.cassandra.schema.IndexMetadata; +import org.apache.cassandra.schema.Indexes; import org.apache.cassandra.schema.KeyspaceParams; import org.apache.cassandra.schema.Schema; import org.apache.cassandra.schema.TableMetadata; @@ -79,7 +83,15 @@ public abstract class AbstractPendingAntiCompactionTest { ks = "ks_" + System.currentTimeMillis(); cfm = CreateTableStatement.parse(String.format("CREATE TABLE %s.%s (k INT PRIMARY KEY, v INT)", ks, tbl), ks).build(); - TableMetadata cfm2 = CreateTableStatement.parse(String.format("CREATE TABLE %s.%s (k INT PRIMARY KEY, v INT)", ks, tbl2), ks).build(); + + Indexes.Builder indexes = Indexes.builder(); + indexes.add(IndexMetadata.fromIndexTargets(Collections.singletonList(new IndexTarget(new ColumnIdentifier("v", true), + IndexTarget.Type.VALUES)), + tbl2 + "_idx", + IndexMetadata.Kind.COMPOSITES, Collections.emptyMap())); + + TableMetadata cfm2 = CreateTableStatement.parse(String.format("CREATE TABLE %s.%s (k INT PRIMARY KEY, v INT)", ks, tbl2), ks).indexes(indexes.build()).build(); + SchemaLoader.createKeyspace(ks, KeyspaceParams.simple(1), cfm, cfm2); cfs = Schema.instance.getColumnFamilyStoreInstance(cfm.id); cfs2 = Schema.instance.getColumnFamilyStoreInstance(cfm2.id); diff --git a/test/unit/org/apache/cassandra/db/repair/PendingAntiCompactionTest.java b/test/unit/org/apache/cassandra/db/repair/PendingAntiCompactionTest.java index 1d4a97f..e44c697 100644 --- a/test/unit/org/apache/cassandra/db/repair/PendingAntiCompactionTest.java +++ b/test/unit/org/apache/cassandra/db/repair/PendingAntiCompactionTest.java @@ -677,6 +677,35 @@ public class PendingAntiCompactionTest extends AbstractPendingAntiCompactionTest } } + @Test + public void testWith2i() throws ExecutionException, InterruptedException + { + cfs2.disableAutoCompaction(); + makeSSTables(2, cfs2, 100); + ColumnFamilyStore idx = cfs2.indexManager.getAllIndexColumnFamilyStores().iterator().next(); + ExecutorService es = Executors.newFixedThreadPool(1); + try + { + UUID prsid = prepareSession(); + for (SSTableReader sstable : cfs2.getLiveSSTables()) + assertFalse(sstable.isPendingRepair()); + + // mark the sstables pending, with a 2i compaction going, which should be untouched; + try (LifecycleTransaction txn = idx.getTracker().tryModify(idx.getLiveSSTables(), OperationType.COMPACTION)) + { + PendingAntiCompaction pac = new PendingAntiCompaction(prsid, Collections.singleton(cfs2), atEndpoint(FULL_RANGE, NO_RANGES), es); + pac.run().get(); + } + // and make sure it succeeded; + for (SSTableReader sstable : cfs2.getLiveSSTables()) + assertTrue(sstable.isPendingRepair()); + } + finally + { + es.shutdown(); + } + } + private static RangesAtEndpoint atEndpoint(Collection<Range<Token>> full, Collection<Range<Token>> trans) { RangesAtEndpoint.Builder builder = RangesAtEndpoint.builder(local); --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
