Ensure memtable book keeping is not corrupted in the event we shrink usage

patch by benedict; reviewed by tjake for CASSANDRA-9681


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/b757db14
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/b757db14
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/b757db14

Branch: refs/heads/trunk
Commit: b757db1484473b264bf25ca5541f080d54a579a2
Parents: c5f03a9
Author: Benedict Elliott Smith <bened...@apache.org>
Authored: Thu Jul 2 10:27:07 2015 +0100
Committer: Benedict Elliott Smith <bened...@apache.org>
Committed: Thu Jul 2 10:27:07 2015 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../apache/cassandra/db/AtomicBTreeColumns.java |  2 +-
 .../apache/cassandra/db/ColumnFamilyStore.java  | 22 +++++-
 src/java/org/apache/cassandra/db/Memtable.java  | 15 ++--
 .../org/apache/cassandra/utils/FBUtilities.java | 10 +++
 .../apache/cassandra/utils/memory/HeapPool.java |  4 +-
 .../utils/memory/MemtableAllocator.java         | 39 +++++++----
 .../cassandra/utils/memory/MemtablePool.java    | 73 ++++++++++++--------
 .../utils/memory/NativeAllocatorTest.java       | 18 ++++-
 9 files changed, 132 insertions(+), 52 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/b757db14/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 762b88b..25f7c1d 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.1.8
+ * Ensure memtable book keeping is not corrupted in the event we shrink usage 
(CASSANDRA-9681)
  * Update internal python driver for cqlsh (CASSANDRA-9064)
  * Fix IndexOutOfBoundsException when inserting tuple with too many
    elements using the string literal notation (CASSANDRA-9559)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b757db14/src/java/org/apache/cassandra/db/AtomicBTreeColumns.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/AtomicBTreeColumns.java 
b/src/java/org/apache/cassandra/db/AtomicBTreeColumns.java
index 47f0b85..d9eb29c 100644
--- a/src/java/org/apache/cassandra/db/AtomicBTreeColumns.java
+++ b/src/java/org/apache/cassandra/db/AtomicBTreeColumns.java
@@ -505,7 +505,7 @@ public class AtomicBTreeColumns extends ColumnFamily
 
         protected void finish()
         {
-            allocator.onHeap().allocate(heapSize, writeOp);
+            allocator.onHeap().adjust(heapSize, writeOp);
             reclaimer.commit();
         }
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b757db14/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java 
b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index fa527c7..8e67cdc 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -35,6 +35,7 @@ import com.google.common.collect.*;
 import com.google.common.util.concurrent.*;
 
 import org.apache.cassandra.io.FSWriteError;
+import org.apache.cassandra.utils.memory.MemtablePool;
 import org.json.simple.*;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -1157,6 +1158,7 @@ public class ColumnFamilyStore implements 
ColumnFamilyStoreMBean
         {
             float largestRatio = 0f;
             Memtable largest = null;
+            float liveOnHeap = 0, liveOffHeap = 0;
             for (ColumnFamilyStore cfs : ColumnFamilyStore.all())
             {
                 // we take a reference to the current main memtable for the CF 
prior to snapping its ownership ratios
@@ -1181,19 +1183,37 @@ public class ColumnFamilyStore implements 
ColumnFamilyStoreMBean
                 }
 
                 float ratio = Math.max(onHeap, offHeap);
-
                 if (ratio > largestRatio)
                 {
                     largest = current;
                     largestRatio = ratio;
                 }
+
+                liveOnHeap += onHeap;
+                liveOffHeap += offHeap;
             }
 
             if (largest != null)
+            {
+                float usedOnHeap = Memtable.MEMORY_POOL.onHeap.usedRatio();
+                float usedOffHeap = Memtable.MEMORY_POOL.offHeap.usedRatio();
+                float flushingOnHeap = 
Memtable.MEMORY_POOL.onHeap.reclaimingRatio();
+                float flushingOffHeap = 
Memtable.MEMORY_POOL.offHeap.reclaimingRatio();
+                float thisOnHeap = 
largest.getAllocator().onHeap().ownershipRatio();
+                float thisOffHeap = 
largest.getAllocator().onHeap().ownershipRatio();
+                logger.info("Flushing largest {} to free up room. Used total: 
{}, live: {}, flushing: {}, this: {}",
+                            largest.cfs, ratio(usedOnHeap, usedOffHeap), 
ratio(liveOnHeap, liveOffHeap),
+                            ratio(flushingOnHeap, flushingOffHeap), 
ratio(thisOnHeap, thisOffHeap));
                 largest.cfs.switchMemtableIfCurrent(largest);
+            }
         }
     }
 
+    private static String ratio(float onHeap, float offHeap)
+    {
+        return String.format("%.0f/%.0f", onHeap, offHeap);
+    }
+
     public void maybeUpdateRowCache(DecoratedKey key)
     {
         if (!isRowCacheEnabled())

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b757db14/src/java/org/apache/cassandra/db/Memtable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Memtable.java 
b/src/java/org/apache/cassandra/db/Memtable.java
index a50a614..9f6cf9b 100644
--- a/src/java/org/apache/cassandra/db/Memtable.java
+++ b/src/java/org/apache/cassandra/db/Memtable.java
@@ -239,7 +239,8 @@ public class Memtable
     public String toString()
     {
         return String.format("Memtable-%s@%s(%s serialized bytes, %s ops, 
%.0f%%/%.0f%% of on/off-heap limit)",
-                             cfs.name, hashCode(), liveDataSize, 
currentOperations, 100 * allocator.onHeap().ownershipRatio(), 100 * 
allocator.offHeap().ownershipRatio());
+                             cfs.name, hashCode(), 
FBUtilities.prettyPrintMemory(liveDataSize.get()), currentOperations,
+                             100 * allocator.onHeap().ownershipRatio(), 100 * 
allocator.offHeap().ownershipRatio());
     }
 
     /**
@@ -378,19 +379,21 @@ public class Memtable
 
                 if (writer.getFilePointer() > 0)
                 {
-                    writer.isolateReferences();
+                    logger.info(String.format("Completed flushing %s (%s) for 
commitlog position %s",
+                                              writer.getFilename(),
+                                              
FBUtilities.prettyPrintMemory(writer.getOnDiskFilePointer()),
+                                              context));
 
+                    writer.isolateReferences();
                     // temp sstables should contain non-repaired data.
                     ssTable = writer.closeAndOpenReader();
-                    logger.info(String.format("Completed flushing %s (%d 
bytes) for commitlog position %s",
-                                              ssTable.getFilename(), new 
File(ssTable.getFilename()).length(), context));
                 }
                 else
                 {
+                    logger.info("Completed flushing %s; nothing needed to be 
retained.  Commitlog position was {}",
+                                writer.getFilename(), context);
                     writer.abort();
                     ssTable = null;
-                    logger.info("Completed flushing; nothing needed to be 
retained.  Commitlog position was {}",
-                                context);
                 }
 
                 if (heavilyContendedRowCount > 0)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b757db14/src/java/org/apache/cassandra/utils/FBUtilities.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/FBUtilities.java 
b/src/java/org/apache/cassandra/utils/FBUtilities.java
index 4c81b2a..68eb864 100644
--- a/src/java/org/apache/cassandra/utils/FBUtilities.java
+++ b/src/java/org/apache/cassandra/utils/FBUtilities.java
@@ -28,6 +28,7 @@ import java.net.UnknownHostException;
 import java.nio.ByteBuffer;
 import java.security.MessageDigest;
 import java.security.NoSuchAlgorithmException;
+import java.text.NumberFormat;
 import java.util.*;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
@@ -570,6 +571,15 @@ public class FBUtilities
         }
     }
 
+    public static String prettyPrintMemory(long size)
+    {
+        if (size >= 1 << 30)
+            return String.format("%.3fGiB", size / (double) (1 << 30));
+        if (size >= 1 << 20)
+            return String.format("%.3fMiB", size / (double) (1 << 20));
+        return String.format("%.3fKiB", size / (double) (1 << 10));
+    }
+
     /**
      * Starts and waits for the given @param pb to finish.
      * @throws java.io.IOException on non-zero exit code

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b757db14/src/java/org/apache/cassandra/utils/memory/HeapPool.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/memory/HeapPool.java 
b/src/java/org/apache/cassandra/utils/memory/HeapPool.java
index a04947c..2a19d9c 100644
--- a/src/java/org/apache/cassandra/utils/memory/HeapPool.java
+++ b/src/java/org/apache/cassandra/utils/memory/HeapPool.java
@@ -75,13 +75,13 @@ public class HeapPool extends MemtablePool
 
             public Reclaimer reclaimImmediately(Cell cell)
             {
-                onHeap().release(cell.name().dataSize() + 
cell.value().remaining());
+                onHeap().released(cell.name().dataSize() + 
cell.value().remaining());
                 return this;
             }
 
             public Reclaimer reclaimImmediately(DecoratedKey key)
             {
-                onHeap().release(key.getKey().remaining());
+                onHeap().released(key.getKey().remaining());
                 return this;
             }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b757db14/src/java/org/apache/cassandra/utils/memory/MemtableAllocator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/memory/MemtableAllocator.java 
b/src/java/org/apache/cassandra/utils/memory/MemtableAllocator.java
index e814b4d..f5e743c 100644
--- a/src/java/org/apache/cassandra/utils/memory/MemtableAllocator.java
+++ b/src/java/org/apache/cassandra/utils/memory/MemtableAllocator.java
@@ -160,13 +160,24 @@ public abstract class MemtableAllocator
         // currently no corroboration/enforcement of this is performed.
         void releaseAll()
         {
-            parent.adjustAcquired(-ownsUpdater.getAndSet(this, 0), false);
-            parent.adjustReclaiming(-reclaimingUpdater.getAndSet(this, 0));
+            parent.released(ownsUpdater.getAndSet(this, 0));
+            parent.reclaimed(reclaimingUpdater.getAndSet(this, 0));
+        }
+
+        // like allocate, but permits allocations to be negative
+        public void adjust(long size, OpOrder.Group opGroup)
+        {
+            if (size <= 0)
+                released(-size);
+            else
+                allocate(size, opGroup);
         }
 
         // allocate memory in the tracker, and mark ourselves as owning it
         public void allocate(long size, OpOrder.Group opGroup)
         {
+            assert size >= 0;
+
             while (true)
             {
                 if (parent.tryAllocate(size))
@@ -190,23 +201,23 @@ public abstract class MemtableAllocator
             }
         }
 
-        // retroactively mark an amount allocated amd acquired in the tracker, 
and owned by us
-        void allocated(long size)
+        // retroactively mark an amount allocated and acquired in the tracker, 
and owned by us
+        private void allocated(long size)
         {
-            parent.adjustAcquired(size, true);
+            parent.allocated(size);
             ownsUpdater.addAndGet(this, size);
         }
 
         // retroactively mark an amount acquired in the tracker, and owned by 
us
-        void acquired(long size)
+        private void acquired(long size)
         {
-            parent.adjustAcquired(size, false);
+            parent.acquired(size);
             ownsUpdater.addAndGet(this, size);
         }
 
-        void release(long size)
+        void released(long size)
         {
-            parent.adjustAcquired(-size, false);
+            parent.released(size);
             ownsUpdater.addAndGet(this, -size);
         }
 
@@ -217,11 +228,11 @@ public abstract class MemtableAllocator
             {
                 long cur = owns;
                 long prev = reclaiming;
-                if (reclaimingUpdater.compareAndSet(this, prev, cur))
-                {
-                    parent.adjustReclaiming(cur - prev);
-                    return;
-                }
+                if (!reclaimingUpdater.compareAndSet(this, prev, cur))
+                    continue;
+
+                parent.reclaiming(cur - prev);
+                return;
             }
         }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b757db14/src/java/org/apache/cassandra/utils/memory/MemtablePool.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/memory/MemtablePool.java 
b/src/java/org/apache/cassandra/utils/memory/MemtablePool.java
index 1d219bb..bb85884 100644
--- a/src/java/org/apache/cassandra/utils/memory/MemtablePool.java
+++ b/src/java/org/apache/cassandra/utils/memory/MemtablePool.java
@@ -130,10 +130,8 @@ public abstract class MemtablePool
          * apply the size adjustment to allocated, bypassing any limits or 
constraints. If this reduces the
          * allocated total, we will signal waiters
          */
-        void adjustAllocated(long size)
+        private void adjustAllocated(long size)
         {
-            if (size == 0)
-                return;
             while (true)
             {
                 long cur = allocated;
@@ -142,38 +140,43 @@ public abstract class MemtablePool
             }
         }
 
-        // 'acquires' an amount of memory, and maybe also marks it allocated. 
This method is meant to be overridden
-        // by implementations with a separate concept of acquired/allocated. 
As this method stands, an acquire
-        // without an allocate is a no-op (acquisition is achieved through 
allocation), however a release (where size < 0)
-        // is always processed and accounted for in allocated.
-        void adjustAcquired(long size, boolean alsoAllocated)
+        void allocated(long size)
         {
-            if (size > 0 || alsoAllocated)
-            {
-                if (alsoAllocated)
-                    adjustAllocated(size);
-                maybeClean();
-            }
-            else if (size < 0)
-            {
-                adjustAllocated(size);
-                hasRoom.signalAll();
-            }
+            assert size >= 0;
+            if (size == 0)
+                return;
+
+            adjustAllocated(size);
+            maybeClean();
+        }
+
+        void acquired(long size)
+        {
+            maybeClean();
+        }
+
+        void released(long size)
+        {
+            assert size >= 0;
+            adjustAllocated(-size);
+            hasRoom.signalAll();
         }
 
-        // space reclaimed should be released prior to calling this, to avoid 
triggering unnecessary cleans
-        void adjustReclaiming(long reclaiming)
+        void reclaiming(long size)
         {
-            if (reclaiming == 0)
+            if (size == 0)
                 return;
-            reclaimingUpdater.addAndGet(this, reclaiming);
-            if (reclaiming < 0 && updateNextClean() && cleaner != null)
-                cleaner.trigger();
+            reclaimingUpdater.addAndGet(this, size);
         }
 
-        public long allocated()
+        void reclaimed(long size)
         {
-            return allocated;
+            if (size == 0)
+                return;
+
+            reclaimingUpdater.addAndGet(this, -size);
+            if (updateNextClean() && cleaner != null)
+                cleaner.trigger();
         }
 
         public long used()
@@ -181,6 +184,22 @@ public abstract class MemtablePool
             return allocated;
         }
 
+        public float reclaimingRatio()
+        {
+            float r = reclaiming / (float) limit;
+            if (Float.isNaN(r))
+                return 0;
+            return r;
+        }
+
+        public float usedRatio()
+        {
+            float r = allocated / (float) limit;
+            if (Float.isNaN(r))
+                return 0;
+            return r;
+        }
+
         public MemtableAllocator.SubAllocator newAllocator()
         {
             return new MemtableAllocator.SubAllocator(this);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b757db14/test/unit/org/apache/cassandra/utils/memory/NativeAllocatorTest.java
----------------------------------------------------------------------
diff --git 
a/test/unit/org/apache/cassandra/utils/memory/NativeAllocatorTest.java 
b/test/unit/org/apache/cassandra/utils/memory/NativeAllocatorTest.java
index 83d6c0c..7f87fcd 100644
--- a/test/unit/org/apache/cassandra/utils/memory/NativeAllocatorTest.java
+++ b/test/unit/org/apache/cassandra/utils/memory/NativeAllocatorTest.java
@@ -56,7 +56,7 @@ public class NativeAllocatorTest
                     }
                     if (isClean.getCount() > 0)
                     {
-                        allocatorRef.get().offHeap().release(80);
+                        allocatorRef.get().offHeap().released(80);
                         isClean.countDown();
                     }
                 }
@@ -79,6 +79,22 @@ public class NativeAllocatorTest
                     // allocate normal, check accounted and not cleaned
                     allocator.allocate(10, group);
                     Assert.assertEquals(10, allocator.offHeap().owns());
+                    // confirm adjustment works
+                    allocator.offHeap().adjust(-10, group);
+                    Assert.assertEquals(0, allocator.offHeap().owns());
+                    allocator.offHeap().adjust(10, group);
+                    Assert.assertEquals(10, allocator.offHeap().owns());
+                    // confirm we cannot allocate negative
+                    boolean success = false;
+                    try
+                    {
+                        allocator.offHeap().allocate(-10, group);
+                    }
+                    catch (AssertionError e)
+                    {
+                        success = true;
+                    }
+                    Assert.assertTrue(success);
                     Uninterruptibles.sleepUninterruptibly(10L, 
TimeUnit.MILLISECONDS);
                     Assert.assertEquals(1, isClean.getCount());
 

Reply via email to