DRILL-1504: Enabling fragment memory limit causes out of memory error

Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/5d3a2291
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/5d3a2291
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/5d3a2291

Branch: refs/heads/master
Commit: 5d3a22911db55e2fac750025acd57e02689430ad
Parents: e302d98
Author: Parth Chandra <pchan...@maprtech.com>
Authored: Thu Sep 25 12:01:06 2014 -0700
Committer: Parth Chandra <pchan...@maprtech.com>
Committed: Wed Oct 15 22:49:17 2014 -0700

----------------------------------------------------------------------
 .../org/apache/drill/exec/memory/Accountor.java | 61 ++++++++++++++++----
 .../drill/exec/rpc/control/WorkEventBus.java    | 35 ++++++++---
 .../drill/exec/memory/TestAllocators.java       |  6 +-
 3 files changed, 79 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5d3a2291/exec/java-exec/src/main/java/org/apache/drill/exec/memory/Accountor.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/memory/Accountor.java 
b/exec/java-exec/src/main/java/org/apache/drill/exec/memory/Accountor.java
index bd64640..67beb95 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/memory/Accountor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/memory/Accountor.java
@@ -193,7 +193,25 @@ public class Accountor {
     if (parent != null){
       parent.addFragmentContext(c);
     }else {
-      logger.debug("Fragment "+fragmentStr+" added to root accountor");
+      if(logger.isDebugEnabled()) {
+        FragmentHandle hndle;
+        String fragStr;
+        if(c!=null) {
+          hndle = c.getHandle();
+          fragStr = (hndle != null) ? (hndle.getMajorFragmentId() + ":" + 
hndle.getMinorFragmentId()) : "[Null Fragment Handle]";
+        }else{
+          fragStr = "[Null Context]";
+        }
+        fragStr+=" (Object Id: "+System.identityHashCode(c)+")";
+        StackTraceElement[] ste = (new Throwable()).getStackTrace();
+        StringBuffer sb = new StringBuffer();
+        for (StackTraceElement s : ste) {
+          sb.append(s.toString());
+          sb.append("\n");
+        }
+
+        logger.debug("Fragment " + fragStr + " added to root 
accountor.\n"+sb.toString());
+      }
       synchronized(this) {
         fragmentContexts.add(c);
       }
@@ -207,7 +225,18 @@ public class Accountor {
         parent.removeFragmentContext(c);
       }
     }else{
-      logger.debug("Fragment "+fragmentStr+" removed from root accountor");
+      if(logger.isDebugEnabled()) {
+        FragmentHandle hndle;
+        String fragStr;
+        if (c != null) {
+          hndle = c.getHandle();
+          fragStr = (hndle != null) ? (hndle.getMajorFragmentId() + ":" + 
hndle.getMinorFragmentId()) : "[Null Fragment Handle]";
+        } else {
+          fragStr = "[Null Context]";
+        }
+        fragStr += " (Object Id: " + System.identityHashCode(c) + ")";
+        logger.debug("Fragment " + fragStr + " removed from root accountor");
+      }
       synchronized(this) {
         fragmentContexts.remove(c);
       }
@@ -230,9 +259,6 @@ public class Accountor {
       //quickly. If they are long running, then we want to favour them with 
larger limits anyway.
       synchronized (this) {
         int nFragments=fragmentContexts.size();
-        if(nFragments==0) {
-          nFragments = 1;
-        }
         long allocatedMemory=0;
         for(FragmentContext fragment: fragmentContexts){
           BufferAllocator a = fragment.getAllocator();
@@ -240,16 +266,26 @@ public class Accountor {
             allocatedMemory += fragment.getAllocator().getAllocatedMemory();
           }
         }
-        long rem=(total-allocatedMemory)/nFragments;
-        for(FragmentContext fragment: fragmentContexts){
-          fragment.setFragmentLimit((long)(rem*fragmentMemOvercommitFactor));
+        if(logger.isDebugEnabled()) {
+          logger.info("Resetting Fragment Memory Limit: total Available 
memory== "+total
+            +" Total Allocated Memory :"+allocatedMemory
+            +" Number of fragments: "+nFragments
+            + " fragmentMemOvercommitFactor: "+fragmentMemOvercommitFactor
+            + " Root fragment limit: "+this.fragmentLimit + "(Root obj: 
"+System.identityHashCode(this)+")"
+          );
+        }
+        if(nFragments>0) {
+          long rem = (total - allocatedMemory) / nFragments;
+          for (FragmentContext fragment : fragmentContexts) {
+            fragment.setFragmentLimit((long) (rem * 
fragmentMemOvercommitFactor));
+          }
         }
-        if(logger.isDebugEnabled()){
+        if(logger.isDebugEnabled() && false){
           StringBuffer sb= new StringBuffer();
           sb.append("[root](0:0)");
           sb.append("Allocated memory: ");
           sb.append(this.getAllocation());
-          sb.append("Fragment Limit  : ");
+          sb.append(" Fragment Limit: ");
           sb.append(this.getFragmentLimit());
           logger.debug(sb.toString());
           for(FragmentContext fragment: fragmentContexts){
@@ -263,14 +299,15 @@ public class Accountor {
               sb.append(handle.getMinorFragmentId());
               sb.append(")");
             }else{
-              sb.append("[root](0:0)");
+              sb.append("[fragment](0:0)");
             }
             sb.append("Allocated memory: ");
             sb.append(fragment.getAllocator().getAllocatedMemory());
-            sb.append("Fragment Limit  : ");
+            sb.append(" Fragment Limit: ");
             sb.append(fragment.getAllocator().getFragmentLimit());
             logger.debug(sb.toString());
           }
+          logger.debug("Resetting Complete");
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5d3a2291/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/WorkEventBus.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/WorkEventBus.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/WorkEventBus.java
index 5c126e1..45acd13 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/WorkEventBus.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/WorkEventBus.java
@@ -104,31 +104,50 @@ public class WorkEventBus {
       logger.debug("Fragment: {} was cancelled. Ignoring fragment handle", 
handle);
       return null;
     }
-    FragmentManager manager = managers.get(handle);
+    // We need to synchronize this part. Without that, multiple bit servers 
will be creating a Fragment manager and the
+    // corresponding FragmentContext object. Each FragmentContext object 
registers with the TopLevelAllocator so that
+    // the allocator can manage fragment resources across all fragments. So we 
need to make sure only one
+    // FragmentManager is actually created and used for a given FragmentHandle.
+    FragmentManager newManager;
+    FragmentManager manager;
+
+    manager = managers.get(handle);
     if (manager != null) {
       return manager;
     }
+    if (logger.isDebugEnabled()) {
+      String fragHandles = "Looking for Fragment handle: " + handle.toString() 
+ "(Hash Code:" + handle.hashCode()
+        + ")\n Fragment Handles in Fragment manager: ";
+      for (FragmentHandle h : managers.keySet()) {
+        fragHandles += h.toString() + "\n";
+        fragHandles += "[Hash Code: " + h.hashCode() + "]\n";
+      }
+      logger.debug(fragHandles);
+    }
     DistributedMap<FragmentHandle, PlanFragment> planCache = 
bee.getContext().getCache().getMap(Foreman.FRAGMENT_CACHE);
-    for (Map.Entry<FragmentHandle, PlanFragment> e : 
planCache.getLocalEntries()) {
+//      for (Map.Entry<FragmentHandle, PlanFragment> e : 
planCache.getLocalEntries()) {
 //      logger.debug("Key: {}", e.getKey());
 //      logger.debug("Value: {}", e.getValue());
-    }
+//      }
     PlanFragment fragment = 
bee.getContext().getCache().getMap(Foreman.FRAGMENT_CACHE).get(handle);
 
     if (fragment == null) {
       throw new FragmentSetupException("Received batch where fragment was not 
in cache.");
     }
+    logger.debug("Allocating new non root fragment manager: " + 
handle.toString());
+    newManager = new NonRootFragmentManager(fragment, bee);
+    logger.debug("Allocated new non root fragment manager: " + 
handle.toString());
 
-    FragmentManager newManager = new NonRootFragmentManager(fragment, bee);
-
-    // since their could be a race condition on the check, we'll use 
putIfAbsent so we don't have two competing
-    // handlers.
     manager = managers.putIfAbsent(fragment.getHandle(), newManager);
-
     if (manager == null) {
       // we added a handler, inform the bee that we did so. This way, the 
foreman can track status.
       bee.addFragmentPendingRemote(newManager);
       manager = newManager;
+    }else{
+      // prevent a leak of the initial allocation.
+      // Also the fragment context is registered with the top level allocator.
+      // This will unregister the unused fragment context as well.
+      newManager.getFragmentContext().close();
     }
 
     return manager;

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5d3a2291/exec/java-exec/src/test/java/org/apache/drill/exec/memory/TestAllocators.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/exec/memory/TestAllocators.java 
b/exec/java-exec/src/test/java/org/apache/drill/exec/memory/TestAllocators.java
index de54f6a..92d4b98 100644
--- 
a/exec/java-exec/src/test/java/org/apache/drill/exec/memory/TestAllocators.java
+++ 
b/exec/java-exec/src/test/java/org/apache/drill/exec/memory/TestAllocators.java
@@ -53,7 +53,7 @@ public class TestAllocators {
     {
       put(ExecConstants.TOP_LEVEL_MAX_ALLOC, "14000000");
       put(ExecConstants.ENABLE_FRAGMENT_MEMORY_LIMIT, "true");
-      put(ExecConstants.FRAGMENT_MEM_OVERCOMMIT_FACTOR, "1.0");
+      put(ExecConstants.FRAGMENT_MEM_OVERCOMMIT_FACTOR, "1.1");
     }
   };
 
@@ -148,7 +148,7 @@ public class TestAllocators {
     // Fragment 3 asks for more and fails
     boolean outOfMem=false;
     try {
-      DrillBuf b31b = oContext31.getAllocator().buffer(4000000);
+      DrillBuf b31b = oContext31.getAllocator().buffer(4400000);
       if(b31b!=null) {
         b31b.release();
       }else{
@@ -164,7 +164,7 @@ public class TestAllocators {
     OperatorContext oContext32 = new OperatorContext(physicalOperator6, 
fragmentContext3, false);
     DrillBuf b32=null;
     try {
-      b32=oContext32.getAllocator().buffer(4000000);
+      b32=oContext32.getAllocator().buffer(4400000);
     }catch(Exception e){
       outOfMem=true;
     }finally{

Reply via email to