Author: jbellis
Date: Thu Apr  2 20:48:02 2009
New Revision: 761423

URL: http://svn.apache.org/viewvc?rev=761423&view=rev
Log:
rename stageOrderedCompaction -> getCompactionBuckets.  clean up code to make 
the algorithm more clear, and to allow grouping with more than just the 
previous bucket.  add getCompactionBucketsTest

Modified:
    incubator/cassandra/trunk/src/org/apache/cassandra/db/ColumnFamilyStore.java
    
incubator/cassandra/trunk/test/org/apache/cassandra/db/ColumnFamilyStoreTest.java

Modified: 
incubator/cassandra/trunk/src/org/apache/cassandra/db/ColumnFamilyStore.java
URL: 
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/db/ColumnFamilyStore.java?rev=761423&r1=761422&r2=761423&view=diff
==============================================================================
--- 
incubator/cassandra/trunk/src/org/apache/cassandra/db/ColumnFamilyStore.java 
(original)
+++ 
incubator/cassandra/trunk/src/org/apache/cassandra/db/ColumnFamilyStore.java 
Thu Apr  2 20:48:02 2009
@@ -30,6 +30,7 @@
 import java.util.PriorityQueue;
 import java.util.Set;
 import java.util.StringTokenizer;
+import java.util.Arrays;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -707,48 +708,47 @@
     }
 
     /*
-     * Stage the compactions , compact similar size files.
-     * This fn figures out the files close enough by size and if they
-     * are greater than the threshold then compacts.
+     * Group files of similar size into buckets.
      */
-    Map<Integer, List<String>> stageOrderedCompaction(List<String> files)
+    static Set<List<String>> getCompactionBuckets(List<String> files, long min)
     {
-        // Sort the files based on the generation ID 
-        Collections.sort(files, new 
FileNameComparator(FileNameComparator.Ascending));
-       Map<Integer, List<String>>  buckets = new HashMap<Integer, 
List<String>>();
-       int maxBuckets = 1000;
-       long averages[] = new long[maxBuckets];
-       long min = 50L*1024L*1024L;
-       Integer i = 0;
-       for(String file : files)
+       Map<List<String>, Long> buckets = new HashMap<List<String>, Long>();
+       for(String fname : files)
        {
-               File f = new File(file);
+               File f = new File(fname);
                long size = f.length();
-                       if ( (size > averages[i]/2 && size < 3*averages[i]/2) 
|| ( size < min && averages[i] < min ))
-                       {
-                               averages[i] = (averages[i] + size) / 2 ;
-                               List<String> fileList = buckets.get(i);
-                               if(fileList == null)
-                               {
-                                       fileList = new ArrayList<String>();
-                                       buckets.put(i, fileList);
-                               }
-                               fileList.add(file);
-                       }
-                       else
+
+               boolean bFound = false;
+            // look for a bucket containing similar-sized files:
+            // group in the same bucket if it's w/in 50% of the average for 
this bucket,
+            // or this file and the bucket are all considered "small" (less 
than `min`)
+            for (List<String> bucket : new 
ArrayList<List<String>>(buckets.keySet()))
+               {
+                long averageSize = buckets.get(bucket);
+                if ((size > averageSize/2 && size < 3*averageSize/2)
+                    || ( size < min && averageSize < min))
+                       {
+                    // remove and re-add because adding changes the hash
+                    buckets.remove(bucket);
+                               averageSize = (averageSize + size) / 2 ;
+                    bucket.add(fname);
+                    buckets.put(bucket, averageSize);
+                               bFound = true;
+                               break;
+                       }
+               }
+            // no similar bucket found; put it in a new one
+               if(!bFound)
                {
-                               if( i >= maxBuckets )
-                                       break;
-                               i++;
-                               List<String> fileList = new ArrayList<String>();
-                               buckets.put(i, fileList);
-                               fileList.add(file);
-                       averages[i] = size;
+                ArrayList<String> bucket = new ArrayList<String>();
+                bucket.add(fname);
+                buckets.put(bucket, size);
                }
        }
-       return buckets;
+
+        return buckets.keySet();
     }
-       
+
     /*
      * Break the files into buckets and then compact.
      */
@@ -759,11 +759,8 @@
         try
         {
                int count = 0;
-               Map<Integer, List<String>> buckets = 
stageOrderedCompaction(files);
-               Set<Integer> keySet = buckets.keySet();
-               for(Integer key : keySet)
-               {
-                       List<String> fileList = buckets.get(key);
+               for(List<String> fileList : getCompactionBuckets(files, 
50L*1024L*1024L))
+            {
                        Collections.sort( fileList , new FileNameComparator( 
FileNameComparator.Ascending));
                        if(fileList.size() >= threshHold_ )
                        {

Modified: 
incubator/cassandra/trunk/test/org/apache/cassandra/db/ColumnFamilyStoreTest.java
URL: 
http://svn.apache.org/viewvc/incubator/cassandra/trunk/test/org/apache/cassandra/db/ColumnFamilyStoreTest.java?rev=761423&r1=761422&r2=761423&view=diff
==============================================================================
--- 
incubator/cassandra/trunk/test/org/apache/cassandra/db/ColumnFamilyStoreTest.java
 (original)
+++ 
incubator/cassandra/trunk/test/org/apache/cassandra/db/ColumnFamilyStoreTest.java
 Thu Apr  2 20:48:02 2009
@@ -4,17 +4,24 @@
 import org.testng.annotations.Test;
 
 import java.io.IOException;
+import java.io.File;
+import java.io.FileOutputStream;
 import java.util.Collection;
 import java.util.List;
 import java.util.Random;
 import java.util.Arrays;
+import java.util.ArrayList;
+import java.util.Set;
 import java.util.concurrent.FutureTask;
 import java.util.concurrent.Future;
 import java.util.concurrent.ExecutionException;
 
-public class ColumnFamilyStoreTest extends ServerTest {
+public class ColumnFamilyStoreTest extends ServerTest
+{
     static byte[] bytes1, bytes2;
-    static {
+
+    static
+    {
         Random random = new Random();
         bytes1 = new byte[1024];
         bytes2 = new byte[128];
@@ -31,14 +38,14 @@
         {
             String key = Integer.toString(i);
             RowMutation rm;
-            for ( int j = 0; j < 8; ++j )
+            for (int j = 0; j < 8; ++j)
             {
                 byte[] bytes = j % 2 == 0 ? bytes1 : bytes2;
                 rm = new RowMutation("Table1", key);
                 rm.add("Standard1:" + "Column-" + j, bytes, j);
                 rm.apply();
 
-                for ( int k = 0; k < 4; ++k )
+                for (int k = 0; k < 4; ++k)
                 {
                     bytes = (j + k) % 2 == 0 ? bytes1 : bytes2;
                     rm = new RowMutation("Table1", key);
@@ -55,7 +62,9 @@
         // wait for flush to finish
         Future f = MemtableManager.instance().flusher_.submit(new Runnable()
         {
-            public void run() {}
+            public void run()
+            {
+            }
         });
         f.get();
 
@@ -65,7 +74,7 @@
     private void validateBytes(Table table)
             throws ColumnFamilyNotDefinedException, IOException
     {
-        for ( int i = 900; i < 1000; ++i )
+        for (int i = 900; i < 1000; ++i)
         {
             String key = Integer.toString(i);
             ColumnFamily cf;
@@ -99,7 +108,8 @@
     }
 
     @Test
-    public void testRemove() throws IOException, 
ColumnFamilyNotDefinedException {
+    public void testRemove() throws IOException, 
ColumnFamilyNotDefinedException
+    {
         Table table = Table.open("Table1");
         ColumnFamilyStore store = table.getColumnFamilyStore("Standard1");
         RowMutation rm;
@@ -122,7 +132,8 @@
     }
 
     @Test
-    public void testRemoveSuperColumn() throws IOException, 
ColumnFamilyNotDefinedException {
+    public void testRemoveSuperColumn() throws IOException, 
ColumnFamilyNotDefinedException
+    {
         Table table = Table.open("Table1");
         ColumnFamilyStore store = table.getColumnFamilyStore("Super1");
         RowMutation rm;
@@ -152,4 +163,42 @@
         assert subColumns.iterator().next().timestamp() == 0;
         assert ColumnFamilyStore.removeDeleted(resolved).getColumnCount() == 0;
     }
+
+    @Test
+    public void testGetCompactionBuckets() throws IOException
+    {
+        // create files 20 40 60 ... 180
+        List<String> small = new ArrayList<String>();
+        List<String> med = new ArrayList<String>();
+        List<String> all = new ArrayList<String>();
+
+        String fname;
+        fname = createFile(20);
+        small.add(fname);
+        all.add(fname);
+        fname = createFile(40);
+        small.add(fname);
+        all.add(fname);
+
+        for (int i = 60; i <= 140; i += 20)
+        {
+            fname = createFile(i);
+            med.add(fname);
+            all.add(fname);
+        }
+
+        Set<List<String>> buckets = 
ColumnFamilyStore.getCompactionBuckets(all, 50);
+        assert buckets.contains(small);
+        assert buckets.contains(med);
+    }
+
+    private String createFile(int nBytes) throws IOException
+    {
+        File f = File.createTempFile("bucket_test", "");
+        FileOutputStream fos = new FileOutputStream(f);
+        byte[] bytes = new byte[nBytes];
+        fos.write(bytes);
+        fos.close();
+        return f.getAbsolutePath();
+    }
 }


Reply via email to