Repository: cassandra Updated Branches: refs/heads/trunk 37771f31b -> 7d4d1a325
Rework CSM.getScanners synchronization Patch by Blake Eggleston; Reviewed by Marcus Eriksson for CASSANDRA-13786 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/7d4d1a32 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/7d4d1a32 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/7d4d1a32 Branch: refs/heads/trunk Commit: 7d4d1a32581ff40ed1049833631832054bcf2316 Parents: 37771f3 Author: Blake Eggleston <[email protected]> Authored: Thu Aug 24 13:00:44 2017 -0700 Committer: Blake Eggleston <[email protected]> Committed: Tue Sep 12 13:31:47 2017 -0700 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../db/compaction/CompactionManager.java | 21 +++----- .../compaction/CompactionStrategyManager.java | 56 ++++++++++++++++---- .../db/compaction/PendingRepairManager.java | 9 +--- 4 files changed, 53 insertions(+), 34 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/7d4d1a32/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index a5dc68d..ebe0dc0 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 4.0 + * Rework CompactionStrategyManager.getScanners synchronization (CASSANDRA-13786) * Add additional unit tests for batch behavior, TTLs, Timestamps (CASSANDRA-13846) * Add keyspace and table name in schema validation exception (CASSANDRA-13845) * Emit metrics whenever we hit tombstone failures and warn thresholds (CASSANDRA-13771) http://git-wip-us.apache.org/repos/asf/cassandra/blob/7d4d1a32/src/java/org/apache/cassandra/db/compaction/CompactionManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java index 722a5d0..5619da7 100644 --- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java @@ -642,17 +642,12 @@ public class CompactionManager implements CompactionManagerMBean logger.info("{} Starting anticompaction for {}.{} on {}/{} sstables", PreviewKind.NONE.logPrefix(parentRepairSession), cfs.keyspace.getName(), cfs.getTableName(), validatedForRepair.size(), cfs.getLiveSSTables().size()); logger.trace("{} Starting anticompaction for ranges {}", PreviewKind.NONE.logPrefix(parentRepairSession), ranges); Set<SSTableReader> sstables = new HashSet<>(validatedForRepair); - Set<SSTableReader> mutatedRepairStatuses = new HashSet<>(); - // we should only notify that repair status changed if it actually did: - Set<SSTableReader> mutatedRepairStatusToNotify = new HashSet<>(); - Map<SSTableReader, Boolean> wasRepairedBefore = new HashMap<>(); - for (SSTableReader sstable : sstables) - wasRepairedBefore.put(sstable, sstable.isRepaired()); Set<SSTableReader> nonAnticompacting = new HashSet<>(); Iterator<SSTableReader> sstableIterator = sstables.iterator(); List<Range<Token>> normalizedRanges = Range.normalize(ranges); + Set<SSTableReader> fullyContainedSSTables = new HashSet<>(); while (sstableIterator.hasNext()) { @@ -667,11 +662,7 @@ public class CompactionManager implements CompactionManagerMBean if (r.contains(sstableRange)) { logger.info("{} SSTable {} fully contained in range {}, mutating repairedAt instead of anticompacting", PreviewKind.NONE.logPrefix(parentRepairSession), sstable, r); - sstable.descriptor.getMetadataSerializer().mutateRepaired(sstable.descriptor, repairedAt, pendingRepair); - sstable.reloadSSTableMetadata(); - mutatedRepairStatuses.add(sstable); - if (!wasRepairedBefore.get(sstable)) - mutatedRepairStatusToNotify.add(sstable); + fullyContainedSSTables.add(sstable); sstableIterator.remove(); shouldAnticompact = true; break; @@ -690,10 +681,10 @@ public class CompactionManager implements CompactionManagerMBean sstableIterator.remove(); } } - cfs.metric.bytesMutatedAnticompaction.inc(SSTableReader.getTotalBytes(mutatedRepairStatuses)); - cfs.getTracker().notifySSTableRepairedStatusChanged(mutatedRepairStatusToNotify); - txn.cancel(Sets.union(nonAnticompacting, mutatedRepairStatuses)); - validatedForRepair.release(Sets.union(nonAnticompacting, mutatedRepairStatuses)); + cfs.metric.bytesMutatedAnticompaction.inc(SSTableReader.getTotalBytes(fullyContainedSSTables)); + cfs.getCompactionStrategyManager().mutateRepaired(fullyContainedSSTables, repairedAt, pendingRepair); + txn.cancel(Sets.union(nonAnticompacting, fullyContainedSSTables)); + validatedForRepair.release(Sets.union(nonAnticompacting, fullyContainedSSTables)); assert txn.originals().equals(sstables); if (!sstables.isEmpty()) doAntiCompaction(cfs, ranges, txn, repairedAt, pendingRepair); http://git-wip-us.apache.org/repos/asf/cassandra/blob/7d4d1a32/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java index 3b1bc41..9192b70 100644 --- a/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java @@ -18,6 +18,7 @@ package org.apache.cassandra.db.compaction; +import java.io.IOException; import java.util.*; import java.util.concurrent.Callable; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -754,22 +755,22 @@ public class CompactionStrategyManager implements INotificationConsumer unrepairedSSTables.add(new HashSet<>()); } - for (SSTableReader sstable : sstables) - { - int idx = getCompactionStrategyIndex(cfs, getDirectories(), sstable); - if (sstable.isPendingRepair()) - pendingSSTables.get(idx).add(sstable); - else if (sstable.isRepaired()) - repairedSSTables.get(idx).add(sstable); - else - unrepairedSSTables.get(idx).add(sstable); - } - List<ISSTableScanner> scanners = new ArrayList<>(sstables.size()); readLock.lock(); try { + for (SSTableReader sstable : sstables) + { + int idx = getCompactionStrategyIndex(cfs, getDirectories(), sstable); + if (sstable.isPendingRepair()) + pendingSSTables.get(idx).add(sstable); + else if (sstable.isRepaired()) + repairedSSTables.get(idx).add(sstable); + else + unrepairedSSTables.get(idx).add(sstable); + } + for (int i = 0; i < pendingSSTables.size(); i++) { if (!pendingSSTables.get(i).isEmpty()) @@ -1163,4 +1164,37 @@ public class CompactionStrategyManager implements INotificationConsumer { return pendingRepairs; } + + /** + * Mutates sstable repairedAt times and notifies listeners of the change with the writeLock held. Prevents races + * with other processes between when the metadata is changed and when sstables are moved between strategies. + */ + public void mutateRepaired(Collection<SSTableReader> sstables, long repairedAt, UUID pendingRepair) throws IOException + { + Set<SSTableReader> changed = new HashSet<>(); + + writeLock.lock(); + try + { + for (SSTableReader sstable: sstables) + { + sstable.descriptor.getMetadataSerializer().mutateRepaired(sstable.descriptor, repairedAt, pendingRepair); + sstable.reloadSSTableMetadata(); + changed.add(sstable); + } + } + finally + { + try + { + // if there was an exception mutating repairedAt, we should still notify for the + // sstables that we were able to modify successfully before releasing the lock + cfs.getTracker().notifySSTableRepairedStatusChanged(changed); + } + finally + { + writeLock.unlock(); + } + } + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/7d4d1a32/src/java/org/apache/cassandra/db/compaction/PendingRepairManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/PendingRepairManager.java b/src/java/org/apache/cassandra/db/compaction/PendingRepairManager.java index 8ee6025..98acbdb 100644 --- a/src/java/org/apache/cassandra/db/compaction/PendingRepairManager.java +++ b/src/java/org/apache/cassandra/db/compaction/PendingRepairManager.java @@ -436,18 +436,11 @@ class PendingRepairManager try { logger.debug("Setting repairedAt to {} on {} for {}", repairedAt, transaction.originals(), sessionID); - for (SSTableReader sstable : transaction.originals()) - { - sstable.descriptor.getMetadataSerializer().mutateRepaired(sstable.descriptor, repairedAt, ActiveRepairService.NO_PENDING_REPAIR); - sstable.reloadSSTableMetadata(); - } + cfs.getCompactionStrategyManager().mutateRepaired(transaction.originals(), repairedAt, ActiveRepairService.NO_PENDING_REPAIR); completed = true; } finally { - // even if we weren't able to rewrite all the sstable metedata, we should still move the ones that were - cfs.getTracker().notifySSTableRepairedStatusChanged(transaction.originals()); - // we always abort because mutating metadata isn't guarded by LifecycleTransaction, so this won't roll // anything back. Also, we don't want to obsolete the originals. We're only using it to prevent other // compactions from marking these sstables compacting, and unmarking them when we're done --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
