DRILL-1411: Disable feature by default. Add an overcommit factor to the fragment limit. Can be enabled and overcommit factor can be set as bootstrap parameters.
Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/8773cb0c Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/8773cb0c Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/8773cb0c Branch: refs/heads/master Commit: 8773cb0c0616d13d055220e6372b3659ecfdfdea Parents: 4862b2b Author: Parth Chandra <pchan...@maprtech.com> Authored: Mon Sep 29 16:11:14 2014 -0700 Committer: Steven Phillips <sphill...@maprtech.com> Committed: Mon Sep 29 21:19:23 2014 -0700 ---------------------------------------------------------------------- .../java/io/netty/buffer/FakeAllocator.java | 2 +- .../org/apache/drill/exec/ExecConstants.java | 4 ++ .../org/apache/drill/exec/memory/Accountor.java | 65 +++++++++++++++++++- .../drill/exec/memory/AtomicRemainder.java | 13 +++- .../drill/exec/memory/TopLevelAllocator.java | 12 ++-- .../drill/exec/memory/TestAllocators.java | 2 + 6 files changed, 88 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8773cb0c/exec/java-exec/src/main/java/io/netty/buffer/FakeAllocator.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/io/netty/buffer/FakeAllocator.java b/exec/java-exec/src/main/java/io/netty/buffer/FakeAllocator.java index 9ebf3e7..df85e74 100644 --- a/exec/java-exec/src/main/java/io/netty/buffer/FakeAllocator.java +++ b/exec/java-exec/src/main/java/io/netty/buffer/FakeAllocator.java @@ -94,7 +94,7 @@ class FakeAllocator implements BufferAllocator { static class FakeAccountor extends Accountor { public FakeAccountor() { - super(false, null, null, 0, 0, true); + super(null, false, null, null, 0, 0, true); } @Override http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8773cb0c/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java index 617115e..f01f577 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java @@ -74,6 +74,10 @@ public interface ExecConstants { public static final String SYS_STORE_PROVIDER_LOCAL_PATH = "drill.exec.sys.store.provider.local.path"; public static final String SYS_STORE_PROVIDER_LOCAL_ENABLE_WRITE = "drill.exec.sys.store.provider.local.write"; public static final String ERROR_ON_MEMORY_LEAK = "drill.exec.debug.error_on_leak"; + /** Fragment memory planning */ + public static final String ENABLE_FRAGMENT_MEMORY_LIMIT = "drill.exec.memory.enable_frag_limit"; + public static final String FRAGMENT_MEM_OVERCOMMIT_FACTOR = "drill.exec.memory.frag_mem_overcommit_factor"; + public static final String CLIENT_SUPPORT_COMPLEX_TYPES = "drill.client.supports-complex-types"; http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8773cb0c/exec/java-exec/src/main/java/org/apache/drill/exec/memory/Accountor.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/memory/Accountor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/memory/Accountor.java index 18c5072..bd64640 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/memory/Accountor.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/memory/Accountor.java @@ -20,6 +20,7 @@ package org.apache.drill.exec.memory; import com.google.common.collect.LinkedListMultimap; import com.google.common.collect.Maps; import com.google.common.collect.Multimap; +import com.typesafe.config.ConfigException; import io.netty.buffer.ByteBuf; import io.netty.buffer.DrillBuf; @@ -29,9 +30,12 @@ import java.util.Collection; import java.util.List; import java.util.concurrent.ConcurrentMap; +import org.apache.drill.common.config.DrillConfig; +import org.apache.drill.exec.ExecConstants; import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.proto.ExecProtos.FragmentHandle; import org.apache.drill.exec.proto.helper.QueryIdHelper; +import org.apache.drill.exec.server.options.OptionValue; import org.apache.drill.exec.util.AssertionUtil; import java.util.Arrays; @@ -48,7 +52,16 @@ public class Accountor { private final FragmentHandle handle; private String fragmentStr; private Accountor parent; + private final boolean errorOnLeak; + // some operators are no subject to the fragment limit. They set the applyFragmentLimit to false + + private final boolean enableFragmentLimit; + private final double fragmentMemOvercommitFactor; + + private final boolean DEFAULT_ENABLE_FRAGMENT_LIMIT=false; + private final double DEFAULT_FRAGMENT_MEM_OVERCOMMIT_FACTOR=1.5; + private final boolean applyFragmentLimit; private final FragmentContext fragmentContext; @@ -58,12 +71,28 @@ public class Accountor { // This enables the top level accountor to calculate a new fragment limit whenever necessary. private final List<FragmentContext> fragmentContexts; - public Accountor(boolean errorOnLeak, FragmentContext context, Accountor parent, long max, long preAllocated, boolean applyFragLimit) { + public Accountor(DrillConfig config, boolean errorOnLeak, FragmentContext context, Accountor parent, long max, long preAllocated, boolean applyFragLimit) { // TODO: fix preallocation stuff this.errorOnLeak = errorOnLeak; AtomicRemainder parentRemainder = parent != null ? parent.remainder : null; this.parent = parent; + + boolean enableFragmentLimit; + double fragmentMemOvercommitFactor; + + try { + enableFragmentLimit = config.getBoolean(ExecConstants.ENABLE_FRAGMENT_MEMORY_LIMIT); + fragmentMemOvercommitFactor = config.getDouble(ExecConstants.FRAGMENT_MEM_OVERCOMMIT_FACTOR); + }catch(Exception e){ + enableFragmentLimit = DEFAULT_ENABLE_FRAGMENT_LIMIT; + fragmentMemOvercommitFactor = DEFAULT_FRAGMENT_MEM_OVERCOMMIT_FACTOR; + } + this.enableFragmentLimit = enableFragmentLimit; + this.fragmentMemOvercommitFactor = fragmentMemOvercommitFactor; + + this.applyFragmentLimit=applyFragLimit; + this.remainder = new AtomicRemainder(errorOnLeak, parentRemainder, max, preAllocated, applyFragmentLimit); this.total = max; this.fragmentContext=context; @@ -187,6 +216,10 @@ public class Accountor { public long resetFragmentLimits(){ // returns the new capacity + if(!this.enableFragmentLimit){ + return getCapacity(); + } + if(parent!=null){ parent.resetFragmentLimits(); }else { @@ -209,7 +242,35 @@ public class Accountor { } long rem=(total-allocatedMemory)/nFragments; for(FragmentContext fragment: fragmentContexts){ - fragment.setFragmentLimit(rem); + fragment.setFragmentLimit((long)(rem*fragmentMemOvercommitFactor)); + } + if(logger.isDebugEnabled()){ + StringBuffer sb= new StringBuffer(); + sb.append("[root](0:0)"); + sb.append("Allocated memory: "); + sb.append(this.getAllocation()); + sb.append("Fragment Limit : "); + sb.append(this.getFragmentLimit()); + logger.debug(sb.toString()); + for(FragmentContext fragment: fragmentContexts){ + sb= new StringBuffer(); + if (handle != null) { + sb.append("["); + sb.append(QueryIdHelper.getQueryId(handle.getQueryId())); + sb.append("]("); + sb.append(handle.getMajorFragmentId()); + sb.append(":"); + sb.append(handle.getMinorFragmentId()); + sb.append(")"); + }else{ + sb.append("[root](0:0)"); + } + sb.append("Allocated memory: "); + sb.append(fragment.getAllocator().getAllocatedMemory()); + sb.append("Fragment Limit : "); + sb.append(fragment.getAllocator().getFragmentLimit()); + logger.debug(sb.toString()); + } } } } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8773cb0c/exec/java-exec/src/main/java/org/apache/drill/exec/memory/AtomicRemainder.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/memory/AtomicRemainder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/memory/AtomicRemainder.java index 6a87ab4..6771497 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/memory/AtomicRemainder.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/memory/AtomicRemainder.java @@ -104,8 +104,17 @@ public class AtomicRemainder { // If we need to allocate memory beyond the allowed Fragment Limit if(applyFragmentLimitForChild && this.applyFragmentLimit && this.hasLimit && (getUsed()+size > this.limit)){ - logger.debug("No more memory. Fragment limit ("+this.limit + - " bytes) reached. Trying to allocate "+size+ " bytes. "+getUsed()+" bytes already allocated."); + if (parent != null) { + parent.returnAllocation(size); + } + StackTraceElement[] ste = (new Throwable()).getStackTrace(); + StringBuffer sb = new StringBuffer(); + for (StackTraceElement s : ste) { + sb.append(s.toString()); + sb.append("\n"); + } + logger.error("No more memory. Fragment limit ("+this.limit + + " bytes) reached. Trying to allocate "+size+ " bytes. "+getUsed()+" bytes already allocated.\n"+sb.toString()); return false; } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8773cb0c/exec/java-exec/src/main/java/org/apache/drill/exec/memory/TopLevelAllocator.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/memory/TopLevelAllocator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/memory/TopLevelAllocator.java index 583b388..32ec37f 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/memory/TopLevelAllocator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/memory/TopLevelAllocator.java @@ -44,6 +44,7 @@ public class TopLevelAllocator implements BufferAllocator { private final Accountor acct; private final boolean errorOnLeak; private final DrillBuf empty; + private final DrillConfig config; @Deprecated public TopLevelAllocator() { @@ -52,18 +53,19 @@ public class TopLevelAllocator implements BufferAllocator { @Deprecated public TopLevelAllocator(long maximumAllocation) { - this(maximumAllocation, true); + this(null, maximumAllocation, true); } - private TopLevelAllocator(long maximumAllocation, boolean errorOnLeak){ + private TopLevelAllocator(DrillConfig config, long maximumAllocation, boolean errorOnLeak){ + this.config=(config!=null) ? config : DrillConfig.create(); this.errorOnLeak = errorOnLeak; - this.acct = new Accountor(errorOnLeak, null, null, maximumAllocation, 0, true); + this.acct = new Accountor(config, errorOnLeak, null, null, maximumAllocation, 0, true); this.empty = DrillBuf.getEmpty(this, acct); this.childrenMap = ENABLE_ACCOUNTING ? new IdentityHashMap<ChildAllocator, StackTraceElement[]>() : null; } public TopLevelAllocator(DrillConfig config) { - this(Math.min(DrillConfig.getMaxDirectMemory(), config.getLong(ExecConstants.TOP_LEVEL_MAX_ALLOC)), + this(config, Math.min(DrillConfig.getMaxDirectMemory(), config.getLong(ExecConstants.TOP_LEVEL_MAX_ALLOC)), config.getBoolean(ExecConstants.ERROR_ON_MEMORY_LEAK) ); } @@ -178,7 +180,7 @@ public class TopLevelAllocator implements BufferAllocator { boolean applyFragmentLimit) throws OutOfMemoryException{ assert max >= pre; this.applyFragmentLimit=applyFragmentLimit; - childAcct = new Accountor(errorOnLeak, context, parentAccountor, max, pre, applyFragmentLimit); + childAcct = new Accountor(context.getConfig(), errorOnLeak, context, parentAccountor, max, pre, applyFragmentLimit); this.fragmentContext=context; this.handle = context.getHandle(); thisMap = map; http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8773cb0c/exec/java-exec/src/test/java/org/apache/drill/exec/memory/TestAllocators.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/memory/TestAllocators.java b/exec/java-exec/src/test/java/org/apache/drill/exec/memory/TestAllocators.java index c23cc10..de54f6a 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/memory/TestAllocators.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/memory/TestAllocators.java @@ -52,6 +52,8 @@ public class TestAllocators { private static final Properties TEST_CONFIGURATIONS = new Properties() { { put(ExecConstants.TOP_LEVEL_MAX_ALLOC, "14000000"); + put(ExecConstants.ENABLE_FRAGMENT_MEMORY_LIMIT, "true"); + put(ExecConstants.FRAGMENT_MEM_OVERCOMMIT_FACTOR, "1.0"); } };