DRILL-1411: Disable feature by default. Add an overcommit factor to the 
fragment limit. Can be enabled and overcommit factor can be set as bootstrap 
parameters.


Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/8773cb0c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/8773cb0c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/8773cb0c

Branch: refs/heads/master
Commit: 8773cb0c0616d13d055220e6372b3659ecfdfdea
Parents: 4862b2b
Author: Parth Chandra <pchan...@maprtech.com>
Authored: Mon Sep 29 16:11:14 2014 -0700
Committer: Steven Phillips <sphill...@maprtech.com>
Committed: Mon Sep 29 21:19:23 2014 -0700

----------------------------------------------------------------------
 .../java/io/netty/buffer/FakeAllocator.java     |  2 +-
 .../org/apache/drill/exec/ExecConstants.java    |  4 ++
 .../org/apache/drill/exec/memory/Accountor.java | 65 +++++++++++++++++++-
 .../drill/exec/memory/AtomicRemainder.java      | 13 +++-
 .../drill/exec/memory/TopLevelAllocator.java    | 12 ++--
 .../drill/exec/memory/TestAllocators.java       |  2 +
 6 files changed, 88 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8773cb0c/exec/java-exec/src/main/java/io/netty/buffer/FakeAllocator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/io/netty/buffer/FakeAllocator.java 
b/exec/java-exec/src/main/java/io/netty/buffer/FakeAllocator.java
index 9ebf3e7..df85e74 100644
--- a/exec/java-exec/src/main/java/io/netty/buffer/FakeAllocator.java
+++ b/exec/java-exec/src/main/java/io/netty/buffer/FakeAllocator.java
@@ -94,7 +94,7 @@ class FakeAllocator implements BufferAllocator {
   static class FakeAccountor extends Accountor {
 
     public FakeAccountor() {
-      super(false, null, null, 0, 0, true);
+      super(null, false, null, null, 0, 0, true);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8773cb0c/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java 
b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
index 617115e..f01f577 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
@@ -74,6 +74,10 @@ public interface ExecConstants {
   public static final String SYS_STORE_PROVIDER_LOCAL_PATH = 
"drill.exec.sys.store.provider.local.path";
   public static final String SYS_STORE_PROVIDER_LOCAL_ENABLE_WRITE = 
"drill.exec.sys.store.provider.local.write";
   public static final String ERROR_ON_MEMORY_LEAK = 
"drill.exec.debug.error_on_leak";
+  /** Fragment memory planning */
+  public static final String ENABLE_FRAGMENT_MEMORY_LIMIT = 
"drill.exec.memory.enable_frag_limit";
+  public static final String FRAGMENT_MEM_OVERCOMMIT_FACTOR = 
"drill.exec.memory.frag_mem_overcommit_factor";
+
 
   public static final String CLIENT_SUPPORT_COMPLEX_TYPES = 
"drill.client.supports-complex-types";
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8773cb0c/exec/java-exec/src/main/java/org/apache/drill/exec/memory/Accountor.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/memory/Accountor.java 
b/exec/java-exec/src/main/java/org/apache/drill/exec/memory/Accountor.java
index 18c5072..bd64640 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/memory/Accountor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/memory/Accountor.java
@@ -20,6 +20,7 @@ package org.apache.drill.exec.memory;
 import com.google.common.collect.LinkedListMultimap;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Multimap;
+import com.typesafe.config.ConfigException;
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.DrillBuf;
 
@@ -29,9 +30,12 @@ import java.util.Collection;
 import java.util.List;
 import java.util.concurrent.ConcurrentMap;
 
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
 import org.apache.drill.exec.proto.helper.QueryIdHelper;
+import org.apache.drill.exec.server.options.OptionValue;
 import org.apache.drill.exec.util.AssertionUtil;
 
 import java.util.Arrays;
@@ -48,7 +52,16 @@ public class Accountor {
   private final FragmentHandle handle;
   private String fragmentStr;
   private Accountor parent;
+
   private final boolean errorOnLeak;
+  // some operators are no subject to the fragment limit. They set the 
applyFragmentLimit to false
+
+  private final boolean enableFragmentLimit;
+  private final double  fragmentMemOvercommitFactor;
+
+  private final boolean  DEFAULT_ENABLE_FRAGMENT_LIMIT=false;
+  private final double   DEFAULT_FRAGMENT_MEM_OVERCOMMIT_FACTOR=1.5;
+
   private final boolean applyFragmentLimit;
 
   private final FragmentContext fragmentContext;
@@ -58,12 +71,28 @@ public class Accountor {
   // This enables the top level accountor to calculate a new fragment limit 
whenever necessary.
   private final List<FragmentContext> fragmentContexts;
 
-  public Accountor(boolean errorOnLeak, FragmentContext context, Accountor 
parent, long max, long preAllocated, boolean applyFragLimit) {
+  public Accountor(DrillConfig config, boolean errorOnLeak, FragmentContext 
context, Accountor parent, long max, long preAllocated, boolean applyFragLimit) 
{
     // TODO: fix preallocation stuff
     this.errorOnLeak = errorOnLeak;
     AtomicRemainder parentRemainder = parent != null ? parent.remainder : null;
     this.parent = parent;
+
+    boolean enableFragmentLimit;
+    double  fragmentMemOvercommitFactor;
+
+    try {
+      enableFragmentLimit = 
config.getBoolean(ExecConstants.ENABLE_FRAGMENT_MEMORY_LIMIT);
+      fragmentMemOvercommitFactor = 
config.getDouble(ExecConstants.FRAGMENT_MEM_OVERCOMMIT_FACTOR);
+    }catch(Exception e){
+      enableFragmentLimit = DEFAULT_ENABLE_FRAGMENT_LIMIT;
+      fragmentMemOvercommitFactor = DEFAULT_FRAGMENT_MEM_OVERCOMMIT_FACTOR;
+    }
+    this.enableFragmentLimit = enableFragmentLimit;
+    this.fragmentMemOvercommitFactor = fragmentMemOvercommitFactor;
+
+
     this.applyFragmentLimit=applyFragLimit;
+
     this.remainder = new AtomicRemainder(errorOnLeak, parentRemainder, max, 
preAllocated, applyFragmentLimit);
     this.total = max;
     this.fragmentContext=context;
@@ -187,6 +216,10 @@ public class Accountor {
 
   public long resetFragmentLimits(){
     // returns the new capacity
+    if(!this.enableFragmentLimit){
+      return getCapacity();
+    }
+
     if(parent!=null){
       parent.resetFragmentLimits();
     }else {
@@ -209,7 +242,35 @@ public class Accountor {
         }
         long rem=(total-allocatedMemory)/nFragments;
         for(FragmentContext fragment: fragmentContexts){
-          fragment.setFragmentLimit(rem);
+          fragment.setFragmentLimit((long)(rem*fragmentMemOvercommitFactor));
+        }
+        if(logger.isDebugEnabled()){
+          StringBuffer sb= new StringBuffer();
+          sb.append("[root](0:0)");
+          sb.append("Allocated memory: ");
+          sb.append(this.getAllocation());
+          sb.append("Fragment Limit  : ");
+          sb.append(this.getFragmentLimit());
+          logger.debug(sb.toString());
+          for(FragmentContext fragment: fragmentContexts){
+            sb= new StringBuffer();
+            if (handle != null) {
+              sb.append("[");
+              sb.append(QueryIdHelper.getQueryId(handle.getQueryId()));
+              sb.append("](");
+              sb.append(handle.getMajorFragmentId());
+              sb.append(":");
+              sb.append(handle.getMinorFragmentId());
+              sb.append(")");
+            }else{
+              sb.append("[root](0:0)");
+            }
+            sb.append("Allocated memory: ");
+            sb.append(fragment.getAllocator().getAllocatedMemory());
+            sb.append("Fragment Limit  : ");
+            sb.append(fragment.getAllocator().getFragmentLimit());
+            logger.debug(sb.toString());
+          }
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8773cb0c/exec/java-exec/src/main/java/org/apache/drill/exec/memory/AtomicRemainder.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/memory/AtomicRemainder.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/memory/AtomicRemainder.java
index 6a87ab4..6771497 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/memory/AtomicRemainder.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/memory/AtomicRemainder.java
@@ -104,8 +104,17 @@ public class AtomicRemainder {
 
       // If we need to allocate memory beyond the allowed Fragment Limit
       if(applyFragmentLimitForChild && this.applyFragmentLimit && 
this.hasLimit && (getUsed()+size > this.limit)){
-        logger.debug("No more memory. Fragment limit ("+this.limit +
-          " bytes) reached. Trying to allocate "+size+ " bytes. "+getUsed()+" 
bytes already allocated.");
+        if (parent != null) {
+          parent.returnAllocation(size);
+        }
+        StackTraceElement[] ste = (new Throwable()).getStackTrace();
+        StringBuffer sb = new StringBuffer();
+        for (StackTraceElement s : ste) {
+          sb.append(s.toString());
+          sb.append("\n");
+        }
+        logger.error("No more memory. Fragment limit ("+this.limit +
+          " bytes) reached. Trying to allocate "+size+ " bytes. "+getUsed()+" 
bytes already allocated.\n"+sb.toString());
         return false;
       }
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8773cb0c/exec/java-exec/src/main/java/org/apache/drill/exec/memory/TopLevelAllocator.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/memory/TopLevelAllocator.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/memory/TopLevelAllocator.java
index 583b388..32ec37f 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/memory/TopLevelAllocator.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/memory/TopLevelAllocator.java
@@ -44,6 +44,7 @@ public class TopLevelAllocator implements BufferAllocator {
   private final Accountor acct;
   private final boolean errorOnLeak;
   private final DrillBuf empty;
+  private final DrillConfig config;
 
   @Deprecated
   public TopLevelAllocator() {
@@ -52,18 +53,19 @@ public class TopLevelAllocator implements BufferAllocator {
 
   @Deprecated
   public TopLevelAllocator(long maximumAllocation) {
-    this(maximumAllocation, true);
+    this(null, maximumAllocation, true);
   }
 
-  private TopLevelAllocator(long maximumAllocation, boolean errorOnLeak){
+  private TopLevelAllocator(DrillConfig config, long maximumAllocation, 
boolean errorOnLeak){
+    this.config=(config!=null) ? config : DrillConfig.create();
     this.errorOnLeak = errorOnLeak;
-    this.acct = new Accountor(errorOnLeak, null, null, maximumAllocation, 0, 
true);
+    this.acct = new Accountor(config, errorOnLeak, null, null, 
maximumAllocation, 0, true);
     this.empty = DrillBuf.getEmpty(this, acct);
     this.childrenMap = ENABLE_ACCOUNTING ? new IdentityHashMap<ChildAllocator, 
StackTraceElement[]>() : null;
   }
 
   public TopLevelAllocator(DrillConfig config) {
-    this(Math.min(DrillConfig.getMaxDirectMemory(), 
config.getLong(ExecConstants.TOP_LEVEL_MAX_ALLOC)),
+    this(config, Math.min(DrillConfig.getMaxDirectMemory(), 
config.getLong(ExecConstants.TOP_LEVEL_MAX_ALLOC)),
         config.getBoolean(ExecConstants.ERROR_ON_MEMORY_LEAK)
         );
   }
@@ -178,7 +180,7 @@ public class TopLevelAllocator implements BufferAllocator {
                           boolean applyFragmentLimit) throws 
OutOfMemoryException{
       assert max >= pre;
       this.applyFragmentLimit=applyFragmentLimit;
-      childAcct = new Accountor(errorOnLeak, context, parentAccountor, max, 
pre, applyFragmentLimit);
+      childAcct = new Accountor(context.getConfig(), errorOnLeak, context, 
parentAccountor, max, pre, applyFragmentLimit);
       this.fragmentContext=context;
       this.handle = context.getHandle();
       thisMap = map;

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8773cb0c/exec/java-exec/src/test/java/org/apache/drill/exec/memory/TestAllocators.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/exec/memory/TestAllocators.java 
b/exec/java-exec/src/test/java/org/apache/drill/exec/memory/TestAllocators.java
index c23cc10..de54f6a 100644
--- 
a/exec/java-exec/src/test/java/org/apache/drill/exec/memory/TestAllocators.java
+++ 
b/exec/java-exec/src/test/java/org/apache/drill/exec/memory/TestAllocators.java
@@ -52,6 +52,8 @@ public class TestAllocators {
   private static final Properties TEST_CONFIGURATIONS = new Properties() {
     {
       put(ExecConstants.TOP_LEVEL_MAX_ALLOC, "14000000");
+      put(ExecConstants.ENABLE_FRAGMENT_MEMORY_LIMIT, "true");
+      put(ExecConstants.FRAGMENT_MEM_OVERCOMMIT_FACTOR, "1.0");
     }
   };
 

Reply via email to