merge from 1.0

Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/64a5e70e
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/64a5e70e
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/64a5e70e

Branch: refs/heads/trunk
Commit: 64a5e70ef5aacdfe244baa91b5698a326173082f
Parents: 1bfb685 853a759
Author: Jonathan Ellis <[email protected]>
Authored: Fri May 25 16:23:52 2012 -0500
Committer: Jonathan Ellis <[email protected]>
Committed: Fri May 25 16:23:52 2012 -0500

----------------------------------------------------------------------
 .../db/compaction/AbstractCompactionTask.java      |    5 ++
 .../cassandra/db/compaction/CompactionManager.java |   21 +++++++-
 .../cassandra/db/compaction/LeveledManifest.java   |   10 +++-
 .../compaction/SizeTieredCompactionStrategy.java   |   40 +++++++--------
 4 files changed, 51 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/64a5e70e/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index f8bab20,872ce0b..e09a012
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@@ -111,31 -109,54 +111,50 @@@ public class CompactionManager implemen
       * It's okay to over-call (within reason) since the compactions are 
single-threaded,
       * and if a call is unnecessary, it will just be no-oped in the bucketing 
phase.
       */
 -    public Future<Integer> submitBackground(final ColumnFamilyStore cfs)
 +    public Future<?> submitBackground(final ColumnFamilyStore cfs)
      {
+         logger.debug("Scheduling a background task check for {}.{} with {}",
+                      new Object[] {cfs.table.name,
+                                    cfs.columnFamily,
+                                    
cfs.getCompactionStrategy().getClass().getSimpleName()});
 -        Callable<Integer> callable = new Callable<Integer>()
 +        Runnable runnable = new WrappedRunnable()
          {
 -            public Integer call() throws IOException
 +            protected void runMayThrow() throws IOException
              {
                  compactionLock.readLock().lock();
                  try
                  {
+                     logger.debug("Checking {}.{}", cfs.table.name, 
cfs.columnFamily); // log after we get the lock so we can see delays from that 
if any
+                     if (!cfs.isValid())
+                     {
+                         logger.debug("Aborting compaction for dropped CF");
 -                        return 0;
++                        return;
+                     }
+ 
 -                    boolean taskExecuted = false;
                      AbstractCompactionStrategy strategy = 
cfs.getCompactionStrategy();
 -                    List<AbstractCompactionTask> tasks = 
strategy.getBackgroundTasks(getDefaultGcBefore(cfs));
 -                    logger.debug("{} minor compaction tasks available", 
tasks.size());
 -                    for (AbstractCompactionTask task : tasks)
 +                    AbstractCompactionTask task = 
strategy.getNextBackgroundTask(getDefaultGcBefore(cfs));
-                     if (task == null || !task.markSSTablesForCompaction())
++                    if (task == null)
+                     {
 -                        if (!task.markSSTablesForCompaction())
 -                        {
 -                            logger.debug("Skipping {}; sstables are busy", 
task);
 -                            continue;
 -                        }
 -
 -                        taskExecuted = true;
 -                        try
 -                        {
 -                            task.execute(executor);
 -                        }
 -                        finally
 -                        {
 -                            task.unmarkSSTables();
 -                        }
++                        logger.debug("No tasks available");
 +                        return;
++                    }
++                    if (!task.markSSTablesForCompaction())
++                    {
++                        logger.debug("Unable to mark SSTables for {}", task);
++                        return;
+                     }
  
 -                    // newly created sstables might have made other 
compactions eligible
 -                    if (taskExecuted)
 -                        submitBackground(cfs);
 +                    try
 +                    {
 +                        task.execute(executor);
 +                    }
 +                    finally
 +                    {
 +                        task.unmarkSSTables();
 +                    }
 +                    submitBackground(cfs);
                  }
 -                finally 
 +                finally
                  {
                      compactionLock.readLock().unlock();
                  }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/64a5e70e/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
index 28700da,7e06848..b2ba857
--- a/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
+++ b/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
@@@ -468,10 -453,12 +470,12 @@@ public class LeveledManifes
          for (int i = generations.length - 1; i >= 0; i--)
          {
              List<SSTableReader> sstables = generations[i];
-             long n = Math.max(0L, SSTableReader.getTotalBytes(sstables) - 
maxBytesForLevel(i)) / (maxSSTableSizeInMB * 1024L * 1024);
-             logger.debug("Estimating " + n + " compaction tasks in level " + 
i);
-             tasks += n;
 -            estimated[i] = Math.max(0L, SSTableReader.getTotalBytes(sstables) 
- maxBytesForLevel(i)) / (maxSSTableSizeInMB * 1024 * 1024);
++            estimated[i] = Math.max(0L, SSTableReader.getTotalBytes(sstables) 
- maxBytesForLevel(i)) / (maxSSTableSizeInMB * 1024L * 1024);
+             tasks += estimated[i];
          }
+ 
+         logger.debug("Estimating {} compactions to do for {}.{}",
+                      new Object[] {Arrays.asList(estimated), cfs.table.name, 
cfs.columnFamily});
          return Ints.checkedCast(tasks);
      }
  }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/64a5e70e/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
----------------------------------------------------------------------
diff --cc 
src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
index 0281f08,636d6ba..63ec442
--- 
a/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
+++ 
b/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
@@@ -52,14 -53,13 +52,15 @@@ public class SizeTieredCompactionStrate
          if (cfs.isCompactionDisabled())
          {
              logger.debug("Compaction is currently disabled.");
 -            return Collections.emptyList();
 +            return null;
          }
  
 -        List<AbstractCompactionTask> tasks = new 
ArrayList<AbstractCompactionTask>();
 -        List<List<SSTableReader>> buckets = 
getBuckets(createSSTableAndLengthPairs(cfs.getSSTables()), minSSTableSize);
 +        Set<SSTableReader> candidates = cfs.getUncompactingSSTables();
 +        List<List<SSTableReader>> buckets = 
getBuckets(createSSTableAndLengthPairs(filterSuspectSSTables(candidates)), 
minSSTableSize);
+         logger.debug("Compaction buckets are {}", buckets);
 +        updateEstimatedCompactionsByTasks(buckets);
  
 +        List<List<SSTableReader>> prunedBuckets = new 
ArrayList<List<SSTableReader>>();
          for (List<SSTableReader> bucket : buckets)
          {
              if (bucket.size() < cfs.getMinimumCompactionThreshold())
@@@ -149,43 -132,43 +151,39 @@@
              // look for a bucket containing similar-sized files:
              // group in the same bucket if it's w/in 50% of the average for 
this bucket,
              // or this file and the bucket are all considered "small" (less 
than `minSSTableSize`)
-             for (Entry<List<T>, Long> entry : buckets.entrySet())
+             for (Entry<Long, List<T>> entry : buckets.entrySet())
              {
-                 List<T> bucket = entry.getKey();
-                 long averageSize = entry.getValue();
-                 if ((size > (averageSize / 2) && size < (3 * averageSize) / 2)
-                     || (size < minSSTableSize && averageSize < 
minSSTableSize))
+                 List<T> bucket = entry.getValue();
+                 long oldAverageSize = entry.getKey();
+                 if ((size > (oldAverageSize / 2) && size < (3 * 
oldAverageSize) / 2)
+                     || (size < minSSTableSize && oldAverageSize < 
minSSTableSize))
                  {
-                     // remove and re-add because adding changes the hash
-                     buckets.remove(bucket);
-                     long totalSize = bucket.size() * averageSize;
-                     averageSize = (totalSize + size) / (bucket.size() + 1);
+                     // remove and re-add under new new average size
+                     buckets.remove(oldAverageSize);
+                     long totalSize = bucket.size() * oldAverageSize;
+                     long newAverageSize = (totalSize + size) / (bucket.size() 
+ 1);
                      bucket.add(pair.left);
-                     buckets.put(bucket, averageSize);
-                     bFound = true;
-                     break;
+                     buckets.put(newAverageSize, bucket);
+                     continue outer;
                  }
              }
+ 
              // no similar bucket found; put it in a new one
-             if (!bFound)
-             {
-                 ArrayList<T> bucket = new ArrayList<T>();
-                 bucket.add(pair.left);
-                 buckets.put(bucket, size);
-             }
+             ArrayList<T> bucket = new ArrayList<T>();
+             bucket.add(pair.left);
+             buckets.put(size, bucket);
          }
  
-         return new LinkedList<List<T>>(buckets.keySet());
+         return new ArrayList<List<T>>(buckets.values());
      }
  
 -    private void 
updateEstimatedCompactionsByTasks(List<AbstractCompactionTask> tasks)
 +    private void updateEstimatedCompactionsByTasks(List<List<SSTableReader>> 
tasks)
      {
          int n = 0;
 -        for (AbstractCompactionTask task: tasks)
 +        for (List<SSTableReader> bucket: tasks)
          {
 -            if (!(task instanceof CompactionTask))
 -                continue;
 -
 -            Collection<SSTableReader> sstablesToBeCompacted = 
task.getSSTables();
 -            if (sstablesToBeCompacted.size() >= 
cfs.getMinimumCompactionThreshold())
 -                n += Math.ceil((double)sstablesToBeCompacted.size() / 
cfs.getMaximumCompactionThreshold());
 +            if (bucket.size() >= cfs.getMinimumCompactionThreshold())
 +                n += Math.ceil((double)bucket.size() / 
cfs.getMaximumCompactionThreshold());
          }
          estimatedRemainingTasks = n;
      }

Reply via email to