Author: jbellis
Date: Wed Aug 31 16:32:37 2011
New Revision: 1163688
URL: http://svn.apache.org/viewvc?rev=1163688&view=rev
Log:
fix race that allowed multiple simultaneous leveled compaction tasks
patch by jbellis; reviewed by Ben Coverston for CASSANDRA-3087
Modified:
cassandra/trunk/CHANGES.txt
cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
cassandra/trunk/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
cassandra/trunk/src/java/org/apache/cassandra/db/compaction/LeveledCompactionTask.java
Modified: cassandra/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/cassandra/trunk/CHANGES.txt?rev=1163688&r1=1163687&r2=1163688&view=diff
==============================================================================
--- cassandra/trunk/CHANGES.txt (original)
+++ cassandra/trunk/CHANGES.txt Wed Aug 31 16:32:37 2011
@@ -44,7 +44,7 @@
Thrift<->Avro conversion methods (CASSANDRA-3032)
* Add timeouts to client request schedulers (CASSANDRA-3079)
* Cli to use hashes rather than array of hashes for strategy options
(CASSANDRA-3081)
- * LeveledCompactionStrategy (CASSANDRA-1608, 3085, 3110)
+ * LeveledCompactionStrategy (CASSANDRA-1608, 3085, 3110, 3087)
* Improvements of the CLI `describe` command (CASSANDRA-2630)
* reduce window where dropped CF sstables may not be deleted (CASSANDRA-2942)
* Expose gossip/FD info to JMX (CASSANDRA-2806)
Modified:
cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionManager.java?rev=1163688&r1=1163687&r2=1163688&view=diff
==============================================================================
---
cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
(original)
+++
cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
Wed Aug 31 16:32:37 2011
@@ -66,7 +66,17 @@ public class CompactionManager implement
public static final String MBEAN_OBJECT_NAME =
"org.apache.cassandra.db:type=CompactionManager";
private static final Logger logger =
LoggerFactory.getLogger(CompactionManager.class);
public static final CompactionManager instance;
- // acquire as read to perform a compaction, and as write to prevent
compactions
+
+ /**
+ * compactionLock has two purposes:
+ * - Compaction acquires its readLock so that multiple compactions can
happen simultaneously,
+ * but the KS/CF migtations acquire its writeLock, so they can be sure
no new SSTables will
+ * be created for a dropped CF posthumously. (Thus, compaction checks
CFS.isValid while the
+ * lock is acquired.)
+ * - "Special" compactions will acquire writelock instead of readlock to
make sure that all
+ * other compaction activity is quiesced and they can grab ALL the
sstables to do something.
+ * TODO this is too big a hammer -- we should only care about quiescing
all for the given CFS.
+ */
private final ReentrantReadWriteLock compactionLock = new
ReentrantReadWriteLock();
static
@@ -143,7 +153,6 @@ public class CompactionManager implement
{
public Object call() throws IOException
{
- // acquire the write lock to schedule all sstables
compactionLock.writeLock().lock();
try
{
Modified:
cassandra/trunk/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java?rev=1163688&r1=1163687&r2=1163688&view=diff
==============================================================================
---
cassandra/trunk/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
(original)
+++
cassandra/trunk/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
Wed Aug 31 16:32:37 2011
@@ -5,6 +5,7 @@ import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
@@ -12,7 +13,6 @@ import org.slf4j.LoggerFactory;
import org.apache.cassandra.concurrent.DebuggableScheduledThreadPoolExecutor;
import org.apache.cassandra.db.ColumnFamilyStore;
-import org.apache.cassandra.db.DataTracker;
import org.apache.cassandra.io.sstable.SSTableReader;
import org.apache.cassandra.notifications.INotification;
import org.apache.cassandra.notifications.INotificationConsumer;
@@ -27,6 +27,7 @@ public class LeveledCompactionStrategy e
private LeveledManifest manifest;
private final String SSTABLE_SIZE_OPTION = "sstable_size_in_mb";
private final int maxSSTableSize;
+ private final AtomicReference<LeveledCompactionTask> task = new
AtomicReference<LeveledCompactionTask>();
public class ScheduledBackgroundCompaction implements Runnable
{
@@ -90,14 +91,21 @@ public class LeveledCompactionStrategy e
return manifest.getLevelSize(i);
}
- public synchronized List<AbstractCompactionTask> getBackgroundTasks(int
gcBefore)
+ public List<AbstractCompactionTask> getBackgroundTasks(int gcBefore)
{
+ LeveledCompactionTask currentTask = task.get();
+ if (currentTask != null && !currentTask.isDone())
+ return Collections.emptyList();
+
Collection<SSTableReader> sstables =
manifest.getCompactionCandidates();
logger.debug("CompactionManager candidates are {}",
StringUtils.join(sstables, ","));
if (sstables.isEmpty())
return Collections.emptyList();
- LeveledCompactionTask task = new LeveledCompactionTask(cfs, sstables,
gcBefore, this.maxSSTableSize);
- return Collections.<AbstractCompactionTask>singletonList(task);
+
+ LeveledCompactionTask newTask = new LeveledCompactionTask(cfs,
sstables, gcBefore, this.maxSSTableSize);
+ return task.compareAndSet(currentTask, newTask)
+ ? Collections.<AbstractCompactionTask>singletonList(newTask)
+ : Collections.<AbstractCompactionTask>emptyList();
}
public List<AbstractCompactionTask> getMaximalTasks(int gcBefore)
Modified:
cassandra/trunk/src/java/org/apache/cassandra/db/compaction/LeveledCompactionTask.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/compaction/LeveledCompactionTask.java?rev=1163688&r1=1163687&r2=1163688&view=diff
==============================================================================
---
cassandra/trunk/src/java/org/apache/cassandra/db/compaction/LeveledCompactionTask.java
(original)
+++
cassandra/trunk/src/java/org/apache/cassandra/db/compaction/LeveledCompactionTask.java
Wed Aug 31 16:32:37 2011
@@ -1,9 +1,10 @@
package org.apache.cassandra.db.compaction;
+import java.io.IOException;
import java.util.Collection;
+import java.util.concurrent.CountDownLatch;
import org.apache.cassandra.db.ColumnFamilyStore;
-import org.apache.cassandra.db.compaction.CompactionTask;
import org.apache.cassandra.io.sstable.SSTableReader;
import org.apache.cassandra.io.sstable.SSTableWriter;
@@ -11,6 +12,8 @@ public class LeveledCompactionTask exten
{
private final int sstableSizeInMB;
+ private final CountDownLatch latch = new CountDownLatch(1);
+
public LeveledCompactionTask(ColumnFamilyStore cfs,
Collection<SSTableReader> sstables, final int gcBefore, int sstableSizeInMB)
{
super(cfs, sstables, gcBefore);
@@ -18,6 +21,19 @@ public class LeveledCompactionTask exten
}
@Override
+ public int execute(CompactionManager.CompactionExecutorStatsCollector
collector) throws IOException
+ {
+ int n = super.execute(collector);
+ latch.countDown();
+ return n;
+ }
+
+ public boolean isDone()
+ {
+ return latch.getCount() == 0;
+ }
+
+ @Override
protected boolean newSSTableSegmentThresholdReached(SSTableWriter writer,
long position)
{
return position > sstableSizeInMB * 1024 * 1024;