Repository: incubator-drill
Updated Branches:
  refs/heads/master 28f5a9ab6 -> 9628f9bb5


DRILL-1411 Fragment memory planning


Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/0988c08d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/0988c08d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/0988c08d

Branch: refs/heads/master
Commit: 0988c08d29c59c0447b078fc60bb4d01e14fbcbc
Parents: 28f5a9a
Author: Parth Chandra <pchan...@maprtech.com>
Authored: Sat Sep 13 20:29:47 2014 -0700
Committer: Steven Phillips <sphill...@maprtech.com>
Committed: Tue Sep 23 22:22:43 2014 -0700

----------------------------------------------------------------------
 .../java/io/netty/buffer/FakeAllocator.java     |  22 ++-
 .../org/apache/drill/exec/memory/Accountor.java | 113 ++++++++++-
 .../drill/exec/memory/AtomicRemainder.java      |  35 +++-
 .../drill/exec/memory/BufferAllocator.java      |  21 ++-
 .../drill/exec/memory/TopLevelAllocator.java    |  71 +++++--
 .../apache/drill/exec/ops/FragmentContext.java  |  29 ++-
 .../apache/drill/exec/ops/OperatorContext.java  |  11 +-
 .../drill/exec/physical/impl/BaseRootExec.java  |  10 +-
 .../drill/exec/physical/impl/ScanBatch.java     |   3 +-
 .../exec/physical/impl/SingleSenderCreator.java |   4 +-
 .../BroadcastSenderRootExec.java                |   4 +-
 .../impl/mergereceiver/MergingRecordBatch.java  |   5 +-
 .../PartitionSenderRootExec.java                |   4 +-
 .../UnorderedReceiverBatch.java                 |   8 +-
 .../physical/impl/xsort/ExternalSortBatch.java  |   3 +-
 .../drill/exec/record/AbstractRecordBatch.java  |  12 +-
 .../exec/store/parquet/ParquetRecordWriter.java |   2 +-
 .../exec/work/batch/SpoolingRawBatchBuffer.java |   2 +-
 .../drill/exec/work/foreman/QueryManager.java   |   4 +-
 .../drill/exec/memory/TestAllocators.java       | 189 +++++++++++++++++++
 .../test/resources/physical_allocator_test.json |  50 +++++
 21 files changed, 550 insertions(+), 52 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0988c08d/exec/java-exec/src/main/java/io/netty/buffer/FakeAllocator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/io/netty/buffer/FakeAllocator.java 
b/exec/java-exec/src/main/java/io/netty/buffer/FakeAllocator.java
index bc69577..9ebf3e7 100644
--- a/exec/java-exec/src/main/java/io/netty/buffer/FakeAllocator.java
+++ b/exec/java-exec/src/main/java/io/netty/buffer/FakeAllocator.java
@@ -20,6 +20,7 @@ package io.netty.buffer;
 import org.apache.drill.exec.memory.Accountor;
 import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.memory.OutOfMemoryException;
+import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
 
 class FakeAllocator implements BufferAllocator {
@@ -45,7 +46,8 @@ class FakeAllocator implements BufferAllocator {
   }
 
   @Override
-  public BufferAllocator getChildAllocator(FragmentHandle handle, long 
initialReservation, long maximumReservation)
+  public BufferAllocator getChildAllocator(FragmentContext context, long 
initialReservation, long maximumReservation,
+                                           boolean applyFragmentLimit)
       throws OutOfMemoryException {
     throw new UnsupportedOperationException();
   }
@@ -66,6 +68,21 @@ class FakeAllocator implements BufferAllocator {
   }
 
   @Override
+  public void resetFragmentLimits() {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void setFragmentLimit(long l) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public long getFragmentLimit(){
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
   public void close() {
   }
 
@@ -77,7 +94,7 @@ class FakeAllocator implements BufferAllocator {
   static class FakeAccountor extends Accountor {
 
     public FakeAccountor() {
-      super(false, null, null, 0, 0);
+      super(false, null, null, 0, 0, true);
     }
 
     @Override
@@ -131,7 +148,6 @@ class FakeAllocator implements BufferAllocator {
     }
 
 
-
   }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0988c08d/exec/java-exec/src/main/java/org/apache/drill/exec/memory/Accountor.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/memory/Accountor.java 
b/exec/java-exec/src/main/java/org/apache/drill/exec/memory/Accountor.java
index d11f224..6ef46de 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/memory/Accountor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/memory/Accountor.java
@@ -20,10 +20,13 @@ package org.apache.drill.exec.memory;
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.DrillBuf;
 
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.List;
 import java.util.concurrent.ConcurrentMap;
 
+import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
 import org.apache.drill.exec.proto.helper.QueryIdHelper;
 import org.apache.drill.exec.util.AssertionUtil;
@@ -40,22 +43,41 @@ public class Accountor {
   private final long total;
   private ConcurrentMap<ByteBuf, DebugStackTrace> buffers = 
Maps.newConcurrentMap();
   private final FragmentHandle handle;
+  private String fragmentStr;
   private Accountor parent;
   private final boolean errorOnLeak;
+  private final boolean applyFragmentLimit;
 
-  public Accountor(boolean errorOnLeak, FragmentHandle handle, Accountor 
parent, long max, long preAllocated) {
+  private final FragmentContext fragmentContext;
+  long fragmentLimit;
+
+  // The top level Allocator has an accountor that keeps track of all the 
FragmentContexts currently executing.
+  // This enables the top level accountor to calculate a new fragment limit 
whenever necessary.
+  private final List<FragmentContext> fragmentContexts;
+
+  public Accountor(boolean errorOnLeak, FragmentContext context, Accountor 
parent, long max, long preAllocated, boolean applyFragLimit) {
     // TODO: fix preallocation stuff
     this.errorOnLeak = errorOnLeak;
     AtomicRemainder parentRemainder = parent != null ? parent.remainder : null;
     this.parent = parent;
-    this.remainder = new AtomicRemainder(errorOnLeak, parentRemainder, max, 
preAllocated);
+    this.applyFragmentLimit=applyFragLimit;
+    this.remainder = new AtomicRemainder(errorOnLeak, parentRemainder, max, 
preAllocated, applyFragmentLimit);
     this.total = max;
-    this.handle = handle;
+    this.fragmentContext=context;
+    this.handle = (context!=null) ? context.getHandle() : null;
+    this.fragmentStr= (handle!=null) ? ( 
handle.getMajorFragmentId()+":"+handle.getMinorFragmentId() ) : "0:0";
+    this.fragmentLimit=this.total; // Allow as much as possible to start with;
     if (ENABLE_ACCOUNTING) {
       buffers = Maps.newConcurrentMap();
     } else {
       buffers = null;
     }
+    this.fragmentContexts = new ArrayList<FragmentContext>();
+    if(parent!=null && parent.parent==null){ // Only add the fragment context 
to the fragment level accountor
+      synchronized(this) {
+        addFragmentContext(this.fragmentContext);
+      }
+    }
   }
 
   public boolean transferTo(Accountor target, DrillBuf buf, long size) {
@@ -65,7 +87,6 @@ public class Accountor {
     if (ENABLE_ACCOUNTING) {
       target.buffers.put(buf, new DebugStackTrace(buf.capacity(), 
Thread.currentThread().getStackTrace()));
     }
-
     return withinLimit;
   }
 
@@ -77,7 +98,7 @@ public class Accountor {
   }
 
   public long getCapacity() {
-    return total;
+    return fragmentLimit;
   }
 
   public long getAllocation() {
@@ -85,7 +106,8 @@ public class Accountor {
   }
 
   public boolean reserve(long size) {
-    return remainder.get(size);
+    logger.debug("Fragment:"+fragmentStr+" Reserved "+size+" bytes. Total 
Allocated: "+getAllocation());
+    return remainder.get(size, this.applyFragmentLimit);
   }
 
   public boolean forceAdditionalReservation(long size) {
@@ -135,7 +157,69 @@ public class Accountor {
     }
   }
 
+  private void addFragmentContext(FragmentContext c) {
+    if (parent != null){
+      parent.addFragmentContext(c);
+    }else {
+      logger.debug("Fragment "+fragmentStr+" added to root accountor");
+      synchronized(this) {
+        fragmentContexts.add(c);
+      }
+    }
+  }
+
+  private void removeFragmentContext(FragmentContext c) {
+    if (parent != null){
+      if (parent.parent==null){
+        // only fragment level allocators will have the fragment context saved
+        parent.removeFragmentContext(c);
+      }
+    }else{
+      logger.debug("Fragment "+fragmentStr+" removed from root accountor");
+      synchronized(this) {
+        fragmentContexts.remove(c);
+      }
+    }
+  }
+
+  public long resetFragmentLimits(){
+    // returns the new capacity
+    if(parent!=null){
+      parent.resetFragmentLimits();
+    }else {
+      //Get remaining memory available per fragment and distribute it EQUALLY 
among all the fragments.
+      //Fragments get the memory limit added to the amount already allocated.
+      //This favours fragments that are already running which will get a limit 
greater than newly started fragments.
+      //If the already running fragments end quickly, their limits will be 
assigned back to the remaining fragments
+      //quickly. If they are long running, then we want to favour them with 
larger limits anyway.
+      synchronized (this) {
+        int nFragments=fragmentContexts.size();
+        if(nFragments==0) {
+          nFragments = 1;
+        }
+        long allocatedMemory=0;
+        for(FragmentContext fragment: fragmentContexts){
+          BufferAllocator a = fragment.getAllocator();
+          if(a!=null) {
+            allocatedMemory += fragment.getAllocator().getAllocatedMemory();
+          }
+        }
+        long rem=(total-allocatedMemory)/nFragments;
+        for(FragmentContext fragment: fragmentContexts){
+          fragment.setFragmentLimit(rem);
+        }
+      }
+    }
+    return getCapacity();
+  }
+
   public void close() {
+    // remove the fragment context and reset fragment limits whenever an 
allocator closes
+    if(parent!=null && parent.parent==null) {
+      logger.debug("Fragment " + fragmentStr + "  accountor being closed");
+      removeFragmentContext(fragmentContext);
+    }
+    resetFragmentLimits();
 
     if (ENABLE_ACCOUNTING && !buffers.isEmpty()) {
       StringBuffer sb = new StringBuffer();
@@ -184,6 +268,23 @@ public class Accountor {
 
   }
 
+  public void setFragmentLimit(long add) {
+    // We ADD the limit to the current allocation. If none has been allocated, 
this
+    // sets a new limit. If memory has already been allocated, the fragment 
gets its
+    // limit based on the allocation, though this might still result in 
reducing the
+    // limit.
+
+    if (parent != null && parent.parent==null) { // This is a fragment level 
accountor
+      this.fragmentLimit=getAllocation()+add;
+      this.remainder.setLimit(this.fragmentLimit);
+      logger.debug("Fragment "+fragmentStr+" memory limit set to 
"+this.fragmentLimit);
+    }
+  }
+
+  public long getFragmentLimit(){
+    return this.fragmentLimit;
+  }
+
   public class DebugStackTrace {
 
     private StackTraceElement[] elements;

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0988c08d/exec/java-exec/src/main/java/org/apache/drill/exec/memory/AtomicRemainder.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/memory/AtomicRemainder.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/memory/AtomicRemainder.java
index 263caa0..6a87ab4 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/memory/AtomicRemainder.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/memory/AtomicRemainder.java
@@ -35,10 +35,13 @@ public class AtomicRemainder {
   private final long initTotal;
   private final long initShared;
   private final long initPrivate;
+  private long limit;       // An Allocator can set a variable limit less than 
or equal to the initTotal
+  private boolean hasLimit; // True for Atomic Remainders associated with a 
Fragment. May be true for Operator Level allocators some day.
   private boolean closed = false;
   private final boolean errorOnLeak;
+  private final boolean applyFragmentLimit;
 
-  public AtomicRemainder(boolean errorOnLeak, AtomicRemainder parent, long 
max, long pre) {
+  public AtomicRemainder(boolean errorOnLeak, AtomicRemainder parent, long 
max, long pre, boolean applyFragLimit) {
     this.errorOnLeak = errorOnLeak;
     this.parent = parent;
     this.availableShared = new AtomicLong(max - pre);
@@ -46,6 +49,9 @@ public class AtomicRemainder {
     this.initTotal = max;
     this.initShared = max - pre;
     this.initPrivate = pre;
+    this.limit = max;
+    this.hasLimit=false;
+    this.applyFragmentLimit=applyFragLimit; // If this is an operator that is 
exempt from the fragment limit, set this to false.
 //    logger.info("new AtomicRemainder. a.s. {} a.p. {} hashcode {}", 
availableShared, availablePrivate, hashCode(), new Exception());
   }
 
@@ -58,13 +64,25 @@ public class AtomicRemainder {
   }
 
   /**
+   * Allow an allocator to constrain the remainder to a particular limit that 
is lower than the initTotal.
+   * If limit is larger than initTotal, then the function will do nothing and 
the hasLimit flag will not be set.
+   * @param limit
+   */
+  public void setLimit(long limit) {
+    if(limit<initTotal){
+      this.hasLimit=true;
+      this.limit=limit;
+    }
+
+  }
+  /**
    * Automatically allocate memory. This is used when an actual allocation 
happened to be larger than requested. This
    * memory has already been used up so it must be accurately accounted for in 
future allocations.
    *
    * @param size
    */
   public boolean forceGet(long size) {
-    if (get(size)) {
+    if (get(size, this.applyFragmentLimit)) {
       return true;
     } else {
       availableShared.addAndGet(size);
@@ -75,12 +93,19 @@ public class AtomicRemainder {
     }
   }
 
-  public boolean get(long size) {
+  public boolean get(long size, boolean applyFragmentLimitForChild) {
     if (availablePrivate.get() < 1) {
       // if there is no preallocated memory, we can operate normally.
 
       // if there is a parent allocator, check it before allocating.
-      if (parent != null && !parent.get(size)) {
+      if (parent != null && !parent.get(size, this.applyFragmentLimit)) {
+        return false;
+      }
+
+      // If we need to allocate memory beyond the allowed Fragment Limit
+      if(applyFragmentLimitForChild && this.applyFragmentLimit && 
this.hasLimit && (getUsed()+size > this.limit)){
+        logger.debug("No more memory. Fragment limit ("+this.limit +
+          " bytes) reached. Trying to allocate "+size+ " bytes. "+getUsed()+" 
bytes already allocated.");
         return false;
       }
 
@@ -111,7 +136,7 @@ public class AtomicRemainder {
 
         long additionalSpaceNeeded = -unaccount;
         // if there is a parent allocator, check it before allocating.
-        if (parent != null && !parent.get(additionalSpaceNeeded)) {
+        if (parent != null && !parent.get(additionalSpaceNeeded, 
this.applyFragmentLimit)) {
           // parent allocation failed, return space to private pool.
           availablePrivate.getAndAdd(size);
           return false;

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0988c08d/exec/java-exec/src/main/java/org/apache/drill/exec/memory/BufferAllocator.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/memory/BufferAllocator.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/memory/BufferAllocator.java
index 8971eea..e072126 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/memory/BufferAllocator.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/memory/BufferAllocator.java
@@ -22,6 +22,7 @@ import io.netty.buffer.DrillBuf;
 
 import java.io.Closeable;
 
+import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
 
 /**
@@ -52,8 +53,8 @@ public interface BufferAllocator extends Closeable {
 
   public abstract ByteBufAllocator getUnderlyingAllocator();
 
-  public abstract BufferAllocator getChildAllocator(FragmentHandle handle, 
long initialReservation,
-      long maximumReservation) throws OutOfMemoryException;
+  public abstract BufferAllocator getChildAllocator(FragmentContext context, 
long initialReservation,
+      long maximumReservation, boolean applyFragmentLimit) throws 
OutOfMemoryException;
 
   /**
    * Take over ownership of fragment accounting.  Always takes over ownership.
@@ -65,6 +66,22 @@ public interface BufferAllocator extends Closeable {
 
   public PreAllocator getNewPreAllocator();
 
+  //public void addFragmentContext(FragmentContext c);
+
+  /**
+   * For Top Level Allocators. Reset the fragment limits for all allocators
+   */
+  public void resetFragmentLimits();
+
+  /**
+   * For Child allocators to set the Fragment limit for the corresponding 
fragment allocator.
+   * @param l the new fragment limit
+   */
+  public void setFragmentLimit(long l);
+
+  public long getFragmentLimit();
+
+
   /**
    * Not thread safe.
    */

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0988c08d/exec/java-exec/src/main/java/org/apache/drill/exec/memory/TopLevelAllocator.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/memory/TopLevelAllocator.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/memory/TopLevelAllocator.java
index a8e8a28..583b388 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/memory/TopLevelAllocator.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/memory/TopLevelAllocator.java
@@ -22,13 +22,16 @@ import io.netty.buffer.DrillBuf;
 import io.netty.buffer.PooledByteBufAllocatorL;
 import io.netty.buffer.UnsafeDirectLittleEndian;
 
-import java.util.HashMap;
 import java.util.IdentityHashMap;
+import java.util.HashMap;
 import java.util.Map;
+
 import java.util.Map.Entry;
 
 import org.apache.drill.common.config.DrillConfig;
 import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.ops.FragmentContext;
+
 import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
 import org.apache.drill.exec.util.AssertionUtil;
 
@@ -42,7 +45,6 @@ public class TopLevelAllocator implements BufferAllocator {
   private final boolean errorOnLeak;
   private final DrillBuf empty;
 
-
   @Deprecated
   public TopLevelAllocator() {
     this(DrillConfig.getMaxDirectMemory());
@@ -53,9 +55,9 @@ public class TopLevelAllocator implements BufferAllocator {
     this(maximumAllocation, true);
   }
 
-  private TopLevelAllocator(long maximumAllocation, boolean errorOnLeak) {
+  private TopLevelAllocator(long maximumAllocation, boolean errorOnLeak){
     this.errorOnLeak = errorOnLeak;
-    this.acct = new Accountor(errorOnLeak, null, null, maximumAllocation, 0);
+    this.acct = new Accountor(errorOnLeak, null, null, maximumAllocation, 0, 
true);
     this.empty = DrillBuf.getEmpty(this, acct);
     this.childrenMap = ENABLE_ACCOUNTING ? new IdentityHashMap<ChildAllocator, 
StackTraceElement[]>() : null;
   }
@@ -100,13 +102,14 @@ public class TopLevelAllocator implements BufferAllocator 
{
   }
 
   @Override
-  public BufferAllocator getChildAllocator(FragmentHandle handle, long 
initialReservation, long maximumReservation) throws OutOfMemoryException {
-    if(!acct.reserve(initialReservation)) {
+  public BufferAllocator getChildAllocator(FragmentContext context, long 
initialReservation, long maximumReservation, boolean applyFragmentLimit) throws 
OutOfMemoryException {
+    if(!acct.reserve(initialReservation)){
+      logger.debug(String.format("You attempted to create a new child 
allocator with initial reservation %d but only %d bytes of memory were 
available.", initialReservation, acct.getCapacity() - acct.getAllocation()));
       throw new OutOfMemoryException(String.format("You attempted to create a 
new child allocator with initial reservation %d but only %d bytes of memory 
were available.", initialReservation, acct.getCapacity() - 
acct.getAllocation()));
     };
     logger.debug("New child allocator with initial reservation {}", 
initialReservation);
-    ChildAllocator allocator = new ChildAllocator(handle, acct, 
maximumReservation, initialReservation, childrenMap);
-    if (ENABLE_ACCOUNTING) {
+    ChildAllocator allocator = new ChildAllocator(context, acct, 
maximumReservation, initialReservation, childrenMap, applyFragmentLimit);
+    if(ENABLE_ACCOUNTING){
       childrenMap.put(allocator, Thread.currentThread().getStackTrace());
     }
 
@@ -114,6 +117,21 @@ public class TopLevelAllocator implements BufferAllocator {
   }
 
   @Override
+  public void resetFragmentLimits() {
+    acct.resetFragmentLimits();
+  }
+
+  @Override
+  public void setFragmentLimit(long limit){
+    acct.setFragmentLimit(limit);
+  }
+
+  @Override
+  public long getFragmentLimit(){
+    return acct.getFragmentLimit();
+  }
+
+  @Override
   public void close() {
     if (ENABLE_ACCOUNTING) {
       for (Entry<ChildAllocator, StackTraceElement[]> child : 
childrenMap.entrySet()) {
@@ -147,12 +165,22 @@ public class TopLevelAllocator implements BufferAllocator 
{
     private Map<ChildAllocator, StackTraceElement[]> children = new 
HashMap<>();
     private boolean closed = false;
     private FragmentHandle handle;
+    private FragmentContext fragmentContext;
     private Map<ChildAllocator, StackTraceElement[]> thisMap;
-
-    public ChildAllocator(FragmentHandle handle, Accountor parentAccountor, 
long max, long pre, Map<ChildAllocator, StackTraceElement[]> map) throws 
OutOfMemoryException{
+    private boolean applyFragmentLimit;
+
+    public ChildAllocator(FragmentContext context,
+                          Accountor parentAccountor,
+                          long max,
+                          long pre,
+                          Map<ChildAllocator,
+                          StackTraceElement[]> map,
+                          boolean applyFragmentLimit) throws 
OutOfMemoryException{
       assert max >= pre;
-      childAcct = new Accountor(errorOnLeak, handle, parentAccountor, max, 
pre);
-      this.handle = handle;
+      this.applyFragmentLimit=applyFragmentLimit;
+      childAcct = new Accountor(errorOnLeak, context, parentAccountor, max, 
pre, applyFragmentLimit);
+      this.fragmentContext=context;
+      this.handle = context.getHandle();
       thisMap = map;
       this.empty = DrillBuf.getEmpty(this, childAcct);
     }
@@ -188,13 +216,13 @@ public class TopLevelAllocator implements BufferAllocator 
{
     }
 
     @Override
-    public BufferAllocator getChildAllocator(FragmentHandle handle, long 
initialReservation, long maximumReservation)
+    public BufferAllocator getChildAllocator(FragmentContext context, long 
initialReservation, long maximumReservation, boolean applyFragmentLimit)
         throws OutOfMemoryException {
       if (!childAcct.reserve(initialReservation)) {
         throw new OutOfMemoryException(String.format("You attempted to create 
a new child allocator with initial reservation %d but only %d bytes of memory 
were available.", initialReservation, childAcct.getAvailable()));
       };
       logger.debug("New child allocator with initial reservation {}", 
initialReservation);
-      ChildAllocator newChildAllocator = new ChildAllocator(handle, childAcct, 
maximumReservation, initialReservation, null);
+      ChildAllocator newChildAllocator = new ChildAllocator(context, 
childAcct, maximumReservation, initialReservation, null, applyFragmentLimit);
       this.children.put(newChildAllocator, 
Thread.currentThread().getStackTrace());
       return newChildAllocator;
     }
@@ -204,6 +232,21 @@ public class TopLevelAllocator implements BufferAllocator {
     }
 
     @Override
+    public void resetFragmentLimits(){
+      childAcct.resetFragmentLimits();
+    }
+
+    @Override
+    public void setFragmentLimit(long limit){
+      childAcct.setFragmentLimit(limit);
+    }
+
+    @Override
+    public long getFragmentLimit(){
+      return childAcct.getFragmentLimit();
+    }
+
+    @Override
     public void close() {
       if (ENABLE_ACCOUNTING) {
         if (thisMap != null) {

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0988c08d/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java 
b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
index a888ea7..04e1937 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
@@ -104,7 +104,15 @@ public class FragmentContext implements Closeable {
     } catch (Exception e) {
       throw new ExecutionSetupException("Failure while reading plan options.", 
e);
     }
-    this.allocator = 
context.getAllocator().getChildAllocator(fragment.getHandle(), 
fragment.getMemInitial(), fragment.getMemMax());
+    // Add the fragment context to the root allocator.
+    // The QueryManager will call the root allocator to recalculate all the 
memory limits for all the fragments
+    try {
+      this.allocator = context.getAllocator().getChildAllocator(this, 
fragment.getMemInitial(), fragment.getMemMax(), true);
+      assert (allocator != null);
+    }catch(Throwable e){
+      throw new ExecutionSetupException("Failure while getting memory 
allocator for fragment.", e);
+    }
+
     this.loader = new QueryClassLoader(dbContext.getConfig(), fragmentOptions);
   }
 
@@ -176,11 +184,18 @@ public class FragmentContext implements Closeable {
    */
   @Deprecated
   public BufferAllocator getAllocator() {
+    if(allocator == null){
+      FragmentHandle handle=getHandle();
+      String 
frag=handle!=null?handle.getMajorFragmentId()+":"+handle.getMinorFragmentId():"0:0";
+      logger.debug("Fragment:"+frag+" Allocator is NULL");
+    }
     return allocator;
   }
 
-  public BufferAllocator getNewChildAllocator(long initialReservation, long 
maximumReservation) throws OutOfMemoryException {
-    return allocator.getChildAllocator(getHandle(), initialReservation, 
maximumReservation);
+  public BufferAllocator getNewChildAllocator(long initialReservation,
+                                              long maximumReservation,
+                                              boolean applyFragmentLimit) 
throws OutOfMemoryException {
+    return allocator.getChildAllocator(this, initialReservation, 
maximumReservation, applyFragmentLimit);
   }
 
   public <T> T getImplementationClass(ClassGenerator<T> cg) throws 
ClassTransformationException, IOException {
@@ -252,6 +267,10 @@ public class FragmentContext implements Closeable {
     return context.getConfig();
   }
 
+  public void setFragmentLimit(long limit) {
+    this.allocator.setFragmentLimit(limit);
+  }
+
   @Override
   public void close() {
     for (Thread thread: daemonThreads) {
@@ -267,7 +286,11 @@ public class FragmentContext implements Closeable {
     if (buffers != null) {
       buffers.close();
     }
+
+    FragmentHandle handle=getHandle();
+    String 
frag=handle!=null?handle.getMajorFragmentId()+":"+handle.getMinorFragmentId():"0:0";
     allocator.close();
+    logger.debug("Fragment:"+frag+" After close allocator is: 
"+allocator!=null?"OK":"NULL");
   }
 
   public DrillBuf replace(DrillBuf old, int newSize) {

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0988c08d/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContext.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContext.java 
b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContext.java
index 54edf88..ccafa67 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContext.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContext.java
@@ -36,17 +36,20 @@ public class OperatorContext implements Closeable {
   private PhysicalOperator popConfig;
   private OperatorStats stats;
   private LongObjectOpenHashMap<DrillBuf> managedBuffers = new 
LongObjectOpenHashMap<>();
+  private final boolean applyFragmentLimit;
 
-  public OperatorContext(PhysicalOperator popConfig, FragmentContext context) 
throws OutOfMemoryException {
-    this.allocator = 
context.getNewChildAllocator(popConfig.getInitialAllocation(), 
popConfig.getMaxAllocation());
+  public OperatorContext(PhysicalOperator popConfig, FragmentContext context, 
boolean applyFragmentLimit) throws OutOfMemoryException {
+    this.applyFragmentLimit=applyFragmentLimit;
+    this.allocator = 
context.getNewChildAllocator(popConfig.getInitialAllocation(), 
popConfig.getMaxAllocation(), applyFragmentLimit);
     this.popConfig = popConfig;
 
     OpProfileDef def = new OpProfileDef(popConfig.getOperatorId(), 
popConfig.getOperatorType(), getChildCount(popConfig));
     this.stats = context.getStats().getOperatorStats(def, allocator);
   }
 
-  public OperatorContext(PhysicalOperator popConfig, FragmentContext context, 
OperatorStats stats) throws OutOfMemoryException {
-    this.allocator = 
context.getNewChildAllocator(popConfig.getInitialAllocation(), 
popConfig.getMaxAllocation());
+  public OperatorContext(PhysicalOperator popConfig, FragmentContext context, 
OperatorStats stats, boolean applyFragmentLimit) throws OutOfMemoryException {
+    this.applyFragmentLimit=applyFragmentLimit;
+    this.allocator = 
context.getNewChildAllocator(popConfig.getInitialAllocation(), 
popConfig.getMaxAllocation(), applyFragmentLimit);
     this.popConfig = popConfig;
     this.stats     = stats;
   }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0988c08d/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BaseRootExec.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BaseRootExec.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BaseRootExec.java
index c2c3144..412da85 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BaseRootExec.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BaseRootExec.java
@@ -33,13 +33,21 @@ public abstract class BaseRootExec implements RootExec {
   protected OperatorContext oContext = null;
 
   public BaseRootExec(FragmentContext context, PhysicalOperator config) throws 
OutOfMemoryException {
-    this.oContext = new OperatorContext(config, context, stats);
+    this.oContext = new OperatorContext(config, context, stats, true);
     stats = new OperatorStats(new OpProfileDef(config.getOperatorId(),
         config.getOperatorType(), OperatorContext.getChildCount(config)),
         oContext.getAllocator());
     context.getStats().addOperatorStats(this.stats);
   }
 
+  public BaseRootExec(FragmentContext context, OperatorContext oContext, 
PhysicalOperator config) throws OutOfMemoryException {
+    this.oContext = oContext;
+    stats = new OperatorStats(new OpProfileDef(config.getOperatorId(),
+      config.getOperatorType(), OperatorContext.getChildCount(config)),
+      oContext.getAllocator());
+    context.getStats().addOperatorStats(this.stats);
+  }
+
   @Override
   public final boolean next() {
     // Stats should have been initialized

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0988c08d/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
index 2712e27..ad8bf96 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
@@ -87,7 +87,8 @@ public class ScanBatch implements RecordBatch {
       throw new ExecutionSetupException("A scan batch must contain at least 
one reader.");
     }
     this.currentReader = readers.next();
-    this.oContext = new OperatorContext(subScanConfig, context);
+    // Scan Batch is not subject to fragment memory limit
+    this.oContext = new OperatorContext(subScanConfig, context, false);
     this.currentReader.setOperatorContext(this.oContext);
     this.currentReader.setup(mutator);
     this.partitionColumns = partitionColumns.iterator();

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0988c08d/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java
index 352deae..34196b7 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java
@@ -25,6 +25,7 @@ import 
org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.exec.memory.OutOfMemoryException;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.ops.MetricDef;
+import org.apache.drill.exec.ops.OperatorContext;
 import org.apache.drill.exec.physical.config.SingleSender;
 import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
 import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
@@ -67,7 +68,8 @@ public class SingleSenderCreator implements 
RootCreator<SingleSender>{
     }
 
     public SingleSenderRootExec(FragmentContext context, RecordBatch batch, 
SingleSender config) throws OutOfMemoryException {
-      super(context, config);
+      super(context, new OperatorContext(config, context, null, false), 
config);
+      //super(context, config);
       this.incoming = batch;
       assert(incoming != null);
       this.handle = context.getHandle();

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0988c08d/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/broadcastsender/BroadcastSenderRootExec.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/broadcastsender/BroadcastSenderRootExec.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/broadcastsender/BroadcastSenderRootExec.java
index d09559d..c594e70 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/broadcastsender/BroadcastSenderRootExec.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/broadcastsender/BroadcastSenderRootExec.java
@@ -25,6 +25,7 @@ import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.memory.OutOfMemoryException;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.ops.MetricDef;
+import org.apache.drill.exec.ops.OperatorContext;
 import org.apache.drill.exec.physical.config.BroadcastSender;
 import org.apache.drill.exec.physical.impl.BaseRootExec;
 import org.apache.drill.exec.physical.impl.SendingAccountor;
@@ -67,7 +68,8 @@ public class BroadcastSenderRootExec extends BaseRootExec {
   public BroadcastSenderRootExec(FragmentContext context,
                                  RecordBatch incoming,
                                  BroadcastSender config) throws 
OutOfMemoryException {
-    super(context, config);
+    super(context, new OperatorContext(config, context, null, false), config);
+    //super(context, config);
     this.ok = true;
     this.context = context;
     this.incoming = incoming;

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0988c08d/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
index 29fd80f..ed49cf1 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
@@ -44,6 +44,7 @@ import org.apache.drill.exec.expr.fn.FunctionGenerationHelper;
 import org.apache.drill.exec.memory.OutOfMemoryException;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.ops.MetricDef;
+import org.apache.drill.exec.ops.OperatorContext;
 import org.apache.drill.exec.physical.config.MergingReceiverPOP;
 import org.apache.drill.exec.proto.BitControl.FinishedReceiver;
 import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
@@ -120,8 +121,8 @@ public class MergingRecordBatch extends 
AbstractRecordBatch<MergingReceiverPOP>
   public MergingRecordBatch(FragmentContext context,
                             MergingReceiverPOP config,
                             RawFragmentBatchProvider[] fragProviders) throws 
OutOfMemoryException {
-
-    super(config, context);
+    super(config, context, new OperatorContext(config, context, false));
+    //super(config, context);
     this.fragProviders = fragProviders;
     this.context = context;
     this.outgoingContainer = new VectorContainer();

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0988c08d/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
index 6ff0418..2c3e85a 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
@@ -33,6 +33,7 @@ import org.apache.drill.exec.expr.ExpressionTreeMaterializer;
 import org.apache.drill.exec.memory.OutOfMemoryException;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.ops.MetricDef;
+import org.apache.drill.exec.ops.OperatorContext;
 import org.apache.drill.exec.physical.config.HashPartitionSender;
 import org.apache.drill.exec.physical.impl.BaseRootExec;
 import org.apache.drill.exec.physical.impl.SendingAccountor;
@@ -87,7 +88,8 @@ public class PartitionSenderRootExec extends BaseRootExec {
   public PartitionSenderRootExec(FragmentContext context,
                                  RecordBatch incoming,
                                  HashPartitionSender operator) throws 
OutOfMemoryException {
-    super(context, operator);
+    super(context, new OperatorContext(operator, context, null, false), 
operator);
+    //super(context, operator);
     this.incoming = incoming;
     this.operator = operator;
     this.context = context;

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0988c08d/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java
index 8e7d9c6..364fc4f 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java
@@ -27,8 +27,9 @@ import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.memory.OutOfMemoryException;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.ops.MetricDef;
-import org.apache.drill.exec.ops.OpProfileDef;
+import org.apache.drill.exec.ops.OperatorContext;
 import org.apache.drill.exec.ops.OperatorStats;
+import org.apache.drill.exec.ops.OpProfileDef;
 import org.apache.drill.exec.physical.config.UnorderedReceiver;
 import org.apache.drill.exec.proto.BitControl.FinishedReceiver;
 import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
@@ -58,6 +59,7 @@ public class UnorderedReceiverBatch implements RecordBatch {
   private OperatorStats stats;
   private boolean first = true;
   private UnorderedReceiver config;
+  OperatorContext oContext;
 
   public enum Metric implements MetricDef {
     BYTES_RECEIVED,
@@ -74,7 +76,8 @@ public class UnorderedReceiverBatch implements RecordBatch {
     this.context = context;
     // In normal case, batchLoader does not require an allocator. However, in 
case of splitAndTransfer of a value vector,
     // we may need an allocator for the new offset vector. Therefore, here we 
pass the context's allocator to batchLoader.
-    this.batchLoader = new RecordBatchLoader(context.getAllocator());
+    oContext = new OperatorContext(config, context, false);
+    this.batchLoader = new RecordBatchLoader(oContext.getAllocator());
 
     this.stats = context.getStats().getOperatorStats(new 
OpProfileDef(config.getOperatorId(), config.getOperatorType(), 1), null);
     this.stats.setLongStat(Metric.NUM_SENDERS, config.getNumSenders());
@@ -194,6 +197,7 @@ public class UnorderedReceiverBatch implements RecordBatch {
   public void cleanup() {
     batchLoader.clear();
     fragProvider.cleanup();
+    oContext.close();
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0988c08d/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
index 52249e9..9c48838 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
@@ -128,7 +128,7 @@ public class ExternalSortBatch extends 
AbstractRecordBatch<ExternalSort> {
     SPILL_DIRECTORIES = 
config.getStringList(ExecConstants.EXTERNAL_SORT_SPILL_DIRS);
     dirs = Iterators.cycle(Lists.newArrayList(SPILL_DIRECTORIES));
     uid = System.nanoTime();
-    copierAllocator = 
oContext.getAllocator().getChildAllocator(context.getHandle(), 10000000, 
20000000);
+    copierAllocator = oContext.getAllocator().getChildAllocator(context, 
10000000, 20000000, true);
   }
 
   @Override
@@ -286,6 +286,7 @@ public class ExternalSortBatch extends 
AbstractRecordBatch<ExternalSort> {
           batchesSinceLastSpill++;
           if ((spillCount > 0 && totalSizeInMemory > .75 * highWaterMark) ||
                   (totalSizeInMemory > .95 * popConfig.getMaxAllocation()) ||
+                  (totalSizeInMemory > .95 * 
oContext.getAllocator().getFragmentLimit()) ||
                   (batchGroups.size() > SPILL_THRESHOLD && 
batchesSinceLastSpill >= SPILL_BATCH_GROUP_SIZE)) {
             mergeAndSpill();
             batchesSinceLastSpill = 0;

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0988c08d/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java
index e8ad311..a835bee 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java
@@ -41,11 +41,21 @@ public abstract class AbstractRecordBatch<T extends 
PhysicalOperator> implements
     super();
     this.context = context;
     this.popConfig = popConfig;
-    this.oContext = new OperatorContext(popConfig, context);
+    this.oContext = new OperatorContext(popConfig, context, true);
     this.stats = oContext.getStats();
     this.container = new VectorContainer(this.oContext);
   }
 
+  protected AbstractRecordBatch(T popConfig, FragmentContext context, 
OperatorContext oContext) throws OutOfMemoryException {
+    super();
+    this.context = context;
+    this.popConfig = popConfig;
+    this.oContext = oContext;
+    this.stats = oContext.getStats();
+    this.container = new VectorContainer(this.oContext);
+  }
+
+
   @Override
   public Iterator<VectorWrapper<?>> iterator() {
     return container.iterator();

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0988c08d/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java
index 5a6ba80..cdb4ba0 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java
@@ -97,7 +97,7 @@ public class ParquetRecordWriter extends 
ParquetOutputRecordWriter {
 
   public ParquetRecordWriter(FragmentContext context, ParquetWriter writer) 
throws OutOfMemoryException{
     super();
-    this.oContext=new OperatorContext(writer, context);
+    this.oContext=new OperatorContext(writer, context, true);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0988c08d/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/SpoolingRawBatchBuffer.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/SpoolingRawBatchBuffer.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/SpoolingRawBatchBuffer.java
index c7527d5..6ee93ab 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/SpoolingRawBatchBuffer.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/SpoolingRawBatchBuffer.java
@@ -78,7 +78,7 @@ public class SpoolingRawBatchBuffer implements RawBatchBuffer 
{
 
   public SpoolingRawBatchBuffer(FragmentContext context, int fragmentCount) 
throws IOException, OutOfMemoryException {
     this.context = context;
-    this.allocator = 
context.getNewChildAllocator(ALLOCATOR_INITIAL_RESERVATION, 
ALLOCATOR_MAX_RESERVATION);
+    this.allocator = 
context.getNewChildAllocator(ALLOCATOR_INITIAL_RESERVATION, 
ALLOCATOR_MAX_RESERVATION, true);
     this.threshold = 
context.getConfig().getLong(ExecConstants.SPOOLING_BUFFER_MEMORY);
     Configuration conf = new Configuration();
     conf.set(FileSystem.FS_DEFAULT_NAME_KEY, 
context.getConfig().getString(ExecConstants.TEMP_FILESYSTEM));

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0988c08d/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java
index 083dd95..a01a5f6 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java
@@ -115,8 +115,6 @@ public class QueryManager implements FragmentStatusListener{
         // if we do, record the fragment manager in the workBus.
         workBus.setRootFragmentManager(fragmentManager);
       }
-
-
     }
 
     // keep track of intermediate fragments (not root or leaf)
@@ -130,6 +128,8 @@ public class QueryManager implements FragmentStatusListener{
       sendRemoteFragment(f);
     }
 
+    bee.getContext().getAllocator().resetFragmentLimits();
+
     logger.debug("Fragment runs setup is complete.");
     running = true;
     if (cancelled && !stopped) {

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0988c08d/exec/java-exec/src/test/java/org/apache/drill/exec/memory/TestAllocators.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/exec/memory/TestAllocators.java 
b/exec/java-exec/src/test/java/org/apache/drill/exec/memory/TestAllocators.java
new file mode 100644
index 0000000..c23cc10
--- /dev/null
+++ 
b/exec/java-exec/src/test/java/org/apache/drill/exec/memory/TestAllocators.java
@@ -0,0 +1,189 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.memory;
+
+
+import com.google.common.base.Charsets;
+import com.google.common.io.Files;
+import io.netty.buffer.DrillBuf;
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.common.util.FileUtils;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.OpProfileDef;
+import org.apache.drill.exec.ops.OperatorContext;
+import org.apache.drill.exec.ops.OperatorStats;
+import org.apache.drill.exec.physical.PhysicalPlan;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.planner.PhysicalPlanReader;
+import org.apache.drill.exec.proto.BitControl;
+import org.apache.drill.exec.proto.CoordinationProtos;
+import org.apache.drill.exec.proto.UserBitShared;
+import org.apache.drill.exec.server.Drillbit;
+import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.server.RemoteServiceSet;
+import org.apache.drill.exec.store.StoragePluginRegistry;
+import org.junit.Test;
+import static org.junit.Assert.assertEquals;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.Properties;
+
+public class TestAllocators {
+
+  private static final Properties TEST_CONFIGURATIONS = new Properties() {
+    {
+      put(ExecConstants.TOP_LEVEL_MAX_ALLOC, "14000000");
+    }
+  };
+
+  static String planFile="/physical_allocator_test.json";
+
+  BufferAllocator rootAllocator;
+  DrillConfig config;
+  Drillbit bit;
+  RemoteServiceSet serviceSet;
+  DrillbitContext bitContext;
+
+  @Test
+  public void testAllocators() throws Exception {
+
+    // Setup a drillbit (initializes a root allocator)
+
+    config = DrillConfig.create(TEST_CONFIGURATIONS);
+    serviceSet = RemoteServiceSet.getLocalServiceSet();
+    bit = new Drillbit(config, serviceSet);
+    bit.run();
+    bitContext = bit.getContext();
+    FunctionImplementationRegistry functionRegistry = 
bitContext.getFunctionImplementationRegistry();
+    StoragePluginRegistry storageRegistry = new 
StoragePluginRegistry(bitContext);
+
+    // Create a few Fragment Contexts
+
+    BitControl.PlanFragment.Builder 
pfBuilder1=BitControl.PlanFragment.newBuilder();
+    pfBuilder1.setMemInitial(1500000);
+    BitControl.PlanFragment pf1=pfBuilder1.build();
+    BitControl.PlanFragment.Builder 
pfBuilder2=BitControl.PlanFragment.newBuilder();
+    pfBuilder2.setMemInitial(500000);
+    BitControl.PlanFragment pf2=pfBuilder1.build();
+
+    FragmentContext fragmentContext1 = new FragmentContext(bitContext, pf1, 
null, functionRegistry);
+    FragmentContext fragmentContext2 = new FragmentContext(bitContext, pf2, 
null, functionRegistry);
+
+    // Get a few physical operators. Easiest way is to read a physical plan.
+    PhysicalPlanReader planReader = new PhysicalPlanReader(config, 
config.getMapper(), CoordinationProtos.DrillbitEndpoint.getDefaultInstance(), 
storageRegistry);
+    PhysicalPlan plan = 
planReader.readPhysicalPlan(Files.toString(FileUtils.getResourceAsFile(planFile),
 Charsets.UTF_8));
+    List<PhysicalOperator> physicalOperators = plan.getSortedOperators();
+    Iterator<PhysicalOperator> physicalOperatorIterator = 
physicalOperators.iterator();
+
+    PhysicalOperator physicalOperator1 = physicalOperatorIterator.next();
+    PhysicalOperator physicalOperator2 = physicalOperatorIterator.next();
+    PhysicalOperator physicalOperator3 = physicalOperatorIterator.next();
+    PhysicalOperator physicalOperator4 = physicalOperatorIterator.next();
+    PhysicalOperator physicalOperator5 = physicalOperatorIterator.next();
+    PhysicalOperator physicalOperator6 = physicalOperatorIterator.next();
+
+    // Create some bogus Operator profile defs and stats to create operator 
contexts
+    OpProfileDef def;
+    OperatorStats stats;
+
+    //Use some bogus operator type to create a new operator context.
+    def = new OpProfileDef(physicalOperator1.getOperatorId(), 
UserBitShared.CoreOperatorType.MOCK_SUB_SCAN_VALUE, 
OperatorContext.getChildCount(physicalOperator1));
+    stats = fragmentContext1.getStats().getOperatorStats(def, 
fragmentContext1.getAllocator());
+
+
+    // Add a couple of Operator Contexts
+    // Initial allocation = 1000000 bytes for all operators
+    OperatorContext oContext11 = new OperatorContext(physicalOperator1, 
fragmentContext1, true);
+    DrillBuf b11=oContext11.getAllocator().buffer(1000000);
+
+    OperatorContext oContext12 = new OperatorContext(physicalOperator2, 
fragmentContext1, stats, true);
+    DrillBuf b12=oContext12.getAllocator().buffer(500000);
+
+    OperatorContext oContext21 = new OperatorContext(physicalOperator3, 
fragmentContext2, true);
+
+    def = new OpProfileDef(physicalOperator4.getOperatorId(), 
UserBitShared.CoreOperatorType.TEXT_WRITER_VALUE, 
OperatorContext.getChildCount(physicalOperator4));
+    stats = fragmentContext2.getStats().getOperatorStats(def, 
fragmentContext2.getAllocator());
+    OperatorContext oContext22 = new OperatorContext(physicalOperator4, 
fragmentContext2, stats, true);
+    DrillBuf b22=oContext22.getAllocator().buffer(2000000);
+
+    // New Fragment begins
+    BitControl.PlanFragment.Builder 
pfBuilder3=BitControl.PlanFragment.newBuilder();
+    pfBuilder3.setMemInitial(1000000);
+    BitControl.PlanFragment pf3=pfBuilder3.build();
+
+    FragmentContext fragmentContext3 = new FragmentContext(bitContext, pf3, 
null, functionRegistry);
+
+    // New fragment starts an operator that allocates an amount within the 
limit
+    def = new OpProfileDef(physicalOperator5.getOperatorId(), 
UserBitShared.CoreOperatorType.UNION_VALUE, 
OperatorContext.getChildCount(physicalOperator5));
+    stats = fragmentContext3.getStats().getOperatorStats(def, 
fragmentContext3.getAllocator());
+    OperatorContext oContext31 = new OperatorContext(physicalOperator5, 
fragmentContext3, stats, true);
+
+    DrillBuf b31a = oContext31.getAllocator().buffer(200000);
+
+    //Previously running operator completes
+    b22.release();
+    oContext22.close();
+
+    // Fragment 3 asks for more and fails
+    boolean outOfMem=false;
+    try {
+      DrillBuf b31b = oContext31.getAllocator().buffer(4000000);
+      if(b31b!=null) {
+        b31b.release();
+      }else{
+        outOfMem=true;
+      }
+    }catch(Exception e){
+      outOfMem=true;
+    }
+    assertEquals(true, (boolean)outOfMem);
+
+    // Operator is Exempt from Fragment limits. Fragment 3 asks for more and 
succeeds
+    outOfMem=false;
+    OperatorContext oContext32 = new OperatorContext(physicalOperator6, 
fragmentContext3, false);
+    DrillBuf b32=null;
+    try {
+      b32=oContext32.getAllocator().buffer(4000000);
+    }catch(Exception e){
+      outOfMem=true;
+    }finally{
+      if(b32!=null) {
+        b32.release();
+      }else{
+        outOfMem=true;
+      }
+      oContext32.close();
+    }
+    assertEquals(false, (boolean)outOfMem);
+
+    b11.release();
+    oContext11.close();
+    b12.release();
+    oContext12.close();
+    oContext21.close();
+    b31a.release();
+    oContext31.close();
+
+  }
+
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0988c08d/exec/java-exec/src/test/resources/physical_allocator_test.json
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/physical_allocator_test.json 
b/exec/java-exec/src/test/resources/physical_allocator_test.json
new file mode 100644
index 0000000..274ba88
--- /dev/null
+++ b/exec/java-exec/src/test/resources/physical_allocator_test.json
@@ -0,0 +1,50 @@
+{
+    head:{
+        type:"APACHE_DRILL_PHYSICAL",
+        version:"1",
+        generator:{
+            type:"manual"
+        }
+    },
+    graph:[
+        {
+            @id:1,
+            pop:"mock-sub-scan",
+            url: "http://apache.org";,
+            entries:[
+              {records: 100, types: [
+                {name: "blue", type: "INT", mode: "REPEATED"},
+                {name: "red", type: "BIGINT", mode: "REPEATED"},
+                {name: "green", type: "INT", mode: "REQUIRED"}
+              ]}
+            ]
+        },
+        {
+            @id:2,
+            pop: "union-exchange",
+            child: 1
+        },
+        {
+            @id:3,
+            child: 2,
+            pop:"filter",
+            expr: "b > 5",
+            selectivity: 0.8
+        },
+        {
+            @id: 4,
+            child: 3,
+            pop: "mock-store"
+        },
+        {
+            @id:5,
+            child: 4,
+            pop: "union-exchange"
+        },
+        {
+            @id: 6,
+            child: 5,
+            pop: "screen"
+        }
+    ]
+}

Reply via email to