merge from 1.0
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/64a5e70e Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/64a5e70e Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/64a5e70e Branch: refs/heads/trunk Commit: 64a5e70ef5aacdfe244baa91b5698a326173082f Parents: 1bfb685 853a759 Author: Jonathan Ellis <[email protected]> Authored: Fri May 25 16:23:52 2012 -0500 Committer: Jonathan Ellis <[email protected]> Committed: Fri May 25 16:23:52 2012 -0500 ---------------------------------------------------------------------- .../db/compaction/AbstractCompactionTask.java | 5 ++ .../cassandra/db/compaction/CompactionManager.java | 21 +++++++- .../cassandra/db/compaction/LeveledManifest.java | 10 +++- .../compaction/SizeTieredCompactionStrategy.java | 40 +++++++-------- 4 files changed, 51 insertions(+), 25 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/64a5e70e/src/java/org/apache/cassandra/db/compaction/CompactionManager.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/compaction/CompactionManager.java index f8bab20,872ce0b..e09a012 --- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java @@@ -111,31 -109,54 +111,50 @@@ public class CompactionManager implemen * It's okay to over-call (within reason) since the compactions are single-threaded, * and if a call is unnecessary, it will just be no-oped in the bucketing phase. */ - public Future<Integer> submitBackground(final ColumnFamilyStore cfs) + public Future<?> submitBackground(final ColumnFamilyStore cfs) { + logger.debug("Scheduling a background task check for {}.{} with {}", + new Object[] {cfs.table.name, + cfs.columnFamily, + cfs.getCompactionStrategy().getClass().getSimpleName()}); - Callable<Integer> callable = new Callable<Integer>() + Runnable runnable = new WrappedRunnable() { - public Integer call() throws IOException + protected void runMayThrow() throws IOException { compactionLock.readLock().lock(); try { + logger.debug("Checking {}.{}", cfs.table.name, cfs.columnFamily); // log after we get the lock so we can see delays from that if any + if (!cfs.isValid()) + { + logger.debug("Aborting compaction for dropped CF"); - return 0; ++ return; + } + - boolean taskExecuted = false; AbstractCompactionStrategy strategy = cfs.getCompactionStrategy(); - List<AbstractCompactionTask> tasks = strategy.getBackgroundTasks(getDefaultGcBefore(cfs)); - logger.debug("{} minor compaction tasks available", tasks.size()); - for (AbstractCompactionTask task : tasks) + AbstractCompactionTask task = strategy.getNextBackgroundTask(getDefaultGcBefore(cfs)); - if (task == null || !task.markSSTablesForCompaction()) ++ if (task == null) + { - if (!task.markSSTablesForCompaction()) - { - logger.debug("Skipping {}; sstables are busy", task); - continue; - } - - taskExecuted = true; - try - { - task.execute(executor); - } - finally - { - task.unmarkSSTables(); - } ++ logger.debug("No tasks available"); + return; ++ } ++ if (!task.markSSTablesForCompaction()) ++ { ++ logger.debug("Unable to mark SSTables for {}", task); ++ return; + } - // newly created sstables might have made other compactions eligible - if (taskExecuted) - submitBackground(cfs); + try + { + task.execute(executor); + } + finally + { + task.unmarkSSTables(); + } + submitBackground(cfs); } - finally + finally { compactionLock.readLock().unlock(); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/64a5e70e/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/compaction/LeveledManifest.java index 28700da,7e06848..b2ba857 --- a/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java +++ b/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java @@@ -468,10 -453,12 +470,12 @@@ public class LeveledManifes for (int i = generations.length - 1; i >= 0; i--) { List<SSTableReader> sstables = generations[i]; - long n = Math.max(0L, SSTableReader.getTotalBytes(sstables) - maxBytesForLevel(i)) / (maxSSTableSizeInMB * 1024L * 1024); - logger.debug("Estimating " + n + " compaction tasks in level " + i); - tasks += n; - estimated[i] = Math.max(0L, SSTableReader.getTotalBytes(sstables) - maxBytesForLevel(i)) / (maxSSTableSizeInMB * 1024 * 1024); ++ estimated[i] = Math.max(0L, SSTableReader.getTotalBytes(sstables) - maxBytesForLevel(i)) / (maxSSTableSizeInMB * 1024L * 1024); + tasks += estimated[i]; } + + logger.debug("Estimating {} compactions to do for {}.{}", + new Object[] {Arrays.asList(estimated), cfs.table.name, cfs.columnFamily}); return Ints.checkedCast(tasks); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/64a5e70e/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java index 0281f08,636d6ba..63ec442 --- a/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java +++ b/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java @@@ -52,14 -53,13 +52,15 @@@ public class SizeTieredCompactionStrate if (cfs.isCompactionDisabled()) { logger.debug("Compaction is currently disabled."); - return Collections.emptyList(); + return null; } - List<AbstractCompactionTask> tasks = new ArrayList<AbstractCompactionTask>(); - List<List<SSTableReader>> buckets = getBuckets(createSSTableAndLengthPairs(cfs.getSSTables()), minSSTableSize); + Set<SSTableReader> candidates = cfs.getUncompactingSSTables(); + List<List<SSTableReader>> buckets = getBuckets(createSSTableAndLengthPairs(filterSuspectSSTables(candidates)), minSSTableSize); + logger.debug("Compaction buckets are {}", buckets); + updateEstimatedCompactionsByTasks(buckets); + List<List<SSTableReader>> prunedBuckets = new ArrayList<List<SSTableReader>>(); for (List<SSTableReader> bucket : buckets) { if (bucket.size() < cfs.getMinimumCompactionThreshold()) @@@ -149,43 -132,43 +151,39 @@@ // look for a bucket containing similar-sized files: // group in the same bucket if it's w/in 50% of the average for this bucket, // or this file and the bucket are all considered "small" (less than `minSSTableSize`) - for (Entry<List<T>, Long> entry : buckets.entrySet()) + for (Entry<Long, List<T>> entry : buckets.entrySet()) { - List<T> bucket = entry.getKey(); - long averageSize = entry.getValue(); - if ((size > (averageSize / 2) && size < (3 * averageSize) / 2) - || (size < minSSTableSize && averageSize < minSSTableSize)) + List<T> bucket = entry.getValue(); + long oldAverageSize = entry.getKey(); + if ((size > (oldAverageSize / 2) && size < (3 * oldAverageSize) / 2) + || (size < minSSTableSize && oldAverageSize < minSSTableSize)) { - // remove and re-add because adding changes the hash - buckets.remove(bucket); - long totalSize = bucket.size() * averageSize; - averageSize = (totalSize + size) / (bucket.size() + 1); + // remove and re-add under new new average size + buckets.remove(oldAverageSize); + long totalSize = bucket.size() * oldAverageSize; + long newAverageSize = (totalSize + size) / (bucket.size() + 1); bucket.add(pair.left); - buckets.put(bucket, averageSize); - bFound = true; - break; + buckets.put(newAverageSize, bucket); + continue outer; } } + // no similar bucket found; put it in a new one - if (!bFound) - { - ArrayList<T> bucket = new ArrayList<T>(); - bucket.add(pair.left); - buckets.put(bucket, size); - } + ArrayList<T> bucket = new ArrayList<T>(); + bucket.add(pair.left); + buckets.put(size, bucket); } - return new LinkedList<List<T>>(buckets.keySet()); + return new ArrayList<List<T>>(buckets.values()); } - private void updateEstimatedCompactionsByTasks(List<AbstractCompactionTask> tasks) + private void updateEstimatedCompactionsByTasks(List<List<SSTableReader>> tasks) { int n = 0; - for (AbstractCompactionTask task: tasks) + for (List<SSTableReader> bucket: tasks) { - if (!(task instanceof CompactionTask)) - continue; - - Collection<SSTableReader> sstablesToBeCompacted = task.getSSTables(); - if (sstablesToBeCompacted.size() >= cfs.getMinimumCompactionThreshold()) - n += Math.ceil((double)sstablesToBeCompacted.size() / cfs.getMaximumCompactionThreshold()); + if (bucket.size() >= cfs.getMinimumCompactionThreshold()) + n += Math.ceil((double)bucket.size() / cfs.getMaximumCompactionThreshold()); } estimatedRemainingTasks = n; }
