Repository: cassandra Updated Branches: refs/heads/trunk f8b3a1588 -> ed0a07c38
Improve CompactionManager concurrency Patch by marcuse; reviewed by yukim for CASSANDRA-10099 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/ed0a07c3 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/ed0a07c3 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/ed0a07c3 Branch: refs/heads/trunk Commit: ed0a07c386658395803886ac5f1cf243cd413cbe Parents: f8b3a15 Author: Marcus Eriksson <marc...@apache.org> Authored: Mon Feb 8 15:56:12 2016 +0100 Committer: Marcus Eriksson <marc...@apache.org> Committed: Tue Mar 15 15:30:38 2016 +0100 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../compaction/AbstractCompactionStrategy.java | 6 + .../compaction/CompactionStrategyManager.java | 552 +++++++++++++------ .../DateTieredCompactionStrategy.java | 4 +- .../compaction/LeveledCompactionStrategy.java | 2 +- .../SizeTieredCompactionStrategy.java | 4 +- .../SSTableRepairStatusChanged.java | 4 +- .../cassandra/tools/StandaloneScrubber.java | 2 +- .../cassandra/db/lifecycle/TrackerTest.java | 2 +- 9 files changed, 396 insertions(+), 181 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/ed0a07c3/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 6f1c4a3..4fc03b7 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 3.6 + * Improve concurrency in CompactionManager (CASSANDRA-10099) * (cqlsh) interpret CQL type for formatting blobs (CASSANDRA-11274) * Refuse to start and print txn log information in case of disk corruption (CASSANDRA-10112) http://git-wip-us.apache.org/repos/asf/cassandra/blob/ed0a07c3/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java index cab56bb..b6d623b 100644 --- a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java +++ b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java @@ -315,6 +315,12 @@ public abstract class AbstractCompactionStrategy public abstract void addSSTable(SSTableReader added); + public synchronized void addSSTables(Iterable<SSTableReader> added) + { + for (SSTableReader sstable : added) + addSSTable(sstable); + } + public abstract void removeSSTable(SSTableReader sstable); public static class ScannerList implements AutoCloseable http://git-wip-us.apache.org/repos/asf/cassandra/blob/ed0a07c3/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java index a9d42eb..1d387dc 100644 --- a/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java @@ -20,8 +20,10 @@ package org.apache.cassandra.db.compaction; import java.util.*; import java.util.concurrent.Callable; +import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.stream.Collectors; +import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Iterables; import org.apache.cassandra.index.Index; import com.google.common.primitives.Ints; @@ -60,11 +62,15 @@ public class CompactionStrategyManager implements INotificationConsumer { private static final Logger logger = LoggerFactory.getLogger(CompactionStrategyManager.class); private final ColumnFamilyStore cfs; - private volatile List<AbstractCompactionStrategy> repaired = new ArrayList<>(); - private volatile List<AbstractCompactionStrategy> unrepaired = new ArrayList<>(); + private final List<AbstractCompactionStrategy> repaired = new ArrayList<>(); + private final List<AbstractCompactionStrategy> unrepaired = new ArrayList<>(); private volatile boolean enabled = true; - public boolean isActive = true; + public volatile boolean isActive = true; private volatile CompactionParams params; + private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); + private final ReentrantReadWriteLock.ReadLock readLock = lock.readLock(); + private final ReentrantReadWriteLock.WriteLock writeLock = lock.writeLock(); + /* We keep a copy of the schema compaction parameters here to be able to decide if we should update the compaction strategy in maybeReloadCompactionStrategy() due to an ALTER. @@ -72,7 +78,7 @@ public class CompactionStrategyManager implements INotificationConsumer If a user changes the local compaction strategy and then later ALTERs a compaction parameter, we will use the new compaction parameters. */ - private CompactionParams schemaCompactionParams; + private volatile CompactionParams schemaCompactionParams; private Directories.DataDirectory[] locations; public CompactionStrategyManager(ColumnFamilyStore cfs) @@ -92,22 +98,29 @@ public class CompactionStrategyManager implements INotificationConsumer * Returns a task for the compaction strategy that needs it the most (most estimated remaining tasks) * */ - public synchronized AbstractCompactionTask getNextBackgroundTask(int gcBefore) + public AbstractCompactionTask getNextBackgroundTask(int gcBefore) { if (!isEnabled()) return null; maybeReload(cfs.metadata); - - List<AbstractCompactionStrategy> strategies = new ArrayList<>(repaired.size() + unrepaired.size()); - strategies.addAll(repaired); - strategies.addAll(unrepaired); - Collections.sort(strategies, (o1, o2) -> Ints.compare(o2.getEstimatedRemainingTasks(), o1.getEstimatedRemainingTasks())); - for (AbstractCompactionStrategy strategy : strategies) + List<AbstractCompactionStrategy> strategies = new ArrayList<>(); + readLock.lock(); + try + { + strategies.addAll(repaired); + strategies.addAll(unrepaired); + Collections.sort(strategies, (o1, o2) -> Ints.compare(o2.getEstimatedRemainingTasks(), o1.getEstimatedRemainingTasks())); + for (AbstractCompactionStrategy strategy : strategies) + { + AbstractCompactionTask task = strategy.getNextBackgroundTask(gcBefore); + if (task != null) + return task; + } + } + finally { - AbstractCompactionTask task = strategy.getNextBackgroundTask(gcBefore); - if (task != null) - return task; + readLock.unlock(); } return null; } @@ -117,7 +130,7 @@ public class CompactionStrategyManager implements INotificationConsumer return enabled && isActive; } - public synchronized void resume() + public void resume() { isActive = true; } @@ -127,21 +140,28 @@ public class CompactionStrategyManager implements INotificationConsumer * * Separate call from enable/disable to not have to save the enabled-state externally */ - public synchronized void pause() + public void pause() { isActive = false; } - private void startup() { - for (SSTableReader sstable : cfs.getSSTables(SSTableSet.CANONICAL)) + writeLock.lock(); + try { - if (sstable.openReason != SSTableReader.OpenReason.EARLY) - getCompactionStrategyFor(sstable).addSSTable(sstable); + for (SSTableReader sstable : cfs.getSSTables(SSTableSet.CANONICAL)) + { + if (sstable.openReason != SSTableReader.OpenReason.EARLY) + getCompactionStrategyFor(sstable).addSSTable(sstable); + } + repaired.forEach(AbstractCompactionStrategy::startup); + unrepaired.forEach(AbstractCompactionStrategy::startup); + } + finally + { + writeLock.unlock(); } - repaired.forEach(AbstractCompactionStrategy::startup); - unrepaired.forEach(AbstractCompactionStrategy::startup); } /** @@ -154,10 +174,18 @@ public class CompactionStrategyManager implements INotificationConsumer private AbstractCompactionStrategy getCompactionStrategyFor(SSTableReader sstable) { int index = getCompactionStrategyIndex(cfs, getDirectories(), sstable); - if (sstable.isRepaired()) - return repaired.get(index); - else - return unrepaired.get(index); + readLock.lock(); + try + { + if (sstable.isRepaired()) + return repaired.get(index); + else + return unrepaired.get(index); + } + finally + { + readLock.unlock(); + } } /** @@ -200,18 +228,35 @@ public class CompactionStrategyManager implements INotificationConsumer public void shutdown() { - isActive = false; - repaired.forEach(AbstractCompactionStrategy::shutdown); - unrepaired.forEach(AbstractCompactionStrategy::shutdown); + writeLock.lock(); + try + { + isActive = false; + repaired.forEach(AbstractCompactionStrategy::shutdown); + unrepaired.forEach(AbstractCompactionStrategy::shutdown); + } + finally + { + writeLock.unlock(); + } } - public synchronized void maybeReload(CFMetaData metadata) + public void maybeReload(CFMetaData metadata) { // compare the old schema configuration to the new one, ignore any locally set changes. if (metadata.params.compaction.equals(schemaCompactionParams) && Arrays.equals(locations, cfs.getDirectories().getWriteableLocations())) // any drives broken? return; - reload(metadata); + + writeLock.lock(); + try + { + reload(metadata); + } + finally + { + writeLock.unlock(); + } } /** @@ -220,7 +265,7 @@ public class CompactionStrategyManager implements INotificationConsumer * Called after changing configuration and at startup. * @param metadata */ - public synchronized void reload(CFMetaData metadata) + private void reload(CFMetaData metadata) { boolean disabledWithJMX = !enabled && shouldBeEnabled(); if (!metadata.params.compaction.equals(schemaCompactionParams)) @@ -247,34 +292,50 @@ public class CompactionStrategyManager implements INotificationConsumer public int getUnleveledSSTables() { - if (repaired.get(0) instanceof LeveledCompactionStrategy && unrepaired.get(0) instanceof LeveledCompactionStrategy) + readLock.lock(); + try { - int count = 0; - for (AbstractCompactionStrategy strategy : repaired) - count += ((LeveledCompactionStrategy)strategy).getLevelSize(0); - for (AbstractCompactionStrategy strategy : unrepaired) - count += ((LeveledCompactionStrategy)strategy).getLevelSize(0); - return count; + if (repaired.get(0) instanceof LeveledCompactionStrategy && unrepaired.get(0) instanceof LeveledCompactionStrategy) + { + int count = 0; + for (AbstractCompactionStrategy strategy : repaired) + count += ((LeveledCompactionStrategy) strategy).getLevelSize(0); + for (AbstractCompactionStrategy strategy : unrepaired) + count += ((LeveledCompactionStrategy) strategy).getLevelSize(0); + return count; + } + } + finally + { + readLock.unlock(); } return 0; } - public synchronized int[] getSSTableCountPerLevel() + public int[] getSSTableCountPerLevel() { - if (repaired.get(0) instanceof LeveledCompactionStrategy && unrepaired.get(0) instanceof LeveledCompactionStrategy) + readLock.lock(); + try { - int [] res = new int[LeveledManifest.MAX_LEVEL_COUNT]; - for (AbstractCompactionStrategy strategy : repaired) + if (repaired.get(0) instanceof LeveledCompactionStrategy && unrepaired.get(0) instanceof LeveledCompactionStrategy) { - int[] repairedCountPerLevel = ((LeveledCompactionStrategy) strategy).getAllLevelSize(); - res = sumArrays(res, repairedCountPerLevel); - } - for (AbstractCompactionStrategy strategy : unrepaired) - { - int[] unrepairedCountPerLevel = ((LeveledCompactionStrategy) strategy).getAllLevelSize(); - res = sumArrays(res, unrepairedCountPerLevel); + int[] res = new int[LeveledManifest.MAX_LEVEL_COUNT]; + for (AbstractCompactionStrategy strategy : repaired) + { + int[] repairedCountPerLevel = ((LeveledCompactionStrategy) strategy).getAllLevelSize(); + res = sumArrays(res, repairedCountPerLevel); + } + for (AbstractCompactionStrategy strategy : unrepaired) + { + int[] unrepairedCountPerLevel = ((LeveledCompactionStrategy) strategy).getAllLevelSize(); + res = sumArrays(res, unrepairedCountPerLevel); + } + return res; } - return res; + } + finally + { + readLock.unlock(); } return null; } @@ -296,85 +357,112 @@ public class CompactionStrategyManager implements INotificationConsumer public boolean shouldDefragment() { - assert repaired.get(0).getClass().equals(unrepaired.get(0).getClass()); - return repaired.get(0).shouldDefragment(); + readLock.lock(); + try + { + assert repaired.get(0).getClass().equals(unrepaired.get(0).getClass()); + return repaired.get(0).shouldDefragment(); + } + finally + { + readLock.unlock(); + } } public Directories getDirectories() { - assert repaired.get(0).getClass().equals(unrepaired.get(0).getClass()); - return repaired.get(0).getDirectories(); + readLock.lock(); + try + { + assert repaired.get(0).getClass().equals(unrepaired.get(0).getClass()); + return repaired.get(0).getDirectories(); + } + finally + { + readLock.unlock(); + } } - public synchronized void handleNotification(INotification notification, Object sender) + private void handleFlushNotification(Iterable<SSTableReader> added) { - maybeReload(cfs.metadata); - if (notification instanceof SSTableAddedNotification) + readLock.lock(); + try { - SSTableAddedNotification flushedNotification = (SSTableAddedNotification) notification; - for (SSTableReader sstable : flushedNotification.added) + for (SSTableReader sstable : added) getCompactionStrategyFor(sstable).addSSTable(sstable); } - else if (notification instanceof SSTableListChangedNotification) + finally { - // a bit of gymnastics to be able to replace sstables in compaction strategies - // we use this to know that a compaction finished and where to start the next compaction in LCS - SSTableListChangedNotification listChangedNotification = (SSTableListChangedNotification) notification; + readLock.unlock(); + } + } - Directories.DataDirectory [] locations = cfs.getDirectories().getWriteableLocations(); - int locationSize = cfs.getPartitioner().splitter().isPresent() ? locations.length : 1; + private void handleListChangedNotification(Iterable<SSTableReader> added, Iterable<SSTableReader> removed) + { + // a bit of gymnastics to be able to replace sstables in compaction strategies + // we use this to know that a compaction finished and where to start the next compaction in LCS + Directories.DataDirectory [] locations = cfs.getDirectories().getWriteableLocations(); + int locationSize = cfs.getPartitioner().splitter().isPresent() ? locations.length : 1; - List<Set<SSTableReader>> repairedRemoved = new ArrayList<>(locationSize); - List<Set<SSTableReader>> repairedAdded = new ArrayList<>(locationSize); - List<Set<SSTableReader>> unrepairedRemoved = new ArrayList<>(locationSize); - List<Set<SSTableReader>> unrepairedAdded = new ArrayList<>(locationSize); + List<Set<SSTableReader>> repairedRemoved = new ArrayList<>(locationSize); + List<Set<SSTableReader>> repairedAdded = new ArrayList<>(locationSize); + List<Set<SSTableReader>> unrepairedRemoved = new ArrayList<>(locationSize); + List<Set<SSTableReader>> unrepairedAdded = new ArrayList<>(locationSize); - for (int i = 0; i < locationSize; i++) - { - repairedRemoved.add(new HashSet<>()); - repairedAdded.add(new HashSet<>()); - unrepairedRemoved.add(new HashSet<>()); - unrepairedAdded.add(new HashSet<>()); - } - - for (SSTableReader sstable : listChangedNotification.removed) - { - int i = getCompactionStrategyIndex(cfs, getDirectories(), sstable); - if (sstable.isRepaired()) - repairedRemoved.get(i).add(sstable); - else - unrepairedRemoved.get(i).add(sstable); - } - for (SSTableReader sstable : listChangedNotification.added) - { - int i = getCompactionStrategyIndex(cfs, getDirectories(), sstable); - if (sstable.isRepaired()) - repairedAdded.get(i).add(sstable); - else - unrepairedAdded.get(i).add(sstable); - } + for (int i = 0; i < locationSize; i++) + { + repairedRemoved.add(new HashSet<>()); + repairedAdded.add(new HashSet<>()); + unrepairedRemoved.add(new HashSet<>()); + unrepairedAdded.add(new HashSet<>()); + } + for (SSTableReader sstable : removed) + { + int i = getCompactionStrategyIndex(cfs, getDirectories(), sstable); + if (sstable.isRepaired()) + repairedRemoved.get(i).add(sstable); + else + unrepairedRemoved.get(i).add(sstable); + } + for (SSTableReader sstable : added) + { + int i = getCompactionStrategyIndex(cfs, getDirectories(), sstable); + if (sstable.isRepaired()) + repairedAdded.get(i).add(sstable); + else + unrepairedAdded.get(i).add(sstable); + } + // we need write lock here since we might be moving sstables between strategies + writeLock.lock(); + try + { for (int i = 0; i < locationSize; i++) { if (!repairedRemoved.get(i).isEmpty()) repaired.get(i).replaceSSTables(repairedRemoved.get(i), repairedAdded.get(i)); else - { - for (SSTableReader sstable : repairedAdded.get(i)) - repaired.get(i).addSSTable(sstable); - } + repaired.get(i).addSSTables(repairedAdded.get(i)); + if (!unrepairedRemoved.get(i).isEmpty()) unrepaired.get(i).replaceSSTables(unrepairedRemoved.get(i), unrepairedAdded.get(i)); else - { - for (SSTableReader sstable : unrepairedAdded.get(i)) - unrepaired.get(i).addSSTable(sstable); - } + unrepaired.get(i).addSSTables(unrepairedAdded.get(i)); } } - else if (notification instanceof SSTableRepairStatusChanged) + finally + { + writeLock.unlock(); + } + } + + private void handleRepairStatusChangedNotification(Iterable<SSTableReader> sstables) + { + // we need a write lock here since we move sstables from one strategy instance to another + writeLock.lock(); + try { - for (SSTableReader sstable : ((SSTableRepairStatusChanged) notification).sstable) + for (SSTableReader sstable : sstables) { int index = getCompactionStrategyIndex(cfs, getDirectories(), sstable); if (sstable.isRepaired()) @@ -389,31 +477,81 @@ public class CompactionStrategyManager implements INotificationConsumer } } } + finally + { + writeLock.unlock(); + } + } + + private void handleDeletingNotification(SSTableReader deleted) + { + readLock.lock(); + try + { + getCompactionStrategyFor(deleted).removeSSTable(deleted); + } + finally + { + readLock.unlock(); + } + } + + public void handleNotification(INotification notification, Object sender) + { + maybeReload(cfs.metadata); + if (notification instanceof SSTableAddedNotification) + { + handleFlushNotification(((SSTableAddedNotification) notification).added); + } + else if (notification instanceof SSTableListChangedNotification) + { + SSTableListChangedNotification listChangedNotification = (SSTableListChangedNotification) notification; + handleListChangedNotification(listChangedNotification.added, listChangedNotification.removed); + } + else if (notification instanceof SSTableRepairStatusChanged) + { + handleRepairStatusChangedNotification(((SSTableRepairStatusChanged) notification).sstables); + } else if (notification instanceof SSTableDeletingNotification) { - SSTableReader sstable = ((SSTableDeletingNotification) notification).deleting; - getCompactionStrategyFor(sstable).removeSSTable(sstable); + handleDeletingNotification(((SSTableDeletingNotification) notification).deleting); } } public void enable() { - if (repaired != null) - repaired.forEach(AbstractCompactionStrategy::enable); - if (unrepaired != null) - unrepaired.forEach(AbstractCompactionStrategy::enable); - // enable this last to make sure the strategies are ready to get calls. - enabled = true; + writeLock.lock(); + try + { + if (repaired != null) + repaired.forEach(AbstractCompactionStrategy::enable); + if (unrepaired != null) + unrepaired.forEach(AbstractCompactionStrategy::enable); + // enable this last to make sure the strategies are ready to get calls. + enabled = true; + } + finally + { + writeLock.unlock(); + } } public void disable() { - // disable this first avoid asking disabled strategies for compaction tasks - enabled = false; - if (repaired != null) - repaired.forEach(AbstractCompactionStrategy::disable); - if (unrepaired != null) - unrepaired.forEach(AbstractCompactionStrategy::disable); + writeLock.lock(); + try + { + // disable this first avoid asking disabled strategies for compaction tasks + enabled = false; + if (repaired != null) + repaired.forEach(AbstractCompactionStrategy::disable); + if (unrepaired != null) + unrepaired.forEach(AbstractCompactionStrategy::disable); + } + finally + { + writeLock.unlock(); + } } /** @@ -425,7 +563,7 @@ public class CompactionStrategyManager implements INotificationConsumer * @return */ @SuppressWarnings("resource") - public synchronized AbstractCompactionStrategy.ScannerList getScanners(Collection<SSTableReader> sstables, Collection<Range<Token>> ranges) + public AbstractCompactionStrategy.ScannerList getScanners(Collection<SSTableReader> sstables, Collection<Range<Token>> ranges) { assert repaired.size() == unrepaired.size(); List<Set<SSTableReader>> repairedSSTables = new ArrayList<>(); @@ -447,68 +585,92 @@ public class CompactionStrategyManager implements INotificationConsumer List<ISSTableScanner> scanners = new ArrayList<>(sstables.size()); - for (Range<Token> range : ranges) + readLock.lock(); + try { - List<ISSTableScanner> repairedScanners = new ArrayList<>(); - List<ISSTableScanner> unrepairedScanners = new ArrayList<>(); - - for (int i = 0; i < repairedSSTables.size(); i++) - { - if (!repairedSSTables.get(i).isEmpty()) - repairedScanners.addAll(repaired.get(i).getScanners(repairedSSTables.get(i), range).scanners); - } - for (int i = 0; i < unrepairedSSTables.size(); i++) + for (Range<Token> range : ranges) { - if (!unrepairedSSTables.get(i).isEmpty()) - scanners.addAll(unrepaired.get(i).getScanners(unrepairedSSTables.get(i), range).scanners); - } - for (ISSTableScanner scanner : Iterables.concat(repairedScanners, unrepairedScanners)) - { - if (!scanners.add(scanner)) - scanner.close(); + List<ISSTableScanner> repairedScanners = new ArrayList<>(); + List<ISSTableScanner> unrepairedScanners = new ArrayList<>(); + + for (int i = 0; i < repairedSSTables.size(); i++) + { + if (!repairedSSTables.get(i).isEmpty()) + repairedScanners.addAll(repaired.get(i).getScanners(repairedSSTables.get(i), range).scanners); + } + for (int i = 0; i < unrepairedSSTables.size(); i++) + { + if (!unrepairedSSTables.get(i).isEmpty()) + scanners.addAll(unrepaired.get(i).getScanners(unrepairedSSTables.get(i), range).scanners); + } + for (ISSTableScanner scanner : Iterables.concat(repairedScanners, unrepairedScanners)) + { + if (!scanners.add(scanner)) + scanner.close(); + } } + return new AbstractCompactionStrategy.ScannerList(scanners); + } + finally + { + readLock.unlock(); } - return new AbstractCompactionStrategy.ScannerList(scanners); } - public synchronized AbstractCompactionStrategy.ScannerList getScanners(Collection<SSTableReader> sstables) + public AbstractCompactionStrategy.ScannerList getScanners(Collection<SSTableReader> sstables) { return getScanners(sstables, Collections.singleton(null)); } public Collection<Collection<SSTableReader>> groupSSTablesForAntiCompaction(Collection<SSTableReader> sstablesToGroup) { - Map<Integer, List<SSTableReader>> groups = sstablesToGroup.stream().collect(Collectors.groupingBy((s) -> getCompactionStrategyIndex(cfs, getDirectories(), s))); - Collection<Collection<SSTableReader>> anticompactionGroups = new ArrayList<>(); + readLock.lock(); + try + { + Map<Integer, List<SSTableReader>> groups = sstablesToGroup.stream().collect(Collectors.groupingBy((s) -> getCompactionStrategyIndex(cfs, getDirectories(), s))); + Collection<Collection<SSTableReader>> anticompactionGroups = new ArrayList<>(); - for (Map.Entry<Integer, List<SSTableReader>> group : groups.entrySet()) - anticompactionGroups.addAll(unrepaired.get(group.getKey()).groupSSTablesForAntiCompaction(group.getValue())); - return anticompactionGroups; + for (Map.Entry<Integer, List<SSTableReader>> group : groups.entrySet()) + anticompactionGroups.addAll(unrepaired.get(group.getKey()).groupSSTablesForAntiCompaction(group.getValue())); + return anticompactionGroups; + } + finally + { + readLock.unlock(); + } } public long getMaxSSTableBytes() { - return unrepaired.get(0).getMaxSSTableBytes(); + readLock.lock(); + try + { + return unrepaired.get(0).getMaxSSTableBytes(); + } + finally + { + readLock.unlock(); + } } public AbstractCompactionTask getCompactionTask(LifecycleTransaction txn, int gcBefore, long maxSSTableBytes) { maybeReload(cfs.metadata); - validateForCompaction(txn.originals()); + validateForCompaction(txn.originals(), cfs, getDirectories()); return getCompactionStrategyFor(txn.originals().iterator().next()).getCompactionTask(txn, gcBefore, maxSSTableBytes); } - private void validateForCompaction(Iterable<SSTableReader> input) + private static void validateForCompaction(Iterable<SSTableReader> input, ColumnFamilyStore cfs, Directories directories) { SSTableReader firstSSTable = Iterables.getFirst(input, null); assert firstSSTable != null; boolean repaired = firstSSTable.isRepaired(); - int firstIndex = getCompactionStrategyIndex(cfs, getDirectories(), firstSSTable); + int firstIndex = getCompactionStrategyIndex(cfs, directories, firstSSTable); for (SSTableReader sstable : input) { if (sstable.isRepaired() != repaired) throw new UnsupportedOperationException("You can't mix repaired and unrepaired data in a compaction"); - if (firstIndex != getCompactionStrategyIndex(cfs, getDirectories(), sstable)) + if (firstIndex != getCompactionStrategyIndex(cfs, directories, sstable)) throw new UnsupportedOperationException("You can't mix sstables from different directories in a compaction"); } } @@ -524,9 +686,10 @@ public class CompactionStrategyManager implements INotificationConsumer @Override public Collection<AbstractCompactionTask> call() throws Exception { - synchronized (CompactionStrategyManager.this) + List<AbstractCompactionTask> tasks = new ArrayList<>(); + readLock.lock(); + try { - List<AbstractCompactionTask> tasks = new ArrayList<>(); for (AbstractCompactionStrategy strategy : repaired) { Collection<AbstractCompactionTask> task = strategy.getMaximalTask(gcBefore, splitOutput); @@ -539,10 +702,14 @@ public class CompactionStrategyManager implements INotificationConsumer if (task != null) tasks.addAll(task); } - if (tasks.isEmpty()) - return null; - return tasks; } + finally + { + readLock.unlock(); + } + if (tasks.isEmpty()) + return null; + return tasks; } }, false, false); } @@ -550,18 +717,34 @@ public class CompactionStrategyManager implements INotificationConsumer public AbstractCompactionTask getUserDefinedTask(Collection<SSTableReader> sstables, int gcBefore) { maybeReload(cfs.metadata); - validateForCompaction(sstables); - return getCompactionStrategyFor(sstables.iterator().next()).getUserDefinedTask(sstables, gcBefore); + validateForCompaction(sstables, cfs, getDirectories()); + readLock.lock(); + try + { + return getCompactionStrategyFor(sstables.iterator().next()).getUserDefinedTask(sstables, gcBefore); + } + finally + { + readLock.unlock(); + } } public int getEstimatedRemainingTasks() { int tasks = 0; - for (AbstractCompactionStrategy strategy : repaired) - tasks += strategy.getEstimatedRemainingTasks(); - for (AbstractCompactionStrategy strategy : unrepaired) - tasks += strategy.getEstimatedRemainingTasks(); + readLock.lock(); + try + { + for (AbstractCompactionStrategy strategy : repaired) + tasks += strategy.getEstimatedRemainingTasks(); + for (AbstractCompactionStrategy strategy : unrepaired) + tasks += strategy.getEstimatedRemainingTasks(); + } + finally + { + readLock.unlock(); + } return tasks; } @@ -572,23 +755,40 @@ public class CompactionStrategyManager implements INotificationConsumer public String getName() { - return unrepaired.get(0).getName(); + readLock.lock(); + try + { + return unrepaired.get(0).getName(); + } + finally + { + readLock.unlock(); + } } - public List<List<AbstractCompactionStrategy>> getStrategies() + @VisibleForTesting + List<List<AbstractCompactionStrategy>> getStrategies() { return Arrays.asList(repaired, unrepaired); } - public synchronized void setNewLocalCompactionStrategy(CompactionParams params) + public void setNewLocalCompactionStrategy(CompactionParams params) { logger.info("Switching local compaction strategy from {} to {}}", this.params, params); - setStrategy(params); - if (shouldBeEnabled()) - enable(); - else - disable(); - startup(); + writeLock.lock(); + try + { + setStrategy(params); + if (shouldBeEnabled()) + enable(); + else + disable(); + startup(); + } + finally + { + writeLock.unlock(); + } } private void setStrategy(CompactionParams params) @@ -633,13 +833,21 @@ public class CompactionStrategyManager implements INotificationConsumer Collection<Index> indexes, LifecycleTransaction txn) { - if (repairedAt == ActiveRepairService.UNREPAIRED_SSTABLE) + readLock.lock(); + try { - return unrepaired.get(0).createSSTableMultiWriter(descriptor, keyCount, repairedAt, collector, header, indexes, txn); + if (repairedAt == ActiveRepairService.UNREPAIRED_SSTABLE) + { + return unrepaired.get(0).createSSTableMultiWriter(descriptor, keyCount, repairedAt, collector, header, indexes, txn); + } + else + { + return repaired.get(0).createSSTableMultiWriter(descriptor, keyCount, repairedAt, collector, header, indexes, txn); + } } - else + finally { - return repaired.get(0).createSSTableMultiWriter(descriptor, keyCount, repairedAt, collector, header, indexes, txn); + readLock.unlock(); } } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/ed0a07c3/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java index 248473c..9a17e06 100644 --- a/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java +++ b/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java @@ -63,7 +63,7 @@ public class DateTieredCompactionStrategy extends AbstractCompactionStrategy @Override @SuppressWarnings("resource") - public synchronized AbstractCompactionTask getNextBackgroundTask(int gcBefore) + public AbstractCompactionTask getNextBackgroundTask(int gcBefore) { while (true) { @@ -83,7 +83,7 @@ public class DateTieredCompactionStrategy extends AbstractCompactionStrategy * @param gcBefore * @return */ - private List<SSTableReader> getNextBackgroundSSTables(final int gcBefore) + private synchronized List<SSTableReader> getNextBackgroundSSTables(final int gcBefore) { if (sstables.isEmpty()) return Collections.emptyList(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/ed0a07c3/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java index 953971a..84ddbbc 100644 --- a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java +++ b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java @@ -90,7 +90,7 @@ public class LeveledCompactionStrategy extends AbstractCompactionStrategy * (by explicit user request) even when compaction is disabled. */ @SuppressWarnings("resource") - public synchronized AbstractCompactionTask getNextBackgroundTask(int gcBefore) + public AbstractCompactionTask getNextBackgroundTask(int gcBefore) { while (true) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/ed0a07c3/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java index e36adf2..28bdf5c 100644 --- a/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java +++ b/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java @@ -75,7 +75,7 @@ public class SizeTieredCompactionStrategy extends AbstractCompactionStrategy this.sizeTieredOptions = new SizeTieredCompactionStrategyOptions(options); } - private List<SSTableReader> getNextBackgroundSSTables(final int gcBefore) + private synchronized List<SSTableReader> getNextBackgroundSSTables(final int gcBefore) { // make local copies so they can't be changed out from under us mid-method int minThreshold = cfs.getMinimumCompactionThreshold(); @@ -175,7 +175,7 @@ public class SizeTieredCompactionStrategy extends AbstractCompactionStrategy } @SuppressWarnings("resource") - public synchronized AbstractCompactionTask getNextBackgroundTask(int gcBefore) + public AbstractCompactionTask getNextBackgroundTask(int gcBefore) { while (true) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/ed0a07c3/src/java/org/apache/cassandra/notifications/SSTableRepairStatusChanged.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/notifications/SSTableRepairStatusChanged.java b/src/java/org/apache/cassandra/notifications/SSTableRepairStatusChanged.java index d1398bc..8c48fa8 100644 --- a/src/java/org/apache/cassandra/notifications/SSTableRepairStatusChanged.java +++ b/src/java/org/apache/cassandra/notifications/SSTableRepairStatusChanged.java @@ -24,10 +24,10 @@ import org.apache.cassandra.io.sstable.format.SSTableReader; public class SSTableRepairStatusChanged implements INotification { - public final Collection<SSTableReader> sstable; + public final Collection<SSTableReader> sstables; public SSTableRepairStatusChanged(Collection<SSTableReader> repairStatusChanged) { - this.sstable = repairStatusChanged; + this.sstables = repairStatusChanged; } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/ed0a07c3/src/java/org/apache/cassandra/tools/StandaloneScrubber.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/tools/StandaloneScrubber.java b/src/java/org/apache/cassandra/tools/StandaloneScrubber.java index 4249430..42772ef 100644 --- a/src/java/org/apache/cassandra/tools/StandaloneScrubber.java +++ b/src/java/org/apache/cassandra/tools/StandaloneScrubber.java @@ -161,7 +161,7 @@ public class StandaloneScrubber private static void checkManifest(CompactionStrategyManager strategyManager, ColumnFamilyStore cfs, Collection<SSTableReader> sstables) { int maxSizeInMB = (int)((cfs.getCompactionStrategyManager().getMaxSSTableBytes()) / (1024L * 1024L)); - if (strategyManager.getStrategies().size() == 2 && strategyManager.getStrategies().get(0) instanceof LeveledCompactionStrategy) + if (strategyManager.getCompactionParams().klass().equals(LeveledCompactionStrategy.class)) { System.out.println("Checking leveled manifest"); Predicate<SSTableReader> repairedPredicate = new Predicate<SSTableReader>() http://git-wip-us.apache.org/repos/asf/cassandra/blob/ed0a07c3/test/unit/org/apache/cassandra/db/lifecycle/TrackerTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/lifecycle/TrackerTest.java b/test/unit/org/apache/cassandra/db/lifecycle/TrackerTest.java index b3dc3d9..0294115 100644 --- a/test/unit/org/apache/cassandra/db/lifecycle/TrackerTest.java +++ b/test/unit/org/apache/cassandra/db/lifecycle/TrackerTest.java @@ -349,7 +349,7 @@ public class TrackerTest Assert.assertEquals(singleton(r2), ((SSTableListChangedNotification) listener.received.get(0)).added); listener.received.clear(); tracker.notifySSTableRepairedStatusChanged(singleton(r1)); - Assert.assertEquals(singleton(r1), ((SSTableRepairStatusChanged) listener.received.get(0)).sstable); + Assert.assertEquals(singleton(r1), ((SSTableRepairStatusChanged) listener.received.get(0)).sstables); listener.received.clear(); Memtable memtable = MockSchema.memtable(cfs); tracker.notifyRenewed(memtable);