Repository: cassandra
Updated Branches:
  refs/heads/cassandra-2.1 5c6958462 -> 7e3f6151a


Ensure memtable flush cannot expire commit log entries from its future

patch by benedict; reviewed by aweisburg for CASSANDRA-8383


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/7e3f6151
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/7e3f6151
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/7e3f6151

Branch: refs/heads/cassandra-2.1
Commit: 7e3f6151abc96ceb1a2cac1bc117324c4de630e9
Parents: 5c69584
Author: Benedict Elliott Smith <bened...@apache.org>
Authored: Fri Dec 12 13:20:19 2014 +0000
Committer: Benedict Elliott Smith <bened...@apache.org>
Committed: Fri Dec 12 13:20:19 2014 +0000

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../apache/cassandra/db/ColumnFamilyStore.java  | 34 ++++++++---
 .../org/apache/cassandra/db/DataTracker.java    |  5 +-
 src/java/org/apache/cassandra/db/Memtable.java  | 62 ++++++++++++--------
 .../cassandra/db/commitlog/CommitLog.java       |  4 +-
 5 files changed, 69 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/7e3f6151/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index b4cb6fb..18efc7e 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.1.3
+ * Ensure memtable flush cannot expire commit log entries from its future 
(CASSANDRA-8383)
  * Make read "defrag" async to reclaim memtables (CASSANDRA-8459)
  * Remove tmplink files for offline compactions (CASSANDRA-8321)
  * Reduce maxHintsInProgress (CASSANDRA-8415)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7e3f6151/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java 
b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index 99940b7..08f7969 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -23,6 +23,7 @@ import java.nio.ByteBuffer;
 import java.util.*;
 import java.util.concurrent.*;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.regex.Pattern;
 import javax.management.*;
 
@@ -910,12 +911,13 @@ public class ColumnFamilyStore implements 
ColumnFamilyStoreMBean
         final boolean flushSecondaryIndexes;
         final OpOrder.Barrier writeBarrier;
         final CountDownLatch latch = new CountDownLatch(1);
-        volatile ReplayPosition lastReplayPosition;
+        final ReplayPosition lastReplayPosition;
 
-        private PostFlush(boolean flushSecondaryIndexes, OpOrder.Barrier 
writeBarrier)
+        private PostFlush(boolean flushSecondaryIndexes, OpOrder.Barrier 
writeBarrier, ReplayPosition lastReplayPosition)
         {
             this.writeBarrier = writeBarrier;
             this.flushSecondaryIndexes = flushSecondaryIndexes;
+            this.lastReplayPosition = lastReplayPosition;
         }
 
         public void run()
@@ -995,19 +997,36 @@ public class ColumnFamilyStore implements 
ColumnFamilyStoreMBean
             memtables = new ArrayList<>();
 
             // submit flushes for the memtable for any indexed sub-cfses, and 
our own
-            final ReplayPosition minReplayPosition = 
CommitLog.instance.getContext();
+            AtomicReference<ReplayPosition> lastReplayPositionHolder = new 
AtomicReference<>();
             for (ColumnFamilyStore cfs : concatWithIndexes())
             {
                 // switch all memtables, regardless of their dirty status, 
setting the barrier
                 // so that we can reach a coordinated decision about 
cleanliness once they
                 // are no longer possible to be modified
                 Memtable mt = cfs.data.switchMemtable(truncate);
-                mt.setDiscarding(writeBarrier, minReplayPosition);
+                mt.setDiscarding(writeBarrier, lastReplayPositionHolder);
                 memtables.add(mt);
             }
 
+            // we now attempt to define the lastReplayPosition; we do this by 
grabbing the current limit from the CL
+            // and attempting to set the holder to this value. at the same 
time all writes to the memtables are
+            // also maintaining this value, so if somebody sneaks ahead of us 
somehow (should be rare) we simply retry,
+            // so that we know all operations prior to the position have not 
reached it yet
+            ReplayPosition lastReplayPosition;
+            while (true)
+            {
+                lastReplayPosition = new 
Memtable.LastReplayPosition(CommitLog.instance.getContext());
+                ReplayPosition currentLast = lastReplayPositionHolder.get();
+                if ((currentLast == null || 
currentLast.compareTo(lastReplayPosition) <= 0)
+                    && lastReplayPositionHolder.compareAndSet(currentLast, 
lastReplayPosition))
+                    break;
+            }
+
+            // we then issue the barrier; this lets us wait for all operations 
started prior to the barrier to complete;
+            // since this happens after wiring up the lastReplayPosition, we 
also know all operations with earlier
+            // replay positions have also completed, i.e. the memtables are 
done and ready to flush
             writeBarrier.issue();
-            postFlush = new PostFlush(!truncate, writeBarrier);
+            postFlush = new PostFlush(!truncate, writeBarrier, 
lastReplayPosition);
         }
 
         public void run()
@@ -1059,7 +1078,6 @@ public class ColumnFamilyStore implements 
ColumnFamilyStoreMBean
             }
 
             // signal the post-flush we've done our work
-            postFlush.lastReplayPosition = 
memtables.get(0).getLastReplayPosition();
             postFlush.latch.countDown();
         }
     }
@@ -1131,8 +1149,8 @@ public class ColumnFamilyStore implements 
ColumnFamilyStoreMBean
     {
         long start = System.nanoTime();
 
-        Memtable mt = data.getMemtableFor(opGroup);
-        final long timeDelta = mt.put(key, columnFamily, indexer, opGroup, 
replayPosition);
+        Memtable mt = data.getMemtableFor(opGroup, replayPosition);
+        final long timeDelta = mt.put(key, columnFamily, indexer, opGroup);
         maybeUpdateRowCache(key);
         metric.writeLatency.addNano(System.nanoTime() - start);
         if(timeDelta < Long.MAX_VALUE)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7e3f6151/src/java/org/apache/cassandra/db/DataTracker.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/DataTracker.java 
b/src/java/org/apache/cassandra/db/DataTracker.java
index 7df2b75..d086b47 100644
--- a/src/java/org/apache/cassandra/db/DataTracker.java
+++ b/src/java/org/apache/cassandra/db/DataTracker.java
@@ -24,6 +24,7 @@ import java.util.concurrent.atomic.AtomicReference;
 
 import com.google.common.base.Predicate;
 import com.google.common.collect.*;
+import org.apache.cassandra.db.commitlog.ReplayPosition;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -54,7 +55,7 @@ public class DataTracker
     }
 
     // get the Memtable that the ordered writeOp should be directed to
-    public Memtable getMemtableFor(OpOrder.Group opGroup)
+    public Memtable getMemtableFor(OpOrder.Group opGroup, ReplayPosition 
replayPosition)
     {
         // since any new memtables appended to the list after we fetch it will 
be for operations started
         // after us, we can safely assume that we will always find the 
memtable that 'accepts' us;
@@ -65,7 +66,7 @@ public class DataTracker
         // assign operations to a memtable that was retired/queued before we 
started)
         for (Memtable memtable : view.get().liveMemtables)
         {
-            if (memtable.accepts(opGroup))
+            if (memtable.accepts(opGroup, replayPosition))
                 return memtable;
         }
         throw new AssertionError(view.get().liveMemtables.toString());

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7e3f6151/src/java/org/apache/cassandra/db/Memtable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Memtable.java 
b/src/java/org/apache/cassandra/db/Memtable.java
index 3ae5da4..eb04bea 100644
--- a/src/java/org/apache/cassandra/db/Memtable.java
+++ b/src/java/org/apache/cassandra/db/Memtable.java
@@ -61,10 +61,17 @@ public class Memtable
     // the write barrier for directing writes to this memtable during a switch
     private volatile OpOrder.Barrier writeBarrier;
     // the last ReplayPosition owned by this Memtable; all ReplayPositions 
lower are owned by this or an earlier Memtable
-    private final AtomicReference<ReplayPosition> lastReplayPosition = new 
AtomicReference<>();
+    private volatile AtomicReference<ReplayPosition> lastReplayPosition;
     // the "first" ReplayPosition owned by this Memtable; this is inaccurate, 
and only used as a convenience to prevent CLSM flushing wantonly
     private final ReplayPosition minReplayPosition = 
CommitLog.instance.getContext();
 
+    public static final class LastReplayPosition extends ReplayPosition
+    {
+        public LastReplayPosition(ReplayPosition copy) {
+            super(copy.segment, copy.position);
+        }
+    }
+
     // We index the memtable by RowPosition only for the purpose of being able
     // to select key range using Token.KeyBound. However put() ensures that we
     // actually only store DecoratedKey.
@@ -101,10 +108,10 @@ public class Memtable
         return currentOperations.get();
     }
 
-    void setDiscarding(OpOrder.Barrier writeBarrier, ReplayPosition 
minLastReplayPosition)
+    void setDiscarding(OpOrder.Barrier writeBarrier, 
AtomicReference<ReplayPosition> lastReplayPosition)
     {
         assert this.writeBarrier == null;
-        this.lastReplayPosition.set(minLastReplayPosition);
+        this.lastReplayPosition = lastReplayPosition;
         this.writeBarrier = writeBarrier;
         allocator.setDiscarding();
     }
@@ -114,10 +121,34 @@ public class Memtable
         allocator.setDiscarded();
     }
 
-    public boolean accepts(OpOrder.Group opGroup)
+    // decide if this memtable should take the write, or if it should go to 
the next memtable
+    public boolean accepts(OpOrder.Group opGroup, ReplayPosition 
replayPosition)
     {
+        // if the barrier hasn't been set yet, then this memtable is still 
taking ALL writes
         OpOrder.Barrier barrier = this.writeBarrier;
-        return barrier == null || barrier.isAfter(opGroup);
+        if (barrier == null)
+            return true;
+        // if the barrier has been set, but is in the past, we are definitely 
destined for a future memtable
+        if (!barrier.isAfter(opGroup))
+            return false;
+        // if we aren't durable we are directed only by the barrier
+        if (replayPosition == null)
+            return true;
+        while (true)
+        {
+            // otherwise we check if we are in the past/future wrt the CL 
boundary;
+            // if the boundary hasn't been finalised yet, we simply update it 
to the max of
+            // its current value and ours; if it HAS been finalised, we simply 
accept its judgement
+            // this permits us to coordinate a safe boundary, as the boundary 
choice is made
+            // atomically wrt our max() maintenance, so an operation cannot 
sneak into the past
+            ReplayPosition currentLast = lastReplayPosition.get();
+            if (currentLast instanceof LastReplayPosition)
+                return currentLast.compareTo(replayPosition) >= 0;
+            if (currentLast != null && currentLast.compareTo(replayPosition) 
>= 0)
+                return true;
+            if (lastReplayPosition.compareAndSet(currentLast, replayPosition))
+                return true;
+        }
     }
 
     public boolean isLive()
@@ -150,22 +181,8 @@ public class Memtable
      *
      * replayPosition should only be null if this is a secondary index, in 
which case it is *expected* to be null
      */
-    long put(DecoratedKey key, ColumnFamily cf, SecondaryIndexManager.Updater 
indexer, OpOrder.Group opGroup, ReplayPosition replayPosition)
+    long put(DecoratedKey key, ColumnFamily cf, SecondaryIndexManager.Updater 
indexer, OpOrder.Group opGroup)
     {
-        if (replayPosition != null && writeBarrier != null)
-        {
-            // if the writeBarrier is set, we want to maintain 
lastReplayPosition; this is an optimisation to avoid
-            // casing it for every write, but still ensure it is correct when 
writeBarrier.await() completes.
-            while (true)
-            {
-                ReplayPosition last = lastReplayPosition.get();
-                if (last.compareTo(replayPosition) >= 0)
-                    break;
-                if (lastReplayPosition.compareAndSet(last, replayPosition))
-                    break;
-            }
-        }
-
         AtomicBTreeColumns previous = rows.get(key);
 
         if (previous == null)
@@ -274,11 +291,6 @@ public class Memtable
         return creationTime;
     }
 
-    public ReplayPosition getLastReplayPosition()
-    {
-        return lastReplayPosition.get();
-    }
-
     class FlushRunnable extends DiskAwareRunnable
     {
         private final ReplayPosition context;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7e3f6151/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLog.java 
b/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
index ee9ca14..9b51a33 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
@@ -158,8 +158,8 @@ public class CommitLog implements CommitLogMBean
     }
 
     /**
-     * @return a Future representing a ReplayPosition such that when it is 
ready,
-     * all Allocations created prior to the getContext call will be written to 
the log
+     * @return a ReplayPosition which, if >= one returned from add(), implies 
add() was started
+     * (but not necessarily finished) prior to this call
      */
     public ReplayPosition getContext()
     {

Reply via email to