avoid blocking additional writes during flush
patch by jbellis; reviewed by slebresnse and tested by brandonwilliams for 
CASSANDRA-1991


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/4d7e7035
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/4d7e7035
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/4d7e7035

Branch: refs/heads/trunk
Commit: 4d7e703561bc68a79d856e28b3f710455b1c70bf
Parents: aead8da
Author: Jonathan Ellis <jbel...@apache.org>
Authored: Wed May 9 14:51:24 2012 -0500
Committer: Jonathan Ellis <jbel...@apache.org>
Committed: Wed May 9 14:52:06 2012 -0500

----------------------------------------------------------------------
 CHANGES.txt                                        |    2 +
 .../org/apache/cassandra/db/ColumnFamilyStore.java |   12 +++---
 src/java/org/apache/cassandra/db/Memtable.java     |   12 +++---
 .../apache/cassandra/db/commitlog/CommitLog.java   |   19 +++--------
 .../db/compaction/LeveledCompactionStrategy.java   |   25 +++++++++++++--
 .../cassandra/db/compaction/LeveledManifest.java   |    9 +++++
 .../org/apache/cassandra/db/CommitLogTest.java     |    6 ++--
 7 files changed, 53 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/4d7e7035/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index d6e62c8..0888d29 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -9,6 +9,8 @@
 
 
 1.1.1-dev
+ * avoid blocking additional writes during flush when the commitlog
+   gets behind temporarily (CASSANDRA-1991)
  * enable caching on index CFs based on data CF cache setting (CASSANDRA-4197)
  * warn on invalid replication strategy creation options (CASSANDRA-4046)
  * remove [Freeable]Memory finalizers (CASSANDRA-4222)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4d7e7035/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java 
b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index ea9bf21..05eaa83 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -30,6 +30,7 @@ import java.util.regex.Pattern;
 import javax.management.*;
 
 import com.google.common.collect.*;
+import com.google.common.util.concurrent.Futures;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -608,8 +609,7 @@ public class ColumnFamilyStore implements 
ColumnFamilyStoreMBean
             }
 
             assert getMemtableThreadSafe() == oldMemtable;
-            final ReplayPosition ctx = writeCommitLog ? 
CommitLog.instance.getContext() : ReplayPosition.NONE;
-            logger.debug("flush position is {}", ctx);
+            final Future<ReplayPosition> ctx = writeCommitLog ? 
CommitLog.instance.getContext() : Futures.immediateFuture(ReplayPosition.NONE);
 
             // submit the memtable for any indexed sub-cfses, and our own.
             final List<ColumnFamilyStore> icc = new 
ArrayList<ColumnFamilyStore>();
@@ -641,7 +641,7 @@ public class ColumnFamilyStore implements 
ColumnFamilyStoreMBean
             // while keeping the wait-for-flush (future.get) out of anything 
latency-sensitive.
             return postFlushExecutor.submit(new WrappedRunnable()
             {
-                public void runMayThrow() throws InterruptedException, 
IOException
+                public void runMayThrow() throws InterruptedException, 
IOException, ExecutionException
                 {
                     latch.await();
 
@@ -661,7 +661,7 @@ public class ColumnFamilyStore implements 
ColumnFamilyStoreMBean
                     {
                         // if we're not writing to the commit log, we are 
replaying the log, so marking
                         // the log header with "you can discard anything 
written before the context" is not valid
-                        
CommitLog.instance.discardCompletedSegments(metadata.cfId, ctx);
+                        
CommitLog.instance.discardCompletedSegments(metadata.cfId, ctx.get());
                     }
                 }
             });
@@ -1709,13 +1709,13 @@ public class ColumnFamilyStore implements 
ColumnFamilyStoreMBean
         if (ksm.durableWrites)
         {
             CommitLog.instance.forceNewSegment();
-            ReplayPosition position = CommitLog.instance.getContext();
+            Future<ReplayPosition> position = CommitLog.instance.getContext();
             // now flush everyone else.  re-flushing ourselves is not 
necessary, but harmless
             for (ColumnFamilyStore cfs : ColumnFamilyStore.all())
                 cfs.forceFlush();
             waitForActiveFlushes();
             // if everything was clean, flush won't have called discard
-            CommitLog.instance.discardCompletedSegments(metadata.cfId, 
position);
+            CommitLog.instance.discardCompletedSegments(metadata.cfId, 
position.get());
         }
 
         // sleep a little to make sure that our truncatedAt comes after any 
sstable

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4d7e7035/src/java/org/apache/cassandra/db/Memtable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Memtable.java 
b/src/java/org/apache/cassandra/db/Memtable.java
index cea118a..da795a6 100644
--- a/src/java/org/apache/cassandra/db/Memtable.java
+++ b/src/java/org/apache/cassandra/db/Memtable.java
@@ -260,7 +260,7 @@ public class Memtable
     }
 
 
-    private SSTableReader writeSortedContents(ReplayPosition context) throws 
IOException
+    private SSTableReader writeSortedContents(Future<ReplayPosition> context) 
throws IOException, ExecutionException, InterruptedException
     {
         logger.info("Writing " + this);
 
@@ -277,7 +277,7 @@ public class Memtable
                                      * 1.2); // bloom filter and row index 
overhead
         SSTableReader ssTable;
         // errors when creating the writer that may leave empty temp files.
-        SSTableWriter writer = cfs.createFlushWriter(columnFamilies.size(), 
estimatedSize, context);
+        SSTableWriter writer = cfs.createFlushWriter(columnFamilies.size(), 
estimatedSize, context.get());
         try
         {
             // (we can't clear out the map as-we-go to free up memory,
@@ -303,16 +303,16 @@ public class Memtable
             writer.abort();
             throw FBUtilities.unchecked(e);
         }
-        logger.info(String.format("Completed flushing %s (%d bytes)",
-                                  ssTable.getFilename(), new 
File(ssTable.getFilename()).length()));
+        logger.info(String.format("Completed flushing %s (%d bytes) for 
commitlog position %s",
+                                  ssTable.getFilename(), new 
File(ssTable.getFilename()).length(), context.get()));
         return ssTable;
     }
 
-    public void flushAndSignal(final CountDownLatch latch, ExecutorService 
writer, final ReplayPosition context)
+    public void flushAndSignal(final CountDownLatch latch, ExecutorService 
writer, final Future<ReplayPosition> context)
     {
         writer.execute(new WrappedRunnable()
         {
-            public void runMayThrow() throws IOException
+            public void runMayThrow() throws Exception
             {
                 SSTableReader sstable = writeSortedContents(context);
                 cfs.replaceFlushed(Memtable.this, sstable);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4d7e7035/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLog.java 
b/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
index ac8c251..055a32d 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
@@ -22,6 +22,7 @@ import java.lang.management.ManagementFactory;
 import java.util.*;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
 
 import org.apache.cassandra.config.ConfigurationException;
 import org.apache.cassandra.db.*;
@@ -158,9 +159,10 @@ public class CommitLog implements CommitLogMBean
     }
 
     /**
-     * @return the current ReplayPosition of the current segment file
+     * @return a Future representing a ReplayPosition such that when it is 
ready,
+     * all commitlog tasks enqueued prior to the getContext call will be 
complete (i.e., appended to the log)
      */
-    public ReplayPosition getContext()
+    public Future<ReplayPosition> getContext()
     {
         Callable<ReplayPosition> task = new Callable<ReplayPosition>()
         {
@@ -169,18 +171,7 @@ public class CommitLog implements CommitLogMBean
                 return activeSegment.getContext();
             }
         };
-        try
-        {
-            return executor.submit(task).get();
-        }
-        catch (InterruptedException e)
-        {
-            throw new RuntimeException(e);
-        }
-        catch (ExecutionException e)
-        {
-            throw new RuntimeException(e);
-        }
+        return executor.submit(task);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4d7e7035/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java 
b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
index 939fdc6..12c0d48 100644
--- a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
@@ -168,21 +168,40 @@ public class LeveledCompactionStrategy extends 
AbstractCompactionStrategy implem
     {
         Multimap<Integer, SSTableReader> byLevel = ArrayListMultimap.create();
         for (SSTableReader sstable : sstables)
-            byLevel.get(manifest.levelOf(sstable)).add(sstable);
+        {
+            int level = manifest.levelOf(sstable);
+            assert level >= 0;
+            byLevel.get(level).add(sstable);
+        }
 
         List<ICompactionScanner> scanners = new 
ArrayList<ICompactionScanner>(sstables.size());
         for (Integer level : byLevel.keySet())
         {
             if (level == 0)
             {
-                // L0 makes no guarantees about overlapping-ness.  Just create 
a direct scanner for each
+                // L0 makes no guarantees about overlapping-ness.  Just create 
a direct scanner for each.
                 for (SSTableReader sstable : byLevel.get(level))
                     scanners.add(sstable.getDirectScanner(range));
             }
             else
             {
                 // Create a LeveledScanner that only opens one sstable at a 
time, in sorted order
-                scanners.add(new LeveledScanner(byLevel.get(level), range));
+                ArrayList<SSTableReader> sstables1 = new 
ArrayList<SSTableReader>(byLevel.get(level));
+                scanners.add(new LeveledScanner(sstables1, range));
+
+                Collections.sort(sstables1, SSTable.sstableComparator);
+                SSTableReader previous = null;
+                for (SSTableReader sstable : sstables1)
+                {
+                    assert previous == null || 
sstable.first.compareTo(previous.last) > 0 : String.format("%s >= %s in %s and 
%s for %s in %s",
+                                                                               
                           previous.last,
+                                                                               
                           sstable.first,
+                                                                               
                           previous,
+                                                                               
                           sstable,
+                                                                               
                           sstable.getColumnFamilyName(),
+                                                                               
                           manifest.getLevel(level));
+                    previous = sstable;
+                }
             }
         }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4d7e7035/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java 
b/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
index 2022949..07eb57b 100644
--- a/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
+++ b/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
@@ -28,6 +28,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.DecoratedKey;
 import org.apache.cassandra.db.RowPosition;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
@@ -184,6 +185,14 @@ public class LeveledManifest
         for (SSTableReader ssTableReader : added)
             add(ssTableReader, newLevel);
 
+        DecoratedKey last = null;
+        Collections.sort(generations[newLevel], SSTable.sstableComparator);
+        for (SSTableReader sstable : generations[newLevel])
+        {
+            assert last == null || sstable.first.compareTo(last) > 0;
+            last = sstable.last;
+        }
+
         serialize();
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4d7e7035/test/unit/org/apache/cassandra/db/CommitLogTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/CommitLogTest.java 
b/test/unit/org/apache/cassandra/db/CommitLogTest.java
index e4b04a4..4e48d73 100644
--- a/test/unit/org/apache/cassandra/db/CommitLogTest.java
+++ b/test/unit/org/apache/cassandra/db/CommitLogTest.java
@@ -112,7 +112,7 @@ public class CommitLogTest extends SchemaLoader
         assert CommitLog.instance.activeSegments() == 2 : "Expecting 2 
segments, got " + CommitLog.instance.activeSegments();
 
         int cfid2 = rm2.getColumnFamilyIds().iterator().next();
-        CommitLog.instance.discardCompletedSegments(cfid2, 
CommitLog.instance.getContext());
+        CommitLog.instance.discardCompletedSegments(cfid2, 
CommitLog.instance.getContext().get());
 
         // Assert we still have both our segment
         assert CommitLog.instance.activeSegments() == 2 : "Expecting 2 
segments, got " + CommitLog.instance.activeSegments();
@@ -134,7 +134,7 @@ public class CommitLogTest extends SchemaLoader
 
         // "Flush": this won't delete anything
         int cfid1 = rm.getColumnFamilyIds().iterator().next();
-        CommitLog.instance.discardCompletedSegments(cfid1, 
CommitLog.instance.getContext());
+        CommitLog.instance.discardCompletedSegments(cfid1, 
CommitLog.instance.getContext().get());
 
         assert CommitLog.instance.activeSegments() == 1 : "Expecting 1 
segment, got " + CommitLog.instance.activeSegments();
 
@@ -152,7 +152,7 @@ public class CommitLogTest extends SchemaLoader
         // didn't write anything on cf1 since last flush (and we flush cf2)
 
         int cfid2 = rm2.getColumnFamilyIds().iterator().next();
-        CommitLog.instance.discardCompletedSegments(cfid2, 
CommitLog.instance.getContext());
+        CommitLog.instance.discardCompletedSegments(cfid2, 
CommitLog.instance.getContext().get());
 
         // Assert we still have both our segment
         assert CommitLog.instance.activeSegments() == 1 : "Expecting 1 
segment, got " + CommitLog.instance.activeSegments();

Reply via email to