Hey Jonathan , Are you testing these changes ? If we are going to make changes like this without much of a reason we better test it as this code needs to run in our production environment. You need to test the changes at a reasonable scale.
- Prashant On Thu, Apr 2, 2009 at 1:48 PM, <[email protected]> wrote: > Author: jbellis > Date: Thu Apr 2 20:48:02 2009 > New Revision: 761423 > > URL: http://svn.apache.org/viewvc?rev=761423&view=rev > Log: > rename stageOrderedCompaction -> getCompactionBuckets. clean up code to > make the algorithm more clear, and to allow grouping with more than just the > previous bucket. add getCompactionBucketsTest > > Modified: > > incubator/cassandra/trunk/src/org/apache/cassandra/db/ColumnFamilyStore.java > > > incubator/cassandra/trunk/test/org/apache/cassandra/db/ColumnFamilyStoreTest.java > > Modified: > incubator/cassandra/trunk/src/org/apache/cassandra/db/ColumnFamilyStore.java > URL: > http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/db/ColumnFamilyStore.java?rev=761423&r1=761422&r2=761423&view=diff > > ============================================================================== > --- > incubator/cassandra/trunk/src/org/apache/cassandra/db/ColumnFamilyStore.java > (original) > +++ > incubator/cassandra/trunk/src/org/apache/cassandra/db/ColumnFamilyStore.java > Thu Apr 2 20:48:02 2009 > @@ -30,6 +30,7 @@ > import java.util.PriorityQueue; > import java.util.Set; > import java.util.StringTokenizer; > +import java.util.Arrays; > import java.util.concurrent.ExecutionException; > import java.util.concurrent.Future; > import java.util.concurrent.atomic.AtomicBoolean; > @@ -707,48 +708,47 @@ > } > > /* > - * Stage the compactions , compact similar size files. > - * This fn figures out the files close enough by size and if they > - * are greater than the threshold then compacts. > + * Group files of similar size into buckets. > */ > - Map<Integer, List<String>> stageOrderedCompaction(List<String> files) > + static Set<List<String>> getCompactionBuckets(List<String> files, long > min) > { > - // Sort the files based on the generation ID > - Collections.sort(files, new > FileNameComparator(FileNameComparator.Ascending)); > - Map<Integer, List<String>> buckets = new HashMap<Integer, > List<String>>(); > - int maxBuckets = 1000; > - long averages[] = new long[maxBuckets]; > - long min = 50L*1024L*1024L; > - Integer i = 0; > - for(String file : files) > + Map<List<String>, Long> buckets = new HashMap<List<String>, > Long>(); > + for(String fname : files) > { > - File f = new File(file); > + File f = new File(fname); > long size = f.length(); > - if ( (size > averages[i]/2 && size < > 3*averages[i]/2) || ( size < min && averages[i] < min )) > - { > - averages[i] = (averages[i] + size) / 2 ; > - List<String> fileList = buckets.get(i); > - if(fileList == null) > - { > - fileList = new ArrayList<String>(); > - buckets.put(i, fileList); > - } > - fileList.add(file); > - } > - else > + > + boolean bFound = false; > + // look for a bucket containing similar-sized files: > + // group in the same bucket if it's w/in 50% of the average > for this bucket, > + // or this file and the bucket are all considered "small" > (less than `min`) > + for (List<String> bucket : new > ArrayList<List<String>>(buckets.keySet())) > + { > + long averageSize = buckets.get(bucket); > + if ((size > averageSize/2 && size < 3*averageSize/2) > + || ( size < min && averageSize < min)) > + { > + // remove and re-add because adding changes the hash > + buckets.remove(bucket); > + averageSize = (averageSize + size) / 2 ; > + bucket.add(fname); > + buckets.put(bucket, averageSize); > + bFound = true; > + break; > + } > + } > + // no similar bucket found; put it in a new one > + if(!bFound) > { > - if( i >= maxBuckets ) > - break; > - i++; > - List<String> fileList = new > ArrayList<String>(); > - buckets.put(i, fileList); > - fileList.add(file); > - averages[i] = size; > + ArrayList<String> bucket = new ArrayList<String>(); > + bucket.add(fname); > + buckets.put(bucket, size); > } > } > - return buckets; > + > + return buckets.keySet(); > } > - > + > /* > * Break the files into buckets and then compact. > */ > @@ -759,11 +759,8 @@ > try > { > int count = 0; > - Map<Integer, List<String>> buckets = > stageOrderedCompaction(files); > - Set<Integer> keySet = buckets.keySet(); > - for(Integer key : keySet) > - { > - List<String> fileList = buckets.get(key); > + for(List<String> fileList : getCompactionBuckets(files, > 50L*1024L*1024L)) > + { > Collections.sort( fileList , new FileNameComparator( > FileNameComparator.Ascending)); > if(fileList.size() >= threshHold_ ) > { > > Modified: > incubator/cassandra/trunk/test/org/apache/cassandra/db/ColumnFamilyStoreTest.java > URL: > http://svn.apache.org/viewvc/incubator/cassandra/trunk/test/org/apache/cassandra/db/ColumnFamilyStoreTest.java?rev=761423&r1=761422&r2=761423&view=diff > > ============================================================================== > --- > incubator/cassandra/trunk/test/org/apache/cassandra/db/ColumnFamilyStoreTest.java > (original) > +++ > incubator/cassandra/trunk/test/org/apache/cassandra/db/ColumnFamilyStoreTest.java > Thu Apr 2 20:48:02 2009 > @@ -4,17 +4,24 @@ > import org.testng.annotations.Test; > > import java.io.IOException; > +import java.io.File; > +import java.io.FileOutputStream; > import java.util.Collection; > import java.util.List; > import java.util.Random; > import java.util.Arrays; > +import java.util.ArrayList; > +import java.util.Set; > import java.util.concurrent.FutureTask; > import java.util.concurrent.Future; > import java.util.concurrent.ExecutionException; > > -public class ColumnFamilyStoreTest extends ServerTest { > +public class ColumnFamilyStoreTest extends ServerTest > +{ > static byte[] bytes1, bytes2; > - static { > + > + static > + { > Random random = new Random(); > bytes1 = new byte[1024]; > bytes2 = new byte[128]; > @@ -31,14 +38,14 @@ > { > String key = Integer.toString(i); > RowMutation rm; > - for ( int j = 0; j < 8; ++j ) > + for (int j = 0; j < 8; ++j) > { > byte[] bytes = j % 2 == 0 ? bytes1 : bytes2; > rm = new RowMutation("Table1", key); > rm.add("Standard1:" + "Column-" + j, bytes, j); > rm.apply(); > > - for ( int k = 0; k < 4; ++k ) > + for (int k = 0; k < 4; ++k) > { > bytes = (j + k) % 2 == 0 ? bytes1 : bytes2; > rm = new RowMutation("Table1", key); > @@ -55,7 +62,9 @@ > // wait for flush to finish > Future f = MemtableManager.instance().flusher_.submit(new > Runnable() > { > - public void run() {} > + public void run() > + { > + } > }); > f.get(); > > @@ -65,7 +74,7 @@ > private void validateBytes(Table table) > throws ColumnFamilyNotDefinedException, IOException > { > - for ( int i = 900; i < 1000; ++i ) > + for (int i = 900; i < 1000; ++i) > { > String key = Integer.toString(i); > ColumnFamily cf; > @@ -99,7 +108,8 @@ > } > > @Test > - public void testRemove() throws IOException, > ColumnFamilyNotDefinedException { > + public void testRemove() throws IOException, > ColumnFamilyNotDefinedException > + { > Table table = Table.open("Table1"); > ColumnFamilyStore store = table.getColumnFamilyStore("Standard1"); > RowMutation rm; > @@ -122,7 +132,8 @@ > } > > @Test > - public void testRemoveSuperColumn() throws IOException, > ColumnFamilyNotDefinedException { > + public void testRemoveSuperColumn() throws IOException, > ColumnFamilyNotDefinedException > + { > Table table = Table.open("Table1"); > ColumnFamilyStore store = table.getColumnFamilyStore("Super1"); > RowMutation rm; > @@ -152,4 +163,42 @@ > assert subColumns.iterator().next().timestamp() == 0; > assert ColumnFamilyStore.removeDeleted(resolved).getColumnCount() > == 0; > } > + > + @Test > + public void testGetCompactionBuckets() throws IOException > + { > + // create files 20 40 60 ... 180 > + List<String> small = new ArrayList<String>(); > + List<String> med = new ArrayList<String>(); > + List<String> all = new ArrayList<String>(); > + > + String fname; > + fname = createFile(20); > + small.add(fname); > + all.add(fname); > + fname = createFile(40); > + small.add(fname); > + all.add(fname); > + > + for (int i = 60; i <= 140; i += 20) > + { > + fname = createFile(i); > + med.add(fname); > + all.add(fname); > + } > + > + Set<List<String>> buckets = > ColumnFamilyStore.getCompactionBuckets(all, 50); > + assert buckets.contains(small); > + assert buckets.contains(med); > + } > + > + private String createFile(int nBytes) throws IOException > + { > + File f = File.createTempFile("bucket_test", ""); > + FileOutputStream fos = new FileOutputStream(f); > + byte[] bytes = new byte[nBytes]; > + fos.write(bytes); > + fos.close(); > + return f.getAbsolutePath(); > + } > } > > >
