Author: gdusbabek
Date: Fri Nov 19 15:27:06 2010
New Revision: 1036888
URL: http://svn.apache.org/viewvc?rev=1036888&view=rev
Log:
compaction lock. patch by gdusbabek, reviewe by jbellis. CASSANDRA-1715
Modified:
cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java
cassandra/trunk/src/java/org/apache/cassandra/db/Table.java
Modified:
cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java?rev=1036888&r1=1036887&r2=1036888&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java Fri
Nov 19 15:27:06 2010
@@ -28,6 +28,9 @@ import java.util.Map.Entry;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.locks.ReentrantLock;
import javax.management.MBeanServer;
import javax.management.ObjectName;
@@ -57,6 +60,8 @@ public class CompactionManager implement
public static final String MBEAN_OBJECT_NAME =
"org.apache.cassandra.db:type=CompactionManager";
private static final Logger logger =
LoggerFactory.getLogger(CompactionManager.class);
public static final CompactionManager instance;
+ private final ReentrantLock compactionLock = new ReentrantLock();
+ // todo: should provide a way to unlock in mbean?
static
{
@@ -74,6 +79,16 @@ public class CompactionManager implement
private CompactionExecutor executor = new CompactionExecutor();
private Map<ColumnFamilyStore, Integer> estimatedCompactions = new
NonBlockingHashMap<ColumnFamilyStore, Integer>();
+
+ public void lockCompactions()
+ {
+ compactionLock.lock();
+ }
+
+ public void unlockCompactions()
+ {
+ compactionLock.unlock();
+ }
/**
* Call this whenever a compaction might be needed on the given
columnfamily.
@@ -86,28 +101,36 @@ public class CompactionManager implement
{
public Integer call() throws IOException
{
- Integer minThreshold = cfs.getMinimumCompactionThreshold();
- Integer maxThreshold = cfs.getMaximumCompactionThreshold();
-
- if (minThreshold == 0 || maxThreshold == 0)
- {
- logger.debug("Compaction is currently disabled.");
- return 0;
- }
- logger.debug("Checking to see if compaction of " +
cfs.columnFamily + " would be useful");
- Set<List<SSTableReader>> buckets =
getBuckets(convertSSTablesToPairs(cfs.getSSTables()), 50L * 1024L * 1024L);
- updateEstimateFor(cfs, buckets);
-
- for (List<SSTableReader> sstables : buckets)
+ compactionLock.lock();
+ try
{
- if (sstables.size() >= minThreshold)
+ Integer minThreshold = cfs.getMinimumCompactionThreshold();
+ Integer maxThreshold = cfs.getMaximumCompactionThreshold();
+
+ if (minThreshold == 0 || maxThreshold == 0)
+ {
+ logger.debug("Compaction is currently disabled.");
+ return 0;
+ }
+ logger.debug("Checking to see if compaction of " +
cfs.columnFamily + " would be useful");
+ Set<List<SSTableReader>> buckets =
getBuckets(convertSSTablesToPairs(cfs.getSSTables()), 50L * 1024L * 1024L);
+ updateEstimateFor(cfs, buckets);
+
+ for (List<SSTableReader> sstables : buckets)
{
- // if we have too many to compact all at once, compact
older ones first -- this avoids
- // re-compacting files we just created.
- Collections.sort(sstables);
- return doCompaction(cfs, sstables.subList(0,
Math.min(sstables.size(), maxThreshold)), (int) (System.currentTimeMillis() /
1000) - cfs.metadata.gcGraceSeconds);
+ if (sstables.size() >= minThreshold)
+ {
+ // if we have too many to compact all at once,
compact older ones first -- this avoids
+ // re-compacting files we just created.
+ Collections.sort(sstables);
+ return doCompaction(cfs, sstables.subList(0,
Math.min(sstables.size(), maxThreshold)), (int) (System.currentTimeMillis() /
1000) - cfs.metadata.gcGraceSeconds);
+ }
}
}
+ finally
+ {
+ compactionLock.unlock();
+ }
return 0;
}
};
@@ -143,8 +166,16 @@ public class CompactionManager implement
{
public Object call() throws IOException
{
- doCleanupCompaction(cfStore);
- return this;
+ compactionLock.lock();
+ try
+ {
+ doCleanupCompaction(cfStore);
+ return this;
+ }
+ finally
+ {
+ compactionLock.unlock();
+ }
}
};
executor.submit(runnable).get();
@@ -161,25 +192,33 @@ public class CompactionManager implement
{
public Object call() throws IOException
{
- Collection<SSTableReader> sstables;
- if (skip > 0)
+ compactionLock.lock();
+ try
{
- sstables = new ArrayList<SSTableReader>();
- for (SSTableReader sstable : cfStore.getSSTables())
+ Collection<SSTableReader> sstables;
+ if (skip > 0)
{
- if (sstable.length() < skip * 1024L * 1024L * 1024L)
+ sstables = new ArrayList<SSTableReader>();
+ for (SSTableReader sstable : cfStore.getSSTables())
{
- sstables.add(sstable);
+ if (sstable.length() < skip * 1024L * 1024L *
1024L)
+ {
+ sstables.add(sstable);
+ }
}
}
+ else
+ {
+ sstables = cfStore.getSSTables();
+ }
+
+ doCompaction(cfStore, sstables, gcBefore);
+ return this;
}
- else
+ finally
{
- sstables = cfStore.getSSTables();
+ compactionLock.unlock();
}
-
- doCompaction(cfStore, sstables, gcBefore);
- return this;
}
};
return executor.submit(callable);
@@ -191,8 +230,16 @@ public class CompactionManager implement
{
public Object call() throws IOException
{
- doValidationCompaction(cfStore, validator);
- return this;
+ compactionLock.lock();
+ try
+ {
+ doValidationCompaction(cfStore, validator);
+ return this;
+ }
+ finally
+ {
+ compactionLock.unlock();
+ }
}
};
return executor.submit(callable);
@@ -495,13 +542,29 @@ public class CompactionManager implement
{
public void run()
{
- executor.beginCompaction(cfs, builder);
- builder.build();
+ compactionLock.lock();
+ try
+ {
+ executor.beginCompaction(cfs, builder);
+ builder.build();
+ }
+ finally
+ {
+ compactionLock.unlock();
+ }
}
};
- return executor.submit(runnable);
+
+ // don't submit to the executor if the compaction lock is held by the
current thread. Instead return a simple
+ // future that will be immediately immediately get()ed and executed.
Happens during a migration, which locks
+ // the compaction thread and then reinitializes a ColumnFamilyStore.
Under normal circumstances, CFS spawns
+ // index jobs to the compaction manager (this) and blocks on them.
+ if (compactionLock.isHeldByCurrentThread())
+ return new SimpleFuture(runnable);
+ else
+ return executor.submit(runnable);
}
-
+
public Future<SSTableReader> submitSSTableBuild(Descriptor desc)
{
final SSTableWriter.Builder builder =
SSTableWriter.createBuilder(desc);
@@ -509,8 +572,16 @@ public class CompactionManager implement
{
public SSTableReader call() throws IOException
{
- executor.beginCompaction(builder.cfs, builder);
- return builder.build();
+ compactionLock.lock();
+ try
+ {
+ executor.beginCompaction(builder.cfs, builder);
+ return builder.build();
+ }
+ finally
+ {
+ compactionLock.unlock();
+ }
}
};
return executor.submit(callable);
@@ -681,4 +752,46 @@ public class CompactionManager implement
{
return executor.getCompletedTaskCount();
}
+
+ private class SimpleFuture implements Future
+ {
+ private Runnable runnable;
+
+ private SimpleFuture(Runnable r)
+ {
+ runnable = r;
+ }
+
+ @Override
+ public boolean cancel(boolean mayInterruptIfRunning)
+ {
+ throw new IllegalStateException("May not call
SimpleFuture.cancel()");
+ }
+
+ @Override
+ public boolean isCancelled()
+ {
+ return false;
+ }
+
+ @Override
+ public boolean isDone()
+ {
+ return runnable == null;
+ }
+
+ @Override
+ public Object get() throws InterruptedException, ExecutionException
+ {
+ runnable.run();
+ runnable = null;
+ return runnable;
+ }
+
+ @Override
+ public Object get(long timeout, TimeUnit unit) throws
InterruptedException, ExecutionException, TimeoutException
+ {
+ throw new IllegalStateException("May not call
SimpleFuture.get(long, TimeUnit)");
+ }
+ }
}
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/Table.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java?rev=1036888&r1=1036887&r2=1036888&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/Table.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/Table.java Fri Nov 19
15:27:06 2010
@@ -34,6 +34,7 @@ import java.util.concurrent.ExecutionExc
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.cassandra.config.CFMetaData;
@@ -121,6 +122,11 @@ public class Table
}
return tableInstance;
}
+
+ public static Lock getFlushLock()
+ {
+ return flusherLock.writeLock();
+ }
public static Table clear(String table) throws IOException
{