Modified: cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionManager.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionManager.java?rev=1134460&r1=1134459&r2=1134460&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionManager.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionManager.java Fri Jun 10 22:13:54 2011 @@ -24,7 +24,6 @@ import java.io.IOException; import java.lang.management.ManagementFactory; import java.nio.ByteBuffer; import java.util.*; -import java.util.Map.Entry; import java.util.concurrent.*; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -34,7 +33,6 @@ import javax.management.ObjectName; import org.apache.commons.collections.PredicateUtils; import org.apache.commons.collections.iterators.CollatingIterator; import org.apache.commons.collections.iterators.FilterIterator; -import org.apache.commons.lang.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -51,7 +49,6 @@ import org.apache.cassandra.service.Anti import org.apache.cassandra.service.StorageService; import org.apache.cassandra.streaming.OperationType; import org.apache.cassandra.utils.*; -import org.cliffc.high_scale_lib.NonBlockingHashMap; /** * A singleton which manages a private executor of ongoing compactions. A readwrite lock @@ -85,7 +82,6 @@ public class CompactionManager implement } private CompactionExecutor executor = new CompactionExecutor(); - private Map<ColumnFamilyStore, Integer> estimatedCompactions = new NonBlockingHashMap<ColumnFamilyStore, Integer>(); /** * @return A lock, for which acquisition means no compactions can run. @@ -111,37 +107,20 @@ public class CompactionManager implement { if (cfs.isInvalid()) return 0; - Integer minThreshold = cfs.getMinimumCompactionThreshold(); - Integer maxThreshold = cfs.getMaximumCompactionThreshold(); - - if (minThreshold == 0 || maxThreshold == 0) - { - logger.debug("Compaction is currently disabled."); - return 0; - } - logger.debug("Checking to see if compaction of " + cfs.columnFamily + " would be useful"); - Set<List<SSTableReader>> buckets = getBuckets(convertSSTablesToPairs(cfs.getSSTables()), 50L * 1024L * 1024L); - updateEstimateFor(cfs, buckets); - int gcBefore = getDefaultGcBefore(cfs); - - for (List<SSTableReader> sstables : buckets) + + AbstractCompactionStrategy strategy = cfs.getCompactionStrategy(); + for (AbstractCompactionTask task : strategy.getBackgroundTasks(getDefaultGcBefore(cfs))) { - if (sstables.size() < minThreshold) - continue; - // if we have too many to compact all at once, compact older ones first -- this avoids - // re-compacting files we just created. - Collections.sort(sstables); - Collection<SSTableReader> tocompact = cfs.getDataTracker().markCompacting(sstables, minThreshold, maxThreshold); - if (tocompact == null) - // enough threads are busy in this bucket + if (!cfs.getDataTracker().markCompacting(task)) continue; + try { - return doCompaction(cfs, tocompact, gcBefore); + task.execute(executor); } finally { - cfs.getDataTracker().unmarkCompacting(tocompact); + cfs.getDataTracker().unmarkCompacting(task); } } } @@ -155,29 +134,6 @@ public class CompactionManager implement return executor.submit(callable); } - private void updateEstimateFor(ColumnFamilyStore cfs, Set<List<SSTableReader>> buckets) - { - Integer minThreshold = cfs.getMinimumCompactionThreshold(); - Integer maxThreshold = cfs.getMaximumCompactionThreshold(); - - if (minThreshold > 0 && maxThreshold > 0) - { - int n = 0; - for (List<SSTableReader> sstables : buckets) - { - if (sstables.size() >= minThreshold) - { - n += Math.ceil((double)sstables.size() / maxThreshold); - } - } - estimatedCompactions.put(cfs, n); - } - else - { - logger.debug("Compaction is currently disabled."); - } - } - public void performCleanup(final ColumnFamilyStore cfStore, final NodeId.OneShotRenewer renewer) throws InterruptedException, ExecutionException { Callable<Object> runnable = new Callable<Object>() @@ -273,10 +229,10 @@ public class CompactionManager implement public void performMajor(final ColumnFamilyStore cfStore) throws InterruptedException, ExecutionException { - submitMajor(cfStore, 0, getDefaultGcBefore(cfStore)).get(); + submitMajor(cfStore, getDefaultGcBefore(cfStore)).get(); } - public Future<Object> submitMajor(final ColumnFamilyStore cfStore, final long skip, final int gcBefore) + public Future<Object> submitMajor(final ColumnFamilyStore cfStore, final int gcBefore) { Callable<Object> callable = new Callable<Object>() { @@ -288,45 +244,30 @@ public class CompactionManager implement { if (cfStore.isInvalid()) return this; - Collection<SSTableReader> sstables; - if (skip > 0) + AbstractCompactionStrategy strategy = cfStore.getCompactionStrategy(); + for (AbstractCompactionTask task : strategy.getMaximalTasks(gcBefore)) { - sstables = new ArrayList<SSTableReader>(); - for (SSTableReader sstable : cfStore.getSSTables()) + if (!cfStore.getDataTracker().markCompacting(task, 0, Integer.MAX_VALUE)) + return this; + try { - if (sstable.length() < skip * 1024L * 1024L * 1024L) + // downgrade the lock acquisition + compactionLock.readLock().lock(); + compactionLock.writeLock().unlock(); + try { - sstables.add(sstable); + return task.execute(executor); + } + finally + { + compactionLock.readLock().unlock(); } - } - } - else - { - sstables = cfStore.getSSTables(); - } - - Collection<SSTableReader> tocompact = cfStore.getDataTracker().markCompacting(sstables, 0, Integer.MAX_VALUE); - if (tocompact == null || tocompact.isEmpty()) - return this; - try - { - // downgrade the lock acquisition - compactionLock.readLock().lock(); - compactionLock.writeLock().unlock(); - try - { - doCompaction(cfStore, tocompact, gcBefore); } finally { - compactionLock.readLock().unlock(); + cfStore.getDataTracker().unmarkCompacting(task); } } - finally - { - cfStore.getDataTracker().unmarkCompacting(tocompact); - } - return this; } finally { @@ -334,6 +275,7 @@ public class CompactionManager implement if (compactionLock.writeLock().isHeldByCurrentThread()) compactionLock.writeLock().unlock(); } + return this; } }; return executor.submit(callable); @@ -408,11 +350,12 @@ public class CompactionManager implement // attempt to schedule the set else if ((sstables = cfs.getDataTracker().markCompacting(sstables, 1, Integer.MAX_VALUE)) != null) { - String location = cfs.table.getDataFileLocation(1); // success: perform the compaction try { - doCompactionWithoutSizeEstimation(cfs, sstables, gcBefore, location); + AbstractCompactionStrategy strategy = cfs.getCompactionStrategy(); + AbstractCompactionTask task = strategy.getUserDefinedTask(sstables, gcBefore); + task.execute(executor); } finally { @@ -484,141 +427,6 @@ public class CompactionManager implement } } - int doCompaction(ColumnFamilyStore cfs, Collection<SSTableReader> sstables, int gcBefore) throws IOException - { - if (sstables.size() < 2) - { - logger.info("Nothing to compact in " + cfs.getColumnFamilyName() + "; use forceUserDefinedCompaction if you wish to force compaction of single sstables (e.g. for tombstone collection)"); - return 0; - } - - Table table = cfs.table; - - // If the compaction file path is null that means we have no space left for this compaction. - // try again w/o the largest one. - Set<SSTableReader> smallerSSTables = new HashSet<SSTableReader>(sstables); - while (smallerSSTables.size() > 1) - { - String compactionFileLocation = table.getDataFileLocation(cfs.getExpectedCompactedFileSize(smallerSSTables)); - if (compactionFileLocation != null) - return doCompactionWithoutSizeEstimation(cfs, smallerSSTables, gcBefore, compactionFileLocation); - - logger.warn("insufficient space to compact all requested files " + StringUtils.join(smallerSSTables, ", ")); - smallerSSTables.remove(cfs.getMaxSizeFile(smallerSSTables)); - } - - logger.error("insufficient space to compact even the two smallest files, aborting"); - return 0; - } - - /** - * For internal use and testing only. The rest of the system should go through the submit* methods, - * which are properly serialized. - */ - int doCompactionWithoutSizeEstimation(ColumnFamilyStore cfs, Collection<SSTableReader> sstables, int gcBefore, String compactionFileLocation) throws IOException - { - // The collection of sstables passed may be empty (but not null); even if - // it is not empty, it may compact down to nothing if all rows are deleted. - assert sstables != null; - - Table table = cfs.table; - if (DatabaseDescriptor.isSnapshotBeforeCompaction()) - table.snapshot(System.currentTimeMillis() + "-" + "compact-" + cfs.columnFamily); - - // sanity check: all sstables must belong to the same cfs - for (SSTableReader sstable : sstables) - assert sstable.descriptor.cfname.equals(cfs.columnFamily); - - // compaction won't normally compact a single sstable, so if that's what we're doing - // it must have been requested manually by the user, which probably means he wants to force - // tombstone purge, which won't happen unless we force deserializing the rows. - boolean forceDeserialize = sstables.size() == 1; - CompactionController controller = new CompactionController(cfs, sstables, gcBefore, forceDeserialize); - // new sstables from flush can be added during a compaction, but only the compaction can remove them, - // so in our single-threaded compaction world this is a valid way of determining if we're compacting - // all the sstables (that existed when we started) - CompactionType type = controller.isMajor() - ? CompactionType.MAJOR - : CompactionType.MINOR; - logger.info("Compacting {}: {}", type, sstables); - - long startTime = System.currentTimeMillis(); - long totalkeysWritten = 0; - - // TODO the int cast here is potentially buggy - int expectedBloomFilterSize = Math.max(DatabaseDescriptor.getIndexInterval(), (int)SSTableReader.getApproximateKeyCount(sstables)); - if (logger.isDebugEnabled()) - logger.debug("Expected bloom filter size : " + expectedBloomFilterSize); - - SSTableWriter writer; - CompactionIterator ci = new CompactionIterator(type, sstables, controller); // retain a handle so we can call close() - Iterator<AbstractCompactedRow> nni = new FilterIterator(ci, PredicateUtils.notNullPredicate()); - Map<DecoratedKey, Long> cachedKeys = new HashMap<DecoratedKey, Long>(); - - executor.beginCompaction(ci); - try - { - if (!nni.hasNext()) - { - // don't mark compacted in the finally block, since if there _is_ nondeleted data, - // we need to sync it (via closeAndOpen) first, so there is no period during which - // a crash could cause data loss. - cfs.markCompacted(sstables); - return 0; - } - - writer = cfs.createCompactionWriter(expectedBloomFilterSize, compactionFileLocation, sstables); - while (nni.hasNext()) - { - AbstractCompactedRow row = nni.next(); - long position = writer.append(row); - totalkeysWritten++; - - if (DatabaseDescriptor.getPreheatKeyCache()) - { - for (SSTableReader sstable : sstables) - { - if (sstable.getCachedPosition(row.key) != null) - { - cachedKeys.put(row.key, position); - break; - } - } - } - } - } - finally - { - ci.close(); - executor.finishCompaction(ci); - } - - SSTableReader ssTable = writer.closeAndOpenReader(getMaxDataAge(sstables)); - cfs.replaceCompactedSSTables(sstables, Arrays.asList(ssTable)); - for (Entry<DecoratedKey, Long> entry : cachedKeys.entrySet()) // empty if preheat is off - ssTable.cacheKey(entry.getKey(), entry.getValue()); - submitMinorIfNeeded(cfs); - - long dTime = System.currentTimeMillis() - startTime; - long startsize = SSTable.getTotalBytes(sstables); - long endsize = ssTable.length(); - double ratio = (double)endsize / (double)startsize; - logger.info(String.format("Compacted to %s. %,d to %,d (~%d%% of original) bytes for %,d keys. Time: %,dms.", - writer.getFilename(), startsize, endsize, (int) (ratio * 100), totalkeysWritten, dTime)); - return sstables.size(); - } - - private static long getMaxDataAge(Collection<SSTableReader> sstables) - { - long max = 0; - for (SSTableReader sstable : sstables) - { - if (sstable.maxDataAge > max) - max = sstable.maxDataAge; - } - return max; - } - /** * Deserialize everything in the CFS and re-serialize w/ the newest version. Also attempts to recover * from bogus row keys / sizes using data from the index, and skips rows with garbage columns that resulted @@ -975,70 +783,6 @@ public class CompactionManager implement } } - /* - * Group files of similar size into buckets. - */ - static <T> Set<List<T>> getBuckets(Collection<Pair<T, Long>> files, long min) - { - // Sort the list in order to get deterministic results during the grouping below - List<Pair<T, Long>> sortedFiles = new ArrayList<Pair<T, Long>>(files); - Collections.sort(sortedFiles, new Comparator<Pair<T, Long>>() - { - public int compare(Pair<T, Long> p1, Pair<T, Long> p2) - { - return p1.right.compareTo(p2.right); - } - }); - - Map<List<T>, Long> buckets = new HashMap<List<T>, Long>(); - - for (Pair<T, Long> pair: sortedFiles) - { - long size = pair.right; - - boolean bFound = false; - // look for a bucket containing similar-sized files: - // group in the same bucket if it's w/in 50% of the average for this bucket, - // or this file and the bucket are all considered "small" (less than `min`) - for (Entry<List<T>, Long> entry : buckets.entrySet()) - { - List<T> bucket = entry.getKey(); - long averageSize = entry.getValue(); - if ((size > (averageSize / 2) && size < (3 * averageSize) / 2) - || (size < min && averageSize < min)) - { - // remove and re-add because adding changes the hash - buckets.remove(bucket); - long totalSize = bucket.size() * averageSize; - averageSize = (totalSize + size) / (bucket.size() + 1); - bucket.add(pair.left); - buckets.put(bucket, averageSize); - bFound = true; - break; - } - } - // no similar bucket found; put it in a new one - if (!bFound) - { - ArrayList<T> bucket = new ArrayList<T>(); - bucket.add(pair.left); - buckets.put(bucket, size); - } - } - - return buckets.keySet(); - } - - private static Collection<Pair<SSTableReader, Long>> convertSSTablesToPairs(Collection<SSTableReader> collection) - { - Collection<Pair<SSTableReader, Long>> tablePairs = new ArrayList<Pair<SSTableReader, Long>>(); - for(SSTableReader table: collection) - { - tablePairs.add(new Pair<SSTableReader, Long>(table, table.length())); - } - return tablePairs; - } - /** * Is not scheduled, because it is performing disjoint work from sstable compaction. */ @@ -1169,7 +913,7 @@ public class CompactionManager implement return executor.submit(runnable); } - private static int getDefaultGcBefore(ColumnFamilyStore cfs) + static int getDefaultGcBefore(ColumnFamilyStore cfs) { return cfs.isIndex() ? Integer.MAX_VALUE @@ -1201,7 +945,7 @@ public class CompactionManager implement return executor.getActiveCount(); } - private static class CompactionExecutor extends DebuggableThreadPoolExecutor + private static class CompactionExecutor extends DebuggableThreadPoolExecutor implements CompactionExecutorStatsCollector { // a synchronized identity set of running tasks to their compaction info private final Set<CompactionInfo.Holder> compactions; @@ -1222,12 +966,12 @@ public class CompactionManager implement return Math.max(1, DatabaseDescriptor.getConcurrentCompactors()); } - void beginCompaction(CompactionInfo.Holder ci) + public void beginCompaction(CompactionInfo.Holder ci) { compactions.add(ci); } - void finishCompaction(CompactionInfo.Holder ci) + public void finishCompaction(CompactionInfo.Holder ci) { compactions.remove(ci); } @@ -1238,6 +982,12 @@ public class CompactionManager implement } } + public interface CompactionExecutorStatsCollector + { + void beginCompaction(CompactionInfo.Holder ci); + void finishCompaction(CompactionInfo.Holder ci); + } + public List<CompactionInfo> getCompactions() { List<CompactionInfo> out = new ArrayList<CompactionInfo>(); @@ -1257,8 +1007,13 @@ public class CompactionManager implement public int getPendingTasks() { int n = 0; - for (Integer i : estimatedCompactions.values()) - n += i; + for (String tableName : DatabaseDescriptor.getTables()) + { + for (ColumnFamilyStore cfs : Table.open(tableName).getColumnFamilyStores()) + { + n += cfs.getCompactionStrategy().getEstimatedRemainingTasks(); + } + } return (int) (executor.getTaskCount() - executor.getCompletedTaskCount()) + n; }
Added: cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionTask.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionTask.java?rev=1134460&view=auto ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionTask.java (added) +++ cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionTask.java Fri Jun 10 22:13:54 2011 @@ -0,0 +1,209 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.db.compaction; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.commons.collections.PredicateUtils; +import org.apache.commons.collections.iterators.FilterIterator; +import org.apache.commons.lang.StringUtils; + +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.DecoratedKey; +import org.apache.cassandra.db.compaction.CompactionManager.CompactionExecutorStatsCollector; +import org.apache.cassandra.io.sstable.SSTable; +import org.apache.cassandra.io.sstable.SSTableReader; +import org.apache.cassandra.io.sstable.SSTableWriter; + +public class CompactionTask extends AbstractCompactionTask +{ + private static final Logger logger = LoggerFactory.getLogger(CompactionTask.class); + protected String compactionFileLocation; + protected final int gcBefore; + protected boolean isUserDefined; + + public CompactionTask(ColumnFamilyStore cfs, Collection<SSTableReader> sstables, final int gcBefore) + { + super(cfs, sstables); + compactionFileLocation = null; + this.gcBefore = gcBefore; + this.isUserDefined = false; + } + + /** + * For internal use and testing only. The rest of the system should go through the submit* methods, + * which are properly serialized. + */ + public int execute(CompactionExecutorStatsCollector collector) throws IOException + { + if (!isUserDefined) + { + if (sstables.size() < 2) + { + logger.info("Nothing to compact in " + cfs.getColumnFamilyName() + "." + + "Use forceUserDefinedCompaction if you wish to force compaction of single sstables " + + "(e.g. for tombstone collection)"); + return 0; + } + + if (compactionFileLocation == null) + compactionFileLocation = cfs.table.getDataFileLocation(cfs.getExpectedCompactedFileSize(sstables)); + + // If the compaction file path is null that means we have no space left for this compaction. + // Try again w/o the largest one. + if (compactionFileLocation == null) + { + Set<SSTableReader> smallerSSTables = new HashSet<SSTableReader>(sstables); + while (compactionFileLocation == null && smallerSSTables.size() > 1) + { + logger.warn("insufficient space to compact all requested files " + StringUtils.join(smallerSSTables, ", ")); + smallerSSTables.remove(cfs.getMaxSizeFile(smallerSSTables)); + compactionFileLocation = cfs.table.getDataFileLocation(cfs.getExpectedCompactedFileSize(smallerSSTables)); + } + } + + if (compactionFileLocation == null) + { + logger.warn("insufficient space to compact even the two smallest files, aborting"); + return 0; + } + } + + // The collection of sstables passed may be empty (but not null); even if + // it is not empty, it may compact down to nothing if all rows are deleted. + assert sstables != null; + + if (DatabaseDescriptor.isSnapshotBeforeCompaction()) + cfs.table.snapshot(System.currentTimeMillis() + "-" + "compact-" + cfs.columnFamily); + + // sanity check: all sstables must belong to the same cfs + for (SSTableReader sstable : sstables) + assert sstable.descriptor.cfname.equals(cfs.columnFamily); + + CompactionController controller = new CompactionController(cfs, sstables, gcBefore, isUserDefined); + // new sstables from flush can be added during a compaction, but only the compaction can remove them, + // so in our single-threaded compaction world this is a valid way of determining if we're compacting + // all the sstables (that existed when we started) + CompactionType type = controller.isMajor() + ? CompactionType.MAJOR + : CompactionType.MINOR; + logger.info("Compacting {}: {}", type, sstables); + + long startTime = System.currentTimeMillis(); + long totalkeysWritten = 0; + + // TODO the int cast here is potentially buggy + int expectedBloomFilterSize = Math.max(DatabaseDescriptor.getIndexInterval(), (int)SSTableReader.getApproximateKeyCount(sstables)); + if (logger.isDebugEnabled()) + logger.debug("Expected bloom filter size : " + expectedBloomFilterSize); + + SSTableWriter writer; + CompactionIterator ci = new CompactionIterator(type, sstables, controller); // retain a handle so we can call close() + Iterator<AbstractCompactedRow> nni = new FilterIterator(ci, PredicateUtils.notNullPredicate()); + Map<DecoratedKey, Long> cachedKeys = new HashMap<DecoratedKey, Long>(); + + if (collector != null) + collector.beginCompaction(ci); + try + { + if (!nni.hasNext()) + { + // don't mark compacted in the finally block, since if there _is_ nondeleted data, + // we need to sync it (via closeAndOpen) first, so there is no period during which + // a crash could cause data loss. + cfs.markCompacted(sstables); + return 0; + } + + writer = cfs.createCompactionWriter(expectedBloomFilterSize, compactionFileLocation, sstables); + while (nni.hasNext()) + { + AbstractCompactedRow row = nni.next(); + long position = writer.append(row); + totalkeysWritten++; + + if (DatabaseDescriptor.getPreheatKeyCache()) + { + for (SSTableReader sstable : sstables) + { + if (sstable.getCachedPosition(row.key) != null) + { + cachedKeys.put(row.key, position); + break; + } + } + } + } + } + finally + { + ci.close(); + if (collector != null) + collector.finishCompaction(ci); + } + + SSTableReader ssTable = writer.closeAndOpenReader(getMaxDataAge(sstables)); + cfs.replaceCompactedSSTables(sstables, Arrays.asList(ssTable)); + for (Entry<DecoratedKey, Long> entry : cachedKeys.entrySet()) // empty if preheat is off + ssTable.cacheKey(entry.getKey(), entry.getValue()); + CompactionManager.instance.submitMinorIfNeeded(cfs); + + long dTime = System.currentTimeMillis() - startTime; + long startsize = SSTable.getTotalBytes(sstables); + long endsize = ssTable.length(); + double ratio = (double)endsize / (double)startsize; + logger.info(String.format("Compacted to %s. %,d to %,d (~%d%% of original) bytes for %,d keys. Time: %,dms.", + writer.getFilename(), startsize, endsize, (int) (ratio * 100), totalkeysWritten, dTime)); + return sstables.size(); + } + + public static long getMaxDataAge(Collection<SSTableReader> sstables) + { + long max = 0; + for (SSTableReader sstable : sstables) + { + if (sstable.maxDataAge > max) + max = sstable.maxDataAge; + } + return max; + } + + public CompactionTask compactionFileLocation(String compactionFileLocation) + { + this.compactionFileLocation = compactionFileLocation; + return this; + } + + public CompactionTask isUserDefined(boolean isUserDefined) + { + this.isUserDefined = isUserDefined; + return this; + } +} Added: cassandra/trunk/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java?rev=1134460&view=auto ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java (added) +++ cassandra/trunk/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java Fri Jun 10 22:13:54 2011 @@ -0,0 +1,179 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.db.compaction; + +import java.util.*; +import java.util.Map.Entry; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.io.sstable.SSTableReader; +import org.apache.cassandra.utils.Pair; + +public class SizeTieredCompactionStrategy extends AbstractCompactionStrategy +{ + private static final Logger logger = LoggerFactory.getLogger(SizeTieredCompactionStrategy.class); + protected static final long DEFAULT_MIN_SSTABLE_SIZE = 50L * 1024L * 1024L; + protected static final String MIN_SSTABLE_SIZE_KEY = "min_sstable_size"; + protected static long minSSTableSize; + protected volatile int estimatedRemainingTasks; + + public SizeTieredCompactionStrategy(ColumnFamilyStore cfs, Map<String, String> options) + { + super(cfs, options); + this.estimatedRemainingTasks = 0; + String optionValue = options.get(MIN_SSTABLE_SIZE_KEY); + minSSTableSize = (null != optionValue) ? Long.parseLong(optionValue) : DEFAULT_MIN_SSTABLE_SIZE; + } + + public List<AbstractCompactionTask> getBackgroundTasks(final int gcBefore) + { + if (cfs.isCompactionDisabled()) + { + logger.debug("Compaction is currently disabled."); + return Collections.<AbstractCompactionTask>emptyList(); + } + + List<AbstractCompactionTask> tasks = new LinkedList<AbstractCompactionTask>(); + List<List<SSTableReader>> buckets = getBuckets(createSSTableAndLengthPairs(cfs.getSSTables()), minSSTableSize); + + for (List<SSTableReader> bucket : buckets) + { + if (bucket.size() < cfs.getMinimumCompactionThreshold()) + continue; + + Collections.sort(bucket); + tasks.add(new CompactionTask(cfs, bucket.subList(0, Math.min(bucket.size(), cfs.getMaximumCompactionThreshold())), gcBefore)); + } + + updateEstimatedCompactionsByTasks(tasks); + return tasks; + } + + public List<AbstractCompactionTask> getMaximalTasks(final int gcBefore) + { + List<AbstractCompactionTask> tasks = new LinkedList<AbstractCompactionTask>(); + if (!cfs.getSSTables().isEmpty()) + tasks.add(new CompactionTask(cfs, cfs.getSSTables(), gcBefore)); + return tasks; + } + + public AbstractCompactionTask getUserDefinedTask(Collection<SSTableReader> sstables, final int gcBefore) + { + return new CompactionTask(cfs, sstables, gcBefore) + .isUserDefined(true) + .compactionFileLocation(cfs.table.getDataFileLocation(1)); + } + + public int getEstimatedRemainingTasks() + { + return estimatedRemainingTasks; + } + + private static List<Pair<SSTableReader, Long>> createSSTableAndLengthPairs(Collection<SSTableReader> collection) + { + List<Pair<SSTableReader, Long>> tableLengthPairs = new ArrayList<Pair<SSTableReader, Long>>(); + for(SSTableReader table: collection) + tableLengthPairs.add(new Pair<SSTableReader, Long>(table, table.length())); + return tableLengthPairs; + } + + /* + * Group files of similar size into buckets. + */ + <T> List<List<T>> getBuckets(Collection<Pair<T, Long>> files, long minSSTableSize) + { + // Sort the list in order to get deterministic results during the grouping below + List<Pair<T, Long>> sortedFiles = new ArrayList<Pair<T, Long>>(files); + Collections.sort(sortedFiles, new Comparator<Pair<T, Long>>() + { + public int compare(Pair<T, Long> p1, Pair<T, Long> p2) + { + return p1.right.compareTo(p2.right); + } + }); + + Map<List<T>, Long> buckets = new HashMap<List<T>, Long>(); + + for (Pair<T, Long> pair: sortedFiles) + { + long size = pair.right; + + boolean bFound = false; + // look for a bucket containing similar-sized files: + // group in the same bucket if it's w/in 50% of the average for this bucket, + // or this file and the bucket are all considered "small" (less than `minSSTableSize`) + for (Entry<List<T>, Long> entry : buckets.entrySet()) + { + List<T> bucket = entry.getKey(); + long averageSize = entry.getValue(); + if ((size > (averageSize / 2) && size < (3 * averageSize) / 2) + || (size < minSSTableSize && averageSize < minSSTableSize)) + { + // remove and re-add because adding changes the hash + buckets.remove(bucket); + long totalSize = bucket.size() * averageSize; + averageSize = (totalSize + size) / (bucket.size() + 1); + bucket.add(pair.left); + buckets.put(bucket, averageSize); + bFound = true; + break; + } + } + // no similar bucket found; put it in a new one + if (!bFound) + { + ArrayList<T> bucket = new ArrayList<T>(); + bucket.add(pair.left); + buckets.put(bucket, size); + } + } + + return new LinkedList<List<T>>(buckets.keySet()); + } + + private void updateEstimatedCompactionsByTasks(List<AbstractCompactionTask> tasks) + { + int n = 0; + for (AbstractCompactionTask task: tasks) + { + if (!(task instanceof CompactionTask)) + continue; + + Collection<SSTableReader> sstablesToBeCompacted = task.getSSTables(); + if (sstablesToBeCompacted.size() >= cfs.getMinimumCompactionThreshold()) + n += Math.ceil((double)sstablesToBeCompacted.size() / cfs.getMaximumCompactionThreshold()); + } + estimatedRemainingTasks = n; + } + + public long getMinSSTableSize() + { + return minSSTableSize; + } + + public String toString() + { + return String.format("SizeTieredCompactionStrategy[%s/%s]", + cfs.getMinimumCompactionThreshold(), + cfs.getMaximumCompactionThreshold()); + } +} Modified: cassandra/trunk/src/java/org/apache/cassandra/service/AbstractCassandraDaemon.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/AbstractCassandraDaemon.java?rev=1134460&r1=1134459&r2=1134460&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/service/AbstractCassandraDaemon.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/service/AbstractCassandraDaemon.java Fri Jun 10 22:13:54 2011 @@ -39,7 +39,6 @@ import org.apache.cassandra.config.CFMet import org.apache.cassandra.config.ConfigurationException; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.ColumnFamilyStore; -import org.apache.cassandra.db.compaction.CompactionManager; import org.apache.cassandra.db.SystemTable; import org.apache.cassandra.db.Table; import org.apache.cassandra.db.commitlog.CommitLog; Modified: cassandra/trunk/test/long/org/apache/cassandra/db/compaction/LongCompactionSpeedTest.java URL: http://svn.apache.org/viewvc/cassandra/trunk/test/long/org/apache/cassandra/db/compaction/LongCompactionSpeedTest.java?rev=1134460&r1=1134459&r2=1134460&view=diff ============================================================================== --- cassandra/trunk/test/long/org/apache/cassandra/db/compaction/LongCompactionSpeedTest.java (original) +++ cassandra/trunk/test/long/org/apache/cassandra/db/compaction/LongCompactionSpeedTest.java Fri Jun 10 22:13:54 2011 @@ -131,7 +131,8 @@ public class LongCompactionSpeedTest ext Thread.sleep(1000); long start = System.currentTimeMillis(); - CompactionManager.instance.doCompaction(store, sstables, (int) (System.currentTimeMillis() / 1000) - DatabaseDescriptor.getCFMetaData(TABLE1, "Standard1").getGcGraceSeconds()); + final int gcBefore = (int) (System.currentTimeMillis() / 1000) - DatabaseDescriptor.getCFMetaData(TABLE1, "Standard1").getGcGraceSeconds(); + new CompactionTask(store, sstables, gcBefore).execute(null); System.out.println(String.format("%s: sstables=%d rowsper=%d colsper=%d: %d ms", this.getClass().getName(), sstableCount, Modified: cassandra/trunk/test/unit/org/apache/cassandra/cli/CliTest.java URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/cli/CliTest.java?rev=1134460&r1=1134459&r2=1134460&view=diff ============================================================================== --- cassandra/trunk/test/unit/org/apache/cassandra/cli/CliTest.java (original) +++ cassandra/trunk/test/unit/org/apache/cassandra/cli/CliTest.java Fri Jun 10 22:13:54 2011 @@ -144,6 +144,8 @@ public class CliTest extends CleanupHelp "create column family Countries with comparator=UTF8Type and column_metadata=[ {column_name: name, validation_class: UTF8Type} ];", "set Countries[1][name] = USA;", "get Countries[1][name];", + "update column family Countries with compaction_strategy = 'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy';", + "create column family Cities with compaction_strategy = 'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy' and compaction_strategy_options = [{min_sstable_size:1024}];", "set myCF['key']['scName']['firstname'] = 'John';", "get myCF['key']['scName']", "assume CF3 keys as utf8;", @@ -264,4 +266,4 @@ public class CliTest extends CleanupHelp assertEquals(unescaped, CliUtils.unescapeSQLString("'" + escaped + "'")); assertEquals(escaped, CliUtils.escapeSQLString(unescaped)); } -} \ No newline at end of file +} Modified: cassandra/trunk/test/unit/org/apache/cassandra/db/DefsTest.java URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/db/DefsTest.java?rev=1134460&r1=1134459&r2=1134460&view=diff ============================================================================== --- cassandra/trunk/test/unit/org/apache/cassandra/db/DefsTest.java (original) +++ cassandra/trunk/test/unit/org/apache/cassandra/db/DefsTest.java Fri Jun 10 22:13:54 2011 @@ -77,7 +77,7 @@ public class DefsTest extends CleanupHel assert cd2.min_compaction_threshold == null; assert cd.row_cache_save_period_in_seconds == null; assert cd2.row_cache_save_period_in_seconds == null; - + assert cd.compaction_strategy == null; } @Test Modified: cassandra/trunk/test/unit/org/apache/cassandra/db/compaction/CompactionsPurgeTest.java URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/db/compaction/CompactionsPurgeTest.java?rev=1134460&r1=1134459&r2=1134460&view=diff ============================================================================== --- cassandra/trunk/test/unit/org/apache/cassandra/db/compaction/CompactionsPurgeTest.java (original) +++ cassandra/trunk/test/unit/org/apache/cassandra/db/compaction/CompactionsPurgeTest.java Fri Jun 10 22:13:54 2011 @@ -19,9 +19,7 @@ package org.apache.cassandra.db.compaction; import java.io.IOException; -import java.nio.ByteBuffer; import java.util.Collection; -import java.util.Collections; import java.util.concurrent.ExecutionException; import org.junit.Test; @@ -35,7 +33,6 @@ import org.apache.cassandra.db.ColumnFam import org.apache.cassandra.db.ColumnFamilyStore; import org.apache.cassandra.db.filter.QueryFilter; import org.apache.cassandra.db.filter.QueryPath; -import org.apache.cassandra.io.sstable.Descriptor; import org.apache.cassandra.io.sstable.SSTableReader; import org.apache.cassandra.Util; @@ -86,7 +83,7 @@ public class CompactionsPurgeTest extend cfs.forceBlockingFlush(); // major compact and test that all columns but the resurrected one is completely gone - CompactionManager.instance.submitMajor(cfs, 0, Integer.MAX_VALUE).get(); + CompactionManager.instance.submitMajor(cfs, Integer.MAX_VALUE).get(); cfs.invalidateCachedRow(key); ColumnFamily cf = cfs.getColumnFamily(QueryFilter.getIdentityFilter(key, new QueryPath(cfName))); assertColumns(cf, "5"); @@ -136,7 +133,7 @@ public class CompactionsPurgeTest extend rm.add(new QueryPath(cfName, null, ByteBufferUtil.bytes(String.valueOf(5))), ByteBufferUtil.EMPTY_BYTE_BUFFER, 2); rm.apply(); cfs.forceBlockingFlush(); - CompactionManager.instance.doCompaction(cfs, sstablesIncomplete, Integer.MAX_VALUE); + new CompactionTask(cfs, sstablesIncomplete, Integer.MAX_VALUE).execute(null); // verify that minor compaction does not GC when key is present // in a non-compacted sstable Modified: cassandra/trunk/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java?rev=1134460&r1=1134459&r2=1134460&view=diff ============================================================================== --- cassandra/trunk/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java (original) +++ cassandra/trunk/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java Fri Jun 10 22:13:54 2011 @@ -19,38 +19,29 @@ package org.apache.cassandra.db.compaction; import java.io.IOException; -import java.net.InetAddress; -import java.nio.ByteBuffer; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Future; -import java.util.List; import java.util.ArrayList; -import java.util.Set; import java.util.HashSet; - -import org.apache.cassandra.Util; +import java.util.Set; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; import org.junit.Test; +import static junit.framework.Assert.assertEquals; -import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.CleanupHelper; -import org.apache.cassandra.db.Table; +import org.apache.cassandra.Util; +import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.ColumnFamilyStore; import org.apache.cassandra.db.DecoratedKey; import org.apache.cassandra.db.RowMutation; +import org.apache.cassandra.db.Table; import org.apache.cassandra.db.filter.QueryPath; import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.FBUtilities; -import org.apache.cassandra.utils.Pair; -import static junit.framework.Assert.assertEquals; public class CompactionsTest extends CleanupHelper { public static final String TABLE1 = "Keyspace1"; - public static final String TABLE2 = "Keyspace2"; - public static final InetAddress LOCAL = FBUtilities.getLocalAddress(); - - public static final int MIN_COMPACTION_THRESHOLD = 2; @Test public void testCompactions() throws IOException, ExecutionException, InterruptedException @@ -103,61 +94,6 @@ public class CompactionsTest extends Cle } @Test - public void testGetBuckets() - { - List<Pair<String, Long>> pairs = new ArrayList<Pair<String, Long>>(); - String[] strings = { "a", "bbbb", "cccccccc", "cccccccc", "bbbb", "a" }; - for (String st : strings) - { - Pair<String, Long> pair = new Pair<String, Long>(st, new Long(st.length())); - pairs.add(pair); - } - - Set<List<String>> buckets = CompactionManager.getBuckets(pairs, 2); - assertEquals(3, buckets.size()); - - for (List<String> bucket : buckets) - { - assertEquals(2, bucket.size()); - assertEquals(bucket.get(0).length(), bucket.get(1).length()); - assertEquals(bucket.get(0).charAt(0), bucket.get(1).charAt(0)); - } - - pairs.clear(); - buckets.clear(); - - String[] strings2 = { "aaa", "bbbbbbbb", "aaa", "bbbbbbbb", "bbbbbbbb", "aaa" }; - for (String st : strings2) - { - Pair<String, Long> pair = new Pair<String, Long>(st, new Long(st.length())); - pairs.add(pair); - } - - buckets = CompactionManager.getBuckets(pairs, 2); - assertEquals(2, buckets.size()); - - for (List<String> bucket : buckets) - { - assertEquals(3, bucket.size()); - assertEquals(bucket.get(0).charAt(0), bucket.get(1).charAt(0)); - assertEquals(bucket.get(1).charAt(0), bucket.get(2).charAt(0)); - } - - // Test the "min" functionality - pairs.clear(); - buckets.clear(); - - String[] strings3 = { "aaa", "bbbbbbbb", "aaa", "bbbbbbbb", "bbbbbbbb", "aaa" }; - for (String st : strings3) - { - Pair<String, Long> pair = new Pair<String, Long>(st, new Long(st.length())); - pairs.add(pair); - } - - buckets = CompactionManager.getBuckets(pairs, 10); // notice the min is 10 - assertEquals(1, buckets.size()); - } - @Test public void testEchoedRow() throws IOException, ExecutionException, InterruptedException { // This test check that EchoedRow doesn't skipp rows: see CASSANDRA-2653 Added: cassandra/trunk/test/unit/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategyTest.java URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategyTest.java?rev=1134460&view=auto ============================================================================== --- cassandra/trunk/test/unit/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategyTest.java (added) +++ cassandra/trunk/test/unit/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategyTest.java Fri Jun 10 22:13:54 2011 @@ -0,0 +1,90 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db.compaction; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.HashMap; + +import org.junit.Test; +import static org.junit.Assert.*; +import static org.mockito.Mockito.mock; + +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.utils.Pair; + +public class SizeTieredCompactionStrategyTest { + @Test + public void testGetBuckets() + { + List<Pair<String, Long>> pairs = new ArrayList<Pair<String, Long>>(); + String[] strings = { "a", "bbbb", "cccccccc", "cccccccc", "bbbb", "a" }; + for (String st : strings) + { + Pair<String, Long> pair = new Pair<String, Long>(st, new Long(st.length())); + pairs.add(pair); + } + + Map<String, String> emptyOptions = new HashMap<String, String>(); + SizeTieredCompactionStrategy strategy = new SizeTieredCompactionStrategy(mock(ColumnFamilyStore.class), emptyOptions); + List<List<String>> buckets = strategy.getBuckets(pairs, 2); + assertEquals(3, buckets.size()); + + for (List<String> bucket : buckets) + { + assertEquals(2, bucket.size()); + assertEquals(bucket.get(0).length(), bucket.get(1).length()); + assertEquals(bucket.get(0).charAt(0), bucket.get(1).charAt(0)); + } + + pairs.clear(); + buckets.clear(); + + String[] strings2 = { "aaa", "bbbbbbbb", "aaa", "bbbbbbbb", "bbbbbbbb", "aaa" }; + for (String st : strings2) + { + Pair<String, Long> pair = new Pair<String, Long>(st, new Long(st.length())); + pairs.add(pair); + } + + buckets = strategy.getBuckets(pairs, 2); + assertEquals(2, buckets.size()); + + for (List<String> bucket : buckets) + { + assertEquals(3, bucket.size()); + assertEquals(bucket.get(0).charAt(0), bucket.get(1).charAt(0)); + assertEquals(bucket.get(1).charAt(0), bucket.get(2).charAt(0)); + } + + // Test the "min" functionality + pairs.clear(); + buckets.clear(); + + String[] strings3 = { "aaa", "bbbbbbbb", "aaa", "bbbbbbbb", "bbbbbbbb", "aaa" }; + for (String st : strings3) + { + Pair<String, Long> pair = new Pair<String, Long>(st, new Long(st.length())); + pairs.add(pair); + } + + buckets = strategy.getBuckets(pairs, 10); // notice the min is 10 + assertEquals(1, buckets.size()); + } +} Modified: cassandra/trunk/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java?rev=1134460&r1=1134459&r2=1134460&view=diff ============================================================================== --- cassandra/trunk/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java (original) +++ cassandra/trunk/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java Fri Jun 10 22:13:54 2011 @@ -28,7 +28,6 @@ import java.util.*; import org.apache.cassandra.CleanupHelper; import org.apache.cassandra.Util; import org.apache.cassandra.db.*; -import org.apache.cassandra.db.compaction.CompactionManager; import org.apache.cassandra.db.columniterator.IdentityQueryFilter; import org.apache.cassandra.db.filter.IFilter; import org.apache.cassandra.db.filter.QueryFilter;
