Author: jbellis
Date: Tue Apr 14 05:12:32 2009
New Revision: 764678
URL: http://svn.apache.org/viewvc?rev=764678&view=rev
Log:
add compaction test showing regression. patch by jbellis; reviewed by Todd
Lipcon
and Jun Rao for #80.
Modified:
incubator/cassandra/trunk/src/org/apache/cassandra/db/ColumnFamilyStore.java
incubator/cassandra/trunk/src/org/apache/cassandra/db/MinorCompactionManager.java
incubator/cassandra/trunk/test/org/apache/cassandra/db/ColumnFamilyStoreTest.java
Modified:
incubator/cassandra/trunk/src/org/apache/cassandra/db/ColumnFamilyStore.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/db/ColumnFamilyStore.java?rev=764678&r1=764677&r2=764678&view=diff
==============================================================================
---
incubator/cassandra/trunk/src/org/apache/cassandra/db/ColumnFamilyStore.java
(original)
+++
incubator/cassandra/trunk/src/org/apache/cassandra/db/ColumnFamilyStore.java
Tue Apr 14 05:12:32 2009
@@ -74,7 +74,7 @@
private AtomicReference<BinaryMemtable> binaryMemtable_;
/* SSTables on disk for this column family */
- private Set<String> ssTables_ = new HashSet<String>();
+ Set<String> ssTables_ = new HashSet<String>();
/* Modification lock used for protecting reads from compactions. */
private ReentrantReadWriteLock lock_ = new ReentrantReadWriteLock(true);
@@ -162,6 +162,7 @@
{
HintedHandOffManager.instance().submit(this);
}
+ // TODO this seems unnecessary -- each memtable flush checks to see if
it needs to compact, too
MinorCompactionManager.instance().submitPeriodicCompaction(this);
}
@@ -671,7 +672,7 @@
}
catch ( Exception ex)
{
- ex.printStackTrace();
+ logger_.warn("corrupt file? or are you just blowing away
data files manually out from under me?", ex);
try
{
if (fs != null)
@@ -734,7 +735,7 @@
/*
* Break the files into buckets and then compact.
*/
- void doCompaction()
+ void doCompaction() throws IOException
{
isCompacting_.set(true);
List<String> files = new ArrayList<String>(ssTables_);
@@ -755,17 +756,10 @@
if( count == threshHold_ )
break;
}
- try
- {
- // For each bucket if it has crossed the
threshhold do the compaction
- // In case of range compaction merge the
counting bloom filters also.
- if( count == threshHold_)
- doFileCompaction(files, bufSize_);
- }
- catch ( Exception ex)
- {
- logger_.warn(LogUtil.throwableToString(ex));
- }
+ // For each bucket if it has crossed the threshhold do the
compaction
+ // In case of range compaction merge the counting bloom
filters also.
+ if( count == threshHold_)
+ doFileCompaction(files, bufSize_);
}
}
}
@@ -1175,7 +1169,7 @@
* to get the latest data.
*
*/
- void doFileCompaction(List<String> files, int minBufferSize)
+ void doFileCompaction(List<String> files, int minBufferSize) throws
IOException
{
String newfile = null;
long startTime = System.currentTimeMillis();
@@ -1183,175 +1177,168 @@
long totalBytesWritten = 0;
long totalkeysRead = 0;
long totalkeysWritten = 0;
- try
- {
- // Calculate the expected compacted filesize
- long expectedCompactedFileSize =
getExpectedCompactedFileSize(files);
- String compactionFileLocation =
DatabaseDescriptor.getCompactionFileLocation(expectedCompactedFileSize);
- // If the compaction file path is null that means we have no
space left for this compaction.
- if( compactionFileLocation == null )
- {
- String maxFile = getMaxSizeFile( files );
- files.remove( maxFile );
- doFileCompaction(files , minBufferSize);
- return;
- }
- PriorityQueue<FileStruct> pq = initializePriorityQueue(files,
null, minBufferSize);
- if (pq.size() > 0)
- {
- String mergedFileName = getTempFileName( files );
- SSTable ssTable = null;
- String lastkey = null;
- List<FileStruct> lfs = new ArrayList<FileStruct>();
- DataOutputBuffer bufOut = new DataOutputBuffer();
- int expectedBloomFilterSize =
SSTable.getApproximateKeyCount(files);
- expectedBloomFilterSize = (expectedBloomFilterSize > 0) ?
expectedBloomFilterSize : SSTable.indexInterval();
- logger_.debug("Expected bloom filter size : " +
expectedBloomFilterSize);
- /* Create the bloom filter for the compacted file. */
- BloomFilter compactedBloomFilter = new
BloomFilter(expectedBloomFilterSize, 15);
- List<ColumnFamily> columnFamilies = new
ArrayList<ColumnFamily>();
+ // Calculate the expected compacted filesize
+ long expectedCompactedFileSize = getExpectedCompactedFileSize(files);
+ String compactionFileLocation =
DatabaseDescriptor.getCompactionFileLocation(expectedCompactedFileSize);
+ // If the compaction file path is null that means we have no space
left for this compaction.
+ if( compactionFileLocation == null )
+ {
+ String maxFile = getMaxSizeFile( files );
+ files.remove( maxFile );
+ doFileCompaction(files , minBufferSize);
+ return;
+ }
+ PriorityQueue<FileStruct> pq = initializePriorityQueue(files, null,
minBufferSize);
+ if (pq.size() > 0)
+ {
+ String mergedFileName = getTempFileName( files );
+ SSTable ssTable = null;
+ String lastkey = null;
+ List<FileStruct> lfs = new ArrayList<FileStruct>();
+ DataOutputBuffer bufOut = new DataOutputBuffer();
+ int expectedBloomFilterSize =
SSTable.getApproximateKeyCount(files);
+ expectedBloomFilterSize = (expectedBloomFilterSize > 0) ?
expectedBloomFilterSize : SSTable.indexInterval();
+ logger_.debug("Expected bloom filter size : " +
expectedBloomFilterSize);
+ /* Create the bloom filter for the compacted file. */
+ BloomFilter compactedBloomFilter = new
BloomFilter(expectedBloomFilterSize, 15);
+ List<ColumnFamily> columnFamilies = new ArrayList<ColumnFamily>();
- while (pq.size() > 0 || lfs.size() > 0)
- {
- FileStruct fs = null;
- if (pq.size() > 0)
- {
- fs = pq.poll();
- }
- if (fs != null
- && (lastkey == null ||
lastkey.equals(fs.getKey())))
- {
- // The keys are the same so we need to add this to
the
- // ldfs list
- lastkey = fs.getKey();
- lfs.add(fs);
- }
- else
- {
- Collections.sort(lfs, new FileStructComparator());
- ColumnFamily columnFamily;
- bufOut.reset();
- if(lfs.size() > 1)
- {
- for (FileStruct filestruct : lfs)
- {
- try
- {
- /* read the length although we don't
need it */
- filestruct.getBufIn().readInt();
- // Skip the Index
-
IndexHelper.skipBloomFilterAndIndex(filestruct.getBufIn());
- // We want to add only 2 and resolve
them right there in order to save on memory footprint
- if(columnFamilies.size() > 1)
- {
- merge(columnFamilies);
- }
- // deserialize into column
families
-
columnFamilies.add(ColumnFamily.serializer().deserialize(filestruct.getBufIn()));
- }
- catch ( Exception ex)
- {
- logger_.warn("error in filecompaction",
ex);
+ while (pq.size() > 0 || lfs.size() > 0)
+ {
+ FileStruct fs = null;
+ if (pq.size() > 0)
+ {
+ fs = pq.poll();
+ }
+ if (fs != null
+ && (lastkey == null || lastkey.equals(fs.getKey())))
+ {
+ // The keys are the same so we need to add this to the
+ // ldfs list
+ lastkey = fs.getKey();
+ lfs.add(fs);
+ }
+ else
+ {
+ Collections.sort(lfs, new FileStructComparator());
+ ColumnFamily columnFamily;
+ bufOut.reset();
+ if(lfs.size() > 1)
+ {
+ for (FileStruct filestruct : lfs)
+ {
+ try
+ {
+ /* read the length although we don't need it */
+ filestruct.getBufIn().readInt();
+ // Skip the Index
+
IndexHelper.skipBloomFilterAndIndex(filestruct.getBufIn());
+ // We want to add only 2 and resolve them
right there in order to save on memory footprint
+ if(columnFamilies.size() > 1)
+ {
+ merge(columnFamilies);
}
- }
- // Now after merging all crap append to the
sstable
- columnFamily =
resolveAndRemoveDeleted(columnFamilies);
- columnFamilies.clear();
- if( columnFamily != null )
- {
- /* serialize the cf with column
indexes */
-
ColumnFamily.serializerWithIndexes().serialize(columnFamily, bufOut);
- }
- }
- else
- {
- FileStruct filestruct = lfs.get(0);
- try
- {
- /* read the length although we don't
need it */
- int size =
filestruct.getBufIn().readInt();
- bufOut.write(filestruct.getBufIn(),
size);
- }
- catch ( Exception ex)
- {
- ex.printStackTrace();
- filestruct.close();
- continue;
- }
- }
-
- if ( ssTable == null )
- {
- ssTable = new SSTable(compactionFileLocation,
mergedFileName);
- }
- ssTable.append(lastkey, bufOut);
+ // deserialize into column families
+
columnFamilies.add(ColumnFamily.serializer().deserialize(filestruct.getBufIn()));
+ }
+ catch ( Exception ex)
+ {
+ logger_.warn("error in filecompaction", ex);
+ }
+ }
+ // Now after merging all crap append to the sstable
+ columnFamily = resolveAndRemoveDeleted(columnFamilies);
+ columnFamilies.clear();
+ if( columnFamily != null )
+ {
+ /* serialize the cf with column indexes */
+
ColumnFamily.serializerWithIndexes().serialize(columnFamily, bufOut);
+ }
+ }
+ else
+ {
+ FileStruct filestruct = lfs.get(0);
+ try
+ {
+ /* read the length although we don't need it */
+ int size = filestruct.getBufIn().readInt();
+ bufOut.write(filestruct.getBufIn(), size);
+ }
+ catch ( Exception ex)
+ {
+ ex.printStackTrace();
+ filestruct.close();
+ continue;
+ }
+ }
- /* Fill the bloom filter with the key */
- doFill(compactedBloomFilter, lastkey);
- totalkeysWritten++;
- for (FileStruct filestruct : lfs)
- {
- try
- {
- filestruct.advance();
- if (filestruct.isExhausted())
- {
- continue;
- }
- pq.add(filestruct);
- totalkeysRead++;
- }
- catch ( Throwable ex )
- {
- // Ignore the exception as it might be
a corrupted file
- // in any case we have read as far as
possible from it
- // and it will be deleted after
compaction.
- filestruct.close();
+ if ( ssTable == null )
+ {
+ ssTable = new SSTable(compactionFileLocation,
mergedFileName);
+ }
+ ssTable.append(lastkey, bufOut);
+
+ /* Fill the bloom filter with the key */
+ doFill(compactedBloomFilter, lastkey);
+ totalkeysWritten++;
+ for (FileStruct filestruct : lfs)
+ {
+ try
+ {
+ filestruct.advance();
+ if (filestruct.isExhausted())
+ {
+ continue;
}
- }
- lfs.clear();
- lastkey = null;
- if (fs != null)
- {
- /* Add back the fs since we processed the rest
of filestructs */
- pq.add(fs);
- }
- }
- }
- if ( ssTable != null )
- {
- ssTable.closeRename(compactedBloomFilter);
- newfile = ssTable.getDataFileLocation();
- }
- lock_.writeLock().lock();
- try
- {
- for (String file : files)
- {
- ssTables_.remove(file);
- SSTable.removeAssociatedBloomFilter(file);
- }
- if ( newfile != null )
- {
- ssTables_.add(newfile);
- logger_.debug("Inserting bloom filter for file " +
newfile);
- SSTable.storeBloomFilter(newfile,
compactedBloomFilter);
- totalBytesWritten = (new File(newfile)).length();
- }
- }
- finally
- {
- lock_.writeLock().unlock();
- }
- for (String file : files)
- {
- SSTable.delete(file);
- }
- }
- }
- catch ( Exception ex)
- {
- logger_.warn( LogUtil.throwableToString(ex) );
+ pq.add(filestruct);
+ totalkeysRead++;
+ }
+ catch ( Throwable ex )
+ {
+ // Ignore the exception as it might be a corrupted
file
+ // in any case we have read as far as possible
from it
+ // and it will be deleted after compaction.
+ filestruct.close();
+ }
+ }
+ lfs.clear();
+ lastkey = null;
+ if (fs != null)
+ {
+ /* Add back the fs since we processed the rest of
filestructs */
+ pq.add(fs);
+ }
+ }
+ }
+ if ( ssTable != null )
+ {
+ ssTable.closeRename(compactedBloomFilter);
+ newfile = ssTable.getDataFileLocation();
+ }
+ lock_.writeLock().lock();
+ try
+ {
+ for (String file : files)
+ {
+ ssTables_.remove(file);
+ SSTable.removeAssociatedBloomFilter(file);
+ }
+ if ( newfile != null )
+ {
+ ssTables_.add(newfile);
+ logger_.debug("Inserting bloom filter for file " +
newfile);
+ SSTable.storeBloomFilter(newfile, compactedBloomFilter);
+ totalBytesWritten = (new File(newfile)).length();
+ }
+ }
+ finally
+ {
+ lock_.writeLock().unlock();
+ }
+ for (String file : files)
+ {
+ SSTable.delete(file);
+ }
}
logger_.debug("Total time taken for compaction ..."
+ (System.currentTimeMillis() - startTime));
Modified:
incubator/cassandra/trunk/src/org/apache/cassandra/db/MinorCompactionManager.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/db/MinorCompactionManager.java?rev=764678&r1=764677&r2=764678&view=diff
==============================================================================
---
incubator/cassandra/trunk/src/org/apache/cassandra/db/MinorCompactionManager.java
(original)
+++
incubator/cassandra/trunk/src/org/apache/cassandra/db/MinorCompactionManager.java
Tue Apr 14 05:12:32 2009
@@ -81,16 +81,16 @@
public void run()
{
+ logger_.debug("Started compaction
..."+columnFamilyStore_.columnFamily_);
try
{
- logger_.debug("Started compaction
..."+columnFamilyStore_.columnFamily_);
- columnFamilyStore_.doCompaction();
- logger_.debug("Finished compaction
..."+columnFamilyStore_.columnFamily_);
+ columnFamilyStore_.doCompaction();
}
- catch (Throwable th)
+ catch (IOException e)
{
- logger_.error( LogUtil.throwableToString(th) );
+ throw new RuntimeException(e);
}
+ logger_.debug("Finished compaction
..."+columnFamilyStore_.columnFamily_);
}
}
@@ -149,16 +149,9 @@
public void run()
{
- try
- {
- logger_.debug("Started compaction
..."+columnFamilyStore_.columnFamily_);
- columnFamilyStore_.doCleanupCompaction();
- logger_.debug("Finished compaction
..."+columnFamilyStore_.columnFamily_);
- }
- catch (Throwable th)
- {
- logger_.error( LogUtil.throwableToString(th) );
- }
+ logger_.debug("Started compaction
..."+columnFamilyStore_.columnFamily_);
+ columnFamilyStore_.doCleanupCompaction();
+ logger_.debug("Finished compaction
..."+columnFamilyStore_.columnFamily_);
}
}
@@ -181,9 +174,9 @@
MinorCompactionManager.intervalInMins_,
TimeUnit.MINUTES);
}
- public void submit(ColumnFamilyStore columnFamilyStore)
+ public Future submit(ColumnFamilyStore columnFamilyStore)
{
- compactor_.submit(new FileCompactor(columnFamilyStore));
+ return compactor_.submit(new FileCompactor(columnFamilyStore));
}
public void submitCleanup(ColumnFamilyStore columnFamilyStore)
Modified:
incubator/cassandra/trunk/test/org/apache/cassandra/db/ColumnFamilyStoreTest.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/test/org/apache/cassandra/db/ColumnFamilyStoreTest.java?rev=764678&r1=764677&r2=764678&view=diff
==============================================================================
---
incubator/cassandra/trunk/test/org/apache/cassandra/db/ColumnFamilyStoreTest.java
(original)
+++
incubator/cassandra/trunk/test/org/apache/cassandra/db/ColumnFamilyStoreTest.java
Tue Apr 14 05:12:32 2009
@@ -314,4 +314,26 @@
fos.close();
return f.getAbsolutePath();
}
+
+ @Test
+ public void testCompaction() throws IOException,
ColumnFamilyNotDefinedException, ExecutionException, InterruptedException
+ {
+ Table table = Table.open("Table1");
+ ColumnFamilyStore store = table.getColumnFamilyStore("Standard1");
+ store.ssTables_.clear(); // TODO integrate this better into test
setup/teardown
+
+ for (int j = 0; j < 5; j++) {
+ for (int i = 0; i < 10; i++) {
+ long epoch = System.currentTimeMillis() / 1000;
+ String key = String.format("%s.%s.%s", epoch, 1, i);
+ RowMutation rm = new RowMutation("Table1", key);
+ rm.add("Standard1:A", new byte[0], epoch);
+ rm.apply();
+ }
+ store.forceFlush();
+ waitForFlush();
+ }
+ Future ft = MinorCompactionManager.instance().submit(store);
+ ft.get();
+ }
}