Author: gdusbabek
Date: Fri Nov 19 15:27:06 2010
New Revision: 1036888

URL: http://svn.apache.org/viewvc?rev=1036888&view=rev
Log:
compaction lock. patch by gdusbabek, reviewe by jbellis. CASSANDRA-1715

Modified:
    cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java
    cassandra/trunk/src/java/org/apache/cassandra/db/Table.java

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java?rev=1036888&r1=1036887&r2=1036888&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java 
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java Fri 
Nov 19 15:27:06 2010
@@ -28,6 +28,9 @@ import java.util.Map.Entry;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.locks.ReentrantLock;
 import javax.management.MBeanServer;
 import javax.management.ObjectName;
 
@@ -57,6 +60,8 @@ public class CompactionManager implement
     public static final String MBEAN_OBJECT_NAME = 
"org.apache.cassandra.db:type=CompactionManager";
     private static final Logger logger = 
LoggerFactory.getLogger(CompactionManager.class);
     public static final CompactionManager instance;
+    private final ReentrantLock compactionLock = new ReentrantLock();
+    // todo: should provide a way to unlock in mbean?
 
     static
     {
@@ -74,6 +79,16 @@ public class CompactionManager implement
 
     private CompactionExecutor executor = new CompactionExecutor();
     private Map<ColumnFamilyStore, Integer> estimatedCompactions = new 
NonBlockingHashMap<ColumnFamilyStore, Integer>();
+    
+    public void lockCompactions()
+    {
+        compactionLock.lock();
+    }
+    
+    public void unlockCompactions()
+    {
+        compactionLock.unlock();
+    }
 
     /**
      * Call this whenever a compaction might be needed on the given 
columnfamily.
@@ -86,28 +101,36 @@ public class CompactionManager implement
         {
             public Integer call() throws IOException
             {
-                Integer minThreshold = cfs.getMinimumCompactionThreshold();
-                Integer maxThreshold = cfs.getMaximumCompactionThreshold();
-
-                if (minThreshold == 0 || maxThreshold == 0)
-                {
-                    logger.debug("Compaction is currently disabled.");
-                    return 0;
-                }
-                logger.debug("Checking to see if compaction of " + 
cfs.columnFamily + " would be useful");
-                Set<List<SSTableReader>> buckets = 
getBuckets(convertSSTablesToPairs(cfs.getSSTables()), 50L * 1024L * 1024L);
-                updateEstimateFor(cfs, buckets);
-                
-                for (List<SSTableReader> sstables : buckets)
+                compactionLock.lock();
+                try
                 {
-                    if (sstables.size() >= minThreshold)
+                    Integer minThreshold = cfs.getMinimumCompactionThreshold();
+                    Integer maxThreshold = cfs.getMaximumCompactionThreshold();
+    
+                    if (minThreshold == 0 || maxThreshold == 0)
+                    {
+                        logger.debug("Compaction is currently disabled.");
+                        return 0;
+                    }
+                    logger.debug("Checking to see if compaction of " + 
cfs.columnFamily + " would be useful");
+                    Set<List<SSTableReader>> buckets = 
getBuckets(convertSSTablesToPairs(cfs.getSSTables()), 50L * 1024L * 1024L);
+                    updateEstimateFor(cfs, buckets);
+                    
+                    for (List<SSTableReader> sstables : buckets)
                     {
-                        // if we have too many to compact all at once, compact 
older ones first -- this avoids
-                        // re-compacting files we just created.
-                        Collections.sort(sstables);
-                        return doCompaction(cfs, sstables.subList(0, 
Math.min(sstables.size(), maxThreshold)), (int) (System.currentTimeMillis() / 
1000) - cfs.metadata.gcGraceSeconds);
+                        if (sstables.size() >= minThreshold)
+                        {
+                            // if we have too many to compact all at once, 
compact older ones first -- this avoids
+                            // re-compacting files we just created.
+                            Collections.sort(sstables);
+                            return doCompaction(cfs, sstables.subList(0, 
Math.min(sstables.size(), maxThreshold)), (int) (System.currentTimeMillis() / 
1000) - cfs.metadata.gcGraceSeconds);
+                        }
                     }
                 }
+                finally 
+                {
+                    compactionLock.unlock();
+                }
                 return 0;
             }
         };
@@ -143,8 +166,16 @@ public class CompactionManager implement
         {
             public Object call() throws IOException
             {
-                doCleanupCompaction(cfStore);
-                return this;
+                compactionLock.lock();
+                try 
+                {
+                    doCleanupCompaction(cfStore);
+                    return this;
+                }
+                finally 
+                {
+                    compactionLock.unlock();
+                }
             }
         };
         executor.submit(runnable).get();
@@ -161,25 +192,33 @@ public class CompactionManager implement
         {
             public Object call() throws IOException
             {
-                Collection<SSTableReader> sstables;
-                if (skip > 0)
+                compactionLock.lock();
+                try
                 {
-                    sstables = new ArrayList<SSTableReader>();
-                    for (SSTableReader sstable : cfStore.getSSTables())
+                    Collection<SSTableReader> sstables;
+                    if (skip > 0)
                     {
-                        if (sstable.length() < skip * 1024L * 1024L * 1024L)
+                        sstables = new ArrayList<SSTableReader>();
+                        for (SSTableReader sstable : cfStore.getSSTables())
                         {
-                            sstables.add(sstable);
+                            if (sstable.length() < skip * 1024L * 1024L * 
1024L)
+                            {
+                                sstables.add(sstable);
+                            }
                         }
                     }
+                    else
+                    {
+                        sstables = cfStore.getSSTables();
+                    }
+    
+                    doCompaction(cfStore, sstables, gcBefore);
+                    return this;
                 }
-                else
+                finally 
                 {
-                    sstables = cfStore.getSSTables();
+                    compactionLock.unlock();
                 }
-
-                doCompaction(cfStore, sstables, gcBefore);
-                return this;
             }
         };
         return executor.submit(callable);
@@ -191,8 +230,16 @@ public class CompactionManager implement
         {
             public Object call() throws IOException
             {
-                doValidationCompaction(cfStore, validator);
-                return this;
+                compactionLock.lock();
+                try
+                {
+                    doValidationCompaction(cfStore, validator);
+                    return this;
+                }
+                finally
+                {
+                    compactionLock.unlock();
+                }
             }
         };
         return executor.submit(callable);
@@ -495,13 +542,29 @@ public class CompactionManager implement
         {
             public void run()
             {
-                executor.beginCompaction(cfs, builder);
-                builder.build();
+                compactionLock.lock();
+                try
+                {
+                    executor.beginCompaction(cfs, builder);
+                    builder.build();
+                }
+                finally
+                {
+                    compactionLock.unlock();
+                }
             }
         };
-        return executor.submit(runnable);
+        
+        // don't submit to the executor if the compaction lock is held by the 
current thread. Instead return a simple
+        // future that will be immediately immediately get()ed and executed. 
Happens during a migration, which locks
+        // the compaction thread and then reinitializes a ColumnFamilyStore. 
Under normal circumstances, CFS spawns
+        // index jobs to the compaction manager (this) and blocks on them.
+        if (compactionLock.isHeldByCurrentThread())
+            return new SimpleFuture(runnable);
+        else
+            return executor.submit(runnable);
     }
-
+    
     public Future<SSTableReader> submitSSTableBuild(Descriptor desc)
     {
         final SSTableWriter.Builder builder = 
SSTableWriter.createBuilder(desc);
@@ -509,8 +572,16 @@ public class CompactionManager implement
         {
             public SSTableReader call() throws IOException
             {
-                executor.beginCompaction(builder.cfs, builder);
-                return builder.build();
+                compactionLock.lock();
+                try
+                {
+                    executor.beginCompaction(builder.cfs, builder);
+                    return builder.build();
+                }
+                finally
+                {
+                    compactionLock.unlock();
+                }
             }
         };
         return executor.submit(callable);
@@ -681,4 +752,46 @@ public class CompactionManager implement
     {
         return executor.getCompletedTaskCount();
     }
+    
+    private class SimpleFuture implements Future
+    {
+        private Runnable runnable;
+        
+        private SimpleFuture(Runnable r) 
+        {
+            runnable = r;
+        }
+        
+        @Override
+        public boolean cancel(boolean mayInterruptIfRunning)
+        {
+            throw new IllegalStateException("May not call 
SimpleFuture.cancel()");
+        }
+
+        @Override
+        public boolean isCancelled()
+        {
+            return false;
+        }
+
+        @Override
+        public boolean isDone()
+        {
+            return runnable == null;
+        }
+
+        @Override
+        public Object get() throws InterruptedException, ExecutionException
+        {
+            runnable.run();
+            runnable = null;
+            return runnable;
+        }
+
+        @Override
+        public Object get(long timeout, TimeUnit unit) throws 
InterruptedException, ExecutionException, TimeoutException
+        {
+            throw new IllegalStateException("May not call 
SimpleFuture.get(long, TimeUnit)");
+        }
+    }
 }

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/Table.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java?rev=1036888&r1=1036887&r2=1036888&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/Table.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/Table.java Fri Nov 19 
15:27:06 2010
@@ -34,6 +34,7 @@ import java.util.concurrent.ExecutionExc
 import java.util.concurrent.Future;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import org.apache.cassandra.config.CFMetaData;
@@ -121,6 +122,11 @@ public class Table
         }
         return tableInstance;
     }
+    
+    public static Lock getFlushLock()
+    {
+        return flusherLock.writeLock();
+    }
 
     public static Table clear(String table) throws IOException
     {


Reply via email to