DRILL-1504: Enabling fragment memory limit causes out of memory error
Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/5d3a2291 Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/5d3a2291 Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/5d3a2291 Branch: refs/heads/master Commit: 5d3a22911db55e2fac750025acd57e02689430ad Parents: e302d98 Author: Parth Chandra <pchan...@maprtech.com> Authored: Thu Sep 25 12:01:06 2014 -0700 Committer: Parth Chandra <pchan...@maprtech.com> Committed: Wed Oct 15 22:49:17 2014 -0700 ---------------------------------------------------------------------- .../org/apache/drill/exec/memory/Accountor.java | 61 ++++++++++++++++---- .../drill/exec/rpc/control/WorkEventBus.java | 35 ++++++++--- .../drill/exec/memory/TestAllocators.java | 6 +- 3 files changed, 79 insertions(+), 23 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5d3a2291/exec/java-exec/src/main/java/org/apache/drill/exec/memory/Accountor.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/memory/Accountor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/memory/Accountor.java index bd64640..67beb95 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/memory/Accountor.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/memory/Accountor.java @@ -193,7 +193,25 @@ public class Accountor { if (parent != null){ parent.addFragmentContext(c); }else { - logger.debug("Fragment "+fragmentStr+" added to root accountor"); + if(logger.isDebugEnabled()) { + FragmentHandle hndle; + String fragStr; + if(c!=null) { + hndle = c.getHandle(); + fragStr = (hndle != null) ? (hndle.getMajorFragmentId() + ":" + hndle.getMinorFragmentId()) : "[Null Fragment Handle]"; + }else{ + fragStr = "[Null Context]"; + } + fragStr+=" (Object Id: "+System.identityHashCode(c)+")"; + StackTraceElement[] ste = (new Throwable()).getStackTrace(); + StringBuffer sb = new StringBuffer(); + for (StackTraceElement s : ste) { + sb.append(s.toString()); + sb.append("\n"); + } + + logger.debug("Fragment " + fragStr + " added to root accountor.\n"+sb.toString()); + } synchronized(this) { fragmentContexts.add(c); } @@ -207,7 +225,18 @@ public class Accountor { parent.removeFragmentContext(c); } }else{ - logger.debug("Fragment "+fragmentStr+" removed from root accountor"); + if(logger.isDebugEnabled()) { + FragmentHandle hndle; + String fragStr; + if (c != null) { + hndle = c.getHandle(); + fragStr = (hndle != null) ? (hndle.getMajorFragmentId() + ":" + hndle.getMinorFragmentId()) : "[Null Fragment Handle]"; + } else { + fragStr = "[Null Context]"; + } + fragStr += " (Object Id: " + System.identityHashCode(c) + ")"; + logger.debug("Fragment " + fragStr + " removed from root accountor"); + } synchronized(this) { fragmentContexts.remove(c); } @@ -230,9 +259,6 @@ public class Accountor { //quickly. If they are long running, then we want to favour them with larger limits anyway. synchronized (this) { int nFragments=fragmentContexts.size(); - if(nFragments==0) { - nFragments = 1; - } long allocatedMemory=0; for(FragmentContext fragment: fragmentContexts){ BufferAllocator a = fragment.getAllocator(); @@ -240,16 +266,26 @@ public class Accountor { allocatedMemory += fragment.getAllocator().getAllocatedMemory(); } } - long rem=(total-allocatedMemory)/nFragments; - for(FragmentContext fragment: fragmentContexts){ - fragment.setFragmentLimit((long)(rem*fragmentMemOvercommitFactor)); + if(logger.isDebugEnabled()) { + logger.info("Resetting Fragment Memory Limit: total Available memory== "+total + +" Total Allocated Memory :"+allocatedMemory + +" Number of fragments: "+nFragments + + " fragmentMemOvercommitFactor: "+fragmentMemOvercommitFactor + + " Root fragment limit: "+this.fragmentLimit + "(Root obj: "+System.identityHashCode(this)+")" + ); + } + if(nFragments>0) { + long rem = (total - allocatedMemory) / nFragments; + for (FragmentContext fragment : fragmentContexts) { + fragment.setFragmentLimit((long) (rem * fragmentMemOvercommitFactor)); + } } - if(logger.isDebugEnabled()){ + if(logger.isDebugEnabled() && false){ StringBuffer sb= new StringBuffer(); sb.append("[root](0:0)"); sb.append("Allocated memory: "); sb.append(this.getAllocation()); - sb.append("Fragment Limit : "); + sb.append(" Fragment Limit: "); sb.append(this.getFragmentLimit()); logger.debug(sb.toString()); for(FragmentContext fragment: fragmentContexts){ @@ -263,14 +299,15 @@ public class Accountor { sb.append(handle.getMinorFragmentId()); sb.append(")"); }else{ - sb.append("[root](0:0)"); + sb.append("[fragment](0:0)"); } sb.append("Allocated memory: "); sb.append(fragment.getAllocator().getAllocatedMemory()); - sb.append("Fragment Limit : "); + sb.append(" Fragment Limit: "); sb.append(fragment.getAllocator().getFragmentLimit()); logger.debug(sb.toString()); } + logger.debug("Resetting Complete"); } } } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5d3a2291/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/WorkEventBus.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/WorkEventBus.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/WorkEventBus.java index 5c126e1..45acd13 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/WorkEventBus.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/WorkEventBus.java @@ -104,31 +104,50 @@ public class WorkEventBus { logger.debug("Fragment: {} was cancelled. Ignoring fragment handle", handle); return null; } - FragmentManager manager = managers.get(handle); + // We need to synchronize this part. Without that, multiple bit servers will be creating a Fragment manager and the + // corresponding FragmentContext object. Each FragmentContext object registers with the TopLevelAllocator so that + // the allocator can manage fragment resources across all fragments. So we need to make sure only one + // FragmentManager is actually created and used for a given FragmentHandle. + FragmentManager newManager; + FragmentManager manager; + + manager = managers.get(handle); if (manager != null) { return manager; } + if (logger.isDebugEnabled()) { + String fragHandles = "Looking for Fragment handle: " + handle.toString() + "(Hash Code:" + handle.hashCode() + + ")\n Fragment Handles in Fragment manager: "; + for (FragmentHandle h : managers.keySet()) { + fragHandles += h.toString() + "\n"; + fragHandles += "[Hash Code: " + h.hashCode() + "]\n"; + } + logger.debug(fragHandles); + } DistributedMap<FragmentHandle, PlanFragment> planCache = bee.getContext().getCache().getMap(Foreman.FRAGMENT_CACHE); - for (Map.Entry<FragmentHandle, PlanFragment> e : planCache.getLocalEntries()) { +// for (Map.Entry<FragmentHandle, PlanFragment> e : planCache.getLocalEntries()) { // logger.debug("Key: {}", e.getKey()); // logger.debug("Value: {}", e.getValue()); - } +// } PlanFragment fragment = bee.getContext().getCache().getMap(Foreman.FRAGMENT_CACHE).get(handle); if (fragment == null) { throw new FragmentSetupException("Received batch where fragment was not in cache."); } + logger.debug("Allocating new non root fragment manager: " + handle.toString()); + newManager = new NonRootFragmentManager(fragment, bee); + logger.debug("Allocated new non root fragment manager: " + handle.toString()); - FragmentManager newManager = new NonRootFragmentManager(fragment, bee); - - // since their could be a race condition on the check, we'll use putIfAbsent so we don't have two competing - // handlers. manager = managers.putIfAbsent(fragment.getHandle(), newManager); - if (manager == null) { // we added a handler, inform the bee that we did so. This way, the foreman can track status. bee.addFragmentPendingRemote(newManager); manager = newManager; + }else{ + // prevent a leak of the initial allocation. + // Also the fragment context is registered with the top level allocator. + // This will unregister the unused fragment context as well. + newManager.getFragmentContext().close(); } return manager; http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5d3a2291/exec/java-exec/src/test/java/org/apache/drill/exec/memory/TestAllocators.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/memory/TestAllocators.java b/exec/java-exec/src/test/java/org/apache/drill/exec/memory/TestAllocators.java index de54f6a..92d4b98 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/memory/TestAllocators.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/memory/TestAllocators.java @@ -53,7 +53,7 @@ public class TestAllocators { { put(ExecConstants.TOP_LEVEL_MAX_ALLOC, "14000000"); put(ExecConstants.ENABLE_FRAGMENT_MEMORY_LIMIT, "true"); - put(ExecConstants.FRAGMENT_MEM_OVERCOMMIT_FACTOR, "1.0"); + put(ExecConstants.FRAGMENT_MEM_OVERCOMMIT_FACTOR, "1.1"); } }; @@ -148,7 +148,7 @@ public class TestAllocators { // Fragment 3 asks for more and fails boolean outOfMem=false; try { - DrillBuf b31b = oContext31.getAllocator().buffer(4000000); + DrillBuf b31b = oContext31.getAllocator().buffer(4400000); if(b31b!=null) { b31b.release(); }else{ @@ -164,7 +164,7 @@ public class TestAllocators { OperatorContext oContext32 = new OperatorContext(physicalOperator6, fragmentContext3, false); DrillBuf b32=null; try { - b32=oContext32.getAllocator().buffer(4000000); + b32=oContext32.getAllocator().buffer(4400000); }catch(Exception e){ outOfMem=true; }finally{