Repository: incubator-drill Updated Branches: refs/heads/master 28f5a9ab6 -> 9628f9bb5
DRILL-1411 Fragment memory planning Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/0988c08d Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/0988c08d Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/0988c08d Branch: refs/heads/master Commit: 0988c08d29c59c0447b078fc60bb4d01e14fbcbc Parents: 28f5a9a Author: Parth Chandra <pchan...@maprtech.com> Authored: Sat Sep 13 20:29:47 2014 -0700 Committer: Steven Phillips <sphill...@maprtech.com> Committed: Tue Sep 23 22:22:43 2014 -0700 ---------------------------------------------------------------------- .../java/io/netty/buffer/FakeAllocator.java | 22 ++- .../org/apache/drill/exec/memory/Accountor.java | 113 ++++++++++- .../drill/exec/memory/AtomicRemainder.java | 35 +++- .../drill/exec/memory/BufferAllocator.java | 21 ++- .../drill/exec/memory/TopLevelAllocator.java | 71 +++++-- .../apache/drill/exec/ops/FragmentContext.java | 29 ++- .../apache/drill/exec/ops/OperatorContext.java | 11 +- .../drill/exec/physical/impl/BaseRootExec.java | 10 +- .../drill/exec/physical/impl/ScanBatch.java | 3 +- .../exec/physical/impl/SingleSenderCreator.java | 4 +- .../BroadcastSenderRootExec.java | 4 +- .../impl/mergereceiver/MergingRecordBatch.java | 5 +- .../PartitionSenderRootExec.java | 4 +- .../UnorderedReceiverBatch.java | 8 +- .../physical/impl/xsort/ExternalSortBatch.java | 3 +- .../drill/exec/record/AbstractRecordBatch.java | 12 +- .../exec/store/parquet/ParquetRecordWriter.java | 2 +- .../exec/work/batch/SpoolingRawBatchBuffer.java | 2 +- .../drill/exec/work/foreman/QueryManager.java | 4 +- .../drill/exec/memory/TestAllocators.java | 189 +++++++++++++++++++ .../test/resources/physical_allocator_test.json | 50 +++++ 21 files changed, 550 insertions(+), 52 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0988c08d/exec/java-exec/src/main/java/io/netty/buffer/FakeAllocator.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/io/netty/buffer/FakeAllocator.java b/exec/java-exec/src/main/java/io/netty/buffer/FakeAllocator.java index bc69577..9ebf3e7 100644 --- a/exec/java-exec/src/main/java/io/netty/buffer/FakeAllocator.java +++ b/exec/java-exec/src/main/java/io/netty/buffer/FakeAllocator.java @@ -20,6 +20,7 @@ package io.netty.buffer; import org.apache.drill.exec.memory.Accountor; import org.apache.drill.exec.memory.BufferAllocator; import org.apache.drill.exec.memory.OutOfMemoryException; +import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.proto.ExecProtos.FragmentHandle; class FakeAllocator implements BufferAllocator { @@ -45,7 +46,8 @@ class FakeAllocator implements BufferAllocator { } @Override - public BufferAllocator getChildAllocator(FragmentHandle handle, long initialReservation, long maximumReservation) + public BufferAllocator getChildAllocator(FragmentContext context, long initialReservation, long maximumReservation, + boolean applyFragmentLimit) throws OutOfMemoryException { throw new UnsupportedOperationException(); } @@ -66,6 +68,21 @@ class FakeAllocator implements BufferAllocator { } @Override + public void resetFragmentLimits() { + throw new UnsupportedOperationException(); + } + + @Override + public void setFragmentLimit(long l) { + throw new UnsupportedOperationException(); + } + + @Override + public long getFragmentLimit(){ + throw new UnsupportedOperationException(); + } + + @Override public void close() { } @@ -77,7 +94,7 @@ class FakeAllocator implements BufferAllocator { static class FakeAccountor extends Accountor { public FakeAccountor() { - super(false, null, null, 0, 0); + super(false, null, null, 0, 0, true); } @Override @@ -131,7 +148,6 @@ class FakeAllocator implements BufferAllocator { } - } } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0988c08d/exec/java-exec/src/main/java/org/apache/drill/exec/memory/Accountor.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/memory/Accountor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/memory/Accountor.java index d11f224..6ef46de 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/memory/Accountor.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/memory/Accountor.java @@ -20,10 +20,13 @@ package org.apache.drill.exec.memory; import io.netty.buffer.ByteBuf; import io.netty.buffer.DrillBuf; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; +import java.util.List; import java.util.concurrent.ConcurrentMap; +import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.proto.ExecProtos.FragmentHandle; import org.apache.drill.exec.proto.helper.QueryIdHelper; import org.apache.drill.exec.util.AssertionUtil; @@ -40,22 +43,41 @@ public class Accountor { private final long total; private ConcurrentMap<ByteBuf, DebugStackTrace> buffers = Maps.newConcurrentMap(); private final FragmentHandle handle; + private String fragmentStr; private Accountor parent; private final boolean errorOnLeak; + private final boolean applyFragmentLimit; - public Accountor(boolean errorOnLeak, FragmentHandle handle, Accountor parent, long max, long preAllocated) { + private final FragmentContext fragmentContext; + long fragmentLimit; + + // The top level Allocator has an accountor that keeps track of all the FragmentContexts currently executing. + // This enables the top level accountor to calculate a new fragment limit whenever necessary. + private final List<FragmentContext> fragmentContexts; + + public Accountor(boolean errorOnLeak, FragmentContext context, Accountor parent, long max, long preAllocated, boolean applyFragLimit) { // TODO: fix preallocation stuff this.errorOnLeak = errorOnLeak; AtomicRemainder parentRemainder = parent != null ? parent.remainder : null; this.parent = parent; - this.remainder = new AtomicRemainder(errorOnLeak, parentRemainder, max, preAllocated); + this.applyFragmentLimit=applyFragLimit; + this.remainder = new AtomicRemainder(errorOnLeak, parentRemainder, max, preAllocated, applyFragmentLimit); this.total = max; - this.handle = handle; + this.fragmentContext=context; + this.handle = (context!=null) ? context.getHandle() : null; + this.fragmentStr= (handle!=null) ? ( handle.getMajorFragmentId()+":"+handle.getMinorFragmentId() ) : "0:0"; + this.fragmentLimit=this.total; // Allow as much as possible to start with; if (ENABLE_ACCOUNTING) { buffers = Maps.newConcurrentMap(); } else { buffers = null; } + this.fragmentContexts = new ArrayList<FragmentContext>(); + if(parent!=null && parent.parent==null){ // Only add the fragment context to the fragment level accountor + synchronized(this) { + addFragmentContext(this.fragmentContext); + } + } } public boolean transferTo(Accountor target, DrillBuf buf, long size) { @@ -65,7 +87,6 @@ public class Accountor { if (ENABLE_ACCOUNTING) { target.buffers.put(buf, new DebugStackTrace(buf.capacity(), Thread.currentThread().getStackTrace())); } - return withinLimit; } @@ -77,7 +98,7 @@ public class Accountor { } public long getCapacity() { - return total; + return fragmentLimit; } public long getAllocation() { @@ -85,7 +106,8 @@ public class Accountor { } public boolean reserve(long size) { - return remainder.get(size); + logger.debug("Fragment:"+fragmentStr+" Reserved "+size+" bytes. Total Allocated: "+getAllocation()); + return remainder.get(size, this.applyFragmentLimit); } public boolean forceAdditionalReservation(long size) { @@ -135,7 +157,69 @@ public class Accountor { } } + private void addFragmentContext(FragmentContext c) { + if (parent != null){ + parent.addFragmentContext(c); + }else { + logger.debug("Fragment "+fragmentStr+" added to root accountor"); + synchronized(this) { + fragmentContexts.add(c); + } + } + } + + private void removeFragmentContext(FragmentContext c) { + if (parent != null){ + if (parent.parent==null){ + // only fragment level allocators will have the fragment context saved + parent.removeFragmentContext(c); + } + }else{ + logger.debug("Fragment "+fragmentStr+" removed from root accountor"); + synchronized(this) { + fragmentContexts.remove(c); + } + } + } + + public long resetFragmentLimits(){ + // returns the new capacity + if(parent!=null){ + parent.resetFragmentLimits(); + }else { + //Get remaining memory available per fragment and distribute it EQUALLY among all the fragments. + //Fragments get the memory limit added to the amount already allocated. + //This favours fragments that are already running which will get a limit greater than newly started fragments. + //If the already running fragments end quickly, their limits will be assigned back to the remaining fragments + //quickly. If they are long running, then we want to favour them with larger limits anyway. + synchronized (this) { + int nFragments=fragmentContexts.size(); + if(nFragments==0) { + nFragments = 1; + } + long allocatedMemory=0; + for(FragmentContext fragment: fragmentContexts){ + BufferAllocator a = fragment.getAllocator(); + if(a!=null) { + allocatedMemory += fragment.getAllocator().getAllocatedMemory(); + } + } + long rem=(total-allocatedMemory)/nFragments; + for(FragmentContext fragment: fragmentContexts){ + fragment.setFragmentLimit(rem); + } + } + } + return getCapacity(); + } + public void close() { + // remove the fragment context and reset fragment limits whenever an allocator closes + if(parent!=null && parent.parent==null) { + logger.debug("Fragment " + fragmentStr + " accountor being closed"); + removeFragmentContext(fragmentContext); + } + resetFragmentLimits(); if (ENABLE_ACCOUNTING && !buffers.isEmpty()) { StringBuffer sb = new StringBuffer(); @@ -184,6 +268,23 @@ public class Accountor { } + public void setFragmentLimit(long add) { + // We ADD the limit to the current allocation. If none has been allocated, this + // sets a new limit. If memory has already been allocated, the fragment gets its + // limit based on the allocation, though this might still result in reducing the + // limit. + + if (parent != null && parent.parent==null) { // This is a fragment level accountor + this.fragmentLimit=getAllocation()+add; + this.remainder.setLimit(this.fragmentLimit); + logger.debug("Fragment "+fragmentStr+" memory limit set to "+this.fragmentLimit); + } + } + + public long getFragmentLimit(){ + return this.fragmentLimit; + } + public class DebugStackTrace { private StackTraceElement[] elements; http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0988c08d/exec/java-exec/src/main/java/org/apache/drill/exec/memory/AtomicRemainder.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/memory/AtomicRemainder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/memory/AtomicRemainder.java index 263caa0..6a87ab4 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/memory/AtomicRemainder.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/memory/AtomicRemainder.java @@ -35,10 +35,13 @@ public class AtomicRemainder { private final long initTotal; private final long initShared; private final long initPrivate; + private long limit; // An Allocator can set a variable limit less than or equal to the initTotal + private boolean hasLimit; // True for Atomic Remainders associated with a Fragment. May be true for Operator Level allocators some day. private boolean closed = false; private final boolean errorOnLeak; + private final boolean applyFragmentLimit; - public AtomicRemainder(boolean errorOnLeak, AtomicRemainder parent, long max, long pre) { + public AtomicRemainder(boolean errorOnLeak, AtomicRemainder parent, long max, long pre, boolean applyFragLimit) { this.errorOnLeak = errorOnLeak; this.parent = parent; this.availableShared = new AtomicLong(max - pre); @@ -46,6 +49,9 @@ public class AtomicRemainder { this.initTotal = max; this.initShared = max - pre; this.initPrivate = pre; + this.limit = max; + this.hasLimit=false; + this.applyFragmentLimit=applyFragLimit; // If this is an operator that is exempt from the fragment limit, set this to false. // logger.info("new AtomicRemainder. a.s. {} a.p. {} hashcode {}", availableShared, availablePrivate, hashCode(), new Exception()); } @@ -58,13 +64,25 @@ public class AtomicRemainder { } /** + * Allow an allocator to constrain the remainder to a particular limit that is lower than the initTotal. + * If limit is larger than initTotal, then the function will do nothing and the hasLimit flag will not be set. + * @param limit + */ + public void setLimit(long limit) { + if(limit<initTotal){ + this.hasLimit=true; + this.limit=limit; + } + + } + /** * Automatically allocate memory. This is used when an actual allocation happened to be larger than requested. This * memory has already been used up so it must be accurately accounted for in future allocations. * * @param size */ public boolean forceGet(long size) { - if (get(size)) { + if (get(size, this.applyFragmentLimit)) { return true; } else { availableShared.addAndGet(size); @@ -75,12 +93,19 @@ public class AtomicRemainder { } } - public boolean get(long size) { + public boolean get(long size, boolean applyFragmentLimitForChild) { if (availablePrivate.get() < 1) { // if there is no preallocated memory, we can operate normally. // if there is a parent allocator, check it before allocating. - if (parent != null && !parent.get(size)) { + if (parent != null && !parent.get(size, this.applyFragmentLimit)) { + return false; + } + + // If we need to allocate memory beyond the allowed Fragment Limit + if(applyFragmentLimitForChild && this.applyFragmentLimit && this.hasLimit && (getUsed()+size > this.limit)){ + logger.debug("No more memory. Fragment limit ("+this.limit + + " bytes) reached. Trying to allocate "+size+ " bytes. "+getUsed()+" bytes already allocated."); return false; } @@ -111,7 +136,7 @@ public class AtomicRemainder { long additionalSpaceNeeded = -unaccount; // if there is a parent allocator, check it before allocating. - if (parent != null && !parent.get(additionalSpaceNeeded)) { + if (parent != null && !parent.get(additionalSpaceNeeded, this.applyFragmentLimit)) { // parent allocation failed, return space to private pool. availablePrivate.getAndAdd(size); return false; http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0988c08d/exec/java-exec/src/main/java/org/apache/drill/exec/memory/BufferAllocator.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/memory/BufferAllocator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/memory/BufferAllocator.java index 8971eea..e072126 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/memory/BufferAllocator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/memory/BufferAllocator.java @@ -22,6 +22,7 @@ import io.netty.buffer.DrillBuf; import java.io.Closeable; +import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.proto.ExecProtos.FragmentHandle; /** @@ -52,8 +53,8 @@ public interface BufferAllocator extends Closeable { public abstract ByteBufAllocator getUnderlyingAllocator(); - public abstract BufferAllocator getChildAllocator(FragmentHandle handle, long initialReservation, - long maximumReservation) throws OutOfMemoryException; + public abstract BufferAllocator getChildAllocator(FragmentContext context, long initialReservation, + long maximumReservation, boolean applyFragmentLimit) throws OutOfMemoryException; /** * Take over ownership of fragment accounting. Always takes over ownership. @@ -65,6 +66,22 @@ public interface BufferAllocator extends Closeable { public PreAllocator getNewPreAllocator(); + //public void addFragmentContext(FragmentContext c); + + /** + * For Top Level Allocators. Reset the fragment limits for all allocators + */ + public void resetFragmentLimits(); + + /** + * For Child allocators to set the Fragment limit for the corresponding fragment allocator. + * @param l the new fragment limit + */ + public void setFragmentLimit(long l); + + public long getFragmentLimit(); + + /** * Not thread safe. */ http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0988c08d/exec/java-exec/src/main/java/org/apache/drill/exec/memory/TopLevelAllocator.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/memory/TopLevelAllocator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/memory/TopLevelAllocator.java index a8e8a28..583b388 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/memory/TopLevelAllocator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/memory/TopLevelAllocator.java @@ -22,13 +22,16 @@ import io.netty.buffer.DrillBuf; import io.netty.buffer.PooledByteBufAllocatorL; import io.netty.buffer.UnsafeDirectLittleEndian; -import java.util.HashMap; import java.util.IdentityHashMap; +import java.util.HashMap; import java.util.Map; + import java.util.Map.Entry; import org.apache.drill.common.config.DrillConfig; import org.apache.drill.exec.ExecConstants; +import org.apache.drill.exec.ops.FragmentContext; + import org.apache.drill.exec.proto.ExecProtos.FragmentHandle; import org.apache.drill.exec.util.AssertionUtil; @@ -42,7 +45,6 @@ public class TopLevelAllocator implements BufferAllocator { private final boolean errorOnLeak; private final DrillBuf empty; - @Deprecated public TopLevelAllocator() { this(DrillConfig.getMaxDirectMemory()); @@ -53,9 +55,9 @@ public class TopLevelAllocator implements BufferAllocator { this(maximumAllocation, true); } - private TopLevelAllocator(long maximumAllocation, boolean errorOnLeak) { + private TopLevelAllocator(long maximumAllocation, boolean errorOnLeak){ this.errorOnLeak = errorOnLeak; - this.acct = new Accountor(errorOnLeak, null, null, maximumAllocation, 0); + this.acct = new Accountor(errorOnLeak, null, null, maximumAllocation, 0, true); this.empty = DrillBuf.getEmpty(this, acct); this.childrenMap = ENABLE_ACCOUNTING ? new IdentityHashMap<ChildAllocator, StackTraceElement[]>() : null; } @@ -100,13 +102,14 @@ public class TopLevelAllocator implements BufferAllocator { } @Override - public BufferAllocator getChildAllocator(FragmentHandle handle, long initialReservation, long maximumReservation) throws OutOfMemoryException { - if(!acct.reserve(initialReservation)) { + public BufferAllocator getChildAllocator(FragmentContext context, long initialReservation, long maximumReservation, boolean applyFragmentLimit) throws OutOfMemoryException { + if(!acct.reserve(initialReservation)){ + logger.debug(String.format("You attempted to create a new child allocator with initial reservation %d but only %d bytes of memory were available.", initialReservation, acct.getCapacity() - acct.getAllocation())); throw new OutOfMemoryException(String.format("You attempted to create a new child allocator with initial reservation %d but only %d bytes of memory were available.", initialReservation, acct.getCapacity() - acct.getAllocation())); }; logger.debug("New child allocator with initial reservation {}", initialReservation); - ChildAllocator allocator = new ChildAllocator(handle, acct, maximumReservation, initialReservation, childrenMap); - if (ENABLE_ACCOUNTING) { + ChildAllocator allocator = new ChildAllocator(context, acct, maximumReservation, initialReservation, childrenMap, applyFragmentLimit); + if(ENABLE_ACCOUNTING){ childrenMap.put(allocator, Thread.currentThread().getStackTrace()); } @@ -114,6 +117,21 @@ public class TopLevelAllocator implements BufferAllocator { } @Override + public void resetFragmentLimits() { + acct.resetFragmentLimits(); + } + + @Override + public void setFragmentLimit(long limit){ + acct.setFragmentLimit(limit); + } + + @Override + public long getFragmentLimit(){ + return acct.getFragmentLimit(); + } + + @Override public void close() { if (ENABLE_ACCOUNTING) { for (Entry<ChildAllocator, StackTraceElement[]> child : childrenMap.entrySet()) { @@ -147,12 +165,22 @@ public class TopLevelAllocator implements BufferAllocator { private Map<ChildAllocator, StackTraceElement[]> children = new HashMap<>(); private boolean closed = false; private FragmentHandle handle; + private FragmentContext fragmentContext; private Map<ChildAllocator, StackTraceElement[]> thisMap; - - public ChildAllocator(FragmentHandle handle, Accountor parentAccountor, long max, long pre, Map<ChildAllocator, StackTraceElement[]> map) throws OutOfMemoryException{ + private boolean applyFragmentLimit; + + public ChildAllocator(FragmentContext context, + Accountor parentAccountor, + long max, + long pre, + Map<ChildAllocator, + StackTraceElement[]> map, + boolean applyFragmentLimit) throws OutOfMemoryException{ assert max >= pre; - childAcct = new Accountor(errorOnLeak, handle, parentAccountor, max, pre); - this.handle = handle; + this.applyFragmentLimit=applyFragmentLimit; + childAcct = new Accountor(errorOnLeak, context, parentAccountor, max, pre, applyFragmentLimit); + this.fragmentContext=context; + this.handle = context.getHandle(); thisMap = map; this.empty = DrillBuf.getEmpty(this, childAcct); } @@ -188,13 +216,13 @@ public class TopLevelAllocator implements BufferAllocator { } @Override - public BufferAllocator getChildAllocator(FragmentHandle handle, long initialReservation, long maximumReservation) + public BufferAllocator getChildAllocator(FragmentContext context, long initialReservation, long maximumReservation, boolean applyFragmentLimit) throws OutOfMemoryException { if (!childAcct.reserve(initialReservation)) { throw new OutOfMemoryException(String.format("You attempted to create a new child allocator with initial reservation %d but only %d bytes of memory were available.", initialReservation, childAcct.getAvailable())); }; logger.debug("New child allocator with initial reservation {}", initialReservation); - ChildAllocator newChildAllocator = new ChildAllocator(handle, childAcct, maximumReservation, initialReservation, null); + ChildAllocator newChildAllocator = new ChildAllocator(context, childAcct, maximumReservation, initialReservation, null, applyFragmentLimit); this.children.put(newChildAllocator, Thread.currentThread().getStackTrace()); return newChildAllocator; } @@ -204,6 +232,21 @@ public class TopLevelAllocator implements BufferAllocator { } @Override + public void resetFragmentLimits(){ + childAcct.resetFragmentLimits(); + } + + @Override + public void setFragmentLimit(long limit){ + childAcct.setFragmentLimit(limit); + } + + @Override + public long getFragmentLimit(){ + return childAcct.getFragmentLimit(); + } + + @Override public void close() { if (ENABLE_ACCOUNTING) { if (thisMap != null) { http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0988c08d/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java index a888ea7..04e1937 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java @@ -104,7 +104,15 @@ public class FragmentContext implements Closeable { } catch (Exception e) { throw new ExecutionSetupException("Failure while reading plan options.", e); } - this.allocator = context.getAllocator().getChildAllocator(fragment.getHandle(), fragment.getMemInitial(), fragment.getMemMax()); + // Add the fragment context to the root allocator. + // The QueryManager will call the root allocator to recalculate all the memory limits for all the fragments + try { + this.allocator = context.getAllocator().getChildAllocator(this, fragment.getMemInitial(), fragment.getMemMax(), true); + assert (allocator != null); + }catch(Throwable e){ + throw new ExecutionSetupException("Failure while getting memory allocator for fragment.", e); + } + this.loader = new QueryClassLoader(dbContext.getConfig(), fragmentOptions); } @@ -176,11 +184,18 @@ public class FragmentContext implements Closeable { */ @Deprecated public BufferAllocator getAllocator() { + if(allocator == null){ + FragmentHandle handle=getHandle(); + String frag=handle!=null?handle.getMajorFragmentId()+":"+handle.getMinorFragmentId():"0:0"; + logger.debug("Fragment:"+frag+" Allocator is NULL"); + } return allocator; } - public BufferAllocator getNewChildAllocator(long initialReservation, long maximumReservation) throws OutOfMemoryException { - return allocator.getChildAllocator(getHandle(), initialReservation, maximumReservation); + public BufferAllocator getNewChildAllocator(long initialReservation, + long maximumReservation, + boolean applyFragmentLimit) throws OutOfMemoryException { + return allocator.getChildAllocator(this, initialReservation, maximumReservation, applyFragmentLimit); } public <T> T getImplementationClass(ClassGenerator<T> cg) throws ClassTransformationException, IOException { @@ -252,6 +267,10 @@ public class FragmentContext implements Closeable { return context.getConfig(); } + public void setFragmentLimit(long limit) { + this.allocator.setFragmentLimit(limit); + } + @Override public void close() { for (Thread thread: daemonThreads) { @@ -267,7 +286,11 @@ public class FragmentContext implements Closeable { if (buffers != null) { buffers.close(); } + + FragmentHandle handle=getHandle(); + String frag=handle!=null?handle.getMajorFragmentId()+":"+handle.getMinorFragmentId():"0:0"; allocator.close(); + logger.debug("Fragment:"+frag+" After close allocator is: "+allocator!=null?"OK":"NULL"); } public DrillBuf replace(DrillBuf old, int newSize) { http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0988c08d/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContext.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContext.java index 54edf88..ccafa67 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContext.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContext.java @@ -36,17 +36,20 @@ public class OperatorContext implements Closeable { private PhysicalOperator popConfig; private OperatorStats stats; private LongObjectOpenHashMap<DrillBuf> managedBuffers = new LongObjectOpenHashMap<>(); + private final boolean applyFragmentLimit; - public OperatorContext(PhysicalOperator popConfig, FragmentContext context) throws OutOfMemoryException { - this.allocator = context.getNewChildAllocator(popConfig.getInitialAllocation(), popConfig.getMaxAllocation()); + public OperatorContext(PhysicalOperator popConfig, FragmentContext context, boolean applyFragmentLimit) throws OutOfMemoryException { + this.applyFragmentLimit=applyFragmentLimit; + this.allocator = context.getNewChildAllocator(popConfig.getInitialAllocation(), popConfig.getMaxAllocation(), applyFragmentLimit); this.popConfig = popConfig; OpProfileDef def = new OpProfileDef(popConfig.getOperatorId(), popConfig.getOperatorType(), getChildCount(popConfig)); this.stats = context.getStats().getOperatorStats(def, allocator); } - public OperatorContext(PhysicalOperator popConfig, FragmentContext context, OperatorStats stats) throws OutOfMemoryException { - this.allocator = context.getNewChildAllocator(popConfig.getInitialAllocation(), popConfig.getMaxAllocation()); + public OperatorContext(PhysicalOperator popConfig, FragmentContext context, OperatorStats stats, boolean applyFragmentLimit) throws OutOfMemoryException { + this.applyFragmentLimit=applyFragmentLimit; + this.allocator = context.getNewChildAllocator(popConfig.getInitialAllocation(), popConfig.getMaxAllocation(), applyFragmentLimit); this.popConfig = popConfig; this.stats = stats; } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0988c08d/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BaseRootExec.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BaseRootExec.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BaseRootExec.java index c2c3144..412da85 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BaseRootExec.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BaseRootExec.java @@ -33,13 +33,21 @@ public abstract class BaseRootExec implements RootExec { protected OperatorContext oContext = null; public BaseRootExec(FragmentContext context, PhysicalOperator config) throws OutOfMemoryException { - this.oContext = new OperatorContext(config, context, stats); + this.oContext = new OperatorContext(config, context, stats, true); stats = new OperatorStats(new OpProfileDef(config.getOperatorId(), config.getOperatorType(), OperatorContext.getChildCount(config)), oContext.getAllocator()); context.getStats().addOperatorStats(this.stats); } + public BaseRootExec(FragmentContext context, OperatorContext oContext, PhysicalOperator config) throws OutOfMemoryException { + this.oContext = oContext; + stats = new OperatorStats(new OpProfileDef(config.getOperatorId(), + config.getOperatorType(), OperatorContext.getChildCount(config)), + oContext.getAllocator()); + context.getStats().addOperatorStats(this.stats); + } + @Override public final boolean next() { // Stats should have been initialized http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0988c08d/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java index 2712e27..ad8bf96 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java @@ -87,7 +87,8 @@ public class ScanBatch implements RecordBatch { throw new ExecutionSetupException("A scan batch must contain at least one reader."); } this.currentReader = readers.next(); - this.oContext = new OperatorContext(subScanConfig, context); + // Scan Batch is not subject to fragment memory limit + this.oContext = new OperatorContext(subScanConfig, context, false); this.currentReader.setOperatorContext(this.oContext); this.currentReader.setup(mutator); this.partitionColumns = partitionColumns.iterator(); http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0988c08d/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java index 352deae..34196b7 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java @@ -25,6 +25,7 @@ import org.apache.drill.common.exceptions.ExecutionSetupException; import org.apache.drill.exec.memory.OutOfMemoryException; import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.ops.MetricDef; +import org.apache.drill.exec.ops.OperatorContext; import org.apache.drill.exec.physical.config.SingleSender; import org.apache.drill.exec.proto.ExecProtos.FragmentHandle; import org.apache.drill.exec.proto.GeneralRPCProtos.Ack; @@ -67,7 +68,8 @@ public class SingleSenderCreator implements RootCreator<SingleSender>{ } public SingleSenderRootExec(FragmentContext context, RecordBatch batch, SingleSender config) throws OutOfMemoryException { - super(context, config); + super(context, new OperatorContext(config, context, null, false), config); + //super(context, config); this.incoming = batch; assert(incoming != null); this.handle = context.getHandle(); http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0988c08d/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/broadcastsender/BroadcastSenderRootExec.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/broadcastsender/BroadcastSenderRootExec.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/broadcastsender/BroadcastSenderRootExec.java index d09559d..c594e70 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/broadcastsender/BroadcastSenderRootExec.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/broadcastsender/BroadcastSenderRootExec.java @@ -25,6 +25,7 @@ import org.apache.drill.exec.ExecConstants; import org.apache.drill.exec.memory.OutOfMemoryException; import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.ops.MetricDef; +import org.apache.drill.exec.ops.OperatorContext; import org.apache.drill.exec.physical.config.BroadcastSender; import org.apache.drill.exec.physical.impl.BaseRootExec; import org.apache.drill.exec.physical.impl.SendingAccountor; @@ -67,7 +68,8 @@ public class BroadcastSenderRootExec extends BaseRootExec { public BroadcastSenderRootExec(FragmentContext context, RecordBatch incoming, BroadcastSender config) throws OutOfMemoryException { - super(context, config); + super(context, new OperatorContext(config, context, null, false), config); + //super(context, config); this.ok = true; this.context = context; this.incoming = incoming; http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0988c08d/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java index 29fd80f..ed49cf1 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java @@ -44,6 +44,7 @@ import org.apache.drill.exec.expr.fn.FunctionGenerationHelper; import org.apache.drill.exec.memory.OutOfMemoryException; import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.ops.MetricDef; +import org.apache.drill.exec.ops.OperatorContext; import org.apache.drill.exec.physical.config.MergingReceiverPOP; import org.apache.drill.exec.proto.BitControl.FinishedReceiver; import org.apache.drill.exec.proto.ExecProtos.FragmentHandle; @@ -120,8 +121,8 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP> public MergingRecordBatch(FragmentContext context, MergingReceiverPOP config, RawFragmentBatchProvider[] fragProviders) throws OutOfMemoryException { - - super(config, context); + super(config, context, new OperatorContext(config, context, false)); + //super(config, context); this.fragProviders = fragProviders; this.context = context; this.outgoingContainer = new VectorContainer(); http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0988c08d/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java index 6ff0418..2c3e85a 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java @@ -33,6 +33,7 @@ import org.apache.drill.exec.expr.ExpressionTreeMaterializer; import org.apache.drill.exec.memory.OutOfMemoryException; import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.ops.MetricDef; +import org.apache.drill.exec.ops.OperatorContext; import org.apache.drill.exec.physical.config.HashPartitionSender; import org.apache.drill.exec.physical.impl.BaseRootExec; import org.apache.drill.exec.physical.impl.SendingAccountor; @@ -87,7 +88,8 @@ public class PartitionSenderRootExec extends BaseRootExec { public PartitionSenderRootExec(FragmentContext context, RecordBatch incoming, HashPartitionSender operator) throws OutOfMemoryException { - super(context, operator); + super(context, new OperatorContext(operator, context, null, false), operator); + //super(context, operator); this.incoming = incoming; this.operator = operator; this.context = context; http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0988c08d/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java index 8e7d9c6..364fc4f 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java @@ -27,8 +27,9 @@ import org.apache.drill.exec.exception.SchemaChangeException; import org.apache.drill.exec.memory.OutOfMemoryException; import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.ops.MetricDef; -import org.apache.drill.exec.ops.OpProfileDef; +import org.apache.drill.exec.ops.OperatorContext; import org.apache.drill.exec.ops.OperatorStats; +import org.apache.drill.exec.ops.OpProfileDef; import org.apache.drill.exec.physical.config.UnorderedReceiver; import org.apache.drill.exec.proto.BitControl.FinishedReceiver; import org.apache.drill.exec.proto.ExecProtos.FragmentHandle; @@ -58,6 +59,7 @@ public class UnorderedReceiverBatch implements RecordBatch { private OperatorStats stats; private boolean first = true; private UnorderedReceiver config; + OperatorContext oContext; public enum Metric implements MetricDef { BYTES_RECEIVED, @@ -74,7 +76,8 @@ public class UnorderedReceiverBatch implements RecordBatch { this.context = context; // In normal case, batchLoader does not require an allocator. However, in case of splitAndTransfer of a value vector, // we may need an allocator for the new offset vector. Therefore, here we pass the context's allocator to batchLoader. - this.batchLoader = new RecordBatchLoader(context.getAllocator()); + oContext = new OperatorContext(config, context, false); + this.batchLoader = new RecordBatchLoader(oContext.getAllocator()); this.stats = context.getStats().getOperatorStats(new OpProfileDef(config.getOperatorId(), config.getOperatorType(), 1), null); this.stats.setLongStat(Metric.NUM_SENDERS, config.getNumSenders()); @@ -194,6 +197,7 @@ public class UnorderedReceiverBatch implements RecordBatch { public void cleanup() { batchLoader.clear(); fragProvider.cleanup(); + oContext.close(); } @Override http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0988c08d/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java index 52249e9..9c48838 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java @@ -128,7 +128,7 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> { SPILL_DIRECTORIES = config.getStringList(ExecConstants.EXTERNAL_SORT_SPILL_DIRS); dirs = Iterators.cycle(Lists.newArrayList(SPILL_DIRECTORIES)); uid = System.nanoTime(); - copierAllocator = oContext.getAllocator().getChildAllocator(context.getHandle(), 10000000, 20000000); + copierAllocator = oContext.getAllocator().getChildAllocator(context, 10000000, 20000000, true); } @Override @@ -286,6 +286,7 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> { batchesSinceLastSpill++; if ((spillCount > 0 && totalSizeInMemory > .75 * highWaterMark) || (totalSizeInMemory > .95 * popConfig.getMaxAllocation()) || + (totalSizeInMemory > .95 * oContext.getAllocator().getFragmentLimit()) || (batchGroups.size() > SPILL_THRESHOLD && batchesSinceLastSpill >= SPILL_BATCH_GROUP_SIZE)) { mergeAndSpill(); batchesSinceLastSpill = 0; http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0988c08d/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java index e8ad311..a835bee 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java @@ -41,11 +41,21 @@ public abstract class AbstractRecordBatch<T extends PhysicalOperator> implements super(); this.context = context; this.popConfig = popConfig; - this.oContext = new OperatorContext(popConfig, context); + this.oContext = new OperatorContext(popConfig, context, true); this.stats = oContext.getStats(); this.container = new VectorContainer(this.oContext); } + protected AbstractRecordBatch(T popConfig, FragmentContext context, OperatorContext oContext) throws OutOfMemoryException { + super(); + this.context = context; + this.popConfig = popConfig; + this.oContext = oContext; + this.stats = oContext.getStats(); + this.container = new VectorContainer(this.oContext); + } + + @Override public Iterator<VectorWrapper<?>> iterator() { return container.iterator(); http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0988c08d/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java index 5a6ba80..cdb4ba0 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java @@ -97,7 +97,7 @@ public class ParquetRecordWriter extends ParquetOutputRecordWriter { public ParquetRecordWriter(FragmentContext context, ParquetWriter writer) throws OutOfMemoryException{ super(); - this.oContext=new OperatorContext(writer, context); + this.oContext=new OperatorContext(writer, context, true); } @Override http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0988c08d/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/SpoolingRawBatchBuffer.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/SpoolingRawBatchBuffer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/SpoolingRawBatchBuffer.java index c7527d5..6ee93ab 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/SpoolingRawBatchBuffer.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/SpoolingRawBatchBuffer.java @@ -78,7 +78,7 @@ public class SpoolingRawBatchBuffer implements RawBatchBuffer { public SpoolingRawBatchBuffer(FragmentContext context, int fragmentCount) throws IOException, OutOfMemoryException { this.context = context; - this.allocator = context.getNewChildAllocator(ALLOCATOR_INITIAL_RESERVATION, ALLOCATOR_MAX_RESERVATION); + this.allocator = context.getNewChildAllocator(ALLOCATOR_INITIAL_RESERVATION, ALLOCATOR_MAX_RESERVATION, true); this.threshold = context.getConfig().getLong(ExecConstants.SPOOLING_BUFFER_MEMORY); Configuration conf = new Configuration(); conf.set(FileSystem.FS_DEFAULT_NAME_KEY, context.getConfig().getString(ExecConstants.TEMP_FILESYSTEM)); http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0988c08d/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java index 083dd95..a01a5f6 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java @@ -115,8 +115,6 @@ public class QueryManager implements FragmentStatusListener{ // if we do, record the fragment manager in the workBus. workBus.setRootFragmentManager(fragmentManager); } - - } // keep track of intermediate fragments (not root or leaf) @@ -130,6 +128,8 @@ public class QueryManager implements FragmentStatusListener{ sendRemoteFragment(f); } + bee.getContext().getAllocator().resetFragmentLimits(); + logger.debug("Fragment runs setup is complete."); running = true; if (cancelled && !stopped) { http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0988c08d/exec/java-exec/src/test/java/org/apache/drill/exec/memory/TestAllocators.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/memory/TestAllocators.java b/exec/java-exec/src/test/java/org/apache/drill/exec/memory/TestAllocators.java new file mode 100644 index 0000000..c23cc10 --- /dev/null +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/memory/TestAllocators.java @@ -0,0 +1,189 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.drill.exec.memory; + + +import com.google.common.base.Charsets; +import com.google.common.io.Files; +import io.netty.buffer.DrillBuf; +import org.apache.drill.common.config.DrillConfig; +import org.apache.drill.common.util.FileUtils; +import org.apache.drill.exec.ExecConstants; +import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry; +import org.apache.drill.exec.ops.FragmentContext; +import org.apache.drill.exec.ops.OpProfileDef; +import org.apache.drill.exec.ops.OperatorContext; +import org.apache.drill.exec.ops.OperatorStats; +import org.apache.drill.exec.physical.PhysicalPlan; +import org.apache.drill.exec.physical.base.PhysicalOperator; +import org.apache.drill.exec.planner.PhysicalPlanReader; +import org.apache.drill.exec.proto.BitControl; +import org.apache.drill.exec.proto.CoordinationProtos; +import org.apache.drill.exec.proto.UserBitShared; +import org.apache.drill.exec.server.Drillbit; +import org.apache.drill.exec.server.DrillbitContext; +import org.apache.drill.exec.server.RemoteServiceSet; +import org.apache.drill.exec.store.StoragePluginRegistry; +import org.junit.Test; +import static org.junit.Assert.assertEquals; + +import java.util.Iterator; +import java.util.List; +import java.util.Properties; + +public class TestAllocators { + + private static final Properties TEST_CONFIGURATIONS = new Properties() { + { + put(ExecConstants.TOP_LEVEL_MAX_ALLOC, "14000000"); + } + }; + + static String planFile="/physical_allocator_test.json"; + + BufferAllocator rootAllocator; + DrillConfig config; + Drillbit bit; + RemoteServiceSet serviceSet; + DrillbitContext bitContext; + + @Test + public void testAllocators() throws Exception { + + // Setup a drillbit (initializes a root allocator) + + config = DrillConfig.create(TEST_CONFIGURATIONS); + serviceSet = RemoteServiceSet.getLocalServiceSet(); + bit = new Drillbit(config, serviceSet); + bit.run(); + bitContext = bit.getContext(); + FunctionImplementationRegistry functionRegistry = bitContext.getFunctionImplementationRegistry(); + StoragePluginRegistry storageRegistry = new StoragePluginRegistry(bitContext); + + // Create a few Fragment Contexts + + BitControl.PlanFragment.Builder pfBuilder1=BitControl.PlanFragment.newBuilder(); + pfBuilder1.setMemInitial(1500000); + BitControl.PlanFragment pf1=pfBuilder1.build(); + BitControl.PlanFragment.Builder pfBuilder2=BitControl.PlanFragment.newBuilder(); + pfBuilder2.setMemInitial(500000); + BitControl.PlanFragment pf2=pfBuilder1.build(); + + FragmentContext fragmentContext1 = new FragmentContext(bitContext, pf1, null, functionRegistry); + FragmentContext fragmentContext2 = new FragmentContext(bitContext, pf2, null, functionRegistry); + + // Get a few physical operators. Easiest way is to read a physical plan. + PhysicalPlanReader planReader = new PhysicalPlanReader(config, config.getMapper(), CoordinationProtos.DrillbitEndpoint.getDefaultInstance(), storageRegistry); + PhysicalPlan plan = planReader.readPhysicalPlan(Files.toString(FileUtils.getResourceAsFile(planFile), Charsets.UTF_8)); + List<PhysicalOperator> physicalOperators = plan.getSortedOperators(); + Iterator<PhysicalOperator> physicalOperatorIterator = physicalOperators.iterator(); + + PhysicalOperator physicalOperator1 = physicalOperatorIterator.next(); + PhysicalOperator physicalOperator2 = physicalOperatorIterator.next(); + PhysicalOperator physicalOperator3 = physicalOperatorIterator.next(); + PhysicalOperator physicalOperator4 = physicalOperatorIterator.next(); + PhysicalOperator physicalOperator5 = physicalOperatorIterator.next(); + PhysicalOperator physicalOperator6 = physicalOperatorIterator.next(); + + // Create some bogus Operator profile defs and stats to create operator contexts + OpProfileDef def; + OperatorStats stats; + + //Use some bogus operator type to create a new operator context. + def = new OpProfileDef(physicalOperator1.getOperatorId(), UserBitShared.CoreOperatorType.MOCK_SUB_SCAN_VALUE, OperatorContext.getChildCount(physicalOperator1)); + stats = fragmentContext1.getStats().getOperatorStats(def, fragmentContext1.getAllocator()); + + + // Add a couple of Operator Contexts + // Initial allocation = 1000000 bytes for all operators + OperatorContext oContext11 = new OperatorContext(physicalOperator1, fragmentContext1, true); + DrillBuf b11=oContext11.getAllocator().buffer(1000000); + + OperatorContext oContext12 = new OperatorContext(physicalOperator2, fragmentContext1, stats, true); + DrillBuf b12=oContext12.getAllocator().buffer(500000); + + OperatorContext oContext21 = new OperatorContext(physicalOperator3, fragmentContext2, true); + + def = new OpProfileDef(physicalOperator4.getOperatorId(), UserBitShared.CoreOperatorType.TEXT_WRITER_VALUE, OperatorContext.getChildCount(physicalOperator4)); + stats = fragmentContext2.getStats().getOperatorStats(def, fragmentContext2.getAllocator()); + OperatorContext oContext22 = new OperatorContext(physicalOperator4, fragmentContext2, stats, true); + DrillBuf b22=oContext22.getAllocator().buffer(2000000); + + // New Fragment begins + BitControl.PlanFragment.Builder pfBuilder3=BitControl.PlanFragment.newBuilder(); + pfBuilder3.setMemInitial(1000000); + BitControl.PlanFragment pf3=pfBuilder3.build(); + + FragmentContext fragmentContext3 = new FragmentContext(bitContext, pf3, null, functionRegistry); + + // New fragment starts an operator that allocates an amount within the limit + def = new OpProfileDef(physicalOperator5.getOperatorId(), UserBitShared.CoreOperatorType.UNION_VALUE, OperatorContext.getChildCount(physicalOperator5)); + stats = fragmentContext3.getStats().getOperatorStats(def, fragmentContext3.getAllocator()); + OperatorContext oContext31 = new OperatorContext(physicalOperator5, fragmentContext3, stats, true); + + DrillBuf b31a = oContext31.getAllocator().buffer(200000); + + //Previously running operator completes + b22.release(); + oContext22.close(); + + // Fragment 3 asks for more and fails + boolean outOfMem=false; + try { + DrillBuf b31b = oContext31.getAllocator().buffer(4000000); + if(b31b!=null) { + b31b.release(); + }else{ + outOfMem=true; + } + }catch(Exception e){ + outOfMem=true; + } + assertEquals(true, (boolean)outOfMem); + + // Operator is Exempt from Fragment limits. Fragment 3 asks for more and succeeds + outOfMem=false; + OperatorContext oContext32 = new OperatorContext(physicalOperator6, fragmentContext3, false); + DrillBuf b32=null; + try { + b32=oContext32.getAllocator().buffer(4000000); + }catch(Exception e){ + outOfMem=true; + }finally{ + if(b32!=null) { + b32.release(); + }else{ + outOfMem=true; + } + oContext32.close(); + } + assertEquals(false, (boolean)outOfMem); + + b11.release(); + oContext11.close(); + b12.release(); + oContext12.close(); + oContext21.close(); + b31a.release(); + oContext31.close(); + + } + +} + http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0988c08d/exec/java-exec/src/test/resources/physical_allocator_test.json ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/resources/physical_allocator_test.json b/exec/java-exec/src/test/resources/physical_allocator_test.json new file mode 100644 index 0000000..274ba88 --- /dev/null +++ b/exec/java-exec/src/test/resources/physical_allocator_test.json @@ -0,0 +1,50 @@ +{ + head:{ + type:"APACHE_DRILL_PHYSICAL", + version:"1", + generator:{ + type:"manual" + } + }, + graph:[ + { + @id:1, + pop:"mock-sub-scan", + url: "http://apache.org", + entries:[ + {records: 100, types: [ + {name: "blue", type: "INT", mode: "REPEATED"}, + {name: "red", type: "BIGINT", mode: "REPEATED"}, + {name: "green", type: "INT", mode: "REQUIRED"} + ]} + ] + }, + { + @id:2, + pop: "union-exchange", + child: 1 + }, + { + @id:3, + child: 2, + pop:"filter", + expr: "b > 5", + selectivity: 0.8 + }, + { + @id: 4, + child: 3, + pop: "mock-store" + }, + { + @id:5, + child: 4, + pop: "union-exchange" + }, + { + @id: 6, + child: 5, + pop: "screen" + } + ] +}