Repository: cassandra Updated Branches: refs/heads/trunk f4da90aca -> e9cc805db
Fix race / ref leak in anticompaction Patch by Blake Eggleston; Reviewed by Ariel Weisberg for CASSANDRA-13688 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/e9cc805d Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/e9cc805d Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/e9cc805d Branch: refs/heads/trunk Commit: e9cc805db1133982c022657f8cab86cd24b3686f Parents: f4da90a Author: Blake Eggleston <bdeggles...@gmail.com> Authored: Wed Jul 12 14:47:48 2017 -0700 Committer: Blake Eggleston <bdeggles...@gmail.com> Committed: Fri Aug 11 10:24:17 2017 -0700 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../db/compaction/AbstractCompactionTask.java | 40 +++++ .../db/compaction/CompactionManager.java | 45 +++--- .../db/compaction/PendingRepairManager.java | 43 ++--- .../db/compaction/AntiCompactionTest.java | 40 +++++ .../db/compaction/CompactionTaskTest.java | 157 +++++++++++++++++++ .../consistent/PendingAntiCompactionTest.java | 23 +++ 7 files changed, 312 insertions(+), 37 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/e9cc805d/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 988f93d..7c9d79a 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 4.0 + * Fix race / ref leak in anticompaction (CASSANDRA-13688) * Expose tasks queue length via JMX (CASSANDRA-12758) * Fix race / ref leak in PendingRepairManager (CASSANDRA-13751) * Enable ppc64le runtime as unsupported architecture (CASSANDRA-13615) http://git-wip-us.apache.org/repos/asf/cassandra/blob/e9cc805d/src/java/org/apache/cassandra/db/compaction/AbstractCompactionTask.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionTask.java b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionTask.java index 430c916..c542a51 100644 --- a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionTask.java +++ b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionTask.java @@ -17,7 +17,11 @@ */ package org.apache.cassandra.db.compaction; +import java.util.Iterator; import java.util.Set; +import java.util.UUID; + +import com.google.common.base.Preconditions; import org.apache.cassandra.db.ColumnFamilyStore; import org.apache.cassandra.db.Directories; @@ -49,6 +53,42 @@ public abstract class AbstractCompactionTask extends WrappedRunnable Set<SSTableReader> compacting = transaction.tracker.getCompacting(); for (SSTableReader sstable : transaction.originals()) assert compacting.contains(sstable) : sstable.getFilename() + " is not correctly marked compacting"; + + validateSSTables(transaction.originals()); + } + + /** + * Confirm that we're not attempting to compact repaired/unrepaired/pending repair sstables together + */ + private void validateSSTables(Set<SSTableReader> sstables) + { + // do not allow to be compacted together + if (!sstables.isEmpty()) + { + Iterator<SSTableReader> iter = sstables.iterator(); + SSTableReader first = iter.next(); + boolean isRepaired = first.isRepaired(); + UUID pendingRepair = first.getPendingRepair(); + while (iter.hasNext()) + { + SSTableReader next = iter.next(); + Preconditions.checkArgument(isRepaired == next.isRepaired(), + "Cannot compact repaired and unrepaired sstables"); + + if (pendingRepair == null) + { + Preconditions.checkArgument(!next.isPendingRepair(), + "Cannot compact pending repair and non-pending repair sstables"); + } + else + { + Preconditions.checkArgument(next.isPendingRepair(), + "Cannot compact pending repair and non-pending repair sstables"); + Preconditions.checkArgument(pendingRepair.equals(next.getPendingRepair()), + "Cannot compact sstables from different pending repairs"); + } + } + } } /** http://git-wip-us.apache.org/repos/asf/cassandra/blob/e9cc805d/src/java/org/apache/cassandra/db/compaction/CompactionManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java index bc372f5..c7c86c6 100644 --- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java @@ -129,6 +129,12 @@ public class CompactionManager implements CompactionManagerMBean private final RateLimiter compactionRateLimiter = RateLimiter.create(Double.MAX_VALUE); + @VisibleForTesting + CompactionMetrics getMetrics() + { + return metrics; + } + /** * Gets compaction rate limiter. * Rate unit is bytes per sec. @@ -592,16 +598,19 @@ public class CompactionManager implements CompactionManagerMBean } }; - ListenableFutureTask<?> task = ListenableFutureTask.create(runnable, null); + ListenableFuture<?> task = null; try { - executor.submitIfRunning(task, "pending anticompaction"); + task = executor.submitIfRunning(runnable, "pending anticompaction"); return task; } finally { - if (task.isCancelled()) + if (task == null || task.isCancelled()) + { sstables.release(); + txn.abort(); + } } } @@ -625,24 +634,24 @@ public class CompactionManager implements CompactionManagerMBean UUID pendingRepair, UUID parentRepairSession) throws InterruptedException, IOException { - ActiveRepairService.ParentRepairSession prs = ActiveRepairService.instance.getParentRepairSession(parentRepairSession); - Preconditions.checkArgument(!prs.isPreview(), "Cannot anticompact for previews"); + try + { + ActiveRepairService.ParentRepairSession prs = ActiveRepairService.instance.getParentRepairSession(parentRepairSession); + Preconditions.checkArgument(!prs.isPreview(), "Cannot anticompact for previews"); - logger.info("{} Starting anticompaction for {}.{} on {}/{} sstables", PreviewKind.NONE.logPrefix(parentRepairSession), cfs.keyspace.getName(), cfs.getTableName(), validatedForRepair.size(), cfs.getLiveSSTables().size()); - logger.trace("{} Starting anticompaction for ranges {}", PreviewKind.NONE.logPrefix(parentRepairSession), ranges); - Set<SSTableReader> sstables = new HashSet<>(validatedForRepair); - Set<SSTableReader> mutatedRepairStatuses = new HashSet<>(); - // we should only notify that repair status changed if it actually did: - Set<SSTableReader> mutatedRepairStatusToNotify = new HashSet<>(); - Map<SSTableReader, Boolean> wasRepairedBefore = new HashMap<>(); - for (SSTableReader sstable : sstables) - wasRepairedBefore.put(sstable, sstable.isRepaired()); + logger.info("{} Starting anticompaction for {}.{} on {}/{} sstables", PreviewKind.NONE.logPrefix(parentRepairSession), cfs.keyspace.getName(), cfs.getTableName(), validatedForRepair.size(), cfs.getLiveSSTables().size()); + logger.trace("{} Starting anticompaction for ranges {}", PreviewKind.NONE.logPrefix(parentRepairSession), ranges); + Set<SSTableReader> sstables = new HashSet<>(validatedForRepair); + Set<SSTableReader> mutatedRepairStatuses = new HashSet<>(); + // we should only notify that repair status changed if it actually did: + Set<SSTableReader> mutatedRepairStatusToNotify = new HashSet<>(); + Map<SSTableReader, Boolean> wasRepairedBefore = new HashMap<>(); + for (SSTableReader sstable : sstables) + wasRepairedBefore.put(sstable, sstable.isRepaired()); - Set<SSTableReader> nonAnticompacting = new HashSet<>(); + Set<SSTableReader> nonAnticompacting = new HashSet<>(); - Iterator<SSTableReader> sstableIterator = sstables.iterator(); - try - { + Iterator<SSTableReader> sstableIterator = sstables.iterator(); List<Range<Token>> normalizedRanges = Range.normalize(ranges); while (sstableIterator.hasNext()) http://git-wip-us.apache.org/repos/asf/cassandra/blob/e9cc805d/src/java/org/apache/cassandra/db/compaction/PendingRepairManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/PendingRepairManager.java b/src/java/org/apache/cassandra/db/compaction/PendingRepairManager.java index 183af7a..2786396 100644 --- a/src/java/org/apache/cassandra/db/compaction/PendingRepairManager.java +++ b/src/java/org/apache/cassandra/db/compaction/PendingRepairManager.java @@ -134,7 +134,7 @@ class PendingRepairManager private synchronized void removeSession(UUID sessionID) { - if (!strategies.containsKey(sessionID)) + if (!strategies.containsKey(sessionID) || !strategies.get(sessionID).getSSTables().isEmpty()) return; logger.debug("Removing compaction strategy for pending repair {} on {}.{}", sessionID, cfs.metadata.keyspace, cfs.metadata.name); @@ -424,14 +424,31 @@ class PendingRepairManager protected void runMayThrow() throws Exception { - for (SSTableReader sstable : transaction.originals()) + boolean completed = false; + try { - logger.debug("Setting repairedAt to {} on {} for {}", repairedAt, sstable, sessionID); - sstable.descriptor.getMetadataSerializer().mutateRepaired(sstable.descriptor, repairedAt, ActiveRepairService.NO_PENDING_REPAIR); - sstable.reloadSSTableMetadata(); + logger.debug("Setting repairedAt to {} on {} for {}", repairedAt, transaction.originals(), sessionID); + for (SSTableReader sstable : transaction.originals()) + { + sstable.descriptor.getMetadataSerializer().mutateRepaired(sstable.descriptor, repairedAt, ActiveRepairService.NO_PENDING_REPAIR); + sstable.reloadSSTableMetadata(); + } + completed = true; + } + finally + { + // even if we weren't able to rewrite all the sstable metedata, we should still move the ones that were + cfs.getTracker().notifySSTableRepairedStatusChanged(transaction.originals()); + + // we always abort because mutating metadata isn't guarded by LifecycleTransaction, so this won't roll + // anything back. Also, we don't want to obsolete the originals. We're only using it to prevent other + // compactions from marking these sstables compacting, and unmarking them when we're done + transaction.abort(); + if (completed) + { + removeSession(sessionID); + } } - cfs.getTracker().notifySSTableRepairedStatusChanged(transaction.originals()); - transaction.abort(); } public CompactionAwareWriter getCompactionAwareWriter(ColumnFamilyStore cfs, Directories directories, LifecycleTransaction txn, Set<SSTableReader> nonExpiredSSTables) @@ -444,18 +461,6 @@ class PendingRepairManager run(); return transaction.originals().size(); } - - public int execute(CompactionManager.CompactionExecutorStatsCollector collector) - { - try - { - return super.execute(collector); - } - finally - { - removeSession(sessionID); - } - } } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/e9cc805d/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java b/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java index 8db194b..5f05fab 100644 --- a/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java +++ b/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java @@ -29,6 +29,7 @@ import java.util.UUID; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; +import org.junit.Assert; import org.junit.BeforeClass; import org.junit.After; import org.junit.Ignore; @@ -55,6 +56,7 @@ import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.UUIDGen; import org.apache.cassandra.utils.concurrent.Refs; import org.apache.cassandra.UpdateBuilder; +import org.apache.cassandra.utils.concurrent.Transactional; import static org.apache.cassandra.service.ActiveRepairService.UNREPAIRED_SSTABLE; import static org.hamcrest.CoreMatchers.is; @@ -397,4 +399,42 @@ public class AntiCompactionTest return ImmutableSet.copyOf(cfs.getTracker().getView().sstables(SSTableSet.LIVE, (s) -> !s.isRepaired())); } + /** + * If the parent repair session is missing, we should still clean up + */ + @Test + public void missingParentRepairSession() throws Exception + { + Keyspace keyspace = Keyspace.open(KEYSPACE1); + ColumnFamilyStore store = keyspace.getColumnFamilyStore(CF); + store.disableAutoCompaction(); + + for (int table = 0; table < 10; table++) + { + generateSStable(store,Integer.toString(table)); + } + Collection<SSTableReader> sstables = getUnrepairedSSTables(store); + assertEquals(10, sstables.size()); + + Range<Token> range = new Range<Token>(new BytesToken("-1".getBytes()), new BytesToken("-10".getBytes())); + List<Range<Token>> ranges = Arrays.asList(range); + + UUID missingRepairSession = UUIDGen.getTimeUUID(); + try (LifecycleTransaction txn = store.getTracker().tryModify(sstables, OperationType.ANTICOMPACTION); + Refs<SSTableReader> refs = Refs.ref(sstables)) + { + Assert.assertFalse(refs.isEmpty()); + try + { + CompactionManager.instance.performAnticompaction(store, ranges, refs, txn, 1, missingRepairSession, missingRepairSession); + Assert.fail("expected RuntimeException"); + } + catch (RuntimeException e) + { + // expected + } + Assert.assertEquals(Transactional.AbstractTransactional.State.ABORTED, txn.state()); + Assert.assertTrue(refs.isEmpty()); + } + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/e9cc805d/test/unit/org/apache/cassandra/db/compaction/CompactionTaskTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/compaction/CompactionTaskTest.java b/test/unit/org/apache/cassandra/db/compaction/CompactionTaskTest.java new file mode 100644 index 0000000..4640248 --- /dev/null +++ b/test/unit/org/apache/cassandra/db/compaction/CompactionTaskTest.java @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.db.compaction; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Set; +import java.util.UUID; + +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +import org.apache.cassandra.SchemaLoader; +import org.apache.cassandra.cql3.QueryProcessor; +import org.apache.cassandra.cql3.statements.CreateTableStatement; +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.lifecycle.LifecycleTransaction; +import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.schema.KeyspaceParams; +import org.apache.cassandra.schema.Schema; +import org.apache.cassandra.schema.TableMetadata; +import org.apache.cassandra.service.ActiveRepairService; +import org.apache.cassandra.utils.FBUtilities; +import org.apache.cassandra.utils.UUIDGen; +import org.apache.cassandra.utils.concurrent.Transactional; + +public class CompactionTaskTest +{ + private static TableMetadata cfm; + private static ColumnFamilyStore cfs; + + @BeforeClass + public static void setUpClass() throws Exception + { + SchemaLoader.prepareServer(); + cfm = CreateTableStatement.parse("CREATE TABLE tbl (k INT PRIMARY KEY, v INT)", "coordinatorsessiontest").build(); + SchemaLoader.createKeyspace("ks", KeyspaceParams.simple(1), cfm); + cfs = Schema.instance.getColumnFamilyStoreInstance(cfm.id); + } + + @Before + public void setUp() throws Exception + { + cfs.getCompactionStrategyManager().enable(); + cfs.truncateBlocking(); + } + + @Test + public void compactionInterruption() throws Exception + { + cfs.getCompactionStrategyManager().disable(); + QueryProcessor.executeInternal("INSERT INTO ks.tbl (k, v) VALUES (1, 1);"); + QueryProcessor.executeInternal("INSERT INTO ks.tbl (k, v) VALUES (2, 2);"); + cfs.forceBlockingFlush(); + QueryProcessor.executeInternal("INSERT INTO ks.tbl (k, v) VALUES (3, 3);"); + QueryProcessor.executeInternal("INSERT INTO ks.tbl (k, v) VALUES (4, 4);"); + cfs.forceBlockingFlush(); + Set<SSTableReader> sstables = cfs.getLiveSSTables(); + + Assert.assertEquals(2, sstables.size()); + + LifecycleTransaction txn = cfs.getTracker().tryModify(sstables, OperationType.COMPACTION); + Assert.assertNotNull(txn); + CompactionTask task = new CompactionTask(cfs, txn, 0); + Assert.assertNotNull(task); + cfs.getCompactionStrategyManager().pause(); + try + { + task.execute(CompactionManager.instance.getMetrics()); + Assert.fail("Expected CompactionInterruptedException"); + } + catch (CompactionInterruptedException e) + { + // expected + } + Assert.assertEquals(Transactional.AbstractTransactional.State.ABORTED, txn.state()); + } + + private static void mutateRepaired(SSTableReader sstable, long repairedAt, UUID pendingRepair) throws IOException + { + sstable.descriptor.getMetadataSerializer().mutateRepaired(sstable.descriptor, repairedAt, pendingRepair); + sstable.reloadSSTableMetadata(); + } + + /** + * If we try to create a compaction task that will mix + * repaired/unrepaired/pending repair sstables, it should fail + */ + @Test + public void mixedSSTableFailure() throws Exception + { + cfs.getCompactionStrategyManager().disable(); + QueryProcessor.executeInternal("INSERT INTO ks.tbl (k, v) VALUES (1, 1);"); + cfs.forceBlockingFlush(); + QueryProcessor.executeInternal("INSERT INTO ks.tbl (k, v) VALUES (2, 2);"); + cfs.forceBlockingFlush(); + QueryProcessor.executeInternal("INSERT INTO ks.tbl (k, v) VALUES (3, 3);"); + cfs.forceBlockingFlush(); + QueryProcessor.executeInternal("INSERT INTO ks.tbl (k, v) VALUES (4, 4);"); + cfs.forceBlockingFlush(); + + List<SSTableReader> sstables = new ArrayList<>(cfs.getLiveSSTables()); + Assert.assertEquals(4, sstables.size()); + + SSTableReader unrepaired = sstables.get(0); + SSTableReader repaired = sstables.get(1); + SSTableReader pending1 = sstables.get(2); + SSTableReader pending2 = sstables.get(3); + + mutateRepaired(repaired, FBUtilities.nowInSeconds(), ActiveRepairService.NO_PENDING_REPAIR); + mutateRepaired(pending1, ActiveRepairService.UNREPAIRED_SSTABLE, UUIDGen.getTimeUUID()); + mutateRepaired(pending2, ActiveRepairService.UNREPAIRED_SSTABLE, UUIDGen.getTimeUUID()); + + LifecycleTransaction txn = null; + List<SSTableReader> toCompact = new ArrayList<>(sstables); + for (int i=0; i<sstables.size(); i++) + { + try + { + txn = cfs.getTracker().tryModify(sstables, OperationType.COMPACTION); + Assert.assertNotNull(txn); + CompactionTask task = new CompactionTask(cfs, txn, 0); + Assert.fail("Expected IllegalArgumentException"); + } + catch (IllegalArgumentException e) + { + // expected + } + finally + { + if (txn != null) + txn.abort(); + } + Collections.rotate(toCompact, 1); + } + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/e9cc805d/test/unit/org/apache/cassandra/repair/consistent/PendingAntiCompactionTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/repair/consistent/PendingAntiCompactionTest.java b/test/unit/org/apache/cassandra/repair/consistent/PendingAntiCompactionTest.java index 3119453..5aeab3e 100644 --- a/test/unit/org/apache/cassandra/repair/consistent/PendingAntiCompactionTest.java +++ b/test/unit/org/apache/cassandra/repair/consistent/PendingAntiCompactionTest.java @@ -43,6 +43,7 @@ import org.slf4j.LoggerFactory; import org.apache.cassandra.SchemaLoader; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.cql3.statements.CreateTableStatement; +import org.apache.cassandra.db.compaction.CompactionManager; import org.apache.cassandra.schema.TableId; import org.apache.cassandra.schema.TableMetadata; import org.apache.cassandra.schema.Schema; @@ -339,4 +340,26 @@ public class PendingAntiCompactionTest Assert.assertTrue(cb.submittedCompactions.contains(cfm.id)); Assert.assertFalse(cb.submittedCompactions.contains(cfs2.metadata.id)); } + + + @Test + public void singleAnticompaction() throws Exception + { + cfs.disableAutoCompaction(); + makeSSTables(2); + + PendingAntiCompaction.AcquisitionCallable acquisitionCallable = new PendingAntiCompaction.AcquisitionCallable(cfs, FULL_RANGE, UUIDGen.getTimeUUID()); + PendingAntiCompaction.AcquireResult result = acquisitionCallable.call(); + UUID sessionID = UUIDGen.getTimeUUID(); + ActiveRepairService.instance.registerParentRepairSession(sessionID, + InetAddress.getByName("127.0.0.1"), + Lists.newArrayList(cfs), + FULL_RANGE, + true,0, + true, + PreviewKind.NONE); + CompactionManager.instance.performAnticompaction(result.cfs, FULL_RANGE, result.refs, result.txn, + ActiveRepairService.UNREPAIRED_SSTABLE, sessionID, sessionID); + + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org