Author: jbellis
Date: Thu Apr 2 20:48:02 2009
New Revision: 761423
URL: http://svn.apache.org/viewvc?rev=761423&view=rev
Log:
rename stageOrderedCompaction -> getCompactionBuckets. clean up code to make
the algorithm more clear, and to allow grouping with more than just the
previous bucket. add getCompactionBucketsTest
Modified:
incubator/cassandra/trunk/src/org/apache/cassandra/db/ColumnFamilyStore.java
incubator/cassandra/trunk/test/org/apache/cassandra/db/ColumnFamilyStoreTest.java
Modified:
incubator/cassandra/trunk/src/org/apache/cassandra/db/ColumnFamilyStore.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/db/ColumnFamilyStore.java?rev=761423&r1=761422&r2=761423&view=diff
==============================================================================
---
incubator/cassandra/trunk/src/org/apache/cassandra/db/ColumnFamilyStore.java
(original)
+++
incubator/cassandra/trunk/src/org/apache/cassandra/db/ColumnFamilyStore.java
Thu Apr 2 20:48:02 2009
@@ -30,6 +30,7 @@
import java.util.PriorityQueue;
import java.util.Set;
import java.util.StringTokenizer;
+import java.util.Arrays;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -707,48 +708,47 @@
}
/*
- * Stage the compactions , compact similar size files.
- * This fn figures out the files close enough by size and if they
- * are greater than the threshold then compacts.
+ * Group files of similar size into buckets.
*/
- Map<Integer, List<String>> stageOrderedCompaction(List<String> files)
+ static Set<List<String>> getCompactionBuckets(List<String> files, long min)
{
- // Sort the files based on the generation ID
- Collections.sort(files, new
FileNameComparator(FileNameComparator.Ascending));
- Map<Integer, List<String>> buckets = new HashMap<Integer,
List<String>>();
- int maxBuckets = 1000;
- long averages[] = new long[maxBuckets];
- long min = 50L*1024L*1024L;
- Integer i = 0;
- for(String file : files)
+ Map<List<String>, Long> buckets = new HashMap<List<String>, Long>();
+ for(String fname : files)
{
- File f = new File(file);
+ File f = new File(fname);
long size = f.length();
- if ( (size > averages[i]/2 && size < 3*averages[i]/2)
|| ( size < min && averages[i] < min ))
- {
- averages[i] = (averages[i] + size) / 2 ;
- List<String> fileList = buckets.get(i);
- if(fileList == null)
- {
- fileList = new ArrayList<String>();
- buckets.put(i, fileList);
- }
- fileList.add(file);
- }
- else
+
+ boolean bFound = false;
+ // look for a bucket containing similar-sized files:
+ // group in the same bucket if it's w/in 50% of the average for
this bucket,
+ // or this file and the bucket are all considered "small" (less
than `min`)
+ for (List<String> bucket : new
ArrayList<List<String>>(buckets.keySet()))
+ {
+ long averageSize = buckets.get(bucket);
+ if ((size > averageSize/2 && size < 3*averageSize/2)
+ || ( size < min && averageSize < min))
+ {
+ // remove and re-add because adding changes the hash
+ buckets.remove(bucket);
+ averageSize = (averageSize + size) / 2 ;
+ bucket.add(fname);
+ buckets.put(bucket, averageSize);
+ bFound = true;
+ break;
+ }
+ }
+ // no similar bucket found; put it in a new one
+ if(!bFound)
{
- if( i >= maxBuckets )
- break;
- i++;
- List<String> fileList = new ArrayList<String>();
- buckets.put(i, fileList);
- fileList.add(file);
- averages[i] = size;
+ ArrayList<String> bucket = new ArrayList<String>();
+ bucket.add(fname);
+ buckets.put(bucket, size);
}
}
- return buckets;
+
+ return buckets.keySet();
}
-
+
/*
* Break the files into buckets and then compact.
*/
@@ -759,11 +759,8 @@
try
{
int count = 0;
- Map<Integer, List<String>> buckets =
stageOrderedCompaction(files);
- Set<Integer> keySet = buckets.keySet();
- for(Integer key : keySet)
- {
- List<String> fileList = buckets.get(key);
+ for(List<String> fileList : getCompactionBuckets(files,
50L*1024L*1024L))
+ {
Collections.sort( fileList , new FileNameComparator(
FileNameComparator.Ascending));
if(fileList.size() >= threshHold_ )
{
Modified:
incubator/cassandra/trunk/test/org/apache/cassandra/db/ColumnFamilyStoreTest.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/test/org/apache/cassandra/db/ColumnFamilyStoreTest.java?rev=761423&r1=761422&r2=761423&view=diff
==============================================================================
---
incubator/cassandra/trunk/test/org/apache/cassandra/db/ColumnFamilyStoreTest.java
(original)
+++
incubator/cassandra/trunk/test/org/apache/cassandra/db/ColumnFamilyStoreTest.java
Thu Apr 2 20:48:02 2009
@@ -4,17 +4,24 @@
import org.testng.annotations.Test;
import java.io.IOException;
+import java.io.File;
+import java.io.FileOutputStream;
import java.util.Collection;
import java.util.List;
import java.util.Random;
import java.util.Arrays;
+import java.util.ArrayList;
+import java.util.Set;
import java.util.concurrent.FutureTask;
import java.util.concurrent.Future;
import java.util.concurrent.ExecutionException;
-public class ColumnFamilyStoreTest extends ServerTest {
+public class ColumnFamilyStoreTest extends ServerTest
+{
static byte[] bytes1, bytes2;
- static {
+
+ static
+ {
Random random = new Random();
bytes1 = new byte[1024];
bytes2 = new byte[128];
@@ -31,14 +38,14 @@
{
String key = Integer.toString(i);
RowMutation rm;
- for ( int j = 0; j < 8; ++j )
+ for (int j = 0; j < 8; ++j)
{
byte[] bytes = j % 2 == 0 ? bytes1 : bytes2;
rm = new RowMutation("Table1", key);
rm.add("Standard1:" + "Column-" + j, bytes, j);
rm.apply();
- for ( int k = 0; k < 4; ++k )
+ for (int k = 0; k < 4; ++k)
{
bytes = (j + k) % 2 == 0 ? bytes1 : bytes2;
rm = new RowMutation("Table1", key);
@@ -55,7 +62,9 @@
// wait for flush to finish
Future f = MemtableManager.instance().flusher_.submit(new Runnable()
{
- public void run() {}
+ public void run()
+ {
+ }
});
f.get();
@@ -65,7 +74,7 @@
private void validateBytes(Table table)
throws ColumnFamilyNotDefinedException, IOException
{
- for ( int i = 900; i < 1000; ++i )
+ for (int i = 900; i < 1000; ++i)
{
String key = Integer.toString(i);
ColumnFamily cf;
@@ -99,7 +108,8 @@
}
@Test
- public void testRemove() throws IOException,
ColumnFamilyNotDefinedException {
+ public void testRemove() throws IOException,
ColumnFamilyNotDefinedException
+ {
Table table = Table.open("Table1");
ColumnFamilyStore store = table.getColumnFamilyStore("Standard1");
RowMutation rm;
@@ -122,7 +132,8 @@
}
@Test
- public void testRemoveSuperColumn() throws IOException,
ColumnFamilyNotDefinedException {
+ public void testRemoveSuperColumn() throws IOException,
ColumnFamilyNotDefinedException
+ {
Table table = Table.open("Table1");
ColumnFamilyStore store = table.getColumnFamilyStore("Super1");
RowMutation rm;
@@ -152,4 +163,42 @@
assert subColumns.iterator().next().timestamp() == 0;
assert ColumnFamilyStore.removeDeleted(resolved).getColumnCount() == 0;
}
+
+ @Test
+ public void testGetCompactionBuckets() throws IOException
+ {
+ // create files 20 40 60 ... 180
+ List<String> small = new ArrayList<String>();
+ List<String> med = new ArrayList<String>();
+ List<String> all = new ArrayList<String>();
+
+ String fname;
+ fname = createFile(20);
+ small.add(fname);
+ all.add(fname);
+ fname = createFile(40);
+ small.add(fname);
+ all.add(fname);
+
+ for (int i = 60; i <= 140; i += 20)
+ {
+ fname = createFile(i);
+ med.add(fname);
+ all.add(fname);
+ }
+
+ Set<List<String>> buckets =
ColumnFamilyStore.getCompactionBuckets(all, 50);
+ assert buckets.contains(small);
+ assert buckets.contains(med);
+ }
+
+ private String createFile(int nBytes) throws IOException
+ {
+ File f = File.createTempFile("bucket_test", "");
+ FileOutputStream fos = new FileOutputStream(f);
+ byte[] bytes = new byte[nBytes];
+ fos.write(bytes);
+ fos.close();
+ return f.getAbsolutePath();
+ }
}