Repository: cassandra
Updated Branches:
  refs/heads/cassandra-2.0 46f7f84ce -> d13f78ab8
  refs/heads/cassandra-2.1 1ac72f637 -> 750235931


Fix potential SlabAllocator yield-starvation
patch by bes; reviewed by jbellis for CASSANDRA-7133


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/d13f78ab
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/d13f78ab
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/d13f78ab

Branch: refs/heads/cassandra-2.0
Commit: d13f78ab82c6fe4a6fbb5293126670fb7032203f
Parents: 46f7f84
Author: Jonathan Ellis <jbel...@apache.org>
Authored: Thu May 8 17:04:17 2014 -0500
Committer: Jonathan Ellis <jbel...@apache.org>
Committed: Thu May 8 17:04:17 2014 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |  2 +
 .../apache/cassandra/utils/SlabAllocator.java   | 53 +++++---------------
 2 files changed, 15 insertions(+), 40 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/d13f78ab/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index ce30239..2712398 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.0.9
+ * Fix potential SlabAllocator yield-starvation (CASSANDRA-7133)
  * Warn when 'USING TIMESTAMP' is used on a CAS BATCH (CASSANDRA-7067)
  * Starting threads in OutboundTcpConnectionPool constructor causes race 
conditions (CASSANDRA-7177)
  * return all cpu values from BackgroundActivityMonitor.readAndCompute 
(CASSANDRA-7183)
@@ -7,6 +8,7 @@
  * Fix potential NumberFormatException when deserializing IntegerType 
(CASSANDRA-7088)
  * cqlsh can't tab-complete disabling compaction (CASSANDRA-7185)
 
+
 2.0.8
  * Correctly delete scheduled range xfers (CASSANDRA-7143)
  * Make batchlog replica selection rack-aware (CASSANDRA-6551)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d13f78ab/src/java/org/apache/cassandra/utils/SlabAllocator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/SlabAllocator.java 
b/src/java/org/apache/cassandra/utils/SlabAllocator.java
index dedf869..20939fe 100644
--- a/src/java/org/apache/cassandra/utils/SlabAllocator.java
+++ b/src/java/org/apache/cassandra/utils/SlabAllocator.java
@@ -18,11 +18,11 @@
 package org.apache.cassandra.utils;
 
 import java.nio.ByteBuffer;
+import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 
-import com.google.common.base.Preconditions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -47,6 +47,9 @@ public class SlabAllocator extends Allocator
     private final static int REGION_SIZE = 1024 * 1024;
     private final static int MAX_CLONED_SIZE = 128 * 1024; // bigger than this 
don't go in the region
 
+    // globally stash any Regions we allocate but are beaten to using, and use 
these up before allocating any more
+    private static final ConcurrentLinkedQueue<Region> RACE_ALLOCATED = new 
ConcurrentLinkedQueue<>();
+
     private final AtomicReference<Region> currentRegion = new 
AtomicReference<Region>();
     private final AtomicInteger regionCount = new AtomicInteger(0);
     private AtomicLong unslabbed = new AtomicLong(0);
@@ -92,19 +95,20 @@ public class SlabAllocator extends Allocator
                 return region;
 
             // No current region, so we want to allocate one. We race
-            // against other allocators to CAS in an uninitialized region
-            // (which is cheap to allocate)
-            region = new Region(REGION_SIZE);
+            // against other allocators to CAS in a Region, and if we fail we 
stash the region for re-use
+            region = RACE_ALLOCATED.poll();
+            if (region == null)
+                region = new Region(REGION_SIZE);
             if (currentRegion.compareAndSet(null, region))
             {
-                // we won race - now we need to actually do the expensive 
allocation step
-                region.init();
                 regionCount.incrementAndGet();
                 logger.trace("{} regions now allocated in {}", regionCount, 
this);
                 return region;
             }
+
             // someone else won race - that's fine, we'll try to grab theirs
             // in the next iteration of the loop.
+            RACE_ALLOCATED.add(region);
         }
     }
 
@@ -129,24 +133,18 @@ public class SlabAllocator extends Allocator
         /**
          * Actual underlying data
          */
-        private ByteBuffer data;
+        private final ByteBuffer data;
 
-        private static final int UNINITIALIZED = -1;
         /**
          * Offset for the next allocation, or the sentinel value -1
          * which implies that the region is still uninitialized.
          */
-        private AtomicInteger nextFreeOffset = new 
AtomicInteger(UNINITIALIZED);
+        private final AtomicInteger nextFreeOffset = new AtomicInteger(0);
 
         /**
          * Total number of allocations satisfied from this buffer
          */
-        private AtomicInteger allocCount = new AtomicInteger();
-
-        /**
-         * Size of region in bytes
-         */
-        private final int size;
+        private final AtomicInteger allocCount = new AtomicInteger();
 
         /**
          * Create an uninitialized region. Note that memory is not allocated 
yet, so
@@ -156,23 +154,7 @@ public class SlabAllocator extends Allocator
          */
         private Region(int size)
         {
-            this.size = size;
-        }
-
-        /**
-         * Actually claim the memory for this region. This should only be 
called from
-         * the thread that constructed the region. It is thread-safe against 
other
-         * threads calling alloc(), who will block until the allocation is 
complete.
-         */
-        public void init()
-        {
-            assert nextFreeOffset.get() == UNINITIALIZED;
             data = ByteBuffer.allocate(size);
-            assert data.remaining() == data.capacity();
-            // Mark that it's ready for use
-            boolean initted = nextFreeOffset.compareAndSet(UNINITIALIZED, 0);
-            // We should always succeed the above CAS since only one thread 
calls init()!
-            Preconditions.checkState(initted, "Multiple threads tried to init 
same region");
         }
 
         /**
@@ -185,15 +167,6 @@ public class SlabAllocator extends Allocator
             while (true)
             {
                 int oldOffset = nextFreeOffset.get();
-                if (oldOffset == UNINITIALIZED)
-                {
-                    // The region doesn't have its data allocated yet.
-                    // Since we found this in currentRegion, we know that 
whoever
-                    // CAS-ed it there is allocating it right now. So spin-loop
-                    // shouldn't spin long!
-                    Thread.yield();
-                    continue;
-                }
 
                 if (oldOffset + size > data.capacity()) // capacity == 
remaining
                     return null;

Reply via email to