Author: jbellis
Date: Wed Sep 23 01:57:15 2009
New Revision: 817925

URL: http://svn.apache.org/viewvc?rev=817925&view=rev
Log:
support multiple flush threads safely.  automatically use up to avaiable core 
count threads for flushing.  pause updates when too many unflushed memtables 
are generated.
patch by jbellis; reviewed by goffinet for CASSANDRA-401

Modified:
    incubator/cassandra/trunk/conf/storage-conf.xml
    
incubator/cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
    
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
    
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java
    
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/PeriodicFlushManager.java

Modified: incubator/cassandra/trunk/conf/storage-conf.xml
URL: 
http://svn.apache.org/viewvc/incubator/cassandra/trunk/conf/storage-conf.xml?rev=817925&r1=817924&r2=817925&view=diff
==============================================================================
--- incubator/cassandra/trunk/conf/storage-conf.xml (original)
+++ incubator/cassandra/trunk/conf/storage-conf.xml Wed Sep 23 01:57:15 2009
@@ -299,12 +299,9 @@
   <GCGraceSeconds>864000</GCGraceSeconds>
 
   <!--
-   ~ Number of threads to run when flushing memtables to disk.  Set this to
-   ~ the number of disks you physically have in your machine allocated for 
DataDirectory * 2. 
-   ~ If you are planning to use the Binary Memtable, its recommended to 
increase the max threads
-   ~ to maintain a higher quality of service while under load when normal 
memtables are flushing to disk. 
+   ~ The threshold size in megabytes the binary memtable must grow to,
+   ~ before it's submitted for flushing to disk.
   -->
-  <FlushMinThreads>1</FlushMinThreads>
-  <FlushMaxThreads>1</FlushMaxThreads>
+  <BinaryMemtableSizeInMB>256</BinaryMemtableSizeInMB>
 
 </Storage>

Modified: 
incubator/cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
URL: 
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java?rev=817925&r1=817924&r2=817925&view=diff
==============================================================================
--- 
incubator/cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
 (original)
+++ 
incubator/cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
 Wed Sep 23 01:57:15 2009
@@ -74,8 +74,7 @@
     private static int slicedReadBufferSizeInKB_ = 64;
     private static List<String> tables_ = new ArrayList<String>();
     private static Set<String> applicationColumnFamilies_ = new 
HashSet<String>();
-    private static int flushMinThreads_ = 1;
-    private static int flushMaxThreads_ = 1;
+    private static int bmtThreshold_ = 256;
 
     // Default descriptive names for introspection. The user can override
     // these choices in the config file. These are not case sensitive.
@@ -272,16 +271,10 @@
                 slicedReadBufferSizeInKB_ = Integer.parseInt(rawSlicedBuffer);
             }
 
-            String rawflushMinThreads = 
xmlUtils.getNodeValue("/Storage/FlushMinThreads");
-            if (rawflushMinThreads != null)
+            String bmtThreshold = 
xmlUtils.getNodeValue("/Storage/BinaryMemtableSizeInMB");
+            if (bmtThreshold != null)
             {
-                flushMinThreads_ = Integer.parseInt(rawflushMinThreads);
-            }
-
-            String rawflushMaxThreads = 
xmlUtils.getNodeValue("/Storage/FlushMaxThreads");
-            if (rawflushMaxThreads != null)
-            {
-                flushMaxThreads_ = Integer.parseInt(rawflushMaxThreads);
+                bmtThreshold_ = Integer.parseInt(bmtThreshold);
             }
 
             /* TCP port on which the storage system listens */
@@ -995,14 +988,8 @@
         return slicedReadBufferSizeInKB_;
     }
 
-    public static int getFlushMinThreads()
+    public static int getBMTThreshold()
     {
-        return flushMinThreads_;
+        return bmtThreshold_;
     }
-
-    public static int getFlushMaxThreads()
-    {
-        return flushMaxThreads_;
-    }
-
 }

Modified: 
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
URL: 
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=817925&r1=817924&r2=817925&view=diff
==============================================================================
--- 
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
 (original)
+++ 
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
 Wed Sep 23 01:57:15 2009
@@ -28,7 +28,6 @@
 import java.util.concurrent.*;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import org.apache.log4j.Logger;
 
@@ -55,8 +54,14 @@
     private static Logger logger_ = Logger.getLogger(ColumnFamilyStore.class);
 
     private static NonBlockingHashMap<String, Set<Memtable>> 
memtablesPendingFlush = new NonBlockingHashMap<String, Set<Memtable>>();
-    private static ExecutorService flusher_ = new 
DebuggableThreadPoolExecutor(DatabaseDescriptor.getFlushMinThreads(), 
DatabaseDescriptor.getFlushMaxThreads(), Integer.MAX_VALUE, TimeUnit.SECONDS, 
new LinkedBlockingQueue<Runnable>(), new 
ThreadFactoryImpl("MEMTABLE-FLUSHER-POOL"));
-    
+    private static DebuggableThreadPoolExecutor flusher_ = new 
DebuggableThreadPoolExecutor(1,
+                                                                               
             Runtime.getRuntime().availableProcessors(),
+                                                                               
             Integer.MAX_VALUE,
+                                                                               
             TimeUnit.SECONDS,
+                                                                               
             new LinkedBlockingQueue<Runnable>(2 * 
Runtime.getRuntime().availableProcessors()),
+                                                                               
             new ThreadFactoryImpl("MEMTABLE-FLUSHER-POOL"));
+    private static ExecutorService commitLogUpdater_ = new 
DebuggableThreadPoolExecutor("MEMTABLE-POST-FLUSHER");
+
     private final String table_;
     public final String columnFamily_;
     private final boolean isSuper_;
@@ -317,9 +322,8 @@
                              columnFamily_, SSTable.TEMPFILE_MARKER, 
fileIndexGenerator_.incrementAndGet());
     }
 
-    void switchMemtable(Memtable oldMemtable)
+    Future<?> switchMemtable(Memtable oldMemtable) throws IOException
     {
-        CommitLog.CommitLogContext ctx = null;
         /**
          *  If we can get the writelock, that means no new updates can come in 
and 
          *  all ongoing updates to memtables have completed. We can get the 
tail
@@ -330,34 +334,43 @@
         Table.flusherLock_.writeLock().lock();
         try
         {
-            try
-            {
-                ctx = CommitLog.open().getContext();
-            }
-            catch (IOException e)
-            {
-                throw new RuntimeException(e);
-            }
+            final CommitLog.CommitLogContext ctx = 
CommitLog.open().getContext();
 
             if (oldMemtable.isFrozen())
             {
-                return;
+                return null;
             }
             logger_.info(columnFamily_ + " has reached its threshold; 
switching in a fresh Memtable");
             oldMemtable.freeze();
             getMemtablesPendingFlushNotNull(columnFamily_).add(oldMemtable); 
// it's ok for the MT to briefly be both active and pendingFlush
-            submitFlush(oldMemtable, ctx);
+            final Future<?> future = submitFlush(oldMemtable, ctx);
             memtable_ = new Memtable(table_, columnFamily_);
-
-            if (memtableSwitchCount == Integer.MAX_VALUE)
+            // a second executor that makes sure the onMemtableFlushes get 
called in the right order,
+            // while keeping the wait-for-flush (future.get) out of anything 
latency-sensitive.
+            return commitLogUpdater_.submit(new Runnable()
             {
-                memtableSwitchCount = 0;
-            }
-            memtableSwitchCount++;
+                public void run()
+                {
+                    try
+                    {
+                        future.get();
+                        onMemtableFlush(ctx);
+                    }
+                    catch (Exception e)
+                    {
+                        throw new RuntimeException(e);
+                    }
+                }
+            });
         }
         finally
         {
             Table.flusherLock_.writeLock().unlock();
+            if (memtableSwitchCount == Integer.MAX_VALUE)
+            {
+                memtableSwitchCount = 0;
+            }
+            memtableSwitchCount++;
         }
     }
 
@@ -367,27 +380,20 @@
         binaryMemtable_.get().put(key, buffer);
     }
 
-    public void forceFlush()
+    public Future<?> forceFlush() throws IOException
     {
         if (memtable_.isClean())
-            return;
+            return null;
 
-        switchMemtable(memtable_);
+        return switchMemtable(memtable_);
     }
 
     void forceBlockingFlush() throws IOException, ExecutionException, 
InterruptedException
     {
         Memtable oldMemtable = getMemtableThreadSafe();
-        forceFlush();
-        // block for flush to finish by adding a no-op action to the flush 
executorservice
-        // and waiting for that to finish.  (this works since flush ES is 
single-threaded.)
-        Future f = flusher_.submit(new Runnable()
-        {
-            public void run()
-            {
-            }
-        });
-        f.get();
+        Future<?> future = forceFlush();
+        if (future != null)
+            future.get();
         /* this assert is not threadsafe -- the memtable could have been clean 
when forceFlush
            checked it, but dirty now thanks to another thread.  But as long as 
we are only
            calling this from single-threaded test code it is useful to have as 
a sanity check. */
@@ -882,10 +888,10 @@
     }
 
     /* Submit memtables to be flushed to disk */
-    private static void submitFlush(final Memtable memtable, final 
CommitLog.CommitLogContext cLogCtx)
+    private static Future<?> submitFlush(final Memtable memtable, final 
CommitLog.CommitLogContext cLogCtx)
     {
         logger_.info("Enqueuing flush of " + memtable);
-        flusher_.submit(new Runnable()
+        return flusher_.submit(new Runnable()
         {
             public void run()
             {

Modified: 
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java
URL: 
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java?rev=817925&r1=817924&r2=817925&view=diff
==============================================================================
--- 
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java
 (original)
+++ 
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java
 Wed Sep 23 01:57:15 2009
@@ -18,6 +18,9 @@
 
 package org.apache.cassandra.db;
 
+import java.util.concurrent.Future;
+import java.io.IOException;
+
 /**
  * The MBean interface for ColumnFamilyStore
  */
@@ -54,7 +57,7 @@
     /**
      * Triggers an immediate memtable flush.
      */
-    public void forceFlush();
+    public Future<?> forceFlush() throws IOException;
 
     /**
      * @return the number of read operations on this column family in the last 
minute

Modified: 
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java
URL: 
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java?rev=817925&r1=817924&r2=817925&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java 
(original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java 
Wed Sep 23 01:57:15 2009
@@ -219,7 +219,6 @@
             }
         }
         SSTableReader ssTable = writer.closeAndOpenReader();
-        cfStore.onMemtableFlush(cLogCtx);
         cfStore.addSSTable(ssTable);
         buffer.close();
         isFlushed_ = true;

Modified: 
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/PeriodicFlushManager.java
URL: 
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/PeriodicFlushManager.java?rev=817925&r1=817924&r2=817925&view=diff
==============================================================================
--- 
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/PeriodicFlushManager.java
 (original)
+++ 
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/PeriodicFlushManager.java
 Wed Sep 23 01:57:15 2009
@@ -22,6 +22,7 @@
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
+import java.io.IOException;
 
 import org.apache.log4j.Logger;
 
@@ -62,7 +63,14 @@
         {
             public void run()
             {
-                columnFamilyStore.forceFlush();
+                try
+                {
+                    columnFamilyStore.forceFlush();
+                }
+                catch (IOException e)
+                {
+                    throw new RuntimeException(e);
+                }
             }
         };
         flusher_.scheduleWithFixedDelay(runnable, flushPeriodInMinutes, 
flushPeriodInMinutes, TimeUnit.MINUTES);       


Reply via email to