Author: jbellis
Date: Wed Dec 23 19:18:05 2009
New Revision: 893600

URL: http://svn.apache.org/viewvc?rev=893600&view=rev
Log:
clean up compactions api and move into CompactionManager.  patch by jbellis; 
reviewed by eevans for CASSANDRA-599

Modified:
    
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
    
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java
    
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTable.java
    
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java
    
incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
    
incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/CompactionsTest.java
    
incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/OneCompactionTest.java
    
incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/RecoveryManager2Test.java
    
incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/RemoveSuperColumnTest.java
    incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/TableTest.java

Modified: 
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
URL: 
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=893600&r1=893599&r2=893600&view=diff
==============================================================================
--- 
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
 (original)
+++ 
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
 Wed Dec 23 19:18:05 2009
@@ -35,7 +35,6 @@
 
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.dht.IPartitioner;
-import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.io.*;
 import java.net.InetAddress;
 import java.util.regex.Matcher;
@@ -49,7 +48,6 @@
 import org.apache.cassandra.db.filter.*;
 import org.apache.cassandra.db.marshal.AbstractType;
 
-import org.apache.commons.lang.StringUtils;
 import org.apache.commons.lang.ArrayUtils;
 import org.apache.commons.collections.IteratorUtils;
 import org.apache.commons.collections.PredicateUtils;
@@ -292,31 +290,6 @@
         sb.append(newLineSeparator);
         return sb.toString();
     }
-    
-    /*
-     * This method forces a compaction of the SSTables on disk. We wait
-     * for the process to complete by waiting on a future pointer.
-    */
-    List<SSTableReader> forceAntiCompaction(Collection<Range> ranges, 
InetAddress target)
-    {
-        assert ranges != null;
-        Future<List<SSTableReader>> futurePtr = 
CompactionManager.instance.submitAnti(ColumnFamilyStore.this,
-                                                                               
         ranges, target);
-
-        List<SSTableReader> result;
-        try
-        {
-            /* Waiting for the compaction to complete. */
-            result = futurePtr.get();
-            if (logger_.isDebugEnabled())
-              logger_.debug("Done forcing compaction ...");
-        }
-        catch (Exception ex)
-        {
-            throw new RuntimeException(ex);
-        }
-        return result;
-    }
 
     /**
      * @return the name of the column family
@@ -501,12 +474,7 @@
      */
     static ColumnFamily removeDeleted(ColumnFamily cf)
     {
-        return removeDeleted(cf, getDefaultGCBefore());
-    }
-
-    public static int getDefaultGCBefore()
-    {
-        return (int)(System.currentTimeMillis() / 1000) - 
DatabaseDescriptor.getGcGraceInSeconds();
+        return removeDeleted(cf, CompactionManager.getDefaultGCBefore());
     }
 
     public static ColumnFamily removeDeleted(ColumnFamily cf, int gcBefore)
@@ -597,109 +565,6 @@
     }
 
     /*
-     * Group files of similar size into buckets.
-     */
-    static Set<List<SSTableReader>> 
getCompactionBuckets(Iterable<SSTableReader> files, long min)
-    {
-        Map<List<SSTableReader>, Long> buckets = new 
HashMap<List<SSTableReader>, Long>();
-        for (SSTableReader sstable : files)
-        {
-            long size = sstable.length();
-
-            boolean bFound = false;
-            // look for a bucket containing similar-sized files:
-            // group in the same bucket if it's w/in 50% of the average for 
this bucket,
-            // or this file and the bucket are all considered "small" (less 
than `min`)
-            for (List<SSTableReader> bucket : buckets.keySet())
-            {
-                long averageSize = buckets.get(bucket);
-                if ((size > averageSize / 2 && size < 3 * averageSize / 2)
-                    || (size < min && averageSize < min))
-                {
-                    // remove and re-add because adding changes the hash
-                    buckets.remove(bucket);
-                    averageSize = (averageSize + size) / 2;
-                    bucket.add(sstable);
-                    buckets.put(bucket, averageSize);
-                    bFound = true;
-                    break;
-                }
-            }
-            // no similar bucket found; put it in a new one
-            if (!bFound)
-            {
-                ArrayList<SSTableReader> bucket = new 
ArrayList<SSTableReader>();
-                bucket.add(sstable);
-                buckets.put(bucket, size);
-            }
-        }
-
-        return buckets.keySet();
-    }
-
-    /*
-     * Break the files into buckets and then compact.
-     */
-    int doCompaction(int minThreshold, int maxThreshold) throws IOException
-    {
-        int filesCompacted = 0;
-        if (minThreshold > 0 && maxThreshold > 0)
-        {
-            logger_.debug("Checking to see if compaction of " + columnFamily_ 
+ " would be useful");
-            for (List<SSTableReader> sstables : 
getCompactionBuckets(ssTables_, 50L * 1024L * 1024L))
-            {
-                if (sstables.size() < minThreshold)
-                {
-                    continue;
-                }
-                // if we have too many to compact all at once, compact older 
ones first -- this avoids
-                // re-compacting files we just created.
-                Collections.sort(sstables);
-                filesCompacted += doFileCompaction(sstables.subList(0, 
Math.min(sstables.size(), maxThreshold)));
-            }
-            logger_.debug(filesCompacted + " files compacted");
-        }
-        else
-        {
-            logger_.debug("Compaction is currently disabled.");
-        }
-        return filesCompacted;
-    }
-
-    void doMajorCompaction(long skip) throws IOException
-    {
-        doMajorCompactionInternal(skip);
-    }
-
-    /*
-     * Compact all the files irrespective of the size.
-     * skip : is the amount in GB of the files to be skipped
-     * all files greater than skip GB are skipped for this compaction.
-     * Except if skip is 0 , in that case this is ignored and all files are 
taken.
-     */
-    void doMajorCompactionInternal(long skip) throws IOException
-    {
-        Collection<SSTableReader> sstables;
-        if (skip > 0)
-        {
-            sstables = new ArrayList<SSTableReader>();
-            for (SSTableReader sstable : ssTables_)
-            {
-                if (sstable.length() < skip * 1024L * 1024L * 1024L)
-                {
-                    sstables.add(sstable);
-                }
-            }
-        }
-        else
-        {
-            sstables = ssTables_.getSSTables();
-        }
-
-        doFileCompaction(sstables);
-    }
-
-    /*
      * Add up all the files sizes this is the worst case file
      * size for compaction of all the list of files given.
      */
@@ -732,277 +597,41 @@
         return maxFile;
     }
 
-    List<SSTableReader> doAntiCompaction(Collection<Range> ranges, InetAddress 
target) throws IOException
-    {
-        return doFileAntiCompaction(ssTables_.getSSTables(), ranges, target);
-    }
-
     void forceCleanup()
     {
         CompactionManager.instance.submitCleanup(ColumnFamilyStore.this);
     }
 
-    /**
-     * This function goes over each file and removes the keys that the node is 
not responsible for
-     * and only keeps keys that this node is responsible for.
-     *
-     * @throws IOException
-     */
-    void doCleanupCompaction() throws IOException
-    {
-        for (SSTableReader sstable : ssTables_)
-        {
-            doCleanup(sstable);
-        }
-        gcAfterRpcTimeout();
-    }
-
-    /**
-     * cleans up one particular file by removing keys that this node is not 
responsible for.
-     * @throws IOException
-     */
-    /* TODO: Take care of the comments later. */
-    void doCleanup(SSTableReader sstable) throws IOException
-    {
-        assert sstable != null;
-        List<SSTableReader> sstables = 
doFileAntiCompaction(Arrays.asList(sstable), 
StorageService.instance().getLocalRanges(), null);
-        if (!sstables.isEmpty())
-        {
-            assert sstables.size() == 1;
-            addSSTable(sstables.get(0));
-        }
-        if (logger_.isDebugEnabled())
-          logger_.debug("Original file : " + sstable + " of size " + 
sstable.length());
-        ssTables_.markCompacted(Arrays.asList(sstable));
-    }
-
-    /**
-     * This function is used to do the anti compaction process , it spits out 
the file which has keys that belong to a given range
-     * If the target is not specified it spits out the file as a compacted 
file with the unecessary ranges wiped out.
-     *
-     * @param sstables
-     * @param ranges
-     * @param target
-     * @return
-     * @throws IOException
-     */
-    List<SSTableReader> doFileAntiCompaction(Collection<SSTableReader> 
sstables, final Collection<Range> ranges, InetAddress target) throws IOException
+    public Table getTable()
     {
-        logger_.info("AntiCompacting [" + StringUtils.join(sstables, ",") + 
"]");
-        // Calculate the expected compacted filesize
-        long expectedRangeFileSize = getExpectedCompactedFileSize(sstables) / 
2;
-        String compactionFileLocation = 
DatabaseDescriptor.getDataFileLocationForTable(table_, expectedRangeFileSize);
-        if (compactionFileLocation == null)
-        {
-            throw new UnsupportedOperationException("disk full");
-        }
-        if (target != null)
-        {
-            // compacting for streaming: send to subdirectory
-            compactionFileLocation = compactionFileLocation + File.separator + 
DatabaseDescriptor.STREAMING_SUBDIR;
-        }
-        List<SSTableReader> results = new ArrayList<SSTableReader>();
-
-        long startTime = System.currentTimeMillis();
-        long totalkeysWritten = 0;
-
-        int expectedBloomFilterSize = Math.max(SSTableReader.indexInterval(), 
(int)(SSTableReader.getApproximateKeyCount(sstables) / 2));
-        if (logger_.isDebugEnabled())
-          logger_.debug("Expected bloom filter size : " + 
expectedBloomFilterSize);
-
-        SSTableWriter writer = null;
-       CompactionIterator ci = new AntiCompactionIterator(sstables, ranges, 
getDefaultGCBefore(), sstables.size() == ssTables_.size());
-        Iterator<CompactionIterator.CompactedRow> nni = new FilterIterator(ci, 
PredicateUtils.notNullPredicate());
-
         try
         {
-            if (!nni.hasNext())
-            {
-                return results;
-            }
-
-            while (nni.hasNext())
-            {
-                CompactionIterator.CompactedRow row = nni.next();
-               if (writer == null)
-               {
-                   FileUtils.createDirectory(compactionFileLocation);
-                   String newFilename = new File(compactionFileLocation, 
getTempSSTableFileName()).getAbsolutePath();
-                   writer = new SSTableWriter(newFilename, 
expectedBloomFilterSize, StorageService.getPartitioner());
-               }
-               writer.append(row.key, row.buffer);
-               totalkeysWritten++;
-            }
-        }
-        finally
-        {
-            ci.close();
+            return Table.open(table_);
         }
-
-        if (writer != null)
+        catch (IOException e)
         {
-            
results.add(writer.closeAndOpenReader(DatabaseDescriptor.getKeysCachedFraction(table_)));
-            String format = "AntiCompacted to %s.  %d/%d bytes for %d keys.  
Time: %dms.";
-            long dTime = System.currentTimeMillis() - startTime;
-            logger_.info(String.format(format, writer.getFilename(), 
getTotalBytes(sstables), results.get(0).length(), totalkeysWritten, dTime));
+            throw new RuntimeException(e);
         }
-
-        return results;
-    }
-
-    private int doFileCompaction(Collection<SSTableReader> sstables) throws 
IOException
-    {
-        return doFileCompaction(sstables, getDefaultGCBefore());
     }
 
-    /*
-    * This function does the actual compaction for files.
-    * It maintains a priority queue of with the first key from each file
-    * and then removes the top of the queue and adds it to the SStable and
-    * repeats this process while reading the next from each file until its
-    * done with all the files . The SStable to which the keys are written
-    * represents the new compacted file. Before writing if there are keys
-    * that occur in multiple files and are the same then a resolution is done
-    * to get the latest data.
-    *
-    * The collection of sstables passed may be empty (but not null); even if
-    * it is not empty, it may compact down to nothing if all rows are deleted.
-    */
-    int doFileCompaction(Collection<SSTableReader> sstables, int gcBefore) 
throws IOException
+    void markCompacted(Collection<SSTableReader> sstables) throws IOException
     {
-        if (DatabaseDescriptor.isSnapshotBeforeCompaction())
-            Table.open(table_).snapshot("compact-" + columnFamily_);
-        logger_.info("Compacting [" + StringUtils.join(sstables, ",") + "]");
-        String compactionFileLocation = 
DatabaseDescriptor.getDataFileLocationForTable(table_, 
getExpectedCompactedFileSize(sstables));
-        // If the compaction file path is null that means we have no space 
left for this compaction.
-        // try again w/o the largest one.
-        if (compactionFileLocation == null)
-        {
-            SSTableReader maxFile = getMaxSizeFile(sstables);
-            List<SSTableReader> smallerSSTables = new 
ArrayList<SSTableReader>(sstables);
-            smallerSSTables.remove(maxFile);
-            return doFileCompaction(smallerSSTables, gcBefore);
-        }
-
-        // new sstables from flush can be added during a compaction, but only 
the compaction can remove them,
-        // so in our single-threaded compaction world this is a valid way of 
determining if we're compacting
-        // all the sstables (that existed when we started)
-        boolean major = sstables.size() == ssTables_.size();
-
-        long startTime = System.currentTimeMillis();
-        long totalkeysWritten = 0;
-
-        // TODO the int cast here is potentially buggy
-        int expectedBloomFilterSize = Math.max(SSTableReader.indexInterval(), 
(int)SSTableReader.getApproximateKeyCount(sstables));
-        if (logger_.isDebugEnabled())
-          logger_.debug("Expected bloom filter size : " + 
expectedBloomFilterSize);
-
-        SSTableWriter writer;
-        CompactionIterator ci = new CompactionIterator(sstables, gcBefore, 
major); // retain a handle so we can call close()
-        Iterator<CompactionIterator.CompactedRow> nni = new FilterIterator(ci, 
PredicateUtils.notNullPredicate());
-
-        try
-        {
-            if (!nni.hasNext())
-            {
-                // don't mark compacted in the finally block, since if there 
_is_ nondeleted data,
-                // we need to sync it (via closeAndOpen) first, so there is no 
period during which
-                // a crash could cause data loss.
-                ssTables_.markCompacted(sstables);
-                return 0;
-            }
-
-            String newFilename = new File(compactionFileLocation, 
getTempSSTableFileName()).getAbsolutePath();
-            writer = new SSTableWriter(newFilename, expectedBloomFilterSize, 
StorageService.getPartitioner());
-
-            // validate the CF as we iterate over it
-            AntiEntropyService.IValidator validator = 
AntiEntropyService.instance().getValidator(table_, columnFamily_, null, major);
-            validator.prepare();
-            while (nni.hasNext())
-            {
-                CompactionIterator.CompactedRow row = nni.next();
-                writer.append(row.key, row.buffer);
-                validator.add(row);
-                totalkeysWritten++;
-            }
-            validator.complete();
-        }
-        finally
-        {
-            ci.close();
-        }
-
-        SSTableReader ssTable = 
writer.closeAndOpenReader(DatabaseDescriptor.getKeysCachedFraction(table_));
-        ssTables_.add(ssTable);
         ssTables_.markCompacted(sstables);
-        gcAfterRpcTimeout();
-        CompactionManager.instance.submitMinor(ColumnFamilyStore.this);
-
-        String format = "Compacted to %s.  %d/%d bytes for %d keys.  Time: 
%dms.";
-        long dTime = System.currentTimeMillis() - startTime;
-        logger_.info(String.format(format, writer.getFilename(), 
getTotalBytes(sstables), ssTable.length(), totalkeysWritten, dTime));
-        return sstables.size();
     }
 
-    /**
-     * perform a GC to clean out obsolete sstables, sleeping rpc timeout first 
so that most in-progress ops can complete
-     * (thus, no longer reference the sstables in question)
-     */
-    private void gcAfterRpcTimeout()
+    boolean isCompleteSSTables(Collection<SSTableReader> sstables)
     {
-        new Thread(new Runnable()
-        {
-            public void run()
-            {
-                try
-                {
-                    Thread.sleep(DatabaseDescriptor.getRpcTimeout());
-                }
-                catch (InterruptedException e)
-                {
-                    throw new AssertionError(e);
-                }
-                System.gc();
-            }
-        }).start();
+        return ssTables_.getSSTables().equals(new 
HashSet<SSTableReader>(sstables));
     }
 
-    /**
-     * Performs a readonly "compaction" of all sstables in order to validate 
complete rows,
-     * but without writing the merge result
-     */
-    void doReadonlyCompaction(InetAddress initiator) throws IOException
+    void replaceCompactedSSTables(Collection<SSTableReader> sstables, 
Iterable<SSTableReader> replacements)
+            throws IOException
     {
-        Collection<SSTableReader> sstables = ssTables_.getSSTables();
-        CompactionIterator ci = new CompactionIterator(sstables, 
getDefaultGCBefore(), true);
-        try
+        for (SSTableReader sstable : replacements)
         {
-            Iterator<CompactionIterator.CompactedRow> nni = new 
FilterIterator(ci, PredicateUtils.notNullPredicate());
-
-            // validate the CF as we iterate over it
-            AntiEntropyService.IValidator validator = 
AntiEntropyService.instance().getValidator(table_, columnFamily_, initiator, 
true);
-            validator.prepare();
-            while (nni.hasNext())
-            {
-                CompactionIterator.CompactedRow row = nni.next();
-                validator.add(row);
-            }
-            validator.complete();
+            ssTables_.add(sstable);
         }
-        finally
-        {
-            ci.close();
-        }
-    }
-
-    private long getTotalBytes(Iterable<SSTableReader> sstables)
-    {
-        long sum = 0;
-        for (SSTableReader sstable : sstables)
-        {
-            sum += sstable.length();
-        }
-        return sum;
+        ssTables_.markCompacted(sstables);
     }
 
     public static List<Memtable> getUnflushedMemtables(String cfName)
@@ -1169,7 +798,7 @@
 
     public ColumnFamily getColumnFamily(QueryFilter filter) throws IOException
     {
-        return getColumnFamily(filter, getDefaultGCBefore());
+        return getColumnFamily(filter, CompactionManager.getDefaultGCBefore());
     }
 
     /**
@@ -1594,40 +1223,4 @@
         memtable_.clearUnsafe();
         ssTables_.clearUnsafe();
     }
-
-    private static class AntiCompactionIterator extends CompactionIterator
-    {
-        public AntiCompactionIterator(Collection<SSTableReader> sstables, 
Collection<Range> ranges, int gcBefore, boolean isMajor)
-                throws IOException
-        {
-            super(getCollatedRangeIterator(sstables, ranges), gcBefore, 
isMajor);
-        }
-
-        private static Iterator 
getCollatedRangeIterator(Collection<SSTableReader> sstables, final 
Collection<Range> ranges)
-                throws IOException
-        {
-            org.apache.commons.collections.Predicate rangesPredicate = new 
org.apache.commons.collections.Predicate()
-            {
-                public boolean evaluate(Object row)
-                {
-                    return 
Range.isTokenInRanges(((IteratingRow)row).getKey().token, ranges);
-                }
-            };
-            CollatingIterator iter = 
FBUtilities.<IteratingRow>getCollatingIterator();
-            for (SSTableReader sstable : sstables)
-            {
-                SSTableScanner scanner = sstable.getScanner(FILE_BUFFER_SIZE);
-                iter.addIterator(new FilterIterator(scanner, rangesPredicate));
-            }
-            return iter;
-        }
-
-        public void close() throws IOException
-        {
-            for (Object o : ((CollatingIterator)source).getIterators())
-            {
-                ((SSTableScanner)((FilterIterator)o).getIterator()).close();
-            }
-        }
-    }
 }

Modified: 
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java
URL: 
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java?rev=893600&r1=893599&r2=893600&view=diff
==============================================================================
--- 
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java
 (original)
+++ 
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java
 Wed Dec 23 19:18:05 2009
@@ -19,9 +19,9 @@
 package org.apache.cassandra.db;
 
 import java.io.IOException;
+import java.io.File;
 import java.lang.management.ManagementFactory;
-import java.util.List;
-import java.util.Collection;
+import java.util.*;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
@@ -31,9 +31,22 @@
 
 import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
 import org.apache.cassandra.dht.Range;
-import org.apache.cassandra.io.SSTableReader;
+import org.apache.cassandra.io.*;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.service.AntiEntropyService;
+import org.apache.cassandra.utils.FileUtils;
+import org.apache.cassandra.utils.FBUtilities;
+
 import java.net.InetAddress;
 
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.collections.iterators.FilterIterator;
+import org.apache.commons.collections.iterators.CollatingIterator;
+import org.apache.commons.collections.PredicateUtils;
+
+import com.sun.corba.se.impl.logging.POASystemException;
+
 public class CompactionManager implements CompactionManagerMBean
 {
     public static final String MBEAN_OBJECT_NAME = 
"org.apache.cassandra.db:type=CompactionManager";
@@ -64,18 +77,34 @@
      * It's okay to over-call (within reason) since the compactions are 
single-threaded,
      * and if a call is unnecessary, it will just be no-oped in the bucketing 
phase.
      */
-    public Future<Integer> submitMinor(final ColumnFamilyStore 
columnFamilyStore)
-    {
-        return submitMinor(columnFamilyStore, minimumCompactionThreshold, 
maximumCompactionThreshold);
-    }
-
-    Future<Integer> submitMinor(final ColumnFamilyStore cfStore, final int 
minThreshold, final int maxThreshold)
+    public Future<Integer> submitMinor(final ColumnFamilyStore cfs)
     {
         Callable<Integer> callable = new Callable<Integer>()
         {
             public Integer call() throws IOException
             {
-                return cfStore.doCompaction(minThreshold, maxThreshold);
+                int filesCompacted = 0;
+                if (minimumCompactionThreshold > 0 && 
maximumCompactionThreshold > 0)
+                {
+                    logger.debug("Checking to see if compaction of " + 
cfs.columnFamily_ + " would be useful");
+                    for (List<SSTableReader> sstables : 
getCompactionBuckets(cfs.getSSTables(), 50L * 1024L * 1024L))
+                    {
+                        if (sstables.size() < minimumCompactionThreshold)
+                        {
+                            continue;
+                        }
+                        // if we have too many to compact all at once, compact 
older ones first -- this avoids
+                        // re-compacting files we just created.
+                        Collections.sort(sstables);
+                        filesCompacted += doCompaction(cfs, 
sstables.subList(0, Math.min(sstables.size(), maximumCompactionThreshold)), 
getDefaultGCBefore());
+                    }
+                    logger.debug(filesCompacted + " files compacted");
+                }
+                else
+                {
+                    logger.debug("Compaction is currently disabled.");
+                }
+                return filesCompacted;
             }
         };
         return compactor_.submit(callable);
@@ -87,32 +116,54 @@
         {
             public Object call() throws IOException
             {
-                cfStore.doCleanupCompaction();
+                doCleanupCompaction(cfStore);
                 return this;
             }
         };
         return compactor_.submit(runnable);
     }
 
-    public Future<List<SSTableReader>> submitAnti(final ColumnFamilyStore 
cfStore, final Collection<Range> ranges, final InetAddress target)
+    public Future<List<SSTableReader>> submitAnticompaction(final 
ColumnFamilyStore cfStore, final Collection<Range> ranges, final InetAddress 
target)
     {
         Callable<List<SSTableReader>> callable = new 
Callable<List<SSTableReader>>()
         {
             public List<SSTableReader> call() throws IOException
             {
-                return cfStore.doAntiCompaction(ranges, target);
+                return doAntiCompaction(cfStore, cfStore.getSSTables(), 
ranges, target);
             }
         };
         return compactor_.submit(callable);
     }
 
-    public Future submitMajor(final ColumnFamilyStore cfStore, final long skip)
+    public Future submitMajor(final ColumnFamilyStore cfStore)
+    {
+        return submitMajor(cfStore, 0, getDefaultGCBefore());
+    }
+
+    public Future submitMajor(final ColumnFamilyStore cfStore, final long 
skip, final int gcBefore)
     {
         Callable<Object> callable = new Callable<Object>()
         {
             public Object call() throws IOException
             {
-                cfStore.doMajorCompaction(skip);
+                Collection<SSTableReader> sstables;
+                if (skip > 0)
+                {
+                    sstables = new ArrayList<SSTableReader>();
+                    for (SSTableReader sstable : cfStore.getSSTables())
+                    {
+                        if (sstable.length() < skip * 1024L * 1024L * 1024L)
+                        {
+                            sstables.add(sstable);
+                        }
+                    }
+                }
+                else
+                {
+                    sstables = cfStore.getSSTables();
+                }
+
+                doCompaction(cfStore, sstables, gcBefore);
                 return this;
             }
         };
@@ -125,7 +176,7 @@
         {
             public Object call() throws IOException
             {
-                cfStore.doReadonlyCompaction(initiator);
+                doReadonlyCompaction(cfStore, initiator);
                 return this;
             }
         };
@@ -164,9 +215,318 @@
         maximumCompactionThreshold = threshold;
     }
 
-    public void disableCompactions()
+    public void disableAutoCompaction()
     {
         minimumCompactionThreshold = 0;
         maximumCompactionThreshold = 0;
     }
+
+    /**
+     * For internal use and testing only.  The rest of the system should go 
through the submit* methods,
+     * which are properly serialized.
+     */
+    int doCompaction(ColumnFamilyStore cfs, Collection<SSTableReader> 
sstables, int gcBefore) throws IOException
+    {
+        // The collection of sstables passed may be empty (but not null); even 
if
+        // it is not empty, it may compact down to nothing if all rows are 
deleted.
+        Table table = cfs.getTable();
+        if (DatabaseDescriptor.isSnapshotBeforeCompaction())
+            table.snapshot("compact-" + cfs.columnFamily_);
+        logger.info("Compacting [" + StringUtils.join(sstables, ",") + "]");
+        String compactionFileLocation = 
table.getDataFileLocation(cfs.getExpectedCompactedFileSize(sstables));
+        // If the compaction file path is null that means we have no space 
left for this compaction.
+        // try again w/o the largest one.
+        if (compactionFileLocation == null)
+        {
+            SSTableReader maxFile = cfs.getMaxSizeFile(sstables);
+            List<SSTableReader> smallerSSTables = new 
ArrayList<SSTableReader>(sstables);
+            smallerSSTables.remove(maxFile);
+            return doCompaction(cfs, smallerSSTables, gcBefore);
+        }
+
+        // new sstables from flush can be added during a compaction, but only 
the compaction can remove them,
+        // so in our single-threaded compaction world this is a valid way of 
determining if we're compacting
+        // all the sstables (that existed when we started)
+        boolean major = cfs.isCompleteSSTables(sstables);
+
+        long startTime = System.currentTimeMillis();
+        long totalkeysWritten = 0;
+
+        // TODO the int cast here is potentially buggy
+        int expectedBloomFilterSize = Math.max(SSTableReader.indexInterval(), 
(int)SSTableReader.getApproximateKeyCount(sstables));
+        if (logger.isDebugEnabled())
+          logger.debug("Expected bloom filter size : " + 
expectedBloomFilterSize);
+
+        SSTableWriter writer;
+        CompactionIterator ci = new CompactionIterator(sstables, gcBefore, 
major); // retain a handle so we can call close()
+        Iterator<CompactionIterator.CompactedRow> nni = new FilterIterator(ci, 
PredicateUtils.notNullPredicate());
+
+        try
+        {
+            if (!nni.hasNext())
+            {
+                // don't mark compacted in the finally block, since if there 
_is_ nondeleted data,
+                // we need to sync it (via closeAndOpen) first, so there is no 
period during which
+                // a crash could cause data loss.
+                cfs.markCompacted(sstables);
+                return 0;
+            }
+
+            String newFilename = new File(compactionFileLocation, 
cfs.getTempSSTableFileName()).getAbsolutePath();
+            writer = new SSTableWriter(newFilename, expectedBloomFilterSize, 
StorageService.getPartitioner());
+
+            // validate the CF as we iterate over it
+            AntiEntropyService.IValidator validator = 
AntiEntropyService.instance().getValidator(table.name, 
cfs.getColumnFamilyName(), null, major);
+            validator.prepare();
+            while (nni.hasNext())
+            {
+                CompactionIterator.CompactedRow row = nni.next();
+                writer.append(row.key, row.buffer);
+                validator.add(row);
+                totalkeysWritten++;
+            }
+            validator.complete();
+        }
+        finally
+        {
+            ci.close();
+        }
+
+        SSTableReader ssTable = 
writer.closeAndOpenReader(DatabaseDescriptor.getKeysCachedFraction(table.name));
+        cfs.replaceCompactedSSTables(sstables, Arrays.asList(ssTable));
+        gcAfterRpcTimeout();
+        instance.submitMinor(cfs);
+
+        String format = "Compacted to %s.  %d/%d bytes for %d keys.  Time: 
%dms.";
+        long dTime = System.currentTimeMillis() - startTime;
+        logger.info(String.format(format, writer.getFilename(), 
SSTable.getTotalBytes(sstables), ssTable.length(), totalkeysWritten, dTime));
+        return sstables.size();
+    }
+
+    /**
+     * This function is used to do the anti compaction process , it spits out 
the file which has keys that belong to a given range
+     * If the target is not specified it spits out the file as a compacted 
file with the unecessary ranges wiped out.
+     *
+     * @param cfs
+     * @param sstables
+     * @param ranges
+     * @param target
+     * @return
+     * @throws java.io.IOException
+     */
+    private List<SSTableReader> doAntiCompaction(ColumnFamilyStore cfs, 
Collection<SSTableReader> sstables, Collection<Range> ranges, InetAddress 
target)
+            throws IOException
+    {
+        Table table = cfs.getTable();
+        logger.info("AntiCompacting [" + StringUtils.join(sstables, ",") + 
"]");
+        // Calculate the expected compacted filesize
+        long expectedRangeFileSize = 
cfs.getExpectedCompactedFileSize(sstables) / 2;
+        String compactionFileLocation = 
DatabaseDescriptor.getDataFileLocationForTable(table.name, 
expectedRangeFileSize);
+        if (compactionFileLocation == null)
+        {
+            throw new UnsupportedOperationException("disk full");
+        }
+        if (target != null)
+        {
+            // compacting for streaming: send to subdirectory
+            compactionFileLocation = compactionFileLocation + File.separator + 
DatabaseDescriptor.STREAMING_SUBDIR;
+        }
+        List<SSTableReader> results = new ArrayList<SSTableReader>();
+
+        long startTime = System.currentTimeMillis();
+        long totalkeysWritten = 0;
+
+        int expectedBloomFilterSize = Math.max(SSTableReader.indexInterval(), 
(int)(SSTableReader.getApproximateKeyCount(sstables) / 2));
+        if (logger.isDebugEnabled())
+          logger.debug("Expected bloom filter size : " + 
expectedBloomFilterSize);
+
+        SSTableWriter writer = null;
+        CompactionIterator ci = new AntiCompactionIterator(sstables, ranges, 
getDefaultGCBefore(), cfs.isCompleteSSTables(sstables));
+        Iterator<CompactionIterator.CompactedRow> nni = new FilterIterator(ci, 
PredicateUtils.notNullPredicate());
+
+        try
+        {
+            if (!nni.hasNext())
+            {
+                return results;
+            }
+
+            while (nni.hasNext())
+            {
+                CompactionIterator.CompactedRow row = nni.next();
+                if (writer == null)
+                {
+                    FileUtils.createDirectory(compactionFileLocation);
+                    String newFilename = new File(compactionFileLocation, 
cfs.getTempSSTableFileName()).getAbsolutePath();
+                    writer = new SSTableWriter(newFilename, 
expectedBloomFilterSize, StorageService.getPartitioner());
+                }
+                writer.append(row.key, row.buffer);
+                totalkeysWritten++;
+            }
+        }
+        finally
+        {
+            ci.close();
+        }
+
+        if (writer != null)
+        {
+            
results.add(writer.closeAndOpenReader(DatabaseDescriptor.getKeysCachedFraction(table.name)));
+            String format = "AntiCompacted to %s.  %d/%d bytes for %d keys.  
Time: %dms.";
+            long dTime = System.currentTimeMillis() - startTime;
+            logger.info(String.format(format, writer.getFilename(), 
SSTable.getTotalBytes(sstables), results.get(0).length(), totalkeysWritten, 
dTime));
+        }
+
+        return results;
+    }
+
+    /**
+     * This function goes over each file and removes the keys that the node is 
not responsible for
+     * and only keeps keys that this node is responsible for.
+     *
+     * @throws IOException
+     */
+    private void doCleanupCompaction(ColumnFamilyStore cfs) throws IOException
+    {
+        Collection<SSTableReader> originalSSTables = cfs.getSSTables();
+        List<SSTableReader> sstables = doAntiCompaction(cfs, originalSSTables, 
StorageService.instance().getLocalRanges(), null);
+        if (!sstables.isEmpty())
+        {
+            cfs.replaceCompactedSSTables(originalSSTables, sstables);
+        }
+        CompactionManager.gcAfterRpcTimeout();
+    }
+
+    /**
+     * Performs a readonly "compaction" of all sstables in order to validate 
complete rows,
+     * but without writing the merge result
+     */
+    private void doReadonlyCompaction(ColumnFamilyStore cfs, InetAddress 
initiator) throws IOException
+    {
+        Collection<SSTableReader> sstables = cfs.getSSTables();
+        CompactionIterator ci = new CompactionIterator(sstables, 
getDefaultGCBefore(), true);
+        try
+        {
+            Iterator<CompactionIterator.CompactedRow> nni = new 
FilterIterator(ci, PredicateUtils.notNullPredicate());
+
+            // validate the CF as we iterate over it
+            AntiEntropyService.IValidator validator = 
AntiEntropyService.instance().getValidator(cfs.getTable().name, 
cfs.getColumnFamilyName(), initiator, true);
+            validator.prepare();
+            while (nni.hasNext())
+            {
+                CompactionIterator.CompactedRow row = nni.next();
+                validator.add(row);
+            }
+            validator.complete();
+        }
+        finally
+        {
+            ci.close();
+        }
+    }
+
+    /**
+     * perform a GC to clean out obsolete sstables, sleeping rpc timeout first 
so that most in-progress ops can complete
+     * (thus, no longer reference the sstables in question)
+     */
+    static void gcAfterRpcTimeout()
+    {
+        new Thread(new Runnable()
+        {
+            public void run()
+            {
+                try
+                {
+                    Thread.sleep(DatabaseDescriptor.getRpcTimeout());
+                }
+                catch (InterruptedException e)
+                {
+                    throw new AssertionError(e);
+                }
+                System.gc();
+            }
+        }).start();
+    }
+
+    /*
+    * Group files of similar size into buckets.
+    */
+    static Set<List<SSTableReader>> 
getCompactionBuckets(Iterable<SSTableReader> files, long min)
+    {
+        Map<List<SSTableReader>, Long> buckets = new 
HashMap<List<SSTableReader>, Long>();
+        for (SSTableReader sstable : files)
+        {
+            long size = sstable.length();
+
+            boolean bFound = false;
+            // look for a bucket containing similar-sized files:
+            // group in the same bucket if it's w/in 50% of the average for 
this bucket,
+            // or this file and the bucket are all considered "small" (less 
than `min`)
+            for (List<SSTableReader> bucket : buckets.keySet())
+            {
+                long averageSize = buckets.get(bucket);
+                if ((size > averageSize / 2 && size < 3 * averageSize / 2)
+                    || (size < min && averageSize < min))
+                {
+                    // remove and re-add because adding changes the hash
+                    buckets.remove(bucket);
+                    averageSize = (averageSize + size) / 2;
+                    bucket.add(sstable);
+                    buckets.put(bucket, averageSize);
+                    bFound = true;
+                    break;
+                }
+            }
+            // no similar bucket found; put it in a new one
+            if (!bFound)
+            {
+                ArrayList<SSTableReader> bucket = new 
ArrayList<SSTableReader>();
+                bucket.add(sstable);
+                buckets.put(bucket, size);
+            }
+        }
+
+        return buckets.keySet();
+    }
+
+    public static int getDefaultGCBefore()
+    {
+        return (int)(System.currentTimeMillis() / 1000) - 
DatabaseDescriptor.getGcGraceInSeconds();
+    }
+
+    private static class AntiCompactionIterator extends CompactionIterator
+    {
+        public AntiCompactionIterator(Collection<SSTableReader> sstables, 
Collection<Range> ranges, int gcBefore, boolean isMajor)
+                throws IOException
+        {
+            super(getCollatedRangeIterator(sstables, ranges), gcBefore, 
isMajor);
+        }
+
+        private static Iterator 
getCollatedRangeIterator(Collection<SSTableReader> sstables, final 
Collection<Range> ranges)
+                throws IOException
+        {
+            org.apache.commons.collections.Predicate rangesPredicate = new 
org.apache.commons.collections.Predicate()
+            {
+                public boolean evaluate(Object row)
+                {
+                    return 
Range.isTokenInRanges(((IteratingRow)row).getKey().token, ranges);
+                }
+            };
+            CollatingIterator iter = 
FBUtilities.<IteratingRow>getCollatingIterator();
+            for (SSTableReader sstable : sstables)
+            {
+                SSTableScanner scanner = sstable.getScanner(FILE_BUFFER_SIZE);
+                iter.addIterator(new FilterIterator(scanner, rangesPredicate));
+            }
+            return iter;
+        }
+
+        public void close() throws IOException
+        {
+            for (Object o : ((CollatingIterator)source).getIterators())
+            {
+                ((SSTableScanner)((FilterIterator)o).getIterator()).close();
+            }
+        }
+    }
 }

Modified: 
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java
URL: 
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java?rev=893600&r1=893599&r2=893600&view=diff
==============================================================================
--- 
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java
 (original)
+++ 
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java
 Wed Dec 23 19:18:05 2009
@@ -24,6 +24,7 @@
 import java.util.Arrays;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 import java.io.IOException;
@@ -187,7 +188,14 @@
             }
         }
         hintStore.forceFlush();
-        hintStore.doMajorCompaction(0);
+        try
+        {
+            CompactionManager.instance.submitMajor(hintStore).get();
+        }
+        catch (Exception e)
+        {
+            throw new RuntimeException(e);
+        }
 
         if (logger_.isDebugEnabled())
           logger_.debug("Finished deliverAllHints");

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java
URL: 
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java?rev=893600&r1=893599&r2=893600&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java 
(original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java Wed 
Dec 23 19:18:05 2009
@@ -25,6 +25,7 @@
 import java.util.concurrent.locks.ReentrantLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.concurrent.Future;
+import java.util.concurrent.ExecutionException;
 
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.dht.Range;
@@ -298,7 +299,14 @@
                 continue;
             
             ColumnFamilyStore cfStore = columnFamilyStores.get( columnFamily );
-            allResults.addAll(cfStore.forceAntiCompaction(ranges, target));
+            try
+            {
+                
allResults.addAll(CompactionManager.instance.submitAnticompaction(cfStore, 
ranges, target).get());
+            }
+            catch (Exception e)
+            {
+                throw new RuntimeException(e);
+            }
         }
         return allResults;
     }
@@ -314,7 +322,7 @@
         {
             ColumnFamilyStore cfStore = columnFamilyStores.get( columnFamily );
             if ( cfStore != null )
-                CompactionManager.instance.submitMajor(cfStore, 0);
+                CompactionManager.instance.submitMajor(cfStore);
         }
     }
 
@@ -474,6 +482,11 @@
         return applicationColumnFamilies;
     }
 
+    public String getDataFileLocation(long expectedCompactedFileSize)
+    {
+        return DatabaseDescriptor.getDataFileLocationForTable(name, 
expectedCompactedFileSize);
+    }
+
     public static String getSnapshotPath(String dataDirPath, String tableName, 
String snapshotName)
     {
         return dataDirPath + File.separator + tableName + File.separator + 
SNAPSHOT_SUBDIR_NAME + File.separator + snapshotName;

Modified: 
incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTable.java
URL: 
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTable.java?rev=893600&r1=893599&r2=893600&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTable.java 
(original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTable.java Wed 
Dec 23 19:18:05 2009
@@ -161,6 +161,16 @@
         logger.info("Deleted " + path);
     }
 
+    public static long getTotalBytes(Iterable<SSTableReader> sstables)
+    {
+        long sum = 0;
+        for (SSTableReader sstable : sstables)
+        {
+            sum += sstable.length();
+        }
+        return sum;
+    }
+
     /**
      * This is a simple container for the index Key and its corresponding 
position
      * in the data file. Binary search is performed on a list of these objects

Modified: 
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java
URL: 
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java?rev=893600&r1=893599&r2=893600&view=diff
==============================================================================
--- 
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java
 (original)
+++ 
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java
 Wed Dec 23 19:18:05 2009
@@ -652,7 +652,7 @@
             try
             {
                 List<Range> ranges = new ArrayList<Range>(differences);
-                List<SSTableReader> sstables = 
CompactionManager.instance.submitAnti(cfstore, ranges, remote).get();
+                List<SSTableReader> sstables = 
CompactionManager.instance.submitAnticompaction(cfstore, ranges, remote).get();
                 Streaming.transferSSTables(remote, sstables, cf.left);
             }
             catch(Exception e)

Modified: 
incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
URL: 
http://svn.apache.org/viewvc/incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java?rev=893600&r1=893599&r2=893600&view=diff
==============================================================================
--- 
incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
 (original)
+++ 
incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
 Wed Dec 23 19:18:05 2009
@@ -18,13 +18,10 @@
 */
 package org.apache.cassandra.db;
 
-import java.io.File;
-import java.io.FileOutputStream;
 import java.io.IOException;
 import java.util.*;
 import java.util.concurrent.ExecutionException;
 
-import org.apache.commons.lang.StringUtils;
 import org.apache.commons.lang.ArrayUtils;
 import static org.junit.Assert.assertNull;
 import org.junit.Test;
@@ -119,7 +116,7 @@
         Range r = new Range(partitioner.getToken("0"), 
partitioner.getToken("zzzzzzz"));
         ranges.add(r);
 
-        List<SSTableReader> fileList = store.forceAntiCompaction(ranges, 
InetAddress.getByName("127.0.0.1"));
+        List<SSTableReader> fileList = 
CompactionManager.instance.submitAnticompaction(store, ranges, 
InetAddress.getByName("127.0.0.1")).get();
         assert fileList.size() >= 1;
     }
 

Modified: 
incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/CompactionsTest.java
URL: 
http://svn.apache.org/viewvc/incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/CompactionsTest.java?rev=893600&r1=893599&r2=893600&view=diff
==============================================================================
--- 
incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/CompactionsTest.java
 (original)
+++ 
incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/CompactionsTest.java
 Wed Dec 23 19:18:05 2009
@@ -45,6 +45,8 @@
     @Test
     public void testCompactions() throws IOException, ExecutionException, 
InterruptedException
     {
+        CompactionManager.instance.disableAutoCompaction();
+
         // this test does enough rows to force multiple block indexes to be 
used
         Table table = Table.open(TABLE1);
         ColumnFamilyStore store = table.getColumnFamilyStore("Standard1");
@@ -70,7 +72,7 @@
         }
         if (store.getSSTables().size() > 1)
         {
-            store.doCompaction(2, store.getSSTables().size());
+            CompactionManager.instance.submitMajor(store).get();
         }
         assertEquals(inserted.size(), 
table.getColumnFamilyStore("Standard1").getKeyRange("", "", 10000).keys.size());
     }
@@ -78,7 +80,7 @@
     @Test
     public void testCompactionPurge() throws IOException, ExecutionException, 
InterruptedException
     {
-        CompactionManager.instance.disableCompactions();
+        CompactionManager.instance.disableAutoCompaction();
 
         Table table = Table.open(TABLE1);
         String cfName = "Standard1";
@@ -117,12 +119,12 @@
         rm.add(new QueryPath(cfName, null, "0".getBytes()), new byte[0], 0);
         rm.apply();
         store.forceBlockingFlush();
-        store.doFileCompaction(sstablesIncomplete, Integer.MAX_VALUE);
+        CompactionManager.instance.doCompaction(store, sstablesIncomplete, 
CompactionManager.getDefaultGCBefore());
         ColumnFamily cf = 
table.getColumnFamilyStore(cfName).getColumnFamily(new IdentityQueryFilter(key, 
new QueryPath(cfName)));
         assert cf.getColumnCount() == 10;
 
         // major compact and test that all columns but the resurrected one is 
completely gone
-        store.doFileCompaction(store.getSSTables(), Integer.MAX_VALUE);
+        CompactionManager.instance.submitMajor(store, 0, 
Integer.MAX_VALUE).get();
         cf = table.getColumnFamilyStore(cfName).getColumnFamily(new 
IdentityQueryFilter(key, new QueryPath(cfName)));
         assert cf.getColumnCount() == 1;
         assert cf.getColumn(String.valueOf(5).getBytes()) != null;
@@ -131,7 +133,7 @@
     @Test
     public void testCompactionPurgeOneFile() throws IOException, 
ExecutionException, InterruptedException
     {
-        CompactionManager.instance.disableCompactions();
+        CompactionManager.instance.disableAutoCompaction();
 
         Table table = Table.open(TABLE1);
         String cfName = "Standard2";
@@ -160,7 +162,7 @@
         assert store.getSSTables().size() == 1 : store.getSSTables(); // 
inserts & deletes were in the same memtable -> only deletes in sstable
 
         // compact and test that the row is completely gone
-        store.doFileCompaction(store.getSSTables(), Integer.MAX_VALUE);
+        CompactionManager.instance.submitMajor(store, 0, 
Integer.MAX_VALUE).get();
         assert store.getSSTables().isEmpty();
         ColumnFamily cf = 
table.getColumnFamilyStore(cfName).getColumnFamily(new IdentityQueryFilter(key, 
new QueryPath(cfName)));
         assert cf == null : cf;
@@ -169,6 +171,8 @@
     @Test
     public void testCompactionReadonly() throws IOException, 
ExecutionException, InterruptedException
     {
+        CompactionManager.instance.disableAutoCompaction();
+
         Table table = Table.open(TABLE2);
         ColumnFamilyStore store = table.getColumnFamilyStore("Standard1");
 
@@ -188,7 +192,7 @@
 
         // perform readonly compaction and confirm that no sstables changed
         ArrayList<SSTableReader> oldsstables = new 
ArrayList<SSTableReader>(store.getSSTables());
-        store.doReadonlyCompaction(LOCAL);
+        CompactionManager.instance.submitReadonly(store, LOCAL).get();
         assertEquals(oldsstables, new 
ArrayList<SSTableReader>(store.getSSTables()));
         assertEquals(inserted.size(), 
table.getColumnFamilyStore("Standard1").getKeyRange("", "", 10000).keys.size());
     }

Modified: 
incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/OneCompactionTest.java
URL: 
http://svn.apache.org/viewvc/incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/OneCompactionTest.java?rev=893600&r1=893599&r2=893600&view=diff
==============================================================================
--- 
incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/OneCompactionTest.java
 (original)
+++ 
incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/OneCompactionTest.java
 Wed Dec 23 19:18:05 2009
@@ -33,6 +33,8 @@
 {
     private void testCompaction(String columnFamilyName, int insertsPerTable) 
throws IOException, ExecutionException, InterruptedException
     {
+        CompactionManager.instance.disableAutoCompaction();
+
         Table table = Table.open("Keyspace1");
         ColumnFamilyStore store = table.getColumnFamilyStore(columnFamilyName);
 
@@ -46,8 +48,7 @@
             store.forceBlockingFlush();
             assertEquals(inserted.size(), 
table.getColumnFamilyStore(columnFamilyName).getKeyRange("", "", 
10000).keys.size());
         }
-        Future<Integer> ft = CompactionManager.instance.submitMinor(store, 2, 
32);
-        ft.get();
+        CompactionManager.instance.submitMajor(store).get();
         assertEquals(1, store.getSSTables().size());
         
assertEquals(table.getColumnFamilyStore(columnFamilyName).getKeyRange("", "", 
10000).keys.size(), inserted.size());
     }

Modified: 
incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/RecoveryManager2Test.java
URL: 
http://svn.apache.org/viewvc/incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/RecoveryManager2Test.java?rev=893600&r1=893599&r2=893600&view=diff
==============================================================================
--- 
incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/RecoveryManager2Test.java
 (original)
+++ 
incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/RecoveryManager2Test.java
 Wed Dec 23 19:18:05 2009
@@ -37,7 +37,7 @@
     @Test
     public void testWithFlush() throws IOException, ExecutionException, 
InterruptedException
     {
-        CompactionManager.instance.disableCompactions();
+        CompactionManager.instance.disableAutoCompaction();
 
         for (int i = 0; i < 100; i++)
         {

Modified: 
incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/RemoveSuperColumnTest.java
URL: 
http://svn.apache.org/viewvc/incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/RemoveSuperColumnTest.java?rev=893600&r1=893599&r2=893600&view=diff
==============================================================================
--- 
incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/RemoveSuperColumnTest.java
 (original)
+++ 
incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/RemoveSuperColumnTest.java
 Wed Dec 23 19:18:05 2009
@@ -59,8 +59,7 @@
         store.forceBlockingFlush();
         validateRemoveTwoSources();
 
-        Future<Integer> ft = CompactionManager.instance.submitMinor(store, 2, 
32);
-        ft.get();
+        CompactionManager.instance.submitMajor(store).get();
         assertEquals(1, store.getSSTables().size());
         validateRemoveCompacted();
     }
@@ -144,8 +143,7 @@
         store.forceBlockingFlush();
         validateRemoveWithNewData();
 
-        Future<Integer> ft = CompactionManager.instance.submitMinor(store, 2, 
32);
-        ft.get();
+        CompactionManager.instance.submitMajor(store).get();
         assertEquals(1, store.getSSTables().size());
         validateRemoveWithNewData();
     }

Modified: 
incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/TableTest.java
URL: 
http://svn.apache.org/viewvc/incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/TableTest.java?rev=893600&r1=893599&r2=893600&view=diff
==============================================================================
--- incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/TableTest.java 
(original)
+++ incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/TableTest.java 
Wed Dec 23 19:18:05 2009
@@ -359,7 +359,7 @@
         // compact so we have a big row with more than the minimum index count
         if (cfStore.getSSTables().size() > 1)
         {
-            cfStore.doCompaction(2, cfStore.getSSTables().size());
+            CompactionManager.instance.submitMajor(cfStore).get();
         }
         SSTableReader sstable = cfStore.getSSTables().iterator().next();
         DecoratedKey decKey = sstable.getPartitioner().decorateKey(key);


Reply via email to