Author: jbellis
Date: Fri Nov 18 16:37:39 2011
New Revision: 1203729
URL: http://svn.apache.org/viewvc?rev=1203729&view=rev
Log:
update size-tiered compaction to prioritize small tiers
patch by jbellis; reviewed by slebresne for CASSANDRA-2407
Modified:
cassandra/trunk/CHANGES.txt
cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
cassandra/trunk/src/java/org/apache/cassandra/db/DataTracker.java
cassandra/trunk/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
cassandra/trunk/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
cassandra/trunk/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
cassandra/trunk/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java
Modified: cassandra/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/cassandra/trunk/CHANGES.txt?rev=1203729&r1=1203728&r2=1203729&view=diff
==============================================================================
--- cassandra/trunk/CHANGES.txt (original)
+++ cassandra/trunk/CHANGES.txt Fri Nov 18 16:37:39 2011
@@ -1,4 +1,5 @@
1.1-dev
+ * update size-tiered compaction to prioritize small tiers (CASSANDRA-2407)
* add message expiration logic to OutboundTcpConnection (CASSANDRA-3005)
* off-heap cache to use sun.misc.Unsafe instead of JNA (CASSANDRA-3271)
* EACH_QUORUM is only supported for writes (CASSANDRA-3272)
Modified:
cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=1203729&r1=1203728&r2=1203729&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java Fri
Nov 18 16:37:39 2011
@@ -1036,6 +1036,11 @@ public class ColumnFamilyStore implement
return data.getSSTables();
}
+ public Set<SSTableReader> getUncompactingSSTables()
+ {
+ return data.getUncompactingSSTables();
+ }
+
public long[] getRecentSSTablesPerReadHistogram()
{
return recentSSTablesPerRead.getBuckets(true);
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/DataTracker.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/DataTracker.java?rev=1203729&r1=1203728&r2=1203729&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/DataTracker.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/DataTracker.java Fri Nov
18 16:37:39 2011
@@ -80,6 +80,11 @@ public class DataTracker
return view.get().sstables;
}
+ public Set<SSTableReader> getUncompactingSSTables()
+ {
+ return view.get().nonCompactingSStables();
+ }
+
public View getView()
{
return view.get();
@@ -276,7 +281,7 @@ public class DataTracker
do
{
currentView = view.get();
- notCompacting =
Sets.difference(ImmutableSet.copyOf(currentView.sstables),
currentView.compacting);
+ notCompacting = currentView.nonCompactingSStables();
newView = currentView.replace(notCompacting,
Collections.<SSTableReader>emptySet());
}
while (!view.compareAndSet(currentView, newView));
@@ -576,6 +581,11 @@ public class DataTracker
this.intervalTree = intervalTree;
}
+ public Sets.SetView<SSTableReader> nonCompactingSStables()
+ {
+ return Sets.difference(ImmutableSet.copyOf(sstables), compacting);
+ }
+
private IntervalTree buildIntervalTree(List<SSTableReader> sstables)
{
List<Interval> intervals = new
ArrayList<Interval>(sstables.size());
Modified:
cassandra/trunk/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java?rev=1203729&r1=1203728&r2=1203729&view=diff
==============================================================================
---
cassandra/trunk/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
(original)
+++
cassandra/trunk/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
Fri Nov 18 16:37:39 2011
@@ -75,18 +75,17 @@ public abstract class AbstractCompaction
public void shutdown() { }
/**
- * @return a list of compaction tasks that should run in the background to
get the sstable
- * count down to desired parameters. Will not be null, but may be empty.
+ * @return the next background/minor compaction task to run; null if
nothing to do.
* @param gcBefore throw away tombstones older than this
*/
- public abstract List<AbstractCompactionTask> getBackgroundTasks(final int
gcBefore);
+ public abstract AbstractCompactionTask getNextBackgroundTask(final int
gcBefore);
/**
- * @return a list of compaction tasks that should be run to compact this
columnfamilystore
- * as much as possible. Will not be null, but may be empty.
+ * @return a compaction task that should be run to compact this
columnfamilystore
+ * as much as possible. Null if nothing to do.
* @param gcBefore throw away tombstones older than this
*/
- public abstract List<AbstractCompactionTask> getMaximalTasks(final int
gcBefore);
+ public abstract AbstractCompactionTask getMaximalTask(final int gcBefore);
/**
* @return a compaction task corresponding to the requested sstables.
Modified:
cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionManager.java?rev=1203729&r1=1203728&r2=1203729&view=diff
==============================================================================
---
cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
(original)
+++
cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
Fri Nov 18 16:37:39 2011
@@ -107,46 +107,37 @@ public class CompactionManager implement
* It's okay to over-call (within reason) since the compactions are
single-threaded,
* and if a call is unnecessary, it will just be no-oped in the bucketing
phase.
*/
- public Future<Integer> submitBackground(final ColumnFamilyStore cfs)
+ public Future<?> submitBackground(final ColumnFamilyStore cfs)
{
- Callable<Integer> callable = new Callable<Integer>()
+ Runnable runnable = new WrappedRunnable()
{
- public Integer call() throws IOException
+ protected void runMayThrow() throws IOException
{
compactionLock.readLock().lock();
try
{
- boolean taskExecuted = false;
AbstractCompactionStrategy strategy =
cfs.getCompactionStrategy();
- List<AbstractCompactionTask> tasks =
strategy.getBackgroundTasks(getDefaultGcBefore(cfs));
- for (AbstractCompactionTask task : tasks)
- {
- if (!task.markSSTablesForCompaction())
- continue;
+ AbstractCompactionTask task =
strategy.getNextBackgroundTask(getDefaultGcBefore(cfs));
+ if (task == null || !task.markSSTablesForCompaction())
+ return;
- taskExecuted = true;
- try
- {
- task.execute(executor);
- }
- finally
- {
- task.unmarkSSTables();
- }
+ try
+ {
+ task.execute(executor);
}
-
- // newly created sstables might have made other
compactions eligible
- if (taskExecuted)
- submitBackground(cfs);
+ finally
+ {
+ task.unmarkSSTables();
+ }
+ submitBackground(cfs);
}
finally
{
compactionLock.readLock().unlock();
}
- return 0;
}
};
- return executor.submit(callable);
+ return executor.submit(runnable);
}
private static interface AllSSTablesOperation
@@ -242,40 +233,39 @@ public class CompactionManager implement
submitMaximal(cfStore, getDefaultGcBefore(cfStore)).get();
}
- public Future<Object> submitMaximal(final ColumnFamilyStore cfStore, final
int gcBefore)
+ public Future<?> submitMaximal(final ColumnFamilyStore cfStore, final int
gcBefore)
{
- Callable<Object> callable = new Callable<Object>()
+ Runnable runnable = new WrappedRunnable()
{
- public Object call() throws IOException
+ protected void runMayThrow() throws IOException
{
// acquire the write lock long enough to schedule all sstables
compactionLock.writeLock().lock();
try
{
- AbstractCompactionStrategy strategy =
cfStore.getCompactionStrategy();
- for (AbstractCompactionTask task :
strategy.getMaximalTasks(gcBefore))
+ AbstractCompactionTask task =
cfStore.getCompactionStrategy().getMaximalTask(gcBefore);
+ if (task == null)
+ return;
+ if (!task.markSSTablesForCompaction(0, Integer.MAX_VALUE))
+ return;
+ try
{
- if (!task.markSSTablesForCompaction(0,
Integer.MAX_VALUE))
- return this;
+ // downgrade the lock acquisition
+ compactionLock.readLock().lock();
+ compactionLock.writeLock().unlock();
try
{
- // downgrade the lock acquisition
- compactionLock.readLock().lock();
- compactionLock.writeLock().unlock();
- try
- {
- return task.execute(executor);
- }
- finally
- {
- compactionLock.readLock().unlock();
- }
+ task.execute(executor);
}
finally
{
- task.unmarkSSTables();
+ compactionLock.readLock().unlock();
}
}
+ finally
+ {
+ task.unmarkSSTables();
+ }
}
finally
{
@@ -283,10 +273,9 @@ public class CompactionManager implement
if (compactionLock.writeLock().isHeldByCurrentThread())
compactionLock.writeLock().unlock();
}
- return this;
}
};
- return executor.submit(callable);
+ return executor.submit(runnable);
}
public void forceUserDefinedCompaction(String ksname, String dataFiles)
@@ -322,11 +311,11 @@ public class CompactionManager implement
submitUserDefined(cfs, descriptors, getDefaultGcBefore(cfs));
}
- public Future<Object> submitUserDefined(final ColumnFamilyStore cfs, final
Collection<Descriptor> dataFiles, final int gcBefore)
+ public Future<?> submitUserDefined(final ColumnFamilyStore cfs, final
Collection<Descriptor> dataFiles, final int gcBefore)
{
- Callable<Object> callable = new Callable<Object>()
+ Runnable runnable = new WrappedRunnable()
{
- public Object call() throws IOException
+ protected void runMayThrow() throws IOException
{
compactionLock.readLock().lock();
try
@@ -379,8 +368,6 @@ public class CompactionManager implement
{
SSTableReader.releaseReferences(sstables);
}
-
- return this;
}
finally
{
@@ -388,7 +375,7 @@ public class CompactionManager implement
}
}
};
- return executor.submit(callable);
+ return executor.submit(runnable);
}
// This acquire a reference on the sstable
Modified:
cassandra/trunk/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java?rev=1203729&r1=1203728&r2=1203729&view=diff
==============================================================================
---
cassandra/trunk/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
(original)
+++
cassandra/trunk/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
Fri Nov 18 16:37:39 2011
@@ -22,7 +22,6 @@ package org.apache.cassandra.db.compacti
import java.util.*;
-import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import com.google.common.collect.ImmutableSet;
@@ -37,7 +36,6 @@ import org.apache.cassandra.notification
import org.apache.cassandra.notifications.INotificationConsumer;
import org.apache.cassandra.notifications.SSTableAddedNotification;
import org.apache.cassandra.notifications.SSTableListChangedNotification;
-import org.apache.cassandra.service.StorageService;
public class LeveledCompactionStrategy extends AbstractCompactionStrategy
implements INotificationConsumer
{
@@ -91,31 +89,31 @@ public class LeveledCompactionStrategy e
return manifest.getLevelSize(i);
}
- public List<AbstractCompactionTask> getBackgroundTasks(int gcBefore)
+ public AbstractCompactionTask getNextBackgroundTask(int gcBefore)
{
LeveledCompactionTask currentTask = task.get();
if (currentTask != null && !currentTask.isDone())
{
logger.debug("Compaction still in progress for {}", this);
- return Collections.emptyList();
+ return null;
}
Collection<SSTableReader> sstables =
manifest.getCompactionCandidates();
if (sstables.isEmpty())
{
logger.debug("No compaction necessary for {}", this);
- return Collections.emptyList();
+ return null;
}
LeveledCompactionTask newTask = new LeveledCompactionTask(cfs,
sstables, gcBefore, this.maxSSTableSizeInMB);
return task.compareAndSet(currentTask, newTask)
- ? Collections.<AbstractCompactionTask>singletonList(newTask)
- : Collections.<AbstractCompactionTask>emptyList();
+ ? newTask
+ : null;
}
- public List<AbstractCompactionTask> getMaximalTasks(int gcBefore)
+ public AbstractCompactionTask getMaximalTask(int gcBefore)
{
- return getBackgroundTasks(gcBefore);
+ return getNextBackgroundTask(gcBefore);
}
public AbstractCompactionTask getUserDefinedTask(Collection<SSTableReader>
sstables, int gcBefore)
Modified:
cassandra/trunk/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java?rev=1203729&r1=1203728&r2=1203729&view=diff
==============================================================================
---
cassandra/trunk/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
(original)
+++
cassandra/trunk/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
Fri Nov 18 16:37:39 2011
@@ -46,17 +46,18 @@ public class SizeTieredCompactionStrateg
minSSTableSize = (null != optionValue) ? Long.parseLong(optionValue) :
DEFAULT_MIN_SSTABLE_SIZE;
}
- public List<AbstractCompactionTask> getBackgroundTasks(final int gcBefore)
+ public AbstractCompactionTask getNextBackgroundTask(final int gcBefore)
{
if (cfs.isCompactionDisabled())
{
logger.debug("Compaction is currently disabled.");
- return Collections.<AbstractCompactionTask>emptyList();
+ return null;
}
- List<AbstractCompactionTask> tasks = new
LinkedList<AbstractCompactionTask>();
- List<List<SSTableReader>> buckets =
getBuckets(createSSTableAndLengthPairs(cfs.getSSTables()), minSSTableSize);
+ List<List<SSTableReader>> buckets =
getBuckets(createSSTableAndLengthPairs(cfs.getUncompactingSSTables()),
minSSTableSize);
+ updateEstimatedCompactionsByTasks(buckets);
+ List<List<SSTableReader>> prunedBuckets = new
ArrayList<List<SSTableReader>>();
for (List<SSTableReader> bucket : buckets)
{
if (bucket.size() < cfs.getMinimumCompactionThreshold())
@@ -69,19 +70,38 @@ public class SizeTieredCompactionStrateg
return o1.descriptor.generation - o2.descriptor.generation;
}
});
- tasks.add(new CompactionTask(cfs, bucket.subList(0,
Math.min(bucket.size(), cfs.getMaximumCompactionThreshold())), gcBefore));
+ prunedBuckets.add(bucket.subList(0, Math.min(bucket.size(),
cfs.getMaximumCompactionThreshold())));
}
- updateEstimatedCompactionsByTasks(tasks);
- return tasks;
+ if (prunedBuckets.isEmpty())
+ return null;
+
+ List<SSTableReader> smallestBucket = Collections.min(prunedBuckets,
new Comparator<List<SSTableReader>>()
+ {
+ public int compare(List<SSTableReader> o1, List<SSTableReader> o2)
+ {
+ long n = avgSize(o1) - avgSize(o2);
+ if (n < 0)
+ return -1;
+ if (n > 0)
+ return 1;
+ return 0;
+ }
+
+ private long avgSize(List<SSTableReader> sstables)
+ {
+ long n = 0;
+ for (SSTableReader sstable : sstables)
+ n += sstable.bytesOnDisk();
+ return n / sstables.size();
+ }
+ });
+ return new CompactionTask(cfs, smallestBucket, gcBefore);
}
- public List<AbstractCompactionTask> getMaximalTasks(final int gcBefore)
+ public AbstractCompactionTask getMaximalTask(final int gcBefore)
{
- List<AbstractCompactionTask> tasks = new
LinkedList<AbstractCompactionTask>();
- if (!cfs.getSSTables().isEmpty())
- tasks.add(new CompactionTask(cfs, cfs.getSSTables(), gcBefore));
- return tasks;
+ return cfs.getSSTables().isEmpty() ? null : new CompactionTask(cfs,
cfs.getSSTables(), gcBefore);
}
public AbstractCompactionTask getUserDefinedTask(Collection<SSTableReader>
sstables, final int gcBefore)
@@ -158,17 +178,13 @@ public class SizeTieredCompactionStrateg
return new LinkedList<List<T>>(buckets.keySet());
}
- private void
updateEstimatedCompactionsByTasks(List<AbstractCompactionTask> tasks)
+ private void updateEstimatedCompactionsByTasks(List<List<SSTableReader>>
tasks)
{
int n = 0;
- for (AbstractCompactionTask task: tasks)
+ for (List<SSTableReader> bucket: tasks)
{
- if (!(task instanceof CompactionTask))
- continue;
-
- Collection<SSTableReader> sstablesToBeCompacted =
task.getSSTables();
- if (sstablesToBeCompacted.size() >=
cfs.getMinimumCompactionThreshold())
- n += Math.ceil((double)sstablesToBeCompacted.size() /
cfs.getMaximumCompactionThreshold());
+ if (bucket.size() >= cfs.getMinimumCompactionThreshold())
+ n += Math.ceil((double)bucket.size() /
cfs.getMaximumCompactionThreshold());
}
estimatedRemainingTasks = n;
}
Modified:
cassandra/trunk/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java?rev=1203729&r1=1203728&r2=1203729&view=diff
==============================================================================
---
cassandra/trunk/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java
(original)
+++
cassandra/trunk/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java
Fri Nov 18 16:37:39 2011
@@ -39,6 +39,7 @@ import org.apache.cassandra.db.filter.Qu
import org.apache.cassandra.db.filter.QueryPath;
import org.apache.cassandra.io.sstable.*;
import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.FBUtilities;
public class CompactionsTest extends CleanupHelper
{
@@ -140,20 +141,16 @@ public class CompactionsTest extends Cle
store.setMaximumCompactionThreshold(4);
// loop submitting parallel compactions until they all return 0
- while (true)
+ do
{
- ArrayList<Future<Integer>> compactions = new
ArrayList<Future<Integer>>();
+ ArrayList<Future<?>> compactions = new ArrayList<Future<?>>();
for (int i = 0; i < 10; i++)
compactions.add(CompactionManager.instance.submitBackground(store));
// another compaction attempt will be launched in the background by
// each completing compaction: not much we can do to control them
here
- boolean progress = false;
- for (Future<Integer> compaction : compactions)
- if (compaction.get() > 0)
- progress = true;
- if (!progress)
- break;
- }
+ FBUtilities.waitOnFutures(compactions);
+ } while (CompactionManager.instance.getPendingTasks() > 0 ||
CompactionManager.instance.getActiveCompactions() > 0);
+
if (store.getSSTables().size() > 1)
{
CompactionManager.instance.performMaximal(store);