Updates for memory issues (WIP)

Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/70dddc54
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/70dddc54
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/70dddc54

Branch: refs/heads/master
Commit: 70dddc54a73183e58f5493b13b1b19e51162f752
Parents: e80c32e
Author: Jacques Nadeau <[email protected]>
Authored: Mon Mar 3 22:22:59 2014 -0800
Committer: Steven Phillips <[email protected]>
Committed: Sun May 4 11:55:52 2014 -0700

----------------------------------------------------------------------
 .../main/java/io/netty/buffer/PoolArenaL.java   |  7 +-
 .../netty/buffer/PooledByteBufAllocatorL.java   |  7 ++
 .../drill/exec/memory/AccountingByteBuf.java    |  4 +-
 .../org/apache/drill/exec/memory/Accountor.java | 11 ++-
 .../drill/exec/memory/AtomicRemainder.java      | 93 ++++++++++++--------
 .../drill/exec/memory/TopLevelAllocator.java    | 40 ++++++---
 .../drill/exec/rpc/ProtobufLengthDecoder.java   |  2 +-
 .../exec/work/batch/ControlHandlerImpl.java     |  2 +-
 8 files changed, 104 insertions(+), 62 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/70dddc54/exec/bufferl/src/main/java/io/netty/buffer/PoolArenaL.java
----------------------------------------------------------------------
diff --git a/exec/bufferl/src/main/java/io/netty/buffer/PoolArenaL.java 
b/exec/bufferl/src/main/java/io/netty/buffer/PoolArenaL.java
index aba2226..479fa80 100644
--- a/exec/bufferl/src/main/java/io/netty/buffer/PoolArenaL.java
+++ b/exec/bufferl/src/main/java/io/netty/buffer/PoolArenaL.java
@@ -125,13 +125,10 @@ abstract class PoolArenaL<T> {
     
     /**
      * Allocate a buffer from the current arena.
-     * Unlike netty.io buffers, this buffer can grow without bounds,
-     * but it will throw an exception if growth involves copying a page 
-     * or more of data. Instead of being an upper bounds sanity check,
+     * Instead of being an upper bounds sanity check,
      * the "max" capacity is used to opportunistically allocate extra memory.
      * Later, the capacity can be reduced very efficiently.
-     * To avoid excessive copying, a buffer cannot grow if it must copy
-     * more than a single page of data.
+     * 
      * @param cache   TODO: not sure
      * @param minRequested  The smallest capacity buffer we want
      * @param maxRequested  If convenient, allocate up to this capacity

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/70dddc54/exec/bufferl/src/main/java/io/netty/buffer/PooledByteBufAllocatorL.java
----------------------------------------------------------------------
diff --git 
a/exec/bufferl/src/main/java/io/netty/buffer/PooledByteBufAllocatorL.java 
b/exec/bufferl/src/main/java/io/netty/buffer/PooledByteBufAllocatorL.java
index 85522c1..bc2b137 100644
--- a/exec/bufferl/src/main/java/io/netty/buffer/PooledByteBufAllocatorL.java
+++ b/exec/bufferl/src/main/java/io/netty/buffer/PooledByteBufAllocatorL.java
@@ -228,6 +228,13 @@ public class PooledByteBufAllocatorL extends 
AbstractByteBufAllocator {
         }
     }
 
+    @Override
+    public ByteBuf directBuffer(int initialCapacity, int maxCapacity) {
+        if (initialCapacity == 0 && maxCapacity == 0) {
+            return newDirectBuffer(0,0);
+        }
+        return super.directBuffer(initialCapacity, maxCapacity);
+    }
     
     /**
      * Override the abstract allocator. Normally, the abstract allocator

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/70dddc54/exec/java-exec/src/main/java/org/apache/drill/exec/memory/AccountingByteBuf.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/memory/AccountingByteBuf.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/memory/AccountingByteBuf.java
index 4df209f..f2d695e 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/memory/AccountingByteBuf.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/memory/AccountingByteBuf.java
@@ -46,7 +46,7 @@ public class AccountingByteBuf extends ByteBuf{
     super();
     this.b = b;
     this.acct = a;
-    this.size = b.maxCapacity();
+    this.size = b.capacity();
   }
 
   @Override
@@ -83,7 +83,7 @@ public class AccountingByteBuf extends ByteBuf{
       return this;
     }else if(newCapacity < size){
       b.capacity(newCapacity);
-      int diff = size - b.maxCapacity();
+      int diff = size - b.capacity();
       acct.releasePartial(this, diff);
       this.size = size - diff;
       return this;

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/70dddc54/exec/java-exec/src/main/java/org/apache/drill/exec/memory/Accountor.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/memory/Accountor.java 
b/exec/java-exec/src/main/java/org/apache/drill/exec/memory/Accountor.java
index bd40da3..0d19340 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/memory/Accountor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/memory/Accountor.java
@@ -62,18 +62,20 @@ public class Accountor {
   }
 
   public boolean reserve(long size) {
-    return remainder.get(size);
+    //TODO: for now, we won't stop reservation.
+    remainder.get(size);
+    return true;
   }
 
   public void forceAdditionalReservation(long size) {
-    remainder.forceGet(size);
+    if(size > 0) remainder.forceGet(size);
   }
 
   public void reserved(long expected, AccountingByteBuf buf){
     // make sure to take away the additional memory that happened due to 
rounding.
 
     long additional = buf.capacity() - expected;
-    remainder.forceGet(additional);
+    if(additional > 0) remainder.forceGet(additional);
 
     if (ENABLE_ACCOUNTING) {
       buffers.put(buf, new DebugStackTrace(buf.capacity(), 
Thread.currentThread().getStackTrace()));
@@ -103,6 +105,7 @@ public class Accountor {
   }
 
   public void close() {
+     
     if (ENABLE_ACCOUNTING && !buffers.isEmpty()) {
       StringBuffer sb = new StringBuffer();
       sb.append("Attempted to close accountor with ");
@@ -144,7 +147,7 @@ public class Accountor {
     }
 
     remainder.close();
-
+    
   }
 
   private class DebugStackTrace {

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/70dddc54/exec/java-exec/src/main/java/org/apache/drill/exec/memory/AtomicRemainder.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/memory/AtomicRemainder.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/memory/AtomicRemainder.java
index 95e57d2..8476b53 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/memory/AtomicRemainder.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/memory/AtomicRemainder.java
@@ -27,27 +27,30 @@ import java.util.concurrent.atomic.AtomicLong;
 public class AtomicRemainder {
   static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(AtomicRemainder.class);
 
+  private static final boolean DEBUG = true;
+
   private final AtomicRemainder parent;
-  private final AtomicLong total;
-  private final AtomicLong unaccountable;
-  private final long max;
-  private final long pre;
-  private boolean closed = false;
+  private final AtomicLong availableShared;
+  private final AtomicLong availablePrivate;
+  private final long initTotal;
+  private final long initShared;
+  private final long initPrivate;
 
   public AtomicRemainder(AtomicRemainder parent, long max, long pre) {
     this.parent = parent;
-    this.total = new AtomicLong(max - pre);
-    this.unaccountable = new AtomicLong(pre);
-    this.max = max;
-    this.pre = pre;
+    this.availableShared = new AtomicLong(max - pre);
+    this.availablePrivate = new AtomicLong(pre);
+    this.initTotal = max;
+    this.initShared = max - pre;
+    this.initPrivate = pre;
   }
 
   public long getRemainder() {
-    return total.get() + unaccountable.get();
+    return availableShared.get() + availablePrivate.get();
   }
 
   public long getUsed() {
-    return max - getRemainder();
+    return initTotal - getRemainder();
   }
 
   /**
@@ -57,41 +60,54 @@ public class AtomicRemainder {
    * @param size
    */
   public void forceGet(long size) {
-    total.addAndGet(size);
+    if (DEBUG)
+      logger.info("Force get {}", size);
+    availableShared.addAndGet(size);
     if (parent != null)
       parent.forceGet(size);
   }
 
   public boolean get(long size) {
-    if (unaccountable.get() < 1) {
+    if (DEBUG)
+      logger.info("Get {}", size);
+    if (availablePrivate.get() < 1) {
       // if there is no preallocated memory, we can operate normally.
-      long outcome = total.addAndGet(-size);
+
+      // attempt to get shared memory, if fails, return false.
+      long outcome = availableShared.addAndGet(-size);
       if (outcome < 0) {
-        total.addAndGet(size);
+        availableShared.addAndGet(size);
         return false;
       } else {
         return true;
       }
+
     } else {
       // if there is preallocated memory, use that first.
-      long unaccount = unaccountable.getAndAdd(-size);
-      if (unaccount > -1) {
+      long unaccount = availablePrivate.addAndGet(-size);
+      if (unaccount >= 0) {
         return true;
       } else {
 
+        long additionalSpaceNeeded = -unaccount;
         // if there is a parent allocator, check it before allocating.
-        if (parent != null && !parent.get(-unaccount)) {
-          unaccountable.getAndAdd(size);
+        if (parent != null && !parent.get(additionalSpaceNeeded)) {
+          // parent allocation failed, return space to private pool.
+          availablePrivate.getAndAdd(size);
           return false;
         }
 
-        long account = total.addAndGet(unaccount);
+        // we got space from parent pool. lets make sure we have space locally 
available.
+        long account = availableShared.addAndGet(-additionalSpaceNeeded);
         if (account >= 0) {
-          unaccountable.getAndAdd(unaccount);
+          // we were succesful, move private back to zero (since we allocated 
using shared).
+          availablePrivate.addAndGet(additionalSpaceNeeded);
           return true;
         } else {
-          unaccountable.getAndAdd(size);
-          total.addAndGet(-unaccount);
+          // we failed to get space from available shared. Return allocations 
to initial state.
+          availablePrivate.addAndGet(size);
+          availableShared.addAndGet(additionalSpaceNeeded);
+          parent.returnAllocation(additionalSpaceNeeded);
           return false;
         }
       }
@@ -106,20 +122,27 @@ public class AtomicRemainder {
    * @param size
    */
   public void returnAllocation(long size) {
-    long preSize = unaccountable.get();
-    long preChange = Math.min(size, pre - preSize);
-    long totalChange = size - preChange;
-    unaccountable.addAndGet(preChange);
-    total.addAndGet(totalChange);
-    if (parent != null){
-      parent.returnAllocation(totalChange);
+    if (DEBUG)
+      logger.info("Return allocation {}", size);
+    long privateSize = availablePrivate.get();
+    long privateChange = Math.min(size, initPrivate - privateSize);
+    long sharedChange = size - privateChange;
+    availablePrivate.addAndGet(privateChange);
+    availableShared.addAndGet(sharedChange);
+    if (parent != null) {
+      parent.returnAllocation(sharedChange);
     }
   }
 
-  public void close(){
-    if(!closed){
-      closed = true;
-//      if(parent != null) parent.returnAllocation(pre);
-    }
+  public void close() {
+    
+    if (availablePrivate.get() != initPrivate || availableShared.get() != 
initShared)
+      throw new IllegalStateException(
+          String
+              .format(ERROR, initPrivate, availablePrivate.get(), initPrivate 
- availablePrivate.get(), initShared, availableShared.get(), initShared - 
availableShared.get()));
+    
+    if(parent != null) parent.returnAllocation(initPrivate);
   }
+
+  static final String ERROR = "Failure while closing accountor.  Expected 
private and shared pools to be set to initial values.  However, one or more 
were not.  Stats are\n\tzone\tinit\tallocated\tdelta \n\tprivate\t%d\t%d\t%d 
\n\tshared\t%d\t%d\t%d.";
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/70dddc54/exec/java-exec/src/main/java/org/apache/drill/exec/memory/TopLevelAllocator.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/memory/TopLevelAllocator.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/memory/TopLevelAllocator.java
index 108eaec..e71c9c9 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/memory/TopLevelAllocator.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/memory/TopLevelAllocator.java
@@ -22,12 +22,18 @@ import io.netty.buffer.ByteBufAllocator;
 import io.netty.buffer.PooledByteBufAllocatorL;
 import io.netty.buffer.PooledUnsafeDirectByteBufL;
 
+import java.util.HashSet;
+import java.util.Set;
+
 import org.apache.drill.common.config.DrillConfig;
 import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
+import org.apache.drill.exec.util.AssertionUtil;
 
 public class TopLevelAllocator implements BufferAllocator {
   static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(TopLevelAllocator.class);
 
+  private static final boolean ENABLE_ACCOUNTING = 
AssertionUtil.isAssertionsEnabled();
+  private final Set<ChildAllocator> children;
   private final PooledByteBufAllocatorL innerAllocator = new 
PooledByteBufAllocatorL(true);
   private final Accountor acct;
 
@@ -37,14 +43,14 @@ public class TopLevelAllocator implements BufferAllocator {
   
   public TopLevelAllocator(long maximumAllocation) {
     this.acct = new Accountor(null, null, maximumAllocation, 0);
+    this.children = ENABLE_ACCOUNTING ? new HashSet<ChildAllocator>() : null; 
   }
 
   public AccountingByteBuf buffer(int min, int max) {
     if(!acct.reserve(min)) return null;
     ByteBuf buffer = innerAllocator.directBuffer(min, max);
-    if(buffer.maxCapacity() > max) buffer.capacity(max);
     AccountingByteBuf wrapped = new AccountingByteBuf(acct, 
(PooledUnsafeDirectByteBufL) buffer);
-    acct.reserved(buffer.maxCapacity(), wrapped);
+    acct.reserved(buffer.capacity() - min, wrapped);
     return wrapped;
   }
   
@@ -68,34 +74,37 @@ public class TopLevelAllocator implements BufferAllocator {
     if(!acct.reserve(initialReservation)){
       throw new OutOfMemoryException(String.format("You attempted to create a 
new child allocator with initial reservation %d but only %d bytes of memory 
were available.", initialReservation, acct.getCapacity() - 
acct.getAllocation()));
     };
-    return new ChildAllocator(handle, acct, initialReservation, 
maximumReservation);
+    ChildAllocator allocator = new ChildAllocator(handle, acct, 
initialReservation, maximumReservation);
+    if(ENABLE_ACCOUNTING) children.add(allocator);
+    return allocator;
   }
 
   @Override
   public void close() {
+    if(ENABLE_ACCOUNTING && !children.isEmpty()){
+      throw new IllegalStateException("Failure while trying to close 
allocator: Child level allocators not closed.");
+    }
     acct.close();
   }
 
   
   private class ChildAllocator implements BufferAllocator{
 
-    private Accountor innerAcct;
+    private Accountor childAcct;
     
     public ChildAllocator(FragmentHandle handle, Accountor parentAccountor, 
long max, long pre) throws OutOfMemoryException{
-      innerAcct = new Accountor(handle, parentAccountor, max, pre);
+      childAcct = new Accountor(handle, parentAccountor, max, pre);
     }
     
-    
     @Override
     public AccountingByteBuf buffer(int size, int max) {
-      if(!innerAcct.reserve(size)){
+      if(!childAcct.reserve(size)){
         return null;
       };
       
       ByteBuf buffer = innerAllocator.directBuffer(size, max);
-      if(buffer.maxCapacity() > max) buffer.capacity(max);
-      AccountingByteBuf wrapped = new AccountingByteBuf(innerAcct, 
(PooledUnsafeDirectByteBufL) buffer);
-      innerAcct.reserved(buffer.maxCapacity(), wrapped);
+      AccountingByteBuf wrapped = new AccountingByteBuf(childAcct, 
(PooledUnsafeDirectByteBufL) buffer);
+      childAcct.reserved(buffer.capacity(), wrapped);
       return wrapped;
     }
     
@@ -111,21 +120,24 @@ public class TopLevelAllocator implements BufferAllocator 
{
     @Override
     public BufferAllocator getChildAllocator(FragmentHandle handle, long 
initialReservation, long maximumReservation)
         throws OutOfMemoryException {
-      return new ChildAllocator(handle, innerAcct, maximumReservation, 
initialReservation);
+      if(!childAcct.reserve(initialReservation)){
+        throw new OutOfMemoryException(String.format("You attempted to create 
a new child allocator with initial reservation %d but only %d bytes of memory 
were available.", initialReservation, childAcct.getCapacity() - 
childAcct.getAllocation()));
+      };
+      return new ChildAllocator(handle, childAcct, maximumReservation, 
initialReservation);
     }
 
     public PreAllocator getNewPreAllocator(){
-      return new PreAlloc(this.innerAcct); 
+      return new PreAlloc(this.childAcct); 
     }
 
     @Override
     public void close() {
-      innerAcct.close();
+      childAcct.close();
     }
 
     @Override
     public long getAllocatedMemory() {
-      return innerAcct.getAllocation();
+      return childAcct.getAllocation();
     }
     
   }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/70dddc54/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/ProtobufLengthDecoder.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/ProtobufLengthDecoder.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/ProtobufLengthDecoder.java
index 6fef7e5..23fa46d 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/ProtobufLengthDecoder.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/ProtobufLengthDecoder.java
@@ -80,7 +80,7 @@ public class ProtobufLengthDecoder extends 
ByteToMessageDecoder {
           // TODO: Can we avoid this copy?
           ByteBuf outBuf = allocator.buffer(length);
           if(outBuf == null){
-            logger.debug("Failure allocating buffer on incoming stream due to 
memory limits.");
+            logger.warn("Failure allocating buffer on incoming stream due to 
memory limits.  Current Allocation: {}.", allocator.getAllocatedMemory());
             in.resetReaderIndex();
             return;
           }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/70dddc54/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlHandlerImpl.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlHandlerImpl.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlHandlerImpl.java
index 92614ca..835adad 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlHandlerImpl.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlHandlerImpl.java
@@ -81,7 +81,7 @@ public class ControlHandlerImpl implements 
ControlMessageHandler {
         return DataRpcConfig.OK;
 
       } catch (OutOfMemoryException e) {
-        logger.error("Failure while attempting to start remote fragment.", 
fragment);
+        logger.error("Failure while attempting to start remote fragment.", 
fragment, e);
         return new Response(RpcType.ACK, Acks.FAIL);
       }
       

Reply via email to