Author: jbellis
Date: Tue Apr 14 05:12:32 2009
New Revision: 764678

URL: http://svn.apache.org/viewvc?rev=764678&view=rev
Log:
add compaction test showing regression.  patch by jbellis; reviewed by Todd 
Lipcon
and Jun Rao for #80.

Modified:
    incubator/cassandra/trunk/src/org/apache/cassandra/db/ColumnFamilyStore.java
    
incubator/cassandra/trunk/src/org/apache/cassandra/db/MinorCompactionManager.java
    
incubator/cassandra/trunk/test/org/apache/cassandra/db/ColumnFamilyStoreTest.java

Modified: 
incubator/cassandra/trunk/src/org/apache/cassandra/db/ColumnFamilyStore.java
URL: 
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/db/ColumnFamilyStore.java?rev=764678&r1=764677&r2=764678&view=diff
==============================================================================
--- 
incubator/cassandra/trunk/src/org/apache/cassandra/db/ColumnFamilyStore.java 
(original)
+++ 
incubator/cassandra/trunk/src/org/apache/cassandra/db/ColumnFamilyStore.java 
Tue Apr 14 05:12:32 2009
@@ -74,7 +74,7 @@
     private AtomicReference<BinaryMemtable> binaryMemtable_;
 
     /* SSTables on disk for this column family */
-    private Set<String> ssTables_ = new HashSet<String>();
+    Set<String> ssTables_ = new HashSet<String>();
 
     /* Modification lock used for protecting reads from compactions. */
     private ReentrantReadWriteLock lock_ = new ReentrantReadWriteLock(true);
@@ -162,6 +162,7 @@
         {
                HintedHandOffManager.instance().submit(this);
         }
+        // TODO this seems unnecessary -- each memtable flush checks to see if 
it needs to compact, too
         MinorCompactionManager.instance().submitPeriodicCompaction(this);
     }
 
@@ -671,7 +672,7 @@
                }
                catch ( Exception ex)
                {
-                       ex.printStackTrace();
+                    logger_.warn("corrupt file?  or are you just blowing away 
data files manually out from under me?", ex);
                        try
                        {
                                if (fs != null)
@@ -734,7 +735,7 @@
     /*
      * Break the files into buckets and then compact.
      */
-    void doCompaction()
+    void doCompaction() throws IOException
     {
         isCompacting_.set(true);
         List<String> files = new ArrayList<String>(ssTables_);
@@ -755,17 +756,10 @@
                                        if( count == threshHold_ )
                                                break;
                                }
-                       try
-                       {
-                               // For each bucket if it has crossed the 
threshhold do the compaction
-                               // In case of range  compaction merge the 
counting bloom filters also.
-                               if( count == threshHold_)
-                                       doFileCompaction(files, bufSize_);
-                       }
-                       catch ( Exception ex)
-                       {
-                               logger_.warn(LogUtil.throwableToString(ex));
-                       }
+                    // For each bucket if it has crossed the threshhold do the 
compaction
+                    // In case of range  compaction merge the counting bloom 
filters also.
+                    if( count == threshHold_)
+                        doFileCompaction(files, bufSize_);
                        }
                }
         }
@@ -1175,7 +1169,7 @@
      * to get the latest data.
      *
      */
-    void  doFileCompaction(List<String> files,  int minBufferSize)
+    void  doFileCompaction(List<String> files,  int minBufferSize) throws 
IOException
     {
        String newfile = null;
         long startTime = System.currentTimeMillis();
@@ -1183,175 +1177,168 @@
         long totalBytesWritten = 0;
         long totalkeysRead = 0;
         long totalkeysWritten = 0;
-        try
-        {
-               // Calculate the expected compacted filesize
-               long expectedCompactedFileSize = 
getExpectedCompactedFileSize(files);
-               String compactionFileLocation = 
DatabaseDescriptor.getCompactionFileLocation(expectedCompactedFileSize);
-               // If the compaction file path is null that means we have no 
space left for this compaction.
-               if( compactionFileLocation == null )
-               {
-                       String maxFile = getMaxSizeFile( files );
-                       files.remove( maxFile );
-                       doFileCompaction(files , minBufferSize);
-                       return;
-               }
-               PriorityQueue<FileStruct> pq = initializePriorityQueue(files, 
null, minBufferSize);
-               if (pq.size() > 0)
-               {
-                   String mergedFileName = getTempFileName( files );
-                   SSTable ssTable = null;
-                   String lastkey = null;
-                   List<FileStruct> lfs = new ArrayList<FileStruct>();
-                   DataOutputBuffer bufOut = new DataOutputBuffer();
-                   int expectedBloomFilterSize = 
SSTable.getApproximateKeyCount(files);
-                   expectedBloomFilterSize = (expectedBloomFilterSize > 0) ? 
expectedBloomFilterSize : SSTable.indexInterval();
-                   logger_.debug("Expected bloom filter size : " + 
expectedBloomFilterSize);
-                   /* Create the bloom filter for the compacted file. */
-                   BloomFilter compactedBloomFilter = new 
BloomFilter(expectedBloomFilterSize, 15);
-                   List<ColumnFamily> columnFamilies = new 
ArrayList<ColumnFamily>();
+        // Calculate the expected compacted filesize
+        long expectedCompactedFileSize = getExpectedCompactedFileSize(files);
+        String compactionFileLocation = 
DatabaseDescriptor.getCompactionFileLocation(expectedCompactedFileSize);
+        // If the compaction file path is null that means we have no space 
left for this compaction.
+        if( compactionFileLocation == null )
+        {
+            String maxFile = getMaxSizeFile( files );
+            files.remove( maxFile );
+            doFileCompaction(files , minBufferSize);
+            return;
+        }
+        PriorityQueue<FileStruct> pq = initializePriorityQueue(files, null, 
minBufferSize);
+        if (pq.size() > 0)
+        {
+            String mergedFileName = getTempFileName( files );
+            SSTable ssTable = null;
+            String lastkey = null;
+            List<FileStruct> lfs = new ArrayList<FileStruct>();
+            DataOutputBuffer bufOut = new DataOutputBuffer();
+            int expectedBloomFilterSize = 
SSTable.getApproximateKeyCount(files);
+            expectedBloomFilterSize = (expectedBloomFilterSize > 0) ? 
expectedBloomFilterSize : SSTable.indexInterval();
+            logger_.debug("Expected bloom filter size : " + 
expectedBloomFilterSize);
+            /* Create the bloom filter for the compacted file. */
+            BloomFilter compactedBloomFilter = new 
BloomFilter(expectedBloomFilterSize, 15);
+            List<ColumnFamily> columnFamilies = new ArrayList<ColumnFamily>();
 
-                   while (pq.size() > 0 || lfs.size() > 0)
-                   {
-                       FileStruct fs = null;
-                       if (pq.size() > 0)
-                       {
-                           fs = pq.poll();                        
-                       }
-                       if (fs != null
-                               && (lastkey == null || 
lastkey.equals(fs.getKey())))
-                       {
-                           // The keys are the same so we need to add this to 
the
-                           // ldfs list
-                           lastkey = fs.getKey();
-                           lfs.add(fs);
-                       }
-                       else
-                       {
-                           Collections.sort(lfs, new FileStructComparator());
-                           ColumnFamily columnFamily;
-                           bufOut.reset();
-                           if(lfs.size() > 1)
-                           {
-                                   for (FileStruct filestruct : lfs)
-                                   {
-                                       try
-                                       {
-                                       /* read the length although we don't 
need it */
-                                       filestruct.getBufIn().readInt();
-                                       // Skip the Index
-                                    
IndexHelper.skipBloomFilterAndIndex(filestruct.getBufIn());
-                                       // We want to add only 2 and resolve 
them right there in order to save on memory footprint
-                                       if(columnFamilies.size() > 1)
-                                       {
-                                               merge(columnFamilies);
-                                       }
-                                               // deserialize into column 
families                                    
-                                               
columnFamilies.add(ColumnFamily.serializer().deserialize(filestruct.getBufIn()));
-                                       }
-                                       catch ( Exception ex)
-                                       {
-                                    logger_.warn("error in filecompaction", 
ex);
+            while (pq.size() > 0 || lfs.size() > 0)
+            {
+                FileStruct fs = null;
+                if (pq.size() > 0)
+                {
+                    fs = pq.poll();
+                }
+                if (fs != null
+                        && (lastkey == null || lastkey.equals(fs.getKey())))
+                {
+                    // The keys are the same so we need to add this to the
+                    // ldfs list
+                    lastkey = fs.getKey();
+                    lfs.add(fs);
+                }
+                else
+                {
+                    Collections.sort(lfs, new FileStructComparator());
+                    ColumnFamily columnFamily;
+                    bufOut.reset();
+                    if(lfs.size() > 1)
+                    {
+                        for (FileStruct filestruct : lfs)
+                        {
+                            try
+                            {
+                                /* read the length although we don't need it */
+                                filestruct.getBufIn().readInt();
+                                // Skip the Index
+                                
IndexHelper.skipBloomFilterAndIndex(filestruct.getBufIn());
+                                // We want to add only 2 and resolve them 
right there in order to save on memory footprint
+                                if(columnFamilies.size() > 1)
+                                {
+                                    merge(columnFamilies);
                                 }
-                                   }
-                                   // Now after merging all crap append to the 
sstable
-                                   columnFamily = 
resolveAndRemoveDeleted(columnFamilies);
-                                   columnFamilies.clear();
-                                   if( columnFamily != null )
-                                   {
-                                               /* serialize the cf with column 
indexes */
-                                           
ColumnFamily.serializerWithIndexes().serialize(columnFamily, bufOut);
-                                   }
-                           }
-                           else
-                           {
-                                   FileStruct filestruct = lfs.get(0);
-                               try
-                               {
-                                       /* read the length although we don't 
need it */
-                                       int size = 
filestruct.getBufIn().readInt();
-                                       bufOut.write(filestruct.getBufIn(), 
size);
-                               }
-                               catch ( Exception ex)
-                               {
-                                       ex.printStackTrace();
-                                   filestruct.close();
-                                   continue;
-                               }
-                           }
-                                        
-                           if ( ssTable == null )
-                           {
-                               ssTable = new SSTable(compactionFileLocation, 
mergedFileName);                          
-                           }
-                        ssTable.append(lastkey, bufOut);
+                                // deserialize into column families
+                                
columnFamilies.add(ColumnFamily.serializer().deserialize(filestruct.getBufIn()));
+                            }
+                            catch ( Exception ex)
+                            {
+                                logger_.warn("error in filecompaction", ex);
+                            }
+                        }
+                        // Now after merging all crap append to the sstable
+                        columnFamily = resolveAndRemoveDeleted(columnFamilies);
+                        columnFamilies.clear();
+                        if( columnFamily != null )
+                        {
+                            /* serialize the cf with column indexes */
+                            
ColumnFamily.serializerWithIndexes().serialize(columnFamily, bufOut);
+                        }
+                    }
+                    else
+                    {
+                        FileStruct filestruct = lfs.get(0);
+                        try
+                        {
+                            /* read the length although we don't need it */
+                            int size = filestruct.getBufIn().readInt();
+                            bufOut.write(filestruct.getBufIn(), size);
+                        }
+                        catch ( Exception ex)
+                        {
+                            ex.printStackTrace();
+                            filestruct.close();
+                            continue;
+                        }
+                    }
 
-                        /* Fill the bloom filter with the key */
-                           doFill(compactedBloomFilter, lastkey);
-                           totalkeysWritten++;
-                           for (FileStruct filestruct : lfs)
-                           {
-                               try
-                               {
-                                filestruct.advance();
-                                       if (filestruct.isExhausted())
-                                       {
-                                               continue;
-                                       }
-                                       pq.add(filestruct);
-                                       totalkeysRead++;
-                               }
-                               catch ( Throwable ex )
-                               {
-                                       // Ignore the exception as it might be 
a corrupted file
-                                       // in any case we have read as far as 
possible from it
-                                       // and it will be deleted after 
compaction.
-                                   filestruct.close();
+                    if ( ssTable == null )
+                    {
+                        ssTable = new SSTable(compactionFileLocation, 
mergedFileName);
+                    }
+                    ssTable.append(lastkey, bufOut);
+
+                    /* Fill the bloom filter with the key */
+                    doFill(compactedBloomFilter, lastkey);
+                    totalkeysWritten++;
+                    for (FileStruct filestruct : lfs)
+                    {
+                        try
+                        {
+                            filestruct.advance();
+                            if (filestruct.isExhausted())
+                            {
+                                continue;
                             }
-                           }
-                           lfs.clear();
-                           lastkey = null;
-                           if (fs != null)
-                           {
-                               /* Add back the fs since we processed the rest 
of filestructs */
-                               pq.add(fs);
-                           }
-                       }
-                   }
-                   if ( ssTable != null )
-                   {
-                       ssTable.closeRename(compactedBloomFilter);
-                       newfile = ssTable.getDataFileLocation();
-                   }
-                   lock_.writeLock().lock();
-                   try
-                   {
-                       for (String file : files)
-                       {
-                           ssTables_.remove(file);
-                           SSTable.removeAssociatedBloomFilter(file);
-                       }
-                       if ( newfile != null )
-                       {
-                           ssTables_.add(newfile);
-                           logger_.debug("Inserting bloom filter for file " + 
newfile);
-                           SSTable.storeBloomFilter(newfile, 
compactedBloomFilter);
-                           totalBytesWritten = (new File(newfile)).length();
-                       }
-                   }
-                   finally
-                   {
-                       lock_.writeLock().unlock();
-                   }
-                   for (String file : files)
-                   {
-                       SSTable.delete(file);
-                   }
-               }
-        }
-        catch ( Exception ex)
-        {
-            logger_.warn( LogUtil.throwableToString(ex) );
+                            pq.add(filestruct);
+                            totalkeysRead++;
+                        }
+                        catch ( Throwable ex )
+                        {
+                            // Ignore the exception as it might be a corrupted 
file
+                            // in any case we have read as far as possible 
from it
+                            // and it will be deleted after compaction.
+                            filestruct.close();
+                        }
+                    }
+                    lfs.clear();
+                    lastkey = null;
+                    if (fs != null)
+                    {
+                        /* Add back the fs since we processed the rest of 
filestructs */
+                        pq.add(fs);
+                    }
+                }
+            }
+            if ( ssTable != null )
+            {
+                ssTable.closeRename(compactedBloomFilter);
+                newfile = ssTable.getDataFileLocation();
+            }
+            lock_.writeLock().lock();
+            try
+            {
+                for (String file : files)
+                {
+                    ssTables_.remove(file);
+                    SSTable.removeAssociatedBloomFilter(file);
+                }
+                if ( newfile != null )
+                {
+                    ssTables_.add(newfile);
+                    logger_.debug("Inserting bloom filter for file " + 
newfile);
+                    SSTable.storeBloomFilter(newfile, compactedBloomFilter);
+                    totalBytesWritten = (new File(newfile)).length();
+                }
+            }
+            finally
+            {
+                lock_.writeLock().unlock();
+            }
+            for (String file : files)
+            {
+                SSTable.delete(file);
+            }
         }
         logger_.debug("Total time taken for compaction  ..."
                 + (System.currentTimeMillis() - startTime));

Modified: 
incubator/cassandra/trunk/src/org/apache/cassandra/db/MinorCompactionManager.java
URL: 
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/db/MinorCompactionManager.java?rev=764678&r1=764677&r2=764678&view=diff
==============================================================================
--- 
incubator/cassandra/trunk/src/org/apache/cassandra/db/MinorCompactionManager.java
 (original)
+++ 
incubator/cassandra/trunk/src/org/apache/cassandra/db/MinorCompactionManager.java
 Tue Apr 14 05:12:32 2009
@@ -81,16 +81,16 @@
 
         public void run()
         {
+                logger_.debug("Started  compaction 
..."+columnFamilyStore_.columnFamily_);
             try
             {
-                logger_.debug("Started  compaction 
..."+columnFamilyStore_.columnFamily_);
-               columnFamilyStore_.doCompaction();
-                logger_.debug("Finished compaction 
..."+columnFamilyStore_.columnFamily_);
+                columnFamilyStore_.doCompaction();
             }
-            catch (Throwable th)
+            catch (IOException e)
             {
-                logger_.error( LogUtil.throwableToString(th) );
+                throw new RuntimeException(e);
             }
+            logger_.debug("Finished compaction 
..."+columnFamilyStore_.columnFamily_);
         }
     }
 
@@ -149,16 +149,9 @@
 
         public void run()
         {
-            try
-            {
-                logger_.debug("Started  compaction 
..."+columnFamilyStore_.columnFamily_);
-               columnFamilyStore_.doCleanupCompaction();
-                logger_.debug("Finished compaction 
..."+columnFamilyStore_.columnFamily_);
-            }
-            catch (Throwable th)
-            {
-                logger_.error( LogUtil.throwableToString(th) );
-            }
+            logger_.debug("Started  compaction 
..."+columnFamilyStore_.columnFamily_);
+            columnFamilyStore_.doCleanupCompaction();
+            logger_.debug("Finished compaction 
..."+columnFamilyStore_.columnFamily_);
         }
     }
     
@@ -181,9 +174,9 @@
                        MinorCompactionManager.intervalInMins_, 
TimeUnit.MINUTES);       
     }
 
-    public void submit(ColumnFamilyStore columnFamilyStore)
+    public Future submit(ColumnFamilyStore columnFamilyStore)
     {
-        compactor_.submit(new FileCompactor(columnFamilyStore));
+        return compactor_.submit(new FileCompactor(columnFamilyStore));
     }
     
     public void submitCleanup(ColumnFamilyStore columnFamilyStore)

Modified: 
incubator/cassandra/trunk/test/org/apache/cassandra/db/ColumnFamilyStoreTest.java
URL: 
http://svn.apache.org/viewvc/incubator/cassandra/trunk/test/org/apache/cassandra/db/ColumnFamilyStoreTest.java?rev=764678&r1=764677&r2=764678&view=diff
==============================================================================
--- 
incubator/cassandra/trunk/test/org/apache/cassandra/db/ColumnFamilyStoreTest.java
 (original)
+++ 
incubator/cassandra/trunk/test/org/apache/cassandra/db/ColumnFamilyStoreTest.java
 Tue Apr 14 05:12:32 2009
@@ -314,4 +314,26 @@
         fos.close();
         return f.getAbsolutePath();
     }
+
+    @Test
+    public void testCompaction() throws IOException, 
ColumnFamilyNotDefinedException, ExecutionException, InterruptedException
+    {
+        Table table = Table.open("Table1");
+        ColumnFamilyStore store = table.getColumnFamilyStore("Standard1");
+        store.ssTables_.clear(); // TODO integrate this better into test 
setup/teardown
+
+        for (int j = 0; j < 5; j++) {
+            for (int i = 0; i < 10; i++) {
+                long epoch = System.currentTimeMillis()  /  1000;
+                String key = String.format("%s.%s.%s",  epoch,  1,  i);
+                RowMutation rm = new RowMutation("Table1", key);
+                rm.add("Standard1:A", new byte[0], epoch);
+                rm.apply();
+            }
+            store.forceFlush();
+            waitForFlush();
+        }
+        Future ft = MinorCompactionManager.instance().submit(store);
+        ft.get();
+    }
 }


Reply via email to