Repository: cassandra Updated Branches: refs/heads/trunk 13ecf33d9 -> 7df3a5c99
Remove wrapping compaction strategy Patch by marcuse; reviewed by JoshuaMcKenzie for CASSANDRA-9342 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/7df3a5c9 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/7df3a5c9 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/7df3a5c9 Branch: refs/heads/trunk Commit: 7df3a5c99da4a352b3b8599b2965a86dec1777e5 Parents: 13ecf33 Author: Marcus Eriksson <[email protected]> Authored: Mon May 11 13:43:13 2015 +0200 Committer: Marcus Eriksson <[email protected]> Committed: Fri Jun 5 10:34:04 2015 +0200 ---------------------------------------------------------------------- .../org/apache/cassandra/config/CFMetaData.java | 2 - .../cassandra/db/CollationController.java | 2 +- .../apache/cassandra/db/ColumnFamilyStore.java | 35 +- src/java/org/apache/cassandra/db/Keyspace.java | 2 +- .../db/compaction/CompactionManager.java | 16 +- .../compaction/CompactionStrategyManager.java | 463 +++++++++++++++++++ .../cassandra/db/compaction/CompactionTask.java | 5 +- .../db/compaction/LeveledManifest.java | 19 +- .../cassandra/db/compaction/Upgrader.java | 8 +- .../compaction/WrappingCompactionStrategy.java | 374 --------------- .../cassandra/metrics/ColumnFamilyMetrics.java | 2 +- .../cassandra/metrics/CompactionMetrics.java | 2 +- .../cassandra/service/CassandraDaemon.java | 2 +- .../cassandra/tools/StandaloneScrubber.java | 9 +- .../LongLeveledCompactionStrategyTest.java | 3 +- test/unit/org/apache/cassandra/Util.java | 2 +- .../compaction/CompactionAwareWriterTest.java | 2 +- .../db/compaction/CompactionsPurgeTest.java | 4 +- .../db/compaction/CompactionsTest.java | 4 +- .../LeveledCompactionStrategyTest.java | 16 +- .../io/sstable/SSTableMetadataTest.java | 1 - .../io/sstable/SSTableRewriterTest.java | 8 +- 22 files changed, 533 insertions(+), 448 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/7df3a5c9/src/java/org/apache/cassandra/config/CFMetaData.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/config/CFMetaData.java b/src/java/org/apache/cassandra/config/CFMetaData.java index d8eeaf2..6bff44d 100644 --- a/src/java/org/apache/cassandra/config/CFMetaData.java +++ b/src/java/org/apache/cassandra/config/CFMetaData.java @@ -854,8 +854,6 @@ public final class CFMetaData { className = className.contains(".") ? className : "org.apache.cassandra.db.compaction." + className; Class<AbstractCompactionStrategy> strategyClass = FBUtilities.classForName(className, "compaction strategy"); - if (className.equals(WrappingCompactionStrategy.class.getName())) - throw new ConfigurationException("You can't set WrappingCompactionStrategy as the compaction strategy!"); if (!AbstractCompactionStrategy.class.isAssignableFrom(strategyClass)) throw new ConfigurationException(String.format("Specified compaction strategy class (%s) is not derived from AbstractReplicationStrategy", className)); http://git-wip-us.apache.org/repos/asf/cassandra/blob/7df3a5c9/src/java/org/apache/cassandra/db/CollationController.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/CollationController.java b/src/java/org/apache/cassandra/db/CollationController.java index 1e9a56d..7f6d439 100644 --- a/src/java/org/apache/cassandra/db/CollationController.java +++ b/src/java/org/apache/cassandra/db/CollationController.java @@ -149,7 +149,7 @@ public class CollationController // "hoist up" the requested data into a more recent sstable if (sstablesIterated > cfs.getMinimumCompactionThreshold() && !cfs.isAutoCompactionDisabled() - && cfs.getCompactionStrategy().shouldDefragment()) + && cfs.getCompactionStrategyManager().shouldDefragment()) { // !!WARNING!! if we stop copying our data to a heap-managed object, // we will need to track the lifetime of this mutation as well http://git-wip-us.apache.org/repos/asf/cassandra/blob/7df3a5c9/src/java/org/apache/cassandra/db/ColumnFamilyStore.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java index 865bac9..2f6dce1 100644 --- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java +++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java @@ -173,7 +173,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean /* These are locally held copies to be changed from the config during runtime */ private volatile DefaultInteger minCompactionThreshold; private volatile DefaultInteger maxCompactionThreshold; - private final WrappingCompactionStrategy compactionStrategyWrapper; + private final CompactionStrategyManager compactionStrategyManager; public final Directories directories; @@ -199,7 +199,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean for (ColumnFamilyStore cfs : concatWithIndexes()) cfs.maxCompactionThreshold = new DefaultInteger(metadata.getMaxCompactionThreshold()); - compactionStrategyWrapper.maybeReloadCompactionStrategy(metadata); + compactionStrategyManager.maybeReload(metadata); scheduleFlush(); @@ -250,7 +250,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean try { metadata.compactionStrategyClass = CFMetaData.createCompactionStrategy(compactionStrategyClass); - compactionStrategyWrapper.maybeReloadCompactionStrategy(metadata); + compactionStrategyManager.maybeReload(metadata); } catch (ConfigurationException e) { @@ -348,12 +348,12 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean CacheService.instance.keyCache.loadSaved(this); // compaction strategy should be created after the CFS has been prepared - this.compactionStrategyWrapper = new WrappingCompactionStrategy(this); + this.compactionStrategyManager = new CompactionStrategyManager(this); if (maxCompactionThreshold.value() <= 0 || minCompactionThreshold.value() <=0) { logger.warn("Disabling compaction strategy by setting compaction thresholds to 0 is deprecated, set the compaction option 'enabled' to 'false' instead."); - this.compactionStrategyWrapper.disable(); + this.compactionStrategyManager.disable(); } // create the private ColumnFamilyStores for the secondary column indexes @@ -434,6 +434,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean } latencyCalculator.cancel(false); + compactionStrategyManager.shutdown(); SystemKeyspace.removeTruncationRecord(metadata.cfId); data.dropSSTables(); indexManager.invalidate(); @@ -1499,7 +1500,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean void replaceFlushed(Memtable memtable, SSTableReader sstable) { - compactionStrategyWrapper.replaceFlushed(memtable, sstable); + compactionStrategyManager.replaceFlushed(memtable, sstable); } public boolean isValid() @@ -1832,7 +1833,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean { public List<SSTableReader> apply(View view) { - return compactionStrategyWrapper.filterSSTablesForReads(view.intervalTree.search(key)); + return compactionStrategyManager.filterSSTablesForReads(view.intervalTree.search(key)); } }; } @@ -1847,7 +1848,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean { public List<SSTableReader> apply(View view) { - return compactionStrategyWrapper.filterSSTablesForReads(view.sstablesInBounds(rowBounds)); + return compactionStrategyManager.filterSSTablesForReads(view.sstablesInBounds(rowBounds)); } }; } @@ -2551,7 +2552,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean Iterable<ColumnFamilyStore> selfWithIndexes = concatWithIndexes(); for (ColumnFamilyStore cfs : selfWithIndexes) - cfs.getCompactionStrategy().pause(); + cfs.getCompactionStrategyManager().pause(); try { // interrupt in-progress compactions @@ -2582,7 +2583,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean finally { for (ColumnFamilyStore cfs : selfWithIndexes) - cfs.getCompactionStrategy().resume(); + cfs.getCompactionStrategyManager().resume(); } } } @@ -2620,7 +2621,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean { // we don't use CompactionStrategy.pause since we don't want users flipping that on and off // during runWithCompactionsDisabled - this.compactionStrategyWrapper.disable(); + compactionStrategyManager.disable(); } public void enableAutoCompaction() @@ -2635,7 +2636,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean @VisibleForTesting public void enableAutoCompaction(boolean waitForFutures) { - this.compactionStrategyWrapper.enable(); + compactionStrategyManager.enable(); List<Future<?>> futures = CompactionManager.instance.submitBackground(this); if (waitForFutures) FBUtilities.waitOnFutures(futures); @@ -2643,7 +2644,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean public boolean isAutoCompactionDisabled() { - return !this.compactionStrategyWrapper.isEnabled(); + return !this.compactionStrategyManager.isEnabled(); } /* @@ -2655,9 +2656,9 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean - get/set memtime */ - public AbstractCompactionStrategy getCompactionStrategy() + public CompactionStrategyManager getCompactionStrategyManager() { - return compactionStrategyWrapper; + return compactionStrategyManager; } public void setCompactionThresholds(int minThreshold, int maxThreshold) @@ -2745,12 +2746,12 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean public int getUnleveledSSTables() { - return this.compactionStrategyWrapper.getUnleveledSSTables(); + return this.compactionStrategyManager.getUnleveledSSTables(); } public int[] getSSTableCountPerLevel() { - return compactionStrategyWrapper.getSSTableCountPerLevel(); + return compactionStrategyManager.getSSTableCountPerLevel(); } public static class ViewFragment http://git-wip-us.apache.org/repos/asf/cassandra/blob/7df3a5c9/src/java/org/apache/cassandra/db/Keyspace.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/Keyspace.java b/src/java/org/apache/cassandra/db/Keyspace.java index 1d86784..cb5c54d 100644 --- a/src/java/org/apache/cassandra/db/Keyspace.java +++ b/src/java/org/apache/cassandra/db/Keyspace.java @@ -304,7 +304,7 @@ public class Keyspace if (cfs == null) return; - cfs.getCompactionStrategy().shutdown(); + cfs.getCompactionStrategyManager().shutdown(); CompactionManager.instance.interruptCompactionForCFs(cfs.concatWithIndexes(), true); // wait for any outstanding reads/writes that might affect the CFS cfs.keyspace.writeOrder.awaitNewBarrier(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/7df3a5c9/src/java/org/apache/cassandra/db/compaction/CompactionManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java index a2783da..21c3f50 100644 --- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java @@ -176,7 +176,7 @@ public class CompactionManager implements CompactionManagerMBean logger.debug("Scheduling a background task check for {}.{} with {}", cfs.keyspace.getName(), cfs.name, - cfs.getCompactionStrategy().getName()); + cfs.getCompactionStrategyManager().getName()); List<Future<?>> futures = new ArrayList<Future<?>>(); // we must schedule it at least once, otherwise compaction will stop for a CF until next flush do { @@ -229,7 +229,7 @@ public class CompactionManager implements CompactionManagerMBean return; } - AbstractCompactionStrategy strategy = cfs.getCompactionStrategy(); + CompactionStrategyManager strategy = cfs.getCompactionStrategyManager(); AbstractCompactionTask task = strategy.getNextBackgroundTask(getDefaultGcBefore(cfs)); if (task == null) { @@ -390,7 +390,7 @@ public class CompactionManager implements CompactionManagerMBean @Override public void execute(LifecycleTransaction txn) throws IOException { - AbstractCompactionTask task = cfs.getCompactionStrategy().getCompactionTask(txn, NO_GC, Long.MAX_VALUE); + AbstractCompactionTask task = cfs.getCompactionStrategyManager().getCompactionTask(txn, NO_GC, Long.MAX_VALUE); task.setUserDefined(true); task.setCompactionType(OperationType.UPGRADE_SSTABLES); task.execute(metrics); @@ -546,7 +546,7 @@ public class CompactionManager implements CompactionManagerMBean // here we compute the task off the compaction executor, so having that present doesn't // confuse runWithCompactionsDisabled -- i.e., we don't want to deadlock ourselves, waiting // for ourselves to finish/acknowledge cancellation before continuing. - final Collection<AbstractCompactionTask> tasks = cfStore.getCompactionStrategy().getMaximalTask(gcBefore, splitOutput); + final Collection<AbstractCompactionTask> tasks = cfStore.getCompactionStrategyManager().getMaximalTasks(gcBefore, splitOutput); if (tasks == null) return Collections.emptyList(); @@ -626,7 +626,7 @@ public class CompactionManager implements CompactionManagerMBean } else { - AbstractCompactionTask task = cfs.getCompactionStrategy().getUserDefinedTask(sstables, gcBefore); + AbstractCompactionTask task = cfs.getCompactionStrategyManager().getUserDefinedTask(sstables, gcBefore); if (task != null) task.execute(metrics); } @@ -1096,7 +1096,7 @@ public class CompactionManager implements CompactionManagerMBean MerkleTree tree = new MerkleTree(cfs.partitioner, validator.desc.range, MerkleTree.RECOMMENDED_DEPTH, (int) Math.pow(2, depth)); long start = System.nanoTime(); - try (AbstractCompactionStrategy.ScannerList scanners = cfs.getCompactionStrategy().getScanners(sstables, validator.desc.range)) + try (AbstractCompactionStrategy.ScannerList scanners = cfs.getCompactionStrategyManager().getScanners(sstables, validator.desc.range)) { CompactionIterable ci = new ValidationCompactionIterable(cfs, scanners.scanners, gcBefore); Iterator<AbstractCompactedRow> iter = ci.iterator(); @@ -1158,7 +1158,7 @@ public class CompactionManager implements CompactionManagerMBean logger.info("Performing anticompaction on {} sstables", repaired.originals().size()); //Group SSTables - Collection<Collection<SSTableReader>> groupedSSTables = cfs.getCompactionStrategy().groupSSTablesForAntiCompaction(repaired.originals()); + Collection<Collection<SSTableReader>> groupedSSTables = cfs.getCompactionStrategyManager().groupSSTablesForAntiCompaction(repaired.originals()); // iterate over sstables to check if the repaired / unrepaired ranges intersect them. int antiCompactedSSTableCount = 0; for (Collection<SSTableReader> sstableGroup : groupedSSTables) @@ -1206,7 +1206,7 @@ public class CompactionManager implements CompactionManagerMBean File destination = cfs.directories.getWriteableLocationAsFile(cfs.getExpectedCompactedFileSize(sstableAsSet, OperationType.ANTICOMPACTION)); long repairedKeyCount = 0; long unrepairedKeyCount = 0; - AbstractCompactionStrategy strategy = cfs.getCompactionStrategy(); + CompactionStrategyManager strategy = cfs.getCompactionStrategyManager(); try (SSTableRewriter repairedSSTableWriter = new SSTableRewriter(cfs, anticompactionGroup, groupMaxDataAge, false, false); SSTableRewriter unRepairedSSTableWriter = new SSTableRewriter(cfs, anticompactionGroup, groupMaxDataAge, false, false); AbstractCompactionStrategy.ScannerList scanners = strategy.getScanners(anticompactionGroup.originals()); http://git-wip-us.apache.org/repos/asf/cassandra/blob/7df3a5c9/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java new file mode 100644 index 0000000..38fb09f --- /dev/null +++ b/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java @@ -0,0 +1,463 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db.compaction; + + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashSet; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; +import java.util.Set; +import java.util.concurrent.Callable; + +import com.google.common.collect.Iterables; +import com.google.common.primitives.Ints; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.Directories; +import org.apache.cassandra.db.Memtable; +import org.apache.cassandra.db.RowPosition; +import org.apache.cassandra.db.lifecycle.LifecycleTransaction; +import org.apache.cassandra.dht.Range; +import org.apache.cassandra.dht.Token; +import org.apache.cassandra.io.sstable.Descriptor; +import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.io.sstable.ISSTableScanner; +import org.apache.cassandra.notifications.INotification; +import org.apache.cassandra.notifications.INotificationConsumer; +import org.apache.cassandra.notifications.SSTableAddedNotification; +import org.apache.cassandra.notifications.SSTableDeletingNotification; +import org.apache.cassandra.notifications.SSTableListChangedNotification; +import org.apache.cassandra.notifications.SSTableRepairStatusChanged; +import org.apache.cassandra.service.StorageService; + +/** + * Manages the compaction strategies. + * + * Currently has two instances of actual compaction strategies - one for repaired data and one for + * unrepaired data. This is done to be able to totally separate the different sets of sstables. + */ +public class CompactionStrategyManager implements INotificationConsumer +{ + protected static final String COMPACTION_ENABLED = "enabled"; + private static final Logger logger = LoggerFactory.getLogger(CompactionStrategyManager.class); + private final ColumnFamilyStore cfs; + private volatile AbstractCompactionStrategy repaired; + private volatile AbstractCompactionStrategy unrepaired; + private volatile boolean enabled = true; + public boolean isActive = true; + + public CompactionStrategyManager(ColumnFamilyStore cfs) + { + cfs.getTracker().subscribe(this); + logger.debug("{} subscribed to the data tracker.", this); + this.cfs = cfs; + reload(cfs.metadata); + String optionValue = cfs.metadata.compactionStrategyOptions.get(COMPACTION_ENABLED); + enabled = optionValue == null || Boolean.parseBoolean(optionValue); + } + + /** + * Return the next background task + * + * Returns a task for the compaction strategy that needs it the most (most estimated remaining tasks) + * + */ + public synchronized AbstractCompactionTask getNextBackgroundTask(int gcBefore) + { + if (!isEnabled()) + return null; + + maybeReload(cfs.metadata); + + if (repaired.getEstimatedRemainingTasks() > unrepaired.getEstimatedRemainingTasks()) + { + AbstractCompactionTask repairedTask = repaired.getNextBackgroundTask(gcBefore); + if (repairedTask != null) + return repairedTask; + return unrepaired.getNextBackgroundTask(gcBefore); + } + else + { + AbstractCompactionTask unrepairedTask = unrepaired.getNextBackgroundTask(gcBefore); + if (unrepairedTask != null) + return unrepairedTask; + return repaired.getNextBackgroundTask(gcBefore); + } + } + + /** + * Disable compaction - used with nodetool disableautocompaction for example + */ + public void disable() + { + enabled = false; + } + + /** + * re-enable disabled compaction. + */ + public void enable() + { + enabled = true; + } + + public boolean isEnabled() + { + return enabled && isActive; + } + + public synchronized void resume() + { + isActive = true; + } + + /** + * pause compaction while we cancel all ongoing compactions + * + * Separate call from enable/disable to not have to save the enabled-state externally + */ + public synchronized void pause() + { + isActive = false; + } + + + private void startup() + { + for (SSTableReader sstable : cfs.getSSTables()) + { + if (sstable.openReason != SSTableReader.OpenReason.EARLY) + getCompactionStrategyFor(sstable).addSSTable(sstable); + } + repaired.startup(); + unrepaired.startup(); + } + + /** + * return the compaction strategy for the given sstable + * + * returns differently based on the repaired status + * @param sstable + * @return + */ + private AbstractCompactionStrategy getCompactionStrategyFor(SSTableReader sstable) + { + if (sstable.isRepaired()) + return repaired; + else + return unrepaired; + } + + public void shutdown() + { + isActive = false; + repaired.shutdown(); + unrepaired.shutdown(); + } + + + public synchronized void maybeReload(CFMetaData metadata) + { + if (repaired != null && repaired.getClass().equals(metadata.compactionStrategyClass) + && unrepaired != null && unrepaired.getClass().equals(metadata.compactionStrategyClass) + && repaired.options.equals(metadata.compactionStrategyOptions) // todo: assumes all have the same options + && unrepaired.options.equals(metadata.compactionStrategyOptions)) + return; + + reload(metadata); + } + + /** + * Reload the compaction strategies + * + * Called after changing configuration and at startup. + * @param metadata + */ + public synchronized void reload(CFMetaData metadata) + { + if (repaired != null) + repaired.shutdown(); + if (unrepaired != null) + unrepaired.shutdown(); + repaired = metadata.createCompactionStrategyInstance(cfs); + unrepaired = metadata.createCompactionStrategyInstance(cfs); + startup(); + } + + public void replaceFlushed(Memtable memtable, SSTableReader sstable) + { + cfs.getTracker().replaceFlushed(memtable, sstable); + if (sstable != null) + CompactionManager.instance.submitBackground(cfs); + } + + /** + * TODO: remove, unused + */ + public List<SSTableReader> filterSSTablesForReads(List<SSTableReader> sstables) + { + // todo: union of filtered sstables or intersection? + return unrepaired.filterSSTablesForReads(repaired.filterSSTablesForReads(sstables)); + } + + public int getUnleveledSSTables() + { + if (repaired instanceof LeveledCompactionStrategy && unrepaired instanceof LeveledCompactionStrategy) + { + int count = 0; + count += ((LeveledCompactionStrategy)repaired).getLevelSize(0); + count += ((LeveledCompactionStrategy)unrepaired).getLevelSize(0); + return count; + } + return 0; + } + + public synchronized int[] getSSTableCountPerLevel() + { + if (repaired instanceof LeveledCompactionStrategy && unrepaired instanceof LeveledCompactionStrategy) + { + int [] res = new int[LeveledManifest.MAX_LEVEL_COUNT]; + int[] repairedCountPerLevel = ((LeveledCompactionStrategy) repaired).getAllLevelSize(); + res = sumArrays(res, repairedCountPerLevel); + int[] unrepairedCountPerLevel = ((LeveledCompactionStrategy) unrepaired).getAllLevelSize(); + res = sumArrays(res, unrepairedCountPerLevel); + return res; + } + return null; + } + + private static int[] sumArrays(int[] a, int[] b) + { + int[] res = new int[Math.max(a.length, b.length)]; + for (int i = 0; i < res.length; i++) + { + if (i < a.length && i < b.length) + res[i] = a[i] + b[i]; + else if (i < a.length) + res[i] = a[i]; + else + res[i] = b[i]; + } + return res; + } + + public boolean shouldDefragment() + { + assert repaired.getClass().equals(unrepaired.getClass()); + return repaired.shouldDefragment(); + } + + + public synchronized void handleNotification(INotification notification, Object sender) + { + if (notification instanceof SSTableAddedNotification) + { + SSTableAddedNotification flushedNotification = (SSTableAddedNotification) notification; + if (flushedNotification.added.isRepaired()) + repaired.addSSTable(flushedNotification.added); + else + unrepaired.addSSTable(flushedNotification.added); + } + else if (notification instanceof SSTableListChangedNotification) + { + SSTableListChangedNotification listChangedNotification = (SSTableListChangedNotification) notification; + Set<SSTableReader> repairedRemoved = new HashSet<>(); + Set<SSTableReader> repairedAdded = new HashSet<>(); + Set<SSTableReader> unrepairedRemoved = new HashSet<>(); + Set<SSTableReader> unrepairedAdded = new HashSet<>(); + + for (SSTableReader sstable : listChangedNotification.removed) + { + if (sstable.isRepaired()) + repairedRemoved.add(sstable); + else + unrepairedRemoved.add(sstable); + } + for (SSTableReader sstable : listChangedNotification.added) + { + if (sstable.isRepaired()) + repairedAdded.add(sstable); + else + unrepairedAdded.add(sstable); + } + if (!repairedRemoved.isEmpty()) + { + repaired.replaceSSTables(repairedRemoved, repairedAdded); + } + else + { + for (SSTableReader sstable : repairedAdded) + repaired.addSSTable(sstable); + } + + if (!unrepairedRemoved.isEmpty()) + { + unrepaired.replaceSSTables(unrepairedRemoved, unrepairedAdded); + } + else + { + for (SSTableReader sstable : unrepairedAdded) + unrepaired.addSSTable(sstable); + } + } + else if (notification instanceof SSTableRepairStatusChanged) + { + for (SSTableReader sstable : ((SSTableRepairStatusChanged) notification).sstable) + { + if (sstable.isRepaired()) + { + unrepaired.removeSSTable(sstable); + repaired.addSSTable(sstable); + } + else + { + repaired.removeSSTable(sstable); + unrepaired.addSSTable(sstable); + } + } + } + else if (notification instanceof SSTableDeletingNotification) + { + SSTableReader sstable = ((SSTableDeletingNotification)notification).deleting; + if (sstable.isRepaired()) + repaired.removeSSTable(sstable); + else + unrepaired.removeSSTable(sstable); + } + } + + /** + * Create ISSTableScanner from the given sstables + * + * Delegates the call to the compaction strategies to allow LCS to create a scanner + * @param sstables + * @param range + * @return + */ + public synchronized AbstractCompactionStrategy.ScannerList getScanners(Collection<SSTableReader> sstables, Range<Token> range) + { + List<SSTableReader> repairedSSTables = new ArrayList<>(); + List<SSTableReader> unrepairedSSTables = new ArrayList<>(); + + for (SSTableReader sstable : sstables) + { + if (sstable.isRepaired()) + repairedSSTables.add(sstable); + else + unrepairedSSTables.add(sstable); + } + + List<ISSTableScanner> scanners = new ArrayList<>(); + + if (!repairedSSTables.isEmpty()) + scanners.addAll(repaired.getScanners(repairedSSTables, range).scanners); + if (!unrepairedSSTables.isEmpty()) + scanners.addAll(unrepaired.getScanners(unrepairedSSTables, range).scanners); + + return new AbstractCompactionStrategy.ScannerList(scanners); + } + + public Collection<Collection<SSTableReader>> groupSSTablesForAntiCompaction(Collection<SSTableReader> sstablesToGroup) + { + return unrepaired.groupSSTablesForAntiCompaction(sstablesToGroup); + } + + public synchronized AbstractCompactionStrategy.ScannerList getScanners(Collection<SSTableReader> sstables) + { + return getScanners(sstables, null); + } + + public long getMaxSSTableBytes() + { + return unrepaired.getMaxSSTableBytes(); + } + + public AbstractCompactionTask getCompactionTask(LifecycleTransaction txn, int gcBefore, long maxSSTableBytes) + { + return getCompactionStrategyFor(txn.originals().iterator().next()).getCompactionTask(txn, gcBefore, maxSSTableBytes); + } + + public Collection<AbstractCompactionTask> getMaximalTasks(final int gcBefore, final boolean splitOutput) + { + // runWithCompactionsDisabled cancels active compactions and disables them, then we are able + // to make the repaired/unrepaired strategies mark their own sstables as compacting. Once the + // sstables are marked the compactions are re-enabled + return cfs.runWithCompactionsDisabled(new Callable<Collection<AbstractCompactionTask>>() + { + @Override + public Collection<AbstractCompactionTask> call() throws Exception + { + synchronized (CompactionStrategyManager.this) + { + Collection<AbstractCompactionTask> repairedTasks = repaired.getMaximalTask(gcBefore, splitOutput); + Collection<AbstractCompactionTask> unrepairedTasks = unrepaired.getMaximalTask(gcBefore, splitOutput); + + if (repairedTasks == null && unrepairedTasks == null) + return null; + + if (repairedTasks == null) + return unrepairedTasks; + if (unrepairedTasks == null) + return repairedTasks; + + List<AbstractCompactionTask> tasks = new ArrayList<>(); + tasks.addAll(repairedTasks); + tasks.addAll(unrepairedTasks); + return tasks; + } + } + }, false); + } + + public AbstractCompactionTask getUserDefinedTask(Collection<SSTableReader> sstables, int gcBefore) + { + return getCompactionStrategyFor(sstables.iterator().next()).getUserDefinedTask(sstables, gcBefore); + } + + public int getEstimatedRemainingTasks() + { + int tasks = 0; + tasks += repaired.getEstimatedRemainingTasks(); + tasks += unrepaired.getEstimatedRemainingTasks(); + + return tasks; + } + + public boolean shouldBeEnabled() + { + String optionValue = cfs.metadata.compactionStrategyOptions.get(COMPACTION_ENABLED); + return optionValue == null || Boolean.parseBoolean(optionValue); + } + + public String getName() + { + return unrepaired.getName(); + } + + public List<AbstractCompactionStrategy> getStrategies() + { + return Arrays.asList(repaired, unrepaired); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/7df3a5c9/src/java/org/apache/cassandra/db/compaction/CompactionTask.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java index 7089016..17e55bf 100644 --- a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java @@ -105,7 +105,7 @@ public class CompactionTask extends AbstractCompactionTask // Note that the current compaction strategy, is not necessarily the one this task was created under. // This should be harmless; see comments to CFS.maybeReloadCompactionStrategy. - AbstractCompactionStrategy strategy = cfs.getCompactionStrategy(); + CompactionStrategyManager strategy = cfs.getCompactionStrategyManager(); if (DatabaseDescriptor.isSnapshotBeforeCompaction()) cfs.snapshotWithoutFlush(System.currentTimeMillis() + "-compact-" + cfs.name); @@ -159,6 +159,7 @@ public class CompactionTask extends AbstractCompactionTask // See CASSANDRA-8019 and CASSANDRA-8399 try (AbstractCompactionStrategy.ScannerList scanners = strategy.getScanners(actuallyCompact)) { + ci = new CompactionIterable(compactionType, scanners.scanners, controller, sstableFormat, taskId); try (CloseableIterator<AbstractCompactedRow> iter = ci.iterator()) { @@ -166,7 +167,7 @@ public class CompactionTask extends AbstractCompactionTask collector.beginCompaction(ci); long lastCheckObsoletion = start; - if (!controller.cfs.getCompactionStrategy().isActive) + if (!controller.cfs.getCompactionStrategyManager().isActive) throw new CompactionInterruptedException(ci.getCompactionInfo()); try (CompactionAwareWriter writer = getCompactionAwareWriter(cfs, transaction, actuallyCompact)) http://git-wip-us.apache.org/repos/asf/cassandra/blob/7df3a5c9/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java b/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java index 0d0928f..0763316 100644 --- a/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java +++ b/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java @@ -56,7 +56,10 @@ public class LeveledManifest * that level into lower level compactions */ private static final int NO_COMPACTION_LIMIT = 25; - + // allocate enough generations for a PB of data, with a 1-MB sstable size. (Note that if maxSSTableSize is + // updated, we will still have sstables of the older, potentially smaller size. So don't make this + // dependent on maxSSTableSize.) + public static final int MAX_LEVEL_COUNT = (int) Math.log10(1000 * 1000 * 1000); private final ColumnFamilyStore cfs; @VisibleForTesting protected final List<SSTableReader>[] generations; @@ -71,18 +74,14 @@ public class LeveledManifest this.maxSSTableSizeInBytes = maxSSTableSizeInMB * 1024L * 1024L; this.options = options; - // allocate enough generations for a PB of data, with a 1-MB sstable size. (Note that if maxSSTableSize is - // updated, we will still have sstables of the older, potentially smaller size. So don't make this - // dependent on maxSSTableSize.) - int n = (int) Math.log10(1000 * 1000 * 1000); - generations = new List[n]; - lastCompactedKeys = new RowPosition[n]; + generations = new List[MAX_LEVEL_COUNT]; + lastCompactedKeys = new RowPosition[MAX_LEVEL_COUNT]; for (int i = 0; i < generations.length; i++) { generations[i] = new ArrayList<>(); lastCompactedKeys[i] = cfs.partitioner.getMinimumToken().minKeyBound(); } - compactionCounter = new int[n]; + compactionCounter = new int[MAX_LEVEL_COUNT]; } public static LeveledManifest create(ColumnFamilyStore cfs, int maxSSTableSize, List<SSTableReader> sstables) @@ -340,7 +339,7 @@ public class LeveledManifest candidates = getOverlappingStarvedSSTables(nextLevel, candidates); if (logger.isDebugEnabled()) logger.debug("Compaction candidates for L{} are {}", i, toString(candidates)); - return new CompactionCandidate(candidates, nextLevel, cfs.getCompactionStrategy().getMaxSSTableBytes()); + return new CompactionCandidate(candidates, nextLevel, cfs.getCompactionStrategyManager().getMaxSSTableBytes()); } else { @@ -355,7 +354,7 @@ public class LeveledManifest Collection<SSTableReader> candidates = getCandidatesFor(0); if (candidates.isEmpty()) return null; - return new CompactionCandidate(candidates, getNextLevel(candidates), cfs.getCompactionStrategy().getMaxSSTableBytes()); + return new CompactionCandidate(candidates, getNextLevel(candidates), cfs.getCompactionStrategyManager().getMaxSSTableBytes()); } private List<SSTableReader> getSSTablesForSTCS(Collection<SSTableReader> sstables) http://git-wip-us.apache.org/repos/asf/cassandra/blob/7df3a5c9/src/java/org/apache/cassandra/db/compaction/Upgrader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/Upgrader.java b/src/java/org/apache/cassandra/db/compaction/Upgrader.java index ca975b8..a0cce24 100644 --- a/src/java/org/apache/cassandra/db/compaction/Upgrader.java +++ b/src/java/org/apache/cassandra/db/compaction/Upgrader.java @@ -43,7 +43,7 @@ public class Upgrader private final OperationType compactionType = OperationType.UPGRADE_SSTABLES; private final CompactionController controller; - private final AbstractCompactionStrategy strategy; + private final CompactionStrategyManager strategyManager; private final long estimatedRows; private final OutputHandler outputHandler; @@ -59,9 +59,9 @@ public class Upgrader this.controller = new UpgradeController(cfs); - this.strategy = cfs.getCompactionStrategy(); + this.strategyManager = cfs.getCompactionStrategyManager(); long estimatedTotalKeys = Math.max(cfs.metadata.getMinIndexInterval(), SSTableReader.getApproximateKeyCount(Arrays.asList(this.sstable))); - long estimatedSSTables = Math.max(1, SSTableReader.getTotalBytes(Arrays.asList(this.sstable)) / strategy.getMaxSSTableBytes()); + long estimatedSSTables = Math.max(1, SSTableReader.getTotalBytes(Arrays.asList(this.sstable)) / strategyManager.getMaxSSTableBytes()); this.estimatedRows = (long) Math.ceil((double) estimatedTotalKeys / estimatedSSTables); } @@ -86,7 +86,7 @@ public class Upgrader outputHandler.output("Upgrading " + sstable); try (SSTableRewriter writer = new SSTableRewriter(cfs, transaction, CompactionTask.getMaxDataAge(transaction.originals()), true); - AbstractCompactionStrategy.ScannerList scanners = strategy.getScanners(transaction.originals()); + AbstractCompactionStrategy.ScannerList scanners = strategyManager.getScanners(transaction.originals()); CloseableIterator<AbstractCompactedRow> iter = new CompactionIterable(compactionType, scanners.scanners, controller, DatabaseDescriptor.getSSTableFormat(), UUIDGen.getTimeUUID()).iterator()) { writer.switchWriter(createCompactionWriter(sstable.getSSTableMetadata().repairedAt)); http://git-wip-us.apache.org/repos/asf/cassandra/blob/7df3a5c9/src/java/org/apache/cassandra/db/compaction/WrappingCompactionStrategy.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/WrappingCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/WrappingCompactionStrategy.java deleted file mode 100644 index adda0c9..0000000 --- a/src/java/org/apache/cassandra/db/compaction/WrappingCompactionStrategy.java +++ /dev/null @@ -1,374 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.cassandra.db.compaction; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.HashSet; -import java.util.List; -import java.util.Set; -import java.util.concurrent.Callable; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.cassandra.config.CFMetaData; -import org.apache.cassandra.db.ColumnFamilyStore; -import org.apache.cassandra.dht.Range; -import org.apache.cassandra.dht.Token; -import org.apache.cassandra.io.sstable.format.SSTableReader; -import org.apache.cassandra.io.sstable.ISSTableScanner; -import org.apache.cassandra.notifications.INotification; -import org.apache.cassandra.notifications.INotificationConsumer; -import org.apache.cassandra.notifications.SSTableAddedNotification; -import org.apache.cassandra.notifications.SSTableDeletingNotification; -import org.apache.cassandra.notifications.SSTableListChangedNotification; -import org.apache.cassandra.notifications.SSTableRepairStatusChanged; - -public final class WrappingCompactionStrategy extends AbstractCompactionStrategy implements INotificationConsumer -{ - private static final Logger logger = LoggerFactory.getLogger(WrappingCompactionStrategy.class); - private volatile AbstractCompactionStrategy repaired; - private volatile AbstractCompactionStrategy unrepaired; - public WrappingCompactionStrategy(ColumnFamilyStore cfs) - { - super(cfs, cfs.metadata.compactionStrategyOptions); - reloadCompactionStrategy(cfs.metadata); - cfs.getTracker().subscribe(this); - logger.debug("{} subscribed to the data tracker.", this); - } - - @Override - public synchronized AbstractCompactionTask getNextBackgroundTask(int gcBefore) - { - if (!isEnabled()) - return null; - - if (repaired.getEstimatedRemainingTasks() > unrepaired.getEstimatedRemainingTasks()) - { - AbstractCompactionTask repairedTask = repaired.getNextBackgroundTask(gcBefore); - if (repairedTask != null) - return repairedTask; - return unrepaired.getNextBackgroundTask(gcBefore); - } - else - { - AbstractCompactionTask unrepairedTask = unrepaired.getNextBackgroundTask(gcBefore); - if (unrepairedTask != null) - return unrepairedTask; - return repaired.getNextBackgroundTask(gcBefore); - } - - } - - @Override - public Collection<AbstractCompactionTask> getMaximalTask(final int gcBefore, final boolean splitOutput) - { - // runWithCompactionsDisabled cancels active compactions and disables them, then we are able - // to make the repaired/unrepaired strategies mark their own sstables as compacting. Once the - // sstables are marked the compactions are re-enabled - return cfs.runWithCompactionsDisabled(new Callable<Collection<AbstractCompactionTask>>() - { - @Override - public Collection<AbstractCompactionTask> call() throws Exception - { - synchronized (WrappingCompactionStrategy.this) - { - Collection<AbstractCompactionTask> repairedTasks = repaired.getMaximalTask(gcBefore, splitOutput); - Collection<AbstractCompactionTask> unrepairedTasks = unrepaired.getMaximalTask(gcBefore, splitOutput); - - if (repairedTasks == null && unrepairedTasks == null) - return null; - - if (repairedTasks == null) - return unrepairedTasks; - if (unrepairedTasks == null) - return repairedTasks; - - List<AbstractCompactionTask> tasks = new ArrayList<>(); - tasks.addAll(repairedTasks); - tasks.addAll(unrepairedTasks); - return tasks; - } - } - }, false); - } - - @Override - public synchronized AbstractCompactionTask getUserDefinedTask(Collection<SSTableReader> sstables, int gcBefore) - { - assert !sstables.isEmpty(); - boolean userDefinedInRepaired = sstables.iterator().next().isRepaired(); - for (SSTableReader sstable : sstables) - { - if (userDefinedInRepaired != sstable.isRepaired()) - { - logger.error("You can't mix repaired and unrepaired sstables in a user defined compaction"); - return null; - } - } - if (userDefinedInRepaired) - return repaired.getUserDefinedTask(sstables, gcBefore); - else - return unrepaired.getUserDefinedTask(sstables, gcBefore); - } - - @Override - public synchronized int getEstimatedRemainingTasks() - { - assert repaired.getClass().equals(unrepaired.getClass()); - return repaired.getEstimatedRemainingTasks() + unrepaired.getEstimatedRemainingTasks(); - } - - @Override - public synchronized long getMaxSSTableBytes() - { - assert repaired.getClass().equals(unrepaired.getClass()); - return unrepaired.getMaxSSTableBytes(); - } - - public synchronized void maybeReloadCompactionStrategy(CFMetaData metadata) - { - if (repaired != null && repaired.getClass().equals(metadata.compactionStrategyClass) - && unrepaired != null && unrepaired.getClass().equals(metadata.compactionStrategyClass) - && repaired.options.equals(metadata.compactionStrategyOptions) - && unrepaired.options.equals(metadata.compactionStrategyOptions)) - return; - - reloadCompactionStrategy(metadata); - } - - public synchronized void reloadCompactionStrategy(CFMetaData metadata) - { - if (repaired != null) - repaired.shutdown(); - if (unrepaired != null) - unrepaired.shutdown(); - repaired = metadata.createCompactionStrategyInstance(cfs); - unrepaired = metadata.createCompactionStrategyInstance(cfs); - startup(); - } - - public synchronized int getUnleveledSSTables() - { - if (this.repaired instanceof LeveledCompactionStrategy && this.unrepaired instanceof LeveledCompactionStrategy) - { - return ((LeveledCompactionStrategy)repaired).getLevelSize(0) + ((LeveledCompactionStrategy)unrepaired).getLevelSize(0); - } - return 0; - } - - public synchronized int[] getSSTableCountPerLevel() - { - if (this.repaired instanceof LeveledCompactionStrategy && this.unrepaired instanceof LeveledCompactionStrategy) - { - int [] repairedCountPerLevel = ((LeveledCompactionStrategy) repaired).getAllLevelSize(); - int [] unrepairedCountPerLevel = ((LeveledCompactionStrategy) unrepaired).getAllLevelSize(); - return sumArrays(repairedCountPerLevel, unrepairedCountPerLevel); - } - return null; - } - - public static int [] sumArrays(int[] a, int [] b) - { - int [] res = new int[Math.max(a.length, b.length)]; - for (int i = 0; i < res.length; i++) - { - if (i < a.length && i < b.length) - res[i] = a[i] + b[i]; - else if (i < a.length) - res[i] = a[i]; - else - res[i] = b[i]; - } - return res; - } - - @Override - public boolean shouldDefragment() - { - assert repaired.getClass().equals(unrepaired.getClass()); - return repaired.shouldDefragment(); - } - - @Override - public String getName() - { - assert repaired.getClass().equals(unrepaired.getClass()); - return repaired.getName(); - } - - @Override - public void replaceSSTables(Collection<SSTableReader> removed, Collection<SSTableReader> added) - { - throw new UnsupportedOperationException("Can't replace sstables in the wrapping compaction strategy"); - } - - @Override - public void addSSTable(SSTableReader added) - { - throw new UnsupportedOperationException("Can't add sstables to the wrapping compaction strategy"); - } - - @Override - public void removeSSTable(SSTableReader sstable) - { - throw new UnsupportedOperationException("Can't remove sstables from the wrapping compaction strategy"); - } - - public synchronized void handleNotification(INotification notification, Object sender) - { - if (notification instanceof SSTableAddedNotification) - { - SSTableAddedNotification flushedNotification = (SSTableAddedNotification) notification; - if (flushedNotification.added.isRepaired()) - repaired.addSSTable(flushedNotification.added); - else - unrepaired.addSSTable(flushedNotification.added); - } - else if (notification instanceof SSTableListChangedNotification) - { - SSTableListChangedNotification listChangedNotification = (SSTableListChangedNotification) notification; - Set<SSTableReader> repairedRemoved = new HashSet<>(); - Set<SSTableReader> repairedAdded = new HashSet<>(); - Set<SSTableReader> unrepairedRemoved = new HashSet<>(); - Set<SSTableReader> unrepairedAdded = new HashSet<>(); - - for (SSTableReader sstable : listChangedNotification.removed) - { - if (sstable.isRepaired()) - repairedRemoved.add(sstable); - else - unrepairedRemoved.add(sstable); - } - for (SSTableReader sstable : listChangedNotification.added) - { - if (sstable.isRepaired()) - repairedAdded.add(sstable); - else - unrepairedAdded.add(sstable); - } - if (!repairedRemoved.isEmpty()) - { - repaired.replaceSSTables(repairedRemoved, repairedAdded); - } - else - { - for (SSTableReader sstable : repairedAdded) - repaired.addSSTable(sstable); - } - - if (!unrepairedRemoved.isEmpty()) - { - unrepaired.replaceSSTables(unrepairedRemoved, unrepairedAdded); - } - else - { - for (SSTableReader sstable : unrepairedAdded) - unrepaired.addSSTable(sstable); - } - } - else if (notification instanceof SSTableRepairStatusChanged) - { - for (SSTableReader sstable : ((SSTableRepairStatusChanged) notification).sstable) - { - if (sstable.isRepaired()) - { - unrepaired.removeSSTable(sstable); - repaired.addSSTable(sstable); - } - else - { - repaired.removeSSTable(sstable); - unrepaired.addSSTable(sstable); - } - } - } - else if (notification instanceof SSTableDeletingNotification) - { - SSTableReader sstable = ((SSTableDeletingNotification)notification).deleting; - if (sstable.isRepaired()) - repaired.removeSSTable(sstable); - else - unrepaired.removeSSTable(sstable); - } - } - - @Override - public List<SSTableReader> filterSSTablesForReads(List<SSTableReader> sstables) - { - // todo: union of filtered sstables or intersection? - return unrepaired.filterSSTablesForReads(repaired.filterSSTablesForReads(sstables)); - } - - @Override - public synchronized void startup() - { - super.startup(); - for (SSTableReader sstable : cfs.getSSTables()) - { - if (sstable.openReason != SSTableReader.OpenReason.EARLY) - { - if (sstable.isRepaired()) - repaired.addSSTable(sstable); - else - unrepaired.addSSTable(sstable); - } - } - repaired.startup(); - unrepaired.startup(); - } - - @Override - public synchronized void shutdown() - { - super.shutdown(); - repaired.shutdown(); - unrepaired.shutdown(); - } - - @Override - @SuppressWarnings("resource") - public synchronized ScannerList getScanners(Collection<SSTableReader> sstables, Range<Token> range) - { - List<SSTableReader> repairedSSTables = new ArrayList<>(); - List<SSTableReader> unrepairedSSTables = new ArrayList<>(); - for (SSTableReader sstable : sstables) - if (sstable.isRepaired()) - repairedSSTables.add(sstable); - else - unrepairedSSTables.add(sstable); - ScannerList repairedScanners = repaired.getScanners(repairedSSTables, range); - ScannerList unrepairedScanners = unrepaired.getScanners(unrepairedSSTables, range); - List<ISSTableScanner> scanners = new ArrayList<>(repairedScanners.scanners.size() + unrepairedScanners.scanners.size()); - scanners.addAll(repairedScanners.scanners); - scanners.addAll(unrepairedScanners.scanners); - return new ScannerList(scanners); - } - - public Collection<Collection<SSTableReader>> groupSSTablesForAntiCompaction(Collection<SSTableReader> sstablesToGroup) - { - return unrepaired.groupSSTablesForAntiCompaction(sstablesToGroup); - } - - public List<AbstractCompactionStrategy> getWrappedStrategies() - { - return Arrays.asList(repaired, unrepaired); - } -} http://git-wip-us.apache.org/repos/asf/cassandra/blob/7df3a5c9/src/java/org/apache/cassandra/metrics/ColumnFamilyMetrics.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/metrics/ColumnFamilyMetrics.java b/src/java/org/apache/cassandra/metrics/ColumnFamilyMetrics.java index 4ab4446..6ad50d3 100644 --- a/src/java/org/apache/cassandra/metrics/ColumnFamilyMetrics.java +++ b/src/java/org/apache/cassandra/metrics/ColumnFamilyMetrics.java @@ -351,7 +351,7 @@ public class ColumnFamilyMetrics { public Integer getValue() { - return cfs.getCompactionStrategy().getEstimatedRemainingTasks(); + return cfs.getCompactionStrategyManager().getEstimatedRemainingTasks(); } }); liveSSTableCount = createColumnFamilyGauge("LiveSSTableCount", new Gauge<Integer>() http://git-wip-us.apache.org/repos/asf/cassandra/blob/7df3a5c9/src/java/org/apache/cassandra/metrics/CompactionMetrics.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/metrics/CompactionMetrics.java b/src/java/org/apache/cassandra/metrics/CompactionMetrics.java index a62e3c4..20a5685 100644 --- a/src/java/org/apache/cassandra/metrics/CompactionMetrics.java +++ b/src/java/org/apache/cassandra/metrics/CompactionMetrics.java @@ -61,7 +61,7 @@ public class CompactionMetrics implements CompactionManager.CompactionExecutorSt for (String keyspaceName : Schema.instance.getKeyspaces()) { for (ColumnFamilyStore cfs : Keyspace.open(keyspaceName).getColumnFamilyStores()) - n += cfs.getCompactionStrategy().getEstimatedRemainingTasks(); + n += cfs.getCompactionStrategyManager().getEstimatedRemainingTasks(); } for (ThreadPoolExecutor collector : collectors) n += collector.getTaskCount() - collector.getCompletedTaskCount(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/7df3a5c9/src/java/org/apache/cassandra/service/CassandraDaemon.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/CassandraDaemon.java b/src/java/org/apache/cassandra/service/CassandraDaemon.java index 30a2b6e..d24c579 100644 --- a/src/java/org/apache/cassandra/service/CassandraDaemon.java +++ b/src/java/org/apache/cassandra/service/CassandraDaemon.java @@ -249,7 +249,7 @@ public class CassandraDaemon { for (final ColumnFamilyStore store : cfs.concatWithIndexes()) { - if (store.getCompactionStrategy().shouldBeEnabled()) + if (store.getCompactionStrategyManager().shouldBeEnabled()) store.enableAutoCompaction(); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/7df3a5c9/src/java/org/apache/cassandra/tools/StandaloneScrubber.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/tools/StandaloneScrubber.java b/src/java/org/apache/cassandra/tools/StandaloneScrubber.java index dd513b8..81b41bc 100644 --- a/src/java/org/apache/cassandra/tools/StandaloneScrubber.java +++ b/src/java/org/apache/cassandra/tools/StandaloneScrubber.java @@ -145,7 +145,7 @@ public class StandaloneScrubber } // Check (and repair) manifests - checkManifest(cfs.getCompactionStrategy(), cfs, sstables); + checkManifest(cfs.getCompactionStrategyManager(), cfs, sstables); CompactionManager.instance.finishCompactionsAndShutdown(5, TimeUnit.MINUTES); SSTableDeletingTask.waitForDeletions(); System.exit(0); // We need that to stop non daemonized threads @@ -159,11 +159,10 @@ public class StandaloneScrubber } } - private static void checkManifest(AbstractCompactionStrategy strategy, ColumnFamilyStore cfs, Collection<SSTableReader> sstables) + private static void checkManifest(CompactionStrategyManager strategyManager, ColumnFamilyStore cfs, Collection<SSTableReader> sstables) { - WrappingCompactionStrategy wrappingStrategy = (WrappingCompactionStrategy)strategy; - int maxSizeInMB = (int)((cfs.getCompactionStrategy().getMaxSSTableBytes()) / (1024L * 1024L)); - if (wrappingStrategy.getWrappedStrategies().size() == 2 && wrappingStrategy.getWrappedStrategies().get(0) instanceof LeveledCompactionStrategy) + int maxSizeInMB = (int)((cfs.getCompactionStrategyManager().getMaxSSTableBytes()) / (1024L * 1024L)); + if (strategyManager.getStrategies().size() == 2 && strategyManager.getStrategies().get(0) instanceof LeveledCompactionStrategy) { System.out.println("Checking leveled manifest"); Predicate<SSTableReader> repairedPredicate = new Predicate<SSTableReader>() http://git-wip-us.apache.org/repos/asf/cassandra/blob/7df3a5c9/test/long/org/apache/cassandra/db/compaction/LongLeveledCompactionStrategyTest.java ---------------------------------------------------------------------- diff --git a/test/long/org/apache/cassandra/db/compaction/LongLeveledCompactionStrategyTest.java b/test/long/org/apache/cassandra/db/compaction/LongLeveledCompactionStrategyTest.java index e38eb3c..cc8203e 100644 --- a/test/long/org/apache/cassandra/db/compaction/LongLeveledCompactionStrategyTest.java +++ b/test/long/org/apache/cassandra/db/compaction/LongLeveledCompactionStrategyTest.java @@ -63,8 +63,7 @@ public class LongLeveledCompactionStrategyTest ColumnFamilyStore store = keyspace.getColumnFamilyStore(cfname); store.disableAutoCompaction(); - WrappingCompactionStrategy strategy = ((WrappingCompactionStrategy) store.getCompactionStrategy()); - LeveledCompactionStrategy lcs = (LeveledCompactionStrategy) strategy.getWrappedStrategies().get(1); + LeveledCompactionStrategy lcs = (LeveledCompactionStrategy)store.getCompactionStrategyManager().getStrategies().get(1); ByteBuffer value = ByteBuffer.wrap(new byte[100 * 1024]); // 100 KB value, make it easy to have multiple files http://git-wip-us.apache.org/repos/asf/cassandra/blob/7df3a5c9/test/unit/org/apache/cassandra/Util.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/Util.java b/test/unit/org/apache/cassandra/Util.java index b8f33d3..2d59abb 100644 --- a/test/unit/org/apache/cassandra/Util.java +++ b/test/unit/org/apache/cassandra/Util.java @@ -307,7 +307,7 @@ public class Util public static void compact(ColumnFamilyStore cfs, Collection<SSTableReader> sstables) { int gcBefore = cfs.gcBefore(System.currentTimeMillis()); - AbstractCompactionTask task = cfs.getCompactionStrategy().getUserDefinedTask(sstables, gcBefore); + AbstractCompactionTask task = cfs.getCompactionStrategyManager().getUserDefinedTask(sstables, gcBefore); task.execute(null); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/7df3a5c9/test/unit/org/apache/cassandra/db/compaction/CompactionAwareWriterTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/compaction/CompactionAwareWriterTest.java b/test/unit/org/apache/cassandra/db/compaction/CompactionAwareWriterTest.java index 235fd49..2b6f575 100644 --- a/test/unit/org/apache/cassandra/db/compaction/CompactionAwareWriterTest.java +++ b/test/unit/org/apache/cassandra/db/compaction/CompactionAwareWriterTest.java @@ -179,7 +179,7 @@ public class CompactionAwareWriterTest { assert txn.originals().size() == 1; int rowsWritten = 0; - try (AbstractCompactionStrategy.ScannerList scanners = cfs.getCompactionStrategy().getScanners(txn.originals())) + try (AbstractCompactionStrategy.ScannerList scanners = cfs.getCompactionStrategyManager().getScanners(txn.originals())) { CompactionController controller = new CompactionController(cfs, txn.originals(), cfs.gcBefore(System.currentTimeMillis())); ISSTableScanner scanner = scanners.scanners.get(0); http://git-wip-us.apache.org/repos/asf/cassandra/blob/7df3a5c9/test/unit/org/apache/cassandra/db/compaction/CompactionsPurgeTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/compaction/CompactionsPurgeTest.java b/test/unit/org/apache/cassandra/db/compaction/CompactionsPurgeTest.java index e5baab6..d03c35a 100644 --- a/test/unit/org/apache/cassandra/db/compaction/CompactionsPurgeTest.java +++ b/test/unit/org/apache/cassandra/db/compaction/CompactionsPurgeTest.java @@ -176,7 +176,7 @@ public class CompactionsPurgeTest rm.add(cfName, cellname(String.valueOf(5)), ByteBufferUtil.EMPTY_BYTE_BUFFER, 2); rm.applyUnsafe(); cfs.forceBlockingFlush(); - cfs.getCompactionStrategy().getUserDefinedTask(sstablesIncomplete, Integer.MAX_VALUE).execute(null); + cfs.getCompactionStrategyManager().getUserDefinedTask(sstablesIncomplete, Integer.MAX_VALUE).execute(null); // verify that minor compaction does GC when key is provably not // present in a non-compacted sstable @@ -223,7 +223,7 @@ public class CompactionsPurgeTest cfs.forceBlockingFlush(); // compact the sstables with the c1/c2 data and the c1 tombstone - cfs.getCompactionStrategy().getUserDefinedTask(sstablesIncomplete, Integer.MAX_VALUE).execute(null); + cfs.getCompactionStrategyManager().getUserDefinedTask(sstablesIncomplete, Integer.MAX_VALUE).execute(null); // We should have both the c1 and c2 tombstones still. Since the min timestamp in the c2 tombstone // sstable is older than the c1 tombstone, it is invalid to throw out the c1 tombstone. http://git-wip-us.apache.org/repos/asf/cassandra/blob/7df3a5c9/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java b/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java index 18418e8..a1fb33d 100644 --- a/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java +++ b/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java @@ -152,9 +152,9 @@ public class CompactionsTest public void testSingleSSTableCompactionWithLeveledCompaction() throws Exception { ColumnFamilyStore store = testSingleSSTableCompaction(LeveledCompactionStrategy.class.getCanonicalName()); - WrappingCompactionStrategy strategy = (WrappingCompactionStrategy) store.getCompactionStrategy(); + CompactionStrategyManager strategyManager = store.getCompactionStrategyManager(); // tombstone removal compaction should not promote level - assert strategy.getSSTableCountPerLevel()[0] == 1; + assert strategyManager.getSSTableCountPerLevel()[0] == 1; } @Test http://git-wip-us.apache.org/repos/asf/cassandra/blob/7df3a5c9/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java b/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java index c7935fe..d85bd6a 100644 --- a/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java +++ b/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java @@ -125,12 +125,12 @@ public class LeveledCompactionStrategyTest } waitForLeveling(cfs); - WrappingCompactionStrategy strategy = (WrappingCompactionStrategy) cfs.getCompactionStrategy(); + CompactionStrategyManager strategy = cfs.getCompactionStrategyManager(); // Checking we're not completely bad at math assert strategy.getSSTableCountPerLevel()[1] > 0; assert strategy.getSSTableCountPerLevel()[2] > 0; - Collection<Collection<SSTableReader>> groupedSSTables = cfs.getCompactionStrategy().groupSSTablesForAntiCompaction(cfs.getSSTables()); + Collection<Collection<SSTableReader>> groupedSSTables = cfs.getCompactionStrategyManager().groupSSTablesForAntiCompaction(cfs.getSSTables()); for (Collection<SSTableReader> sstableGroup : groupedSSTables) { int groupLevel = -1; @@ -176,7 +176,7 @@ public class LeveledCompactionStrategyTest } waitForLeveling(cfs); - WrappingCompactionStrategy strategy = (WrappingCompactionStrategy) cfs.getCompactionStrategy(); + CompactionStrategyManager strategy = cfs.getCompactionStrategyManager(); // Checking we're not completely bad at math assertTrue(strategy.getSSTableCountPerLevel()[1] > 0); assertTrue(strategy.getSSTableCountPerLevel()[2] > 0); @@ -195,7 +195,7 @@ public class LeveledCompactionStrategyTest */ private void waitForLeveling(ColumnFamilyStore cfs) throws InterruptedException { - WrappingCompactionStrategy strategy = (WrappingCompactionStrategy) cfs.getCompactionStrategy(); + CompactionStrategyManager strategy = cfs.getCompactionStrategyManager(); // L0 is the lowest priority, so when that's done, we know everything is done while (strategy.getSSTableCountPerLevel()[0] > 1) Thread.sleep(100); @@ -223,7 +223,7 @@ public class LeveledCompactionStrategyTest } waitForLeveling(cfs); - LeveledCompactionStrategy strategy = (LeveledCompactionStrategy) ((WrappingCompactionStrategy) cfs.getCompactionStrategy()).getWrappedStrategies().get(1); + LeveledCompactionStrategy strategy = (LeveledCompactionStrategy) ( cfs.getCompactionStrategyManager()).getStrategies().get(1); assert strategy.getLevelSize(1) > 0; // get LeveledScanner for level 1 sstables @@ -262,7 +262,7 @@ public class LeveledCompactionStrategyTest } waitForLeveling(cfs); cfs.forceBlockingFlush(); - LeveledCompactionStrategy strategy = (LeveledCompactionStrategy) ((WrappingCompactionStrategy) cfs.getCompactionStrategy()).getWrappedStrategies().get(1); + LeveledCompactionStrategy strategy = (LeveledCompactionStrategy) ( cfs.getCompactionStrategyManager()).getStrategies().get(1); cfs.disableAutoCompaction(); while(CompactionManager.instance.isCompacting(Arrays.asList(cfs))) @@ -314,8 +314,8 @@ public class LeveledCompactionStrategyTest while(CompactionManager.instance.isCompacting(Arrays.asList(cfs))) Thread.sleep(100); - WrappingCompactionStrategy strategy = (WrappingCompactionStrategy) cfs.getCompactionStrategy(); - List<AbstractCompactionStrategy> strategies = strategy.getWrappedStrategies(); + CompactionStrategyManager strategy = cfs.getCompactionStrategyManager(); + List<AbstractCompactionStrategy> strategies = strategy.getStrategies(); LeveledCompactionStrategy repaired = (LeveledCompactionStrategy) strategies.get(0); LeveledCompactionStrategy unrepaired = (LeveledCompactionStrategy) strategies.get(1); assertEquals(0, repaired.manifest.getLevelCount() ); http://git-wip-us.apache.org/repos/asf/cassandra/blob/7df3a5c9/test/unit/org/apache/cassandra/io/sstable/SSTableMetadataTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableMetadataTest.java b/test/unit/org/apache/cassandra/io/sstable/SSTableMetadataTest.java index 755225e..f952b91 100644 --- a/test/unit/org/apache/cassandra/io/sstable/SSTableMetadataTest.java +++ b/test/unit/org/apache/cassandra/io/sstable/SSTableMetadataTest.java @@ -201,7 +201,6 @@ public class SSTableMetadataTest { Keyspace keyspace = Keyspace.open(KEYSPACE1); ColumnFamilyStore store = keyspace.getColumnFamilyStore("Standard3"); - store.getCompactionStrategy(); for (int j = 0; j < 8; j++) { DecoratedKey key = Util.dk("row"+j); http://git-wip-us.apache.org/repos/asf/cassandra/blob/7df3a5c9/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java b/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java index 9e1cb91..8f6ec16 100644 --- a/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java +++ b/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java @@ -100,7 +100,7 @@ public class SSTableRewriterTest extends SchemaLoader Set<SSTableReader> sstables = new HashSet<>(cfs.getSSTables()); assertEquals(1, sstables.size()); assertEquals(sstables.iterator().next().bytesOnDisk(), cfs.metric.liveDiskSpaceUsed.getCount()); - try (AbstractCompactionStrategy.ScannerList scanners = cfs.getCompactionStrategy().getScanners(sstables); + try (AbstractCompactionStrategy.ScannerList scanners = cfs.getCompactionStrategyManager().getScanners(sstables); LifecycleTransaction txn = cfs.getTracker().tryModify(sstables, OperationType.UNKNOWN); SSTableRewriter writer = new SSTableRewriter(cfs, txn, 1000, false);) { @@ -132,7 +132,7 @@ public class SSTableRewriterTest extends SchemaLoader Set<SSTableReader> sstables = new HashSet<>(cfs.getSSTables()); assertEquals(1, sstables.size()); - try (AbstractCompactionStrategy.ScannerList scanners = cfs.getCompactionStrategy().getScanners(sstables); + try (AbstractCompactionStrategy.ScannerList scanners = cfs.getCompactionStrategyManager().getScanners(sstables); LifecycleTransaction txn = cfs.getTracker().tryModify(sstables, OperationType.UNKNOWN); SSTableRewriter writer = new SSTableRewriter(cfs, txn, 1000, false, 10000000);) { @@ -167,7 +167,7 @@ public class SSTableRewriterTest extends SchemaLoader assertEquals(1, sstables.size()); boolean checked = false; - try (AbstractCompactionStrategy.ScannerList scanners = cfs.getCompactionStrategy().getScanners(sstables); + try (AbstractCompactionStrategy.ScannerList scanners = cfs.getCompactionStrategyManager().getScanners(sstables); LifecycleTransaction txn = cfs.getTracker().tryModify(sstables, OperationType.UNKNOWN); SSTableRewriter writer = new SSTableRewriter(cfs, txn, 1000, false, 10000000)) { @@ -804,7 +804,7 @@ public class SSTableRewriterTest extends SchemaLoader cfs.addSSTable(s); Set<SSTableReader> sstables = Sets.newHashSet(s); assertEquals(1, sstables.size()); - try (AbstractCompactionStrategy.ScannerList scanners = cfs.getCompactionStrategy().getScanners(sstables); + try (AbstractCompactionStrategy.ScannerList scanners = cfs.getCompactionStrategyManager().getScanners(sstables); LifecycleTransaction txn = cfs.getTracker().tryModify(sstables, OperationType.UNKNOWN); SSTableRewriter writer = new SSTableRewriter(cfs, txn, 1000, false, false); SSTableRewriter writer2 = new SSTableRewriter(cfs, txn, 1000, false, false))
