Author: jbellis
Date: Wed Dec 23 19:20:16 2009
New Revision: 893605

URL: http://svn.apache.org/viewvc?rev=893605&view=rev
Log:
make estimation of pendingtasks for CompactionManager sane.  patch by jbellis; 
reviewed by eevans for CASSANDRA-599

Modified:
    
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
    
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java
    
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManagerMBean.java
    
incubator/cassandra/trunk/src/java/org/apache/cassandra/io/CompactionIterator.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTable.java
    
incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTableScanner.java
    
incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/CompactionsTest.java

Modified: 
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
URL: 
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=893605&r1=893604&r2=893605&view=diff
==============================================================================
--- 
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
 (original)
+++ 
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
 Wed Dec 23 19:20:16 2009
@@ -559,7 +559,7 @@
     public void addSSTable(SSTableReader sstable)
     {
         ssTables_.add(sstable);
-        CompactionManager.instance.submitMinor(this);
+        CompactionManager.instance.submitMinorIfNeeded(this);
     }
 
     /*

Modified: 
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java
URL: 
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java?rev=893605&r1=893604&r2=893605&view=diff
==============================================================================
--- 
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java
 (original)
+++ 
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java
 Wed Dec 23 19:20:16 2009
@@ -23,13 +23,12 @@
 import java.lang.management.ManagementFactory;
 import java.util.*;
 import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 import javax.management.*;
 
 import org.apache.log4j.Logger;
 
-import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor;
+import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.io.*;
 import org.apache.cassandra.config.DatabaseDescriptor;
@@ -37,6 +36,7 @@
 import org.apache.cassandra.service.AntiEntropyService;
 import org.apache.cassandra.utils.FileUtils;
 import org.apache.cassandra.utils.FBUtilities;
+import org.cliffc.high_scale_lib.NonBlockingHashMap;
 
 import java.net.InetAddress;
 
@@ -68,44 +68,56 @@
         }
     }
 
-    private ExecutorService compactor_ = new 
JMXEnabledThreadPoolExecutor("COMPACTION-POOL");
+    private CompactionExecutor executor = new CompactionExecutor();
+    private Map<ColumnFamilyStore, Integer> estimatedCompactions = new 
NonBlockingHashMap<ColumnFamilyStore, Integer>();
 
     /**
      * Call this whenever a compaction might be needed on the given 
columnfamily.
      * It's okay to over-call (within reason) since the compactions are 
single-threaded,
      * and if a call is unnecessary, it will just be no-oped in the bucketing 
phase.
      */
-    public Future<Integer> submitMinor(final ColumnFamilyStore cfs)
+    public Future<Integer> submitMinorIfNeeded(final ColumnFamilyStore cfs)
     {
         Callable<Integer> callable = new Callable<Integer>()
         {
             public Integer call() throws IOException
             {
-                int filesCompacted = 0;
-                if (minimumCompactionThreshold > 0 && 
maximumCompactionThreshold > 0)
+                if (minimumCompactionThreshold <= 0 || 
maximumCompactionThreshold <= 0)
                 {
-                    logger.debug("Checking to see if compaction of " + 
cfs.columnFamily_ + " would be useful");
-                    for (List<SSTableReader> sstables : 
getCompactionBuckets(cfs.getSSTables(), 50L * 1024L * 1024L))
+                    logger.debug("Compaction is currently disabled.");
+                    return 0;
+                }
+                logger.debug("Checking to see if compaction of " + 
cfs.columnFamily_ + " would be useful");
+                Set<List<SSTableReader>> buckets = 
getCompactionBuckets(cfs.getSSTables(), 50L * 1024L * 1024L);
+                updateEstimateFor(cfs, buckets);
+                
+                for (List<SSTableReader> sstables : buckets)
+                {
+                    if (sstables.size() >= minimumCompactionThreshold)
                     {
-                        if (sstables.size() < minimumCompactionThreshold)
-                        {
-                            continue;
-                        }
                         // if we have too many to compact all at once, compact 
older ones first -- this avoids
                         // re-compacting files we just created.
                         Collections.sort(sstables);
-                        filesCompacted += doCompaction(cfs, 
sstables.subList(0, Math.min(sstables.size(), maximumCompactionThreshold)), 
getDefaultGCBefore());
+                        return doCompaction(cfs, sstables.subList(0, 
Math.min(sstables.size(), maximumCompactionThreshold)), getDefaultGCBefore());
                     }
-                    logger.debug(filesCompacted + " files compacted");
                 }
-                else
-                {
-                    logger.debug("Compaction is currently disabled.");
-                }
-                return filesCompacted;
+                return 0;
             }
         };
-        return compactor_.submit(callable);
+        return executor.submit(callable);
+    }
+
+    private void updateEstimateFor(ColumnFamilyStore cfs, 
Set<List<SSTableReader>> buckets)
+    {
+        int n = 0;
+        for (List<SSTableReader> sstables : buckets)
+        {
+            if (sstables.size() >= minimumCompactionThreshold)
+            {
+                n += 1 + sstables.size() / (maximumCompactionThreshold - 
minimumCompactionThreshold);
+            }
+        }
+        estimatedCompactions.put(cfs, n);
     }
 
     public Future<Object> submitCleanup(final ColumnFamilyStore cfStore)
@@ -118,7 +130,7 @@
                 return this;
             }
         };
-        return compactor_.submit(runnable);
+        return executor.submit(runnable);
     }
 
     public Future<List<SSTableReader>> submitAnticompaction(final 
ColumnFamilyStore cfStore, final Collection<Range> ranges, final InetAddress 
target)
@@ -130,7 +142,7 @@
                 return doAntiCompaction(cfStore, cfStore.getSSTables(), 
ranges, target);
             }
         };
-        return compactor_.submit(callable);
+        return executor.submit(callable);
     }
 
     public Future submitMajor(final ColumnFamilyStore cfStore)
@@ -165,7 +177,7 @@
                 return this;
             }
         };
-        return compactor_.submit(callable);
+        return executor.submit(callable);
     }
 
     public Future submitReadonly(final ColumnFamilyStore cfStore, final 
InetAddress initiator)
@@ -178,7 +190,7 @@
                 return this;
             }
         };
-        return compactor_.submit(callable);
+        return executor.submit(callable);
     }
 
     /**
@@ -258,6 +270,7 @@
         SSTableWriter writer;
         CompactionIterator ci = new CompactionIterator(sstables, gcBefore, 
major); // retain a handle so we can call close()
         Iterator<CompactionIterator.CompactedRow> nni = new FilterIterator(ci, 
PredicateUtils.notNullPredicate());
+        executor.beginCompaction(cfs, ci);
 
         try
         {
@@ -293,7 +306,7 @@
         SSTableReader ssTable = 
writer.closeAndOpenReader(DatabaseDescriptor.getKeysCachedFraction(table.name));
         cfs.replaceCompactedSSTables(sstables, Arrays.asList(ssTable));
         gcAfterRpcTimeout();
-        instance.submitMinor(cfs);
+        submitMinorIfNeeded(cfs);
 
         String format = "Compacted to %s.  %d/%d bytes for %d keys.  Time: 
%dms.";
         long dTime = System.currentTimeMillis() - startTime;
@@ -341,6 +354,7 @@
         SSTableWriter writer = null;
         CompactionIterator ci = new AntiCompactionIterator(sstables, ranges, 
getDefaultGCBefore(), cfs.isCompleteSSTables(sstables));
         Iterator<CompactionIterator.CompactedRow> nni = new FilterIterator(ci, 
PredicateUtils.notNullPredicate());
+        executor.beginCompaction(cfs, ci);
 
         try
         {
@@ -403,6 +417,7 @@
     {
         Collection<SSTableReader> sstables = cfs.getSSTables();
         CompactionIterator ci = new CompactionIterator(sstables, 
getDefaultGCBefore(), true);
+        executor.beginCompaction(cfs, ci);
         try
         {
             Iterator<CompactionIterator.CompactedRow> nni = new 
FilterIterator(ci, PredicateUtils.notNullPredicate());
@@ -528,21 +543,99 @@
         }
     }
 
-    public void checkAllColumnFamilies()
+    public void checkAllColumnFamilies() throws IOException
     {
+        // perform estimates
         for (String tableName : DatabaseDescriptor.getTables())
         {
-            try
+            for (final ColumnFamilyStore cfs : 
Table.open(tableName).getColumnFamilyStores().values())
             {
-                for (ColumnFamilyStore cfs : 
Table.open(tableName).getColumnFamilyStores().values())
+                Runnable runnable = new Runnable()
                 {
-                    submitMinor(cfs);
-                }
+                    public void run ()
+                    {
+                        logger.debug("Estimating compactions for " + 
cfs.columnFamily_);
+                        final Set<List<SSTableReader>> buckets = 
getCompactionBuckets(cfs.getSSTables(), 50L * 1024L * 1024L);
+                        updateEstimateFor(cfs, buckets);
+                    }
+                };
+                executor.submit(runnable);
             }
-            catch (IOException e)
+        }
+        
+        // actually schedule compactions.  done in a second pass so all the 
estimates occur before we
+        // bog down the executor in actual compactions.
+        for (String tableName : DatabaseDescriptor.getTables())
+        {
+            for (final ColumnFamilyStore cfs : 
Table.open(tableName).getColumnFamilyStores().values())
             {
-                throw new RuntimeException(e);
+                submitMinorIfNeeded(cfs);
             }
         }
     }
+
+    private class CompactionExecutor extends DebuggableThreadPoolExecutor
+    {
+        private volatile ColumnFamilyStore cfs;
+        private volatile CompactionIterator ci;
+
+        public CompactionExecutor()
+        {
+            super("COMPACTION-POOL");
+        }
+
+        @Override
+        public void afterExecute(Runnable r, Throwable t)
+        {
+            super.afterExecute(r, t);
+            cfs = null;
+            ci = null;
+        }
+
+        void beginCompaction(ColumnFamilyStore cfs, CompactionIterator ci)
+        {
+            this.cfs = cfs;
+            this.ci = ci;
+        }
+
+        public String getColumnFamilyName()
+        {
+            return cfs == null ? null : cfs.getColumnFamilyName();
+        }
+
+        public Long getBytesTotal()
+        {
+            return ci == null ? null : ci.getTotalBytes();
+        }
+
+        public Long getBytesCompleted()
+        {
+            return ci == null ? null : ci.getBytesRead();
+        }
+    }
+
+    public String getColumnFamilyInProgress()
+    {
+        return executor.getColumnFamilyName();
+    }
+
+    public Long getBytesTotalInProgress()
+    {
+        return executor.getBytesTotal();
+    }
+
+    public Long getBytesCompacted()
+    {
+        return executor.getBytesCompleted();
+    }
+
+    public int getPendingTasks()
+    {
+        int n = 0;
+        for (Integer i : estimatedCompactions.values())
+        {
+            n += i;
+        }
+        return n;
+    }
 }

Modified: 
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManagerMBean.java
URL: 
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManagerMBean.java?rev=893605&r1=893604&r2=893605&view=diff
==============================================================================
--- 
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManagerMBean.java
 (original)
+++ 
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManagerMBean.java
 Wed Dec 23 19:20:16 2009
@@ -39,4 +39,24 @@
      * Sets the maximum number of sstables in queue before compaction kicks off
      */
     public void setMaximumCompactionThreshold(int threshold);
+
+    /**
+     * @return the columnfamily currently being compacted; null if none
+     */
+    public String getColumnFamilyInProgress();
+
+    /**
+     * @return the total (data, not including index and filter) bytes being 
compacted; null if none
+     */
+    public Long getBytesTotalInProgress();
+
+    /**
+     * @return the progress on the current compaction; null if none
+     */
+    public Long getBytesCompacted();
+
+    /**
+     * @return estimated number of compactions remaining to perform
+     */
+    public int getPendingTasks();
 }

Modified: 
incubator/cassandra/trunk/src/java/org/apache/cassandra/io/CompactionIterator.java
URL: 
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/io/CompactionIterator.java?rev=893605&r1=893604&r2=893605&view=diff
==============================================================================
--- 
incubator/cassandra/trunk/src/java/org/apache/cassandra/io/CompactionIterator.java
 (original)
+++ 
incubator/cassandra/trunk/src/java/org/apache/cassandra/io/CompactionIterator.java
 Wed Dec 23 19:20:16 2009
@@ -45,10 +45,15 @@
 
     private final List<IteratingRow> rows = new ArrayList<IteratingRow>();
     private final int gcBefore;
-    private boolean major;
+    private final boolean major;
+
+    private long totalBytes;
+    private long bytesRead;
+    private long row;
 
     public CompactionIterator(Iterable<SSTableReader> sstables, int gcBefore, 
boolean major) throws IOException
     {
+<<<<<<< HEAD
         this(getCollatingIterator(sstables), gcBefore, major);
     }
 
@@ -56,6 +61,15 @@
     protected CompactionIterator(Iterator iter, int gcBefore, boolean major)
     {
         super(iter);
+=======
+        super(getCollatingIterator(sstables));
+        row = 0;
+        totalBytes = bytesRead = 0;
+        for (SSTableScanner iter : 
(List<SSTableScanner>)((CollatingIterator)source).getIterators())
+        {
+            totalBytes += iter.getFileLength();
+        }
+>>>>>>> make estimation of pendingtasks for CompactionManager sane
         this.gcBefore = gcBefore;
         this.major = major;
     }
@@ -135,6 +149,14 @@
         finally
         {
             rows.clear();
+            if ((row++ % 1000) == 0)
+            {
+                bytesRead = 0;
+                for (SSTableScanner iter : 
(List<SSTableScanner>)((CollatingIterator)source).getIterators())
+                {
+                    bytesRead += iter.getFilePointer();
+                }
+            }
         }
         return new CompactedRow(key, buffer);
     }
@@ -147,6 +169,16 @@
         }
     }
 
+    public long getTotalBytes()
+    {
+        return totalBytes;
+    }
+
+    public long getBytesRead()
+    {
+        return bytesRead;
+    }
+
     public static class CompactedRow
     {
         public final DecoratedKey key;

Modified: 
incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTable.java
URL: 
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTable.java?rev=893605&r1=893604&r2=893605&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTable.java 
(original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTable.java Wed 
Dec 23 19:20:16 2009
@@ -198,11 +198,6 @@
         }
     }
 
-    public long dataBytesOnDisk()
-    {
-        return new File(path).length();
-    }
-
     public long bytesOnDisk()
     {
         long bytes = 0;

Modified: 
incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTableScanner.java
URL: 
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTableScanner.java?rev=893605&r1=893604&r2=893605&view=diff
==============================================================================
--- 
incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTableScanner.java 
(original)
+++ 
incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTableScanner.java 
Wed Dec 23 19:20:16 2009
@@ -20,6 +20,7 @@
 
 import java.io.IOException;
 import java.io.Closeable;
+import java.io.IOError;
 import java.util.Iterator;
 import java.util.Arrays;
 
@@ -72,6 +73,23 @@
         }
     }
 
+    public long getFileLength()
+    {
+        try
+        {
+            return file.length();
+        }
+        catch (IOException e)
+        {
+            throw new IOError(e);
+        }
+    }
+
+    public long getFilePointer()
+    {
+        return file.getFilePointer();
+    }
+
     public boolean hasNext()
     {
         if (iterator == null)

Modified: 
incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/CompactionsTest.java
URL: 
http://svn.apache.org/viewvc/incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/CompactionsTest.java?rev=893605&r1=893604&r2=893605&view=diff
==============================================================================
--- 
incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/CompactionsTest.java
 (original)
+++ 
incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/CompactionsTest.java
 Wed Dec 23 19:20:16 2009
@@ -66,7 +66,7 @@
         }
         while (true)
         {
-            Future<Integer> ft = CompactionManager.instance.submitMinor(store);
+            Future<Integer> ft = 
CompactionManager.instance.submitMinorIfNeeded(store);
             if (ft.get() == 0)
                 break;
         }


Reply via email to