Author: jbellis
Date: Wed Dec 23 19:18:05 2009
New Revision: 893600
URL: http://svn.apache.org/viewvc?rev=893600&view=rev
Log:
clean up compactions api and move into CompactionManager. patch by jbellis;
reviewed by eevans for CASSANDRA-599
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTable.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java
incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/CompactionsTest.java
incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/OneCompactionTest.java
incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/RecoveryManager2Test.java
incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/RemoveSuperColumnTest.java
incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/TableTest.java
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=893600&r1=893599&r2=893600&view=diff
==============================================================================
---
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
(original)
+++
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
Wed Dec 23 19:18:05 2009
@@ -35,7 +35,6 @@
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.dht.IPartitioner;
-import org.apache.cassandra.dht.Range;
import org.apache.cassandra.io.*;
import java.net.InetAddress;
import java.util.regex.Matcher;
@@ -49,7 +48,6 @@
import org.apache.cassandra.db.filter.*;
import org.apache.cassandra.db.marshal.AbstractType;
-import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang.ArrayUtils;
import org.apache.commons.collections.IteratorUtils;
import org.apache.commons.collections.PredicateUtils;
@@ -292,31 +290,6 @@
sb.append(newLineSeparator);
return sb.toString();
}
-
- /*
- * This method forces a compaction of the SSTables on disk. We wait
- * for the process to complete by waiting on a future pointer.
- */
- List<SSTableReader> forceAntiCompaction(Collection<Range> ranges,
InetAddress target)
- {
- assert ranges != null;
- Future<List<SSTableReader>> futurePtr =
CompactionManager.instance.submitAnti(ColumnFamilyStore.this,
-
ranges, target);
-
- List<SSTableReader> result;
- try
- {
- /* Waiting for the compaction to complete. */
- result = futurePtr.get();
- if (logger_.isDebugEnabled())
- logger_.debug("Done forcing compaction ...");
- }
- catch (Exception ex)
- {
- throw new RuntimeException(ex);
- }
- return result;
- }
/**
* @return the name of the column family
@@ -501,12 +474,7 @@
*/
static ColumnFamily removeDeleted(ColumnFamily cf)
{
- return removeDeleted(cf, getDefaultGCBefore());
- }
-
- public static int getDefaultGCBefore()
- {
- return (int)(System.currentTimeMillis() / 1000) -
DatabaseDescriptor.getGcGraceInSeconds();
+ return removeDeleted(cf, CompactionManager.getDefaultGCBefore());
}
public static ColumnFamily removeDeleted(ColumnFamily cf, int gcBefore)
@@ -597,109 +565,6 @@
}
/*
- * Group files of similar size into buckets.
- */
- static Set<List<SSTableReader>>
getCompactionBuckets(Iterable<SSTableReader> files, long min)
- {
- Map<List<SSTableReader>, Long> buckets = new
HashMap<List<SSTableReader>, Long>();
- for (SSTableReader sstable : files)
- {
- long size = sstable.length();
-
- boolean bFound = false;
- // look for a bucket containing similar-sized files:
- // group in the same bucket if it's w/in 50% of the average for
this bucket,
- // or this file and the bucket are all considered "small" (less
than `min`)
- for (List<SSTableReader> bucket : buckets.keySet())
- {
- long averageSize = buckets.get(bucket);
- if ((size > averageSize / 2 && size < 3 * averageSize / 2)
- || (size < min && averageSize < min))
- {
- // remove and re-add because adding changes the hash
- buckets.remove(bucket);
- averageSize = (averageSize + size) / 2;
- bucket.add(sstable);
- buckets.put(bucket, averageSize);
- bFound = true;
- break;
- }
- }
- // no similar bucket found; put it in a new one
- if (!bFound)
- {
- ArrayList<SSTableReader> bucket = new
ArrayList<SSTableReader>();
- bucket.add(sstable);
- buckets.put(bucket, size);
- }
- }
-
- return buckets.keySet();
- }
-
- /*
- * Break the files into buckets and then compact.
- */
- int doCompaction(int minThreshold, int maxThreshold) throws IOException
- {
- int filesCompacted = 0;
- if (minThreshold > 0 && maxThreshold > 0)
- {
- logger_.debug("Checking to see if compaction of " + columnFamily_
+ " would be useful");
- for (List<SSTableReader> sstables :
getCompactionBuckets(ssTables_, 50L * 1024L * 1024L))
- {
- if (sstables.size() < minThreshold)
- {
- continue;
- }
- // if we have too many to compact all at once, compact older
ones first -- this avoids
- // re-compacting files we just created.
- Collections.sort(sstables);
- filesCompacted += doFileCompaction(sstables.subList(0,
Math.min(sstables.size(), maxThreshold)));
- }
- logger_.debug(filesCompacted + " files compacted");
- }
- else
- {
- logger_.debug("Compaction is currently disabled.");
- }
- return filesCompacted;
- }
-
- void doMajorCompaction(long skip) throws IOException
- {
- doMajorCompactionInternal(skip);
- }
-
- /*
- * Compact all the files irrespective of the size.
- * skip : is the amount in GB of the files to be skipped
- * all files greater than skip GB are skipped for this compaction.
- * Except if skip is 0 , in that case this is ignored and all files are
taken.
- */
- void doMajorCompactionInternal(long skip) throws IOException
- {
- Collection<SSTableReader> sstables;
- if (skip > 0)
- {
- sstables = new ArrayList<SSTableReader>();
- for (SSTableReader sstable : ssTables_)
- {
- if (sstable.length() < skip * 1024L * 1024L * 1024L)
- {
- sstables.add(sstable);
- }
- }
- }
- else
- {
- sstables = ssTables_.getSSTables();
- }
-
- doFileCompaction(sstables);
- }
-
- /*
* Add up all the files sizes this is the worst case file
* size for compaction of all the list of files given.
*/
@@ -732,277 +597,41 @@
return maxFile;
}
- List<SSTableReader> doAntiCompaction(Collection<Range> ranges, InetAddress
target) throws IOException
- {
- return doFileAntiCompaction(ssTables_.getSSTables(), ranges, target);
- }
-
void forceCleanup()
{
CompactionManager.instance.submitCleanup(ColumnFamilyStore.this);
}
- /**
- * This function goes over each file and removes the keys that the node is
not responsible for
- * and only keeps keys that this node is responsible for.
- *
- * @throws IOException
- */
- void doCleanupCompaction() throws IOException
- {
- for (SSTableReader sstable : ssTables_)
- {
- doCleanup(sstable);
- }
- gcAfterRpcTimeout();
- }
-
- /**
- * cleans up one particular file by removing keys that this node is not
responsible for.
- * @throws IOException
- */
- /* TODO: Take care of the comments later. */
- void doCleanup(SSTableReader sstable) throws IOException
- {
- assert sstable != null;
- List<SSTableReader> sstables =
doFileAntiCompaction(Arrays.asList(sstable),
StorageService.instance().getLocalRanges(), null);
- if (!sstables.isEmpty())
- {
- assert sstables.size() == 1;
- addSSTable(sstables.get(0));
- }
- if (logger_.isDebugEnabled())
- logger_.debug("Original file : " + sstable + " of size " +
sstable.length());
- ssTables_.markCompacted(Arrays.asList(sstable));
- }
-
- /**
- * This function is used to do the anti compaction process , it spits out
the file which has keys that belong to a given range
- * If the target is not specified it spits out the file as a compacted
file with the unecessary ranges wiped out.
- *
- * @param sstables
- * @param ranges
- * @param target
- * @return
- * @throws IOException
- */
- List<SSTableReader> doFileAntiCompaction(Collection<SSTableReader>
sstables, final Collection<Range> ranges, InetAddress target) throws IOException
+ public Table getTable()
{
- logger_.info("AntiCompacting [" + StringUtils.join(sstables, ",") +
"]");
- // Calculate the expected compacted filesize
- long expectedRangeFileSize = getExpectedCompactedFileSize(sstables) /
2;
- String compactionFileLocation =
DatabaseDescriptor.getDataFileLocationForTable(table_, expectedRangeFileSize);
- if (compactionFileLocation == null)
- {
- throw new UnsupportedOperationException("disk full");
- }
- if (target != null)
- {
- // compacting for streaming: send to subdirectory
- compactionFileLocation = compactionFileLocation + File.separator +
DatabaseDescriptor.STREAMING_SUBDIR;
- }
- List<SSTableReader> results = new ArrayList<SSTableReader>();
-
- long startTime = System.currentTimeMillis();
- long totalkeysWritten = 0;
-
- int expectedBloomFilterSize = Math.max(SSTableReader.indexInterval(),
(int)(SSTableReader.getApproximateKeyCount(sstables) / 2));
- if (logger_.isDebugEnabled())
- logger_.debug("Expected bloom filter size : " +
expectedBloomFilterSize);
-
- SSTableWriter writer = null;
- CompactionIterator ci = new AntiCompactionIterator(sstables, ranges,
getDefaultGCBefore(), sstables.size() == ssTables_.size());
- Iterator<CompactionIterator.CompactedRow> nni = new FilterIterator(ci,
PredicateUtils.notNullPredicate());
-
try
{
- if (!nni.hasNext())
- {
- return results;
- }
-
- while (nni.hasNext())
- {
- CompactionIterator.CompactedRow row = nni.next();
- if (writer == null)
- {
- FileUtils.createDirectory(compactionFileLocation);
- String newFilename = new File(compactionFileLocation,
getTempSSTableFileName()).getAbsolutePath();
- writer = new SSTableWriter(newFilename,
expectedBloomFilterSize, StorageService.getPartitioner());
- }
- writer.append(row.key, row.buffer);
- totalkeysWritten++;
- }
- }
- finally
- {
- ci.close();
+ return Table.open(table_);
}
-
- if (writer != null)
+ catch (IOException e)
{
-
results.add(writer.closeAndOpenReader(DatabaseDescriptor.getKeysCachedFraction(table_)));
- String format = "AntiCompacted to %s. %d/%d bytes for %d keys.
Time: %dms.";
- long dTime = System.currentTimeMillis() - startTime;
- logger_.info(String.format(format, writer.getFilename(),
getTotalBytes(sstables), results.get(0).length(), totalkeysWritten, dTime));
+ throw new RuntimeException(e);
}
-
- return results;
- }
-
- private int doFileCompaction(Collection<SSTableReader> sstables) throws
IOException
- {
- return doFileCompaction(sstables, getDefaultGCBefore());
}
- /*
- * This function does the actual compaction for files.
- * It maintains a priority queue of with the first key from each file
- * and then removes the top of the queue and adds it to the SStable and
- * repeats this process while reading the next from each file until its
- * done with all the files . The SStable to which the keys are written
- * represents the new compacted file. Before writing if there are keys
- * that occur in multiple files and are the same then a resolution is done
- * to get the latest data.
- *
- * The collection of sstables passed may be empty (but not null); even if
- * it is not empty, it may compact down to nothing if all rows are deleted.
- */
- int doFileCompaction(Collection<SSTableReader> sstables, int gcBefore)
throws IOException
+ void markCompacted(Collection<SSTableReader> sstables) throws IOException
{
- if (DatabaseDescriptor.isSnapshotBeforeCompaction())
- Table.open(table_).snapshot("compact-" + columnFamily_);
- logger_.info("Compacting [" + StringUtils.join(sstables, ",") + "]");
- String compactionFileLocation =
DatabaseDescriptor.getDataFileLocationForTable(table_,
getExpectedCompactedFileSize(sstables));
- // If the compaction file path is null that means we have no space
left for this compaction.
- // try again w/o the largest one.
- if (compactionFileLocation == null)
- {
- SSTableReader maxFile = getMaxSizeFile(sstables);
- List<SSTableReader> smallerSSTables = new
ArrayList<SSTableReader>(sstables);
- smallerSSTables.remove(maxFile);
- return doFileCompaction(smallerSSTables, gcBefore);
- }
-
- // new sstables from flush can be added during a compaction, but only
the compaction can remove them,
- // so in our single-threaded compaction world this is a valid way of
determining if we're compacting
- // all the sstables (that existed when we started)
- boolean major = sstables.size() == ssTables_.size();
-
- long startTime = System.currentTimeMillis();
- long totalkeysWritten = 0;
-
- // TODO the int cast here is potentially buggy
- int expectedBloomFilterSize = Math.max(SSTableReader.indexInterval(),
(int)SSTableReader.getApproximateKeyCount(sstables));
- if (logger_.isDebugEnabled())
- logger_.debug("Expected bloom filter size : " +
expectedBloomFilterSize);
-
- SSTableWriter writer;
- CompactionIterator ci = new CompactionIterator(sstables, gcBefore,
major); // retain a handle so we can call close()
- Iterator<CompactionIterator.CompactedRow> nni = new FilterIterator(ci,
PredicateUtils.notNullPredicate());
-
- try
- {
- if (!nni.hasNext())
- {
- // don't mark compacted in the finally block, since if there
_is_ nondeleted data,
- // we need to sync it (via closeAndOpen) first, so there is no
period during which
- // a crash could cause data loss.
- ssTables_.markCompacted(sstables);
- return 0;
- }
-
- String newFilename = new File(compactionFileLocation,
getTempSSTableFileName()).getAbsolutePath();
- writer = new SSTableWriter(newFilename, expectedBloomFilterSize,
StorageService.getPartitioner());
-
- // validate the CF as we iterate over it
- AntiEntropyService.IValidator validator =
AntiEntropyService.instance().getValidator(table_, columnFamily_, null, major);
- validator.prepare();
- while (nni.hasNext())
- {
- CompactionIterator.CompactedRow row = nni.next();
- writer.append(row.key, row.buffer);
- validator.add(row);
- totalkeysWritten++;
- }
- validator.complete();
- }
- finally
- {
- ci.close();
- }
-
- SSTableReader ssTable =
writer.closeAndOpenReader(DatabaseDescriptor.getKeysCachedFraction(table_));
- ssTables_.add(ssTable);
ssTables_.markCompacted(sstables);
- gcAfterRpcTimeout();
- CompactionManager.instance.submitMinor(ColumnFamilyStore.this);
-
- String format = "Compacted to %s. %d/%d bytes for %d keys. Time:
%dms.";
- long dTime = System.currentTimeMillis() - startTime;
- logger_.info(String.format(format, writer.getFilename(),
getTotalBytes(sstables), ssTable.length(), totalkeysWritten, dTime));
- return sstables.size();
}
- /**
- * perform a GC to clean out obsolete sstables, sleeping rpc timeout first
so that most in-progress ops can complete
- * (thus, no longer reference the sstables in question)
- */
- private void gcAfterRpcTimeout()
+ boolean isCompleteSSTables(Collection<SSTableReader> sstables)
{
- new Thread(new Runnable()
- {
- public void run()
- {
- try
- {
- Thread.sleep(DatabaseDescriptor.getRpcTimeout());
- }
- catch (InterruptedException e)
- {
- throw new AssertionError(e);
- }
- System.gc();
- }
- }).start();
+ return ssTables_.getSSTables().equals(new
HashSet<SSTableReader>(sstables));
}
- /**
- * Performs a readonly "compaction" of all sstables in order to validate
complete rows,
- * but without writing the merge result
- */
- void doReadonlyCompaction(InetAddress initiator) throws IOException
+ void replaceCompactedSSTables(Collection<SSTableReader> sstables,
Iterable<SSTableReader> replacements)
+ throws IOException
{
- Collection<SSTableReader> sstables = ssTables_.getSSTables();
- CompactionIterator ci = new CompactionIterator(sstables,
getDefaultGCBefore(), true);
- try
+ for (SSTableReader sstable : replacements)
{
- Iterator<CompactionIterator.CompactedRow> nni = new
FilterIterator(ci, PredicateUtils.notNullPredicate());
-
- // validate the CF as we iterate over it
- AntiEntropyService.IValidator validator =
AntiEntropyService.instance().getValidator(table_, columnFamily_, initiator,
true);
- validator.prepare();
- while (nni.hasNext())
- {
- CompactionIterator.CompactedRow row = nni.next();
- validator.add(row);
- }
- validator.complete();
+ ssTables_.add(sstable);
}
- finally
- {
- ci.close();
- }
- }
-
- private long getTotalBytes(Iterable<SSTableReader> sstables)
- {
- long sum = 0;
- for (SSTableReader sstable : sstables)
- {
- sum += sstable.length();
- }
- return sum;
+ ssTables_.markCompacted(sstables);
}
public static List<Memtable> getUnflushedMemtables(String cfName)
@@ -1169,7 +798,7 @@
public ColumnFamily getColumnFamily(QueryFilter filter) throws IOException
{
- return getColumnFamily(filter, getDefaultGCBefore());
+ return getColumnFamily(filter, CompactionManager.getDefaultGCBefore());
}
/**
@@ -1594,40 +1223,4 @@
memtable_.clearUnsafe();
ssTables_.clearUnsafe();
}
-
- private static class AntiCompactionIterator extends CompactionIterator
- {
- public AntiCompactionIterator(Collection<SSTableReader> sstables,
Collection<Range> ranges, int gcBefore, boolean isMajor)
- throws IOException
- {
- super(getCollatedRangeIterator(sstables, ranges), gcBefore,
isMajor);
- }
-
- private static Iterator
getCollatedRangeIterator(Collection<SSTableReader> sstables, final
Collection<Range> ranges)
- throws IOException
- {
- org.apache.commons.collections.Predicate rangesPredicate = new
org.apache.commons.collections.Predicate()
- {
- public boolean evaluate(Object row)
- {
- return
Range.isTokenInRanges(((IteratingRow)row).getKey().token, ranges);
- }
- };
- CollatingIterator iter =
FBUtilities.<IteratingRow>getCollatingIterator();
- for (SSTableReader sstable : sstables)
- {
- SSTableScanner scanner = sstable.getScanner(FILE_BUFFER_SIZE);
- iter.addIterator(new FilterIterator(scanner, rangesPredicate));
- }
- return iter;
- }
-
- public void close() throws IOException
- {
- for (Object o : ((CollatingIterator)source).getIterators())
- {
- ((SSTableScanner)((FilterIterator)o).getIterator()).close();
- }
- }
- }
}
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java?rev=893600&r1=893599&r2=893600&view=diff
==============================================================================
---
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java
(original)
+++
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java
Wed Dec 23 19:18:05 2009
@@ -19,9 +19,9 @@
package org.apache.cassandra.db;
import java.io.IOException;
+import java.io.File;
import java.lang.management.ManagementFactory;
-import java.util.List;
-import java.util.Collection;
+import java.util.*;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
@@ -31,9 +31,22 @@
import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
import org.apache.cassandra.dht.Range;
-import org.apache.cassandra.io.SSTableReader;
+import org.apache.cassandra.io.*;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.service.AntiEntropyService;
+import org.apache.cassandra.utils.FileUtils;
+import org.apache.cassandra.utils.FBUtilities;
+
import java.net.InetAddress;
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.collections.iterators.FilterIterator;
+import org.apache.commons.collections.iterators.CollatingIterator;
+import org.apache.commons.collections.PredicateUtils;
+
+import com.sun.corba.se.impl.logging.POASystemException;
+
public class CompactionManager implements CompactionManagerMBean
{
public static final String MBEAN_OBJECT_NAME =
"org.apache.cassandra.db:type=CompactionManager";
@@ -64,18 +77,34 @@
* It's okay to over-call (within reason) since the compactions are
single-threaded,
* and if a call is unnecessary, it will just be no-oped in the bucketing
phase.
*/
- public Future<Integer> submitMinor(final ColumnFamilyStore
columnFamilyStore)
- {
- return submitMinor(columnFamilyStore, minimumCompactionThreshold,
maximumCompactionThreshold);
- }
-
- Future<Integer> submitMinor(final ColumnFamilyStore cfStore, final int
minThreshold, final int maxThreshold)
+ public Future<Integer> submitMinor(final ColumnFamilyStore cfs)
{
Callable<Integer> callable = new Callable<Integer>()
{
public Integer call() throws IOException
{
- return cfStore.doCompaction(minThreshold, maxThreshold);
+ int filesCompacted = 0;
+ if (minimumCompactionThreshold > 0 &&
maximumCompactionThreshold > 0)
+ {
+ logger.debug("Checking to see if compaction of " +
cfs.columnFamily_ + " would be useful");
+ for (List<SSTableReader> sstables :
getCompactionBuckets(cfs.getSSTables(), 50L * 1024L * 1024L))
+ {
+ if (sstables.size() < minimumCompactionThreshold)
+ {
+ continue;
+ }
+ // if we have too many to compact all at once, compact
older ones first -- this avoids
+ // re-compacting files we just created.
+ Collections.sort(sstables);
+ filesCompacted += doCompaction(cfs,
sstables.subList(0, Math.min(sstables.size(), maximumCompactionThreshold)),
getDefaultGCBefore());
+ }
+ logger.debug(filesCompacted + " files compacted");
+ }
+ else
+ {
+ logger.debug("Compaction is currently disabled.");
+ }
+ return filesCompacted;
}
};
return compactor_.submit(callable);
@@ -87,32 +116,54 @@
{
public Object call() throws IOException
{
- cfStore.doCleanupCompaction();
+ doCleanupCompaction(cfStore);
return this;
}
};
return compactor_.submit(runnable);
}
- public Future<List<SSTableReader>> submitAnti(final ColumnFamilyStore
cfStore, final Collection<Range> ranges, final InetAddress target)
+ public Future<List<SSTableReader>> submitAnticompaction(final
ColumnFamilyStore cfStore, final Collection<Range> ranges, final InetAddress
target)
{
Callable<List<SSTableReader>> callable = new
Callable<List<SSTableReader>>()
{
public List<SSTableReader> call() throws IOException
{
- return cfStore.doAntiCompaction(ranges, target);
+ return doAntiCompaction(cfStore, cfStore.getSSTables(),
ranges, target);
}
};
return compactor_.submit(callable);
}
- public Future submitMajor(final ColumnFamilyStore cfStore, final long skip)
+ public Future submitMajor(final ColumnFamilyStore cfStore)
+ {
+ return submitMajor(cfStore, 0, getDefaultGCBefore());
+ }
+
+ public Future submitMajor(final ColumnFamilyStore cfStore, final long
skip, final int gcBefore)
{
Callable<Object> callable = new Callable<Object>()
{
public Object call() throws IOException
{
- cfStore.doMajorCompaction(skip);
+ Collection<SSTableReader> sstables;
+ if (skip > 0)
+ {
+ sstables = new ArrayList<SSTableReader>();
+ for (SSTableReader sstable : cfStore.getSSTables())
+ {
+ if (sstable.length() < skip * 1024L * 1024L * 1024L)
+ {
+ sstables.add(sstable);
+ }
+ }
+ }
+ else
+ {
+ sstables = cfStore.getSSTables();
+ }
+
+ doCompaction(cfStore, sstables, gcBefore);
return this;
}
};
@@ -125,7 +176,7 @@
{
public Object call() throws IOException
{
- cfStore.doReadonlyCompaction(initiator);
+ doReadonlyCompaction(cfStore, initiator);
return this;
}
};
@@ -164,9 +215,318 @@
maximumCompactionThreshold = threshold;
}
- public void disableCompactions()
+ public void disableAutoCompaction()
{
minimumCompactionThreshold = 0;
maximumCompactionThreshold = 0;
}
+
+ /**
+ * For internal use and testing only. The rest of the system should go
through the submit* methods,
+ * which are properly serialized.
+ */
+ int doCompaction(ColumnFamilyStore cfs, Collection<SSTableReader>
sstables, int gcBefore) throws IOException
+ {
+ // The collection of sstables passed may be empty (but not null); even
if
+ // it is not empty, it may compact down to nothing if all rows are
deleted.
+ Table table = cfs.getTable();
+ if (DatabaseDescriptor.isSnapshotBeforeCompaction())
+ table.snapshot("compact-" + cfs.columnFamily_);
+ logger.info("Compacting [" + StringUtils.join(sstables, ",") + "]");
+ String compactionFileLocation =
table.getDataFileLocation(cfs.getExpectedCompactedFileSize(sstables));
+ // If the compaction file path is null that means we have no space
left for this compaction.
+ // try again w/o the largest one.
+ if (compactionFileLocation == null)
+ {
+ SSTableReader maxFile = cfs.getMaxSizeFile(sstables);
+ List<SSTableReader> smallerSSTables = new
ArrayList<SSTableReader>(sstables);
+ smallerSSTables.remove(maxFile);
+ return doCompaction(cfs, smallerSSTables, gcBefore);
+ }
+
+ // new sstables from flush can be added during a compaction, but only
the compaction can remove them,
+ // so in our single-threaded compaction world this is a valid way of
determining if we're compacting
+ // all the sstables (that existed when we started)
+ boolean major = cfs.isCompleteSSTables(sstables);
+
+ long startTime = System.currentTimeMillis();
+ long totalkeysWritten = 0;
+
+ // TODO the int cast here is potentially buggy
+ int expectedBloomFilterSize = Math.max(SSTableReader.indexInterval(),
(int)SSTableReader.getApproximateKeyCount(sstables));
+ if (logger.isDebugEnabled())
+ logger.debug("Expected bloom filter size : " +
expectedBloomFilterSize);
+
+ SSTableWriter writer;
+ CompactionIterator ci = new CompactionIterator(sstables, gcBefore,
major); // retain a handle so we can call close()
+ Iterator<CompactionIterator.CompactedRow> nni = new FilterIterator(ci,
PredicateUtils.notNullPredicate());
+
+ try
+ {
+ if (!nni.hasNext())
+ {
+ // don't mark compacted in the finally block, since if there
_is_ nondeleted data,
+ // we need to sync it (via closeAndOpen) first, so there is no
period during which
+ // a crash could cause data loss.
+ cfs.markCompacted(sstables);
+ return 0;
+ }
+
+ String newFilename = new File(compactionFileLocation,
cfs.getTempSSTableFileName()).getAbsolutePath();
+ writer = new SSTableWriter(newFilename, expectedBloomFilterSize,
StorageService.getPartitioner());
+
+ // validate the CF as we iterate over it
+ AntiEntropyService.IValidator validator =
AntiEntropyService.instance().getValidator(table.name,
cfs.getColumnFamilyName(), null, major);
+ validator.prepare();
+ while (nni.hasNext())
+ {
+ CompactionIterator.CompactedRow row = nni.next();
+ writer.append(row.key, row.buffer);
+ validator.add(row);
+ totalkeysWritten++;
+ }
+ validator.complete();
+ }
+ finally
+ {
+ ci.close();
+ }
+
+ SSTableReader ssTable =
writer.closeAndOpenReader(DatabaseDescriptor.getKeysCachedFraction(table.name));
+ cfs.replaceCompactedSSTables(sstables, Arrays.asList(ssTable));
+ gcAfterRpcTimeout();
+ instance.submitMinor(cfs);
+
+ String format = "Compacted to %s. %d/%d bytes for %d keys. Time:
%dms.";
+ long dTime = System.currentTimeMillis() - startTime;
+ logger.info(String.format(format, writer.getFilename(),
SSTable.getTotalBytes(sstables), ssTable.length(), totalkeysWritten, dTime));
+ return sstables.size();
+ }
+
+ /**
+ * This function is used to do the anti compaction process , it spits out
the file which has keys that belong to a given range
+ * If the target is not specified it spits out the file as a compacted
file with the unecessary ranges wiped out.
+ *
+ * @param cfs
+ * @param sstables
+ * @param ranges
+ * @param target
+ * @return
+ * @throws java.io.IOException
+ */
+ private List<SSTableReader> doAntiCompaction(ColumnFamilyStore cfs,
Collection<SSTableReader> sstables, Collection<Range> ranges, InetAddress
target)
+ throws IOException
+ {
+ Table table = cfs.getTable();
+ logger.info("AntiCompacting [" + StringUtils.join(sstables, ",") +
"]");
+ // Calculate the expected compacted filesize
+ long expectedRangeFileSize =
cfs.getExpectedCompactedFileSize(sstables) / 2;
+ String compactionFileLocation =
DatabaseDescriptor.getDataFileLocationForTable(table.name,
expectedRangeFileSize);
+ if (compactionFileLocation == null)
+ {
+ throw new UnsupportedOperationException("disk full");
+ }
+ if (target != null)
+ {
+ // compacting for streaming: send to subdirectory
+ compactionFileLocation = compactionFileLocation + File.separator +
DatabaseDescriptor.STREAMING_SUBDIR;
+ }
+ List<SSTableReader> results = new ArrayList<SSTableReader>();
+
+ long startTime = System.currentTimeMillis();
+ long totalkeysWritten = 0;
+
+ int expectedBloomFilterSize = Math.max(SSTableReader.indexInterval(),
(int)(SSTableReader.getApproximateKeyCount(sstables) / 2));
+ if (logger.isDebugEnabled())
+ logger.debug("Expected bloom filter size : " +
expectedBloomFilterSize);
+
+ SSTableWriter writer = null;
+ CompactionIterator ci = new AntiCompactionIterator(sstables, ranges,
getDefaultGCBefore(), cfs.isCompleteSSTables(sstables));
+ Iterator<CompactionIterator.CompactedRow> nni = new FilterIterator(ci,
PredicateUtils.notNullPredicate());
+
+ try
+ {
+ if (!nni.hasNext())
+ {
+ return results;
+ }
+
+ while (nni.hasNext())
+ {
+ CompactionIterator.CompactedRow row = nni.next();
+ if (writer == null)
+ {
+ FileUtils.createDirectory(compactionFileLocation);
+ String newFilename = new File(compactionFileLocation,
cfs.getTempSSTableFileName()).getAbsolutePath();
+ writer = new SSTableWriter(newFilename,
expectedBloomFilterSize, StorageService.getPartitioner());
+ }
+ writer.append(row.key, row.buffer);
+ totalkeysWritten++;
+ }
+ }
+ finally
+ {
+ ci.close();
+ }
+
+ if (writer != null)
+ {
+
results.add(writer.closeAndOpenReader(DatabaseDescriptor.getKeysCachedFraction(table.name)));
+ String format = "AntiCompacted to %s. %d/%d bytes for %d keys.
Time: %dms.";
+ long dTime = System.currentTimeMillis() - startTime;
+ logger.info(String.format(format, writer.getFilename(),
SSTable.getTotalBytes(sstables), results.get(0).length(), totalkeysWritten,
dTime));
+ }
+
+ return results;
+ }
+
+ /**
+ * This function goes over each file and removes the keys that the node is
not responsible for
+ * and only keeps keys that this node is responsible for.
+ *
+ * @throws IOException
+ */
+ private void doCleanupCompaction(ColumnFamilyStore cfs) throws IOException
+ {
+ Collection<SSTableReader> originalSSTables = cfs.getSSTables();
+ List<SSTableReader> sstables = doAntiCompaction(cfs, originalSSTables,
StorageService.instance().getLocalRanges(), null);
+ if (!sstables.isEmpty())
+ {
+ cfs.replaceCompactedSSTables(originalSSTables, sstables);
+ }
+ CompactionManager.gcAfterRpcTimeout();
+ }
+
+ /**
+ * Performs a readonly "compaction" of all sstables in order to validate
complete rows,
+ * but without writing the merge result
+ */
+ private void doReadonlyCompaction(ColumnFamilyStore cfs, InetAddress
initiator) throws IOException
+ {
+ Collection<SSTableReader> sstables = cfs.getSSTables();
+ CompactionIterator ci = new CompactionIterator(sstables,
getDefaultGCBefore(), true);
+ try
+ {
+ Iterator<CompactionIterator.CompactedRow> nni = new
FilterIterator(ci, PredicateUtils.notNullPredicate());
+
+ // validate the CF as we iterate over it
+ AntiEntropyService.IValidator validator =
AntiEntropyService.instance().getValidator(cfs.getTable().name,
cfs.getColumnFamilyName(), initiator, true);
+ validator.prepare();
+ while (nni.hasNext())
+ {
+ CompactionIterator.CompactedRow row = nni.next();
+ validator.add(row);
+ }
+ validator.complete();
+ }
+ finally
+ {
+ ci.close();
+ }
+ }
+
+ /**
+ * perform a GC to clean out obsolete sstables, sleeping rpc timeout first
so that most in-progress ops can complete
+ * (thus, no longer reference the sstables in question)
+ */
+ static void gcAfterRpcTimeout()
+ {
+ new Thread(new Runnable()
+ {
+ public void run()
+ {
+ try
+ {
+ Thread.sleep(DatabaseDescriptor.getRpcTimeout());
+ }
+ catch (InterruptedException e)
+ {
+ throw new AssertionError(e);
+ }
+ System.gc();
+ }
+ }).start();
+ }
+
+ /*
+ * Group files of similar size into buckets.
+ */
+ static Set<List<SSTableReader>>
getCompactionBuckets(Iterable<SSTableReader> files, long min)
+ {
+ Map<List<SSTableReader>, Long> buckets = new
HashMap<List<SSTableReader>, Long>();
+ for (SSTableReader sstable : files)
+ {
+ long size = sstable.length();
+
+ boolean bFound = false;
+ // look for a bucket containing similar-sized files:
+ // group in the same bucket if it's w/in 50% of the average for
this bucket,
+ // or this file and the bucket are all considered "small" (less
than `min`)
+ for (List<SSTableReader> bucket : buckets.keySet())
+ {
+ long averageSize = buckets.get(bucket);
+ if ((size > averageSize / 2 && size < 3 * averageSize / 2)
+ || (size < min && averageSize < min))
+ {
+ // remove and re-add because adding changes the hash
+ buckets.remove(bucket);
+ averageSize = (averageSize + size) / 2;
+ bucket.add(sstable);
+ buckets.put(bucket, averageSize);
+ bFound = true;
+ break;
+ }
+ }
+ // no similar bucket found; put it in a new one
+ if (!bFound)
+ {
+ ArrayList<SSTableReader> bucket = new
ArrayList<SSTableReader>();
+ bucket.add(sstable);
+ buckets.put(bucket, size);
+ }
+ }
+
+ return buckets.keySet();
+ }
+
+ public static int getDefaultGCBefore()
+ {
+ return (int)(System.currentTimeMillis() / 1000) -
DatabaseDescriptor.getGcGraceInSeconds();
+ }
+
+ private static class AntiCompactionIterator extends CompactionIterator
+ {
+ public AntiCompactionIterator(Collection<SSTableReader> sstables,
Collection<Range> ranges, int gcBefore, boolean isMajor)
+ throws IOException
+ {
+ super(getCollatedRangeIterator(sstables, ranges), gcBefore,
isMajor);
+ }
+
+ private static Iterator
getCollatedRangeIterator(Collection<SSTableReader> sstables, final
Collection<Range> ranges)
+ throws IOException
+ {
+ org.apache.commons.collections.Predicate rangesPredicate = new
org.apache.commons.collections.Predicate()
+ {
+ public boolean evaluate(Object row)
+ {
+ return
Range.isTokenInRanges(((IteratingRow)row).getKey().token, ranges);
+ }
+ };
+ CollatingIterator iter =
FBUtilities.<IteratingRow>getCollatingIterator();
+ for (SSTableReader sstable : sstables)
+ {
+ SSTableScanner scanner = sstable.getScanner(FILE_BUFFER_SIZE);
+ iter.addIterator(new FilterIterator(scanner, rangesPredicate));
+ }
+ return iter;
+ }
+
+ public void close() throws IOException
+ {
+ for (Object o : ((CollatingIterator)source).getIterators())
+ {
+ ((SSTableScanner)((FilterIterator)o).getIterator()).close();
+ }
+ }
+ }
}
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java?rev=893600&r1=893599&r2=893600&view=diff
==============================================================================
---
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java
(original)
+++
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java
Wed Dec 23 19:18:05 2009
@@ -24,6 +24,7 @@
import java.util.Arrays;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.io.IOException;
@@ -187,7 +188,14 @@
}
}
hintStore.forceFlush();
- hintStore.doMajorCompaction(0);
+ try
+ {
+ CompactionManager.instance.submitMajor(hintStore).get();
+ }
+ catch (Exception e)
+ {
+ throw new RuntimeException(e);
+ }
if (logger_.isDebugEnabled())
logger_.debug("Finished deliverAllHints");
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java?rev=893600&r1=893599&r2=893600&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java
(original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java Wed
Dec 23 19:18:05 2009
@@ -25,6 +25,7 @@
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.concurrent.Future;
+import java.util.concurrent.ExecutionException;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.dht.Range;
@@ -298,7 +299,14 @@
continue;
ColumnFamilyStore cfStore = columnFamilyStores.get( columnFamily );
- allResults.addAll(cfStore.forceAntiCompaction(ranges, target));
+ try
+ {
+
allResults.addAll(CompactionManager.instance.submitAnticompaction(cfStore,
ranges, target).get());
+ }
+ catch (Exception e)
+ {
+ throw new RuntimeException(e);
+ }
}
return allResults;
}
@@ -314,7 +322,7 @@
{
ColumnFamilyStore cfStore = columnFamilyStores.get( columnFamily );
if ( cfStore != null )
- CompactionManager.instance.submitMajor(cfStore, 0);
+ CompactionManager.instance.submitMajor(cfStore);
}
}
@@ -474,6 +482,11 @@
return applicationColumnFamilies;
}
+ public String getDataFileLocation(long expectedCompactedFileSize)
+ {
+ return DatabaseDescriptor.getDataFileLocationForTable(name,
expectedCompactedFileSize);
+ }
+
public static String getSnapshotPath(String dataDirPath, String tableName,
String snapshotName)
{
return dataDirPath + File.separator + tableName + File.separator +
SNAPSHOT_SUBDIR_NAME + File.separator + snapshotName;
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTable.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTable.java?rev=893600&r1=893599&r2=893600&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTable.java
(original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTable.java Wed
Dec 23 19:18:05 2009
@@ -161,6 +161,16 @@
logger.info("Deleted " + path);
}
+ public static long getTotalBytes(Iterable<SSTableReader> sstables)
+ {
+ long sum = 0;
+ for (SSTableReader sstable : sstables)
+ {
+ sum += sstable.length();
+ }
+ return sum;
+ }
+
/**
* This is a simple container for the index Key and its corresponding
position
* in the data file. Binary search is performed on a list of these objects
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java?rev=893600&r1=893599&r2=893600&view=diff
==============================================================================
---
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java
(original)
+++
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java
Wed Dec 23 19:18:05 2009
@@ -652,7 +652,7 @@
try
{
List<Range> ranges = new ArrayList<Range>(differences);
- List<SSTableReader> sstables =
CompactionManager.instance.submitAnti(cfstore, ranges, remote).get();
+ List<SSTableReader> sstables =
CompactionManager.instance.submitAnticompaction(cfstore, ranges, remote).get();
Streaming.transferSSTables(remote, sstables, cf.left);
}
catch(Exception e)
Modified:
incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java?rev=893600&r1=893599&r2=893600&view=diff
==============================================================================
---
incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
(original)
+++
incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
Wed Dec 23 19:18:05 2009
@@ -18,13 +18,10 @@
*/
package org.apache.cassandra.db;
-import java.io.File;
-import java.io.FileOutputStream;
import java.io.IOException;
import java.util.*;
import java.util.concurrent.ExecutionException;
-import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang.ArrayUtils;
import static org.junit.Assert.assertNull;
import org.junit.Test;
@@ -119,7 +116,7 @@
Range r = new Range(partitioner.getToken("0"),
partitioner.getToken("zzzzzzz"));
ranges.add(r);
- List<SSTableReader> fileList = store.forceAntiCompaction(ranges,
InetAddress.getByName("127.0.0.1"));
+ List<SSTableReader> fileList =
CompactionManager.instance.submitAnticompaction(store, ranges,
InetAddress.getByName("127.0.0.1")).get();
assert fileList.size() >= 1;
}
Modified:
incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/CompactionsTest.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/CompactionsTest.java?rev=893600&r1=893599&r2=893600&view=diff
==============================================================================
---
incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/CompactionsTest.java
(original)
+++
incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/CompactionsTest.java
Wed Dec 23 19:18:05 2009
@@ -45,6 +45,8 @@
@Test
public void testCompactions() throws IOException, ExecutionException,
InterruptedException
{
+ CompactionManager.instance.disableAutoCompaction();
+
// this test does enough rows to force multiple block indexes to be
used
Table table = Table.open(TABLE1);
ColumnFamilyStore store = table.getColumnFamilyStore("Standard1");
@@ -70,7 +72,7 @@
}
if (store.getSSTables().size() > 1)
{
- store.doCompaction(2, store.getSSTables().size());
+ CompactionManager.instance.submitMajor(store).get();
}
assertEquals(inserted.size(),
table.getColumnFamilyStore("Standard1").getKeyRange("", "", 10000).keys.size());
}
@@ -78,7 +80,7 @@
@Test
public void testCompactionPurge() throws IOException, ExecutionException,
InterruptedException
{
- CompactionManager.instance.disableCompactions();
+ CompactionManager.instance.disableAutoCompaction();
Table table = Table.open(TABLE1);
String cfName = "Standard1";
@@ -117,12 +119,12 @@
rm.add(new QueryPath(cfName, null, "0".getBytes()), new byte[0], 0);
rm.apply();
store.forceBlockingFlush();
- store.doFileCompaction(sstablesIncomplete, Integer.MAX_VALUE);
+ CompactionManager.instance.doCompaction(store, sstablesIncomplete,
CompactionManager.getDefaultGCBefore());
ColumnFamily cf =
table.getColumnFamilyStore(cfName).getColumnFamily(new IdentityQueryFilter(key,
new QueryPath(cfName)));
assert cf.getColumnCount() == 10;
// major compact and test that all columns but the resurrected one is
completely gone
- store.doFileCompaction(store.getSSTables(), Integer.MAX_VALUE);
+ CompactionManager.instance.submitMajor(store, 0,
Integer.MAX_VALUE).get();
cf = table.getColumnFamilyStore(cfName).getColumnFamily(new
IdentityQueryFilter(key, new QueryPath(cfName)));
assert cf.getColumnCount() == 1;
assert cf.getColumn(String.valueOf(5).getBytes()) != null;
@@ -131,7 +133,7 @@
@Test
public void testCompactionPurgeOneFile() throws IOException,
ExecutionException, InterruptedException
{
- CompactionManager.instance.disableCompactions();
+ CompactionManager.instance.disableAutoCompaction();
Table table = Table.open(TABLE1);
String cfName = "Standard2";
@@ -160,7 +162,7 @@
assert store.getSSTables().size() == 1 : store.getSSTables(); //
inserts & deletes were in the same memtable -> only deletes in sstable
// compact and test that the row is completely gone
- store.doFileCompaction(store.getSSTables(), Integer.MAX_VALUE);
+ CompactionManager.instance.submitMajor(store, 0,
Integer.MAX_VALUE).get();
assert store.getSSTables().isEmpty();
ColumnFamily cf =
table.getColumnFamilyStore(cfName).getColumnFamily(new IdentityQueryFilter(key,
new QueryPath(cfName)));
assert cf == null : cf;
@@ -169,6 +171,8 @@
@Test
public void testCompactionReadonly() throws IOException,
ExecutionException, InterruptedException
{
+ CompactionManager.instance.disableAutoCompaction();
+
Table table = Table.open(TABLE2);
ColumnFamilyStore store = table.getColumnFamilyStore("Standard1");
@@ -188,7 +192,7 @@
// perform readonly compaction and confirm that no sstables changed
ArrayList<SSTableReader> oldsstables = new
ArrayList<SSTableReader>(store.getSSTables());
- store.doReadonlyCompaction(LOCAL);
+ CompactionManager.instance.submitReadonly(store, LOCAL).get();
assertEquals(oldsstables, new
ArrayList<SSTableReader>(store.getSSTables()));
assertEquals(inserted.size(),
table.getColumnFamilyStore("Standard1").getKeyRange("", "", 10000).keys.size());
}
Modified:
incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/OneCompactionTest.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/OneCompactionTest.java?rev=893600&r1=893599&r2=893600&view=diff
==============================================================================
---
incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/OneCompactionTest.java
(original)
+++
incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/OneCompactionTest.java
Wed Dec 23 19:18:05 2009
@@ -33,6 +33,8 @@
{
private void testCompaction(String columnFamilyName, int insertsPerTable)
throws IOException, ExecutionException, InterruptedException
{
+ CompactionManager.instance.disableAutoCompaction();
+
Table table = Table.open("Keyspace1");
ColumnFamilyStore store = table.getColumnFamilyStore(columnFamilyName);
@@ -46,8 +48,7 @@
store.forceBlockingFlush();
assertEquals(inserted.size(),
table.getColumnFamilyStore(columnFamilyName).getKeyRange("", "",
10000).keys.size());
}
- Future<Integer> ft = CompactionManager.instance.submitMinor(store, 2,
32);
- ft.get();
+ CompactionManager.instance.submitMajor(store).get();
assertEquals(1, store.getSSTables().size());
assertEquals(table.getColumnFamilyStore(columnFamilyName).getKeyRange("", "",
10000).keys.size(), inserted.size());
}
Modified:
incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/RecoveryManager2Test.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/RecoveryManager2Test.java?rev=893600&r1=893599&r2=893600&view=diff
==============================================================================
---
incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/RecoveryManager2Test.java
(original)
+++
incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/RecoveryManager2Test.java
Wed Dec 23 19:18:05 2009
@@ -37,7 +37,7 @@
@Test
public void testWithFlush() throws IOException, ExecutionException,
InterruptedException
{
- CompactionManager.instance.disableCompactions();
+ CompactionManager.instance.disableAutoCompaction();
for (int i = 0; i < 100; i++)
{
Modified:
incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/RemoveSuperColumnTest.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/RemoveSuperColumnTest.java?rev=893600&r1=893599&r2=893600&view=diff
==============================================================================
---
incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/RemoveSuperColumnTest.java
(original)
+++
incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/RemoveSuperColumnTest.java
Wed Dec 23 19:18:05 2009
@@ -59,8 +59,7 @@
store.forceBlockingFlush();
validateRemoveTwoSources();
- Future<Integer> ft = CompactionManager.instance.submitMinor(store, 2,
32);
- ft.get();
+ CompactionManager.instance.submitMajor(store).get();
assertEquals(1, store.getSSTables().size());
validateRemoveCompacted();
}
@@ -144,8 +143,7 @@
store.forceBlockingFlush();
validateRemoveWithNewData();
- Future<Integer> ft = CompactionManager.instance.submitMinor(store, 2,
32);
- ft.get();
+ CompactionManager.instance.submitMajor(store).get();
assertEquals(1, store.getSSTables().size());
validateRemoveWithNewData();
}
Modified:
incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/TableTest.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/TableTest.java?rev=893600&r1=893599&r2=893600&view=diff
==============================================================================
--- incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/TableTest.java
(original)
+++ incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/TableTest.java
Wed Dec 23 19:18:05 2009
@@ -359,7 +359,7 @@
// compact so we have a big row with more than the minimum index count
if (cfStore.getSSTables().size() > 1)
{
- cfStore.doCompaction(2, cfStore.getSSTables().size());
+ CompactionManager.instance.submitMajor(cfStore).get();
}
SSTableReader sstable = cfStore.getSSTables().iterator().next();
DecoratedKey decKey = sstable.getPartitioner().decorateKey(key);