Author: jbellis
Date: Wed May 20 14:40:27 2009
New Revision: 776714
URL: http://svn.apache.org/viewvc?rev=776714&view=rev
Log:
OneCompactionTest is failing occasionally because 500 keys per CFS is actually
triggering an automatic compaction (since test flush threshold is only 20) and
we were doing a non-threadsafe doCompaction for convenience: the failure occurs
when our manual compaction begins mid-run of an automatic one, and the
automatic deletes the original sstable file first. Fix by (a) dropping the
number of keys so that OneCompactionTest lives up to its name (more are tested
in "CompactionsTest") and (b) making the compactions call threadsafe by
refactoring to allow a threshold parameter to MCM.submit.
patch by jbellis; reviewed by Jun Rao for CASSANDRA-184
Modified:
incubator/cassandra/branches/cassandra-0.3/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
incubator/cassandra/branches/cassandra-0.3/src/java/org/apache/cassandra/db/MinorCompactionManager.java
incubator/cassandra/branches/cassandra-0.3/src/java/org/apache/cassandra/io/SSTable.java
incubator/cassandra/branches/cassandra-0.3/test/unit/org/apache/cassandra/db/OneCompactionTest.java
Modified:
incubator/cassandra/branches/cassandra-0.3/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/branches/cassandra-0.3/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=776714&r1=776713&r2=776714&view=diff
==============================================================================
---
incubator/cassandra/branches/cassandra-0.3/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
(original)
+++
incubator/cassandra/branches/cassandra-0.3/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
Wed May 20 14:40:27 2009
@@ -56,7 +56,6 @@
{
private static Logger logger_ = Logger.getLogger(ColumnFamilyStore.class);
- private static int COMPACTION_THRESHOLD = 4; // compact this many sstables
at a time
private static final int BUFSIZE = 128 * 1024 * 1024;
private static final int COMPACTION_MEMORY_THRESHOLD = 1 << 30;
@@ -770,8 +769,8 @@
lock_.writeLock().unlock();
}
- if ((ssTableSize >= COMPACTION_THRESHOLD && !isCompacting_.get())
- || (isCompacting_.get() && ssTableSize % COMPACTION_THRESHOLD ==
0))
+ if ((ssTableSize >= MinorCompactionManager.COMPACTION_THRESHOLD &&
!isCompacting_.get())
+ || (isCompacting_.get() && ssTableSize %
MinorCompactionManager.COMPACTION_THRESHOLD == 0))
{
logger_.debug("Submitting for compaction ...");
MinorCompactionManager.instance().submit(ColumnFamilyStore.this);
@@ -860,11 +859,6 @@
return buckets.keySet();
}
- public int doCompaction() throws IOException
- {
- return doCompaction(COMPACTION_THRESHOLD);
- }
-
/*
* Break the files into buckets and then compact.
*/
@@ -1294,6 +1288,7 @@
*/
private int doFileCompaction(List<String> files, int minBufferSize) throws
IOException
{
+ logger_.info("Compacting [" + StringUtils.join(files, ",") + "]");
String compactionFileLocation =
DatabaseDescriptor.getCompactionFileLocation(getExpectedCompactedFileSize(files));
// If the compaction file path is null that means we have no space
left for this compaction.
// try again w/o the largest one.
@@ -1314,6 +1309,7 @@
if (pq.isEmpty())
{
+ logger_.warn("Nothing to compact (all files empty or corrupt)");
// TODO clean out bad files, if any
return 0;
}
Modified:
incubator/cassandra/branches/cassandra-0.3/src/java/org/apache/cassandra/db/MinorCompactionManager.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/branches/cassandra-0.3/src/java/org/apache/cassandra/db/MinorCompactionManager.java?rev=776714&r1=776713&r2=776714&view=diff
==============================================================================
---
incubator/cassandra/branches/cassandra-0.3/src/java/org/apache/cassandra/db/MinorCompactionManager.java
(original)
+++
incubator/cassandra/branches/cassandra-0.3/src/java/org/apache/cassandra/db/MinorCompactionManager.java
Wed May 20 14:40:27 2009
@@ -46,7 +46,8 @@
private static MinorCompactionManager instance_;
private static Lock lock_ = new ReentrantLock();
private static Logger logger_ =
Logger.getLogger(MinorCompactionManager.class);
- final static long intervalInMins_ = 5;
+ private static final long intervalInMins_ = 5;
+ static final int COMPACTION_THRESHOLD = 4; // compact this many sstables
at a time
public static MinorCompactionManager instance()
{
@@ -66,33 +67,6 @@
return instance_;
}
- class FileCompactor implements Callable<Integer>
- {
- private ColumnFamilyStore columnFamilyStore_;
-
- FileCompactor(ColumnFamilyStore columnFamilyStore)
- {
- columnFamilyStore_ = columnFamilyStore;
- }
-
- public Integer call()
- {
- logger_.debug("Started compaction ..." +
columnFamilyStore_.columnFamily_);
- try
- {
- return columnFamilyStore_.doCompaction();
- }
- catch (IOException e)
- {
- throw new RuntimeException(e);
- }
- finally
- {
- logger_.debug("Finished compaction ..." +
columnFamilyStore_.columnFamily_);
- }
- }
- }
-
class FileCompactor2 implements Callable<Boolean>
{
private ColumnFamilyStore columnFamilyStore_;
@@ -138,9 +112,9 @@
public void run()
{
- logger_.debug("Started Major compaction
..."+columnFamilyStore_.columnFamily_);
+ logger_.debug("Started Major compaction for " +
columnFamilyStore_.columnFamily_);
columnFamilyStore_.doMajorCompaction(skip_);
- logger_.debug("Finished Major compaction
..."+columnFamilyStore_.columnFamily_);
+ logger_.debug("Finished Major compaction for " +
columnFamilyStore_.columnFamily_);
}
}
@@ -183,22 +157,41 @@
public void submitPeriodicCompaction(final ColumnFamilyStore
columnFamilyStore)
{
- Runnable runnable = new Runnable() // having to wrap Callable in
Runnable is retarded but that's what the API insists on.
+ Runnable runnable = new Runnable()
{
public void run()
{
- new FileCompactor(columnFamilyStore).call();
+ try
+ {
+ columnFamilyStore.doCompaction(COMPACTION_THRESHOLD);
+ }
+ catch (IOException e)
+ {
+ throw new RuntimeException(e);
+ }
}
};
compactor_.scheduleWithFixedDelay(runnable,
MinorCompactionManager.intervalInMins_,
MinorCompactionManager.intervalInMins_,
TimeUnit.MINUTES);
}
- public Future<Integer> submit(ColumnFamilyStore columnFamilyStore)
+ public Future<Integer> submit(final ColumnFamilyStore columnFamilyStore)
{
- return compactor_.submit(new FileCompactor(columnFamilyStore));
+ return submit(columnFamilyStore, COMPACTION_THRESHOLD);
}
-
+
+ Future<Integer> submit(final ColumnFamilyStore columnFamilyStore, final
int threshold)
+ {
+ Callable<Integer> callable = new Callable<Integer>()
+ {
+ public Integer call() throws IOException
+ {
+ return columnFamilyStore.doCompaction(threshold);
+ }
+ };
+ return compactor_.submit(callable);
+ }
+
public void submitCleanup(ColumnFamilyStore columnFamilyStore)
{
compactor_.submit(new CleanupCompactor(columnFamilyStore));
Modified:
incubator/cassandra/branches/cassandra-0.3/src/java/org/apache/cassandra/io/SSTable.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/branches/cassandra-0.3/src/java/org/apache/cassandra/io/SSTable.java?rev=776714&r1=776713&r2=776714&view=diff
==============================================================================
---
incubator/cassandra/branches/cassandra-0.3/src/java/org/apache/cassandra/io/SSTable.java
(original)
+++
incubator/cassandra/branches/cassandra-0.3/src/java/org/apache/cassandra/io/SSTable.java
Wed May 20 14:40:27 2009
@@ -220,7 +220,7 @@
}
File file = new File(dataFile);
- assert file.exists();
+ assert file.exists() : "attempted to delete non-existing file " +
dataFile;
/* delete the data file */
if (!file.delete())
{
Modified:
incubator/cassandra/branches/cassandra-0.3/test/unit/org/apache/cassandra/db/OneCompactionTest.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/branches/cassandra-0.3/test/unit/org/apache/cassandra/db/OneCompactionTest.java?rev=776714&r1=776713&r2=776714&view=diff
==============================================================================
---
incubator/cassandra/branches/cassandra-0.3/test/unit/org/apache/cassandra/db/OneCompactionTest.java
(original)
+++
incubator/cassandra/branches/cassandra-0.3/test/unit/org/apache/cassandra/db/OneCompactionTest.java
Wed May 20 14:40:27 2009
@@ -20,6 +20,7 @@
import java.io.IOException;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
import java.util.Set;
import java.util.HashSet;
@@ -44,7 +45,9 @@
store.forceBlockingFlush();
assertEquals(table.getKeyRange("", "", 10000).size(),
inserted.size());
}
- store.doCompaction(2);
+ Future<Integer> ft = MinorCompactionManager.instance().submit(store,
2);
+ ft.get();
+ assertEquals(1, store.getSSTableFilenames().size());
assertEquals(table.getKeyRange("", "", 10000).size(), inserted.size());
}
@@ -57,6 +60,6 @@
@Test
public void testCompaction2() throws IOException, ExecutionException,
InterruptedException
{
- testCompaction("Standard2", 500);
+ testCompaction("Standard2", 5);
}
}