Repository: hive Updated Branches: refs/heads/master af606ffd4 -> de532b1f9
HIVE-15959 : LLAP: fix headroom calculation and move it to daemon (Sergey Shelukhin, reviewed by Siddharth Seth) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/de532b1f Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/de532b1f Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/de532b1f Branch: refs/heads/master Commit: de532b1f9bb21daa668dac0f2b4f2429c9b4bd37 Parents: af606ff Author: Sergey Shelukhin <[email protected]> Authored: Tue Feb 21 13:56:17 2017 -0800 Committer: Sergey Shelukhin <[email protected]> Committed: Tue Feb 21 13:56:17 2017 -0800 ---------------------------------------------------------------------- .../org/apache/hadoop/hive/conf/HiveConf.java | 12 ++-- .../hadoop/hive/llap/cli/LlapServiceDriver.java | 64 ++++++++------------ .../hive/llap/daemon/impl/LlapDaemon.java | 49 +++++++++++---- llap-server/src/main/resources/package.py | 8 +-- .../hive/llap/daemon/MiniLlapCluster.java | 2 +- 5 files changed, 72 insertions(+), 63 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/de532b1f/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java ---------------------------------------------------------------------- diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 1af59ba..4faaa8b 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -381,7 +381,7 @@ public class HiveConf extends Configuration { llapDaemonVarsSetLocal.add(ConfVars.LLAP_DAEMON_NUM_EXECUTORS.varname); llapDaemonVarsSetLocal.add(ConfVars.LLAP_DAEMON_RPC_PORT.varname); llapDaemonVarsSetLocal.add(ConfVars.LLAP_DAEMON_MEMORY_PER_INSTANCE_MB.varname); - llapDaemonVarsSetLocal.add(ConfVars.LLAP_DAEMON_HEADROOM_MEMORY_PER_INSTANCE_MB.varname); + llapDaemonVarsSetLocal.add(ConfVars.LLAP_DAEMON_XMX_HEADROOM.varname); llapDaemonVarsSetLocal.add(ConfVars.LLAP_DAEMON_VCPUS_PER_INSTANCE.varname); llapDaemonVarsSetLocal.add(ConfVars.LLAP_DAEMON_NUM_FILE_CLEANER_THREADS.varname); llapDaemonVarsSetLocal.add(ConfVars.LLAP_FILE_CLEANUP_DELAY_SECONDS.varname); @@ -3072,11 +3072,11 @@ public class HiveConf extends Configuration { LLAP_DAEMON_MEMORY_PER_INSTANCE_MB("hive.llap.daemon.memory.per.instance.mb", 4096, "The total amount of memory to use for the executors inside LLAP (in megabytes).", "llap.daemon.memory.per.instance.mb"), - LLAP_DAEMON_HEADROOM_MEMORY_PER_INSTANCE_MB("hive.llap.daemon.headroom.memory.per.instance.mb", 512, - "The total amount of memory deducted from daemon memory required for other LLAP services. The remaining memory" + - " will be used by the executors. If the cache is off-heap, Executor memory + Headroom memory = Xmx. If the " + - "cache is on-heap, Executor memory + Cache memory + Headroom memory = Xmx. The headroom memory has to be " + - "minimum of 5% from the daemon memory."), + LLAP_DAEMON_XMX_HEADROOM("hive.llap.daemon.xmx.headroom", "5%", + "The total amount of heap memory set aside by LLAP and not used by the executors. Can\n" + + "be specified as size (e.g. '512Mb'), or percentage (e.g. '5%'). Note that the latter is\n" + + "derived from the total daemon XMX, which can be different from the total executor\n" + + "memory if the cache is on-heap; although that's not the default configuration."), LLAP_DAEMON_VCPUS_PER_INSTANCE("hive.llap.daemon.vcpus.per.instance", 4, "The total number of vcpus to use for the executors inside LLAP.", "llap.daemon.vcpus.per.instance"), http://git-wip-us.apache.org/repos/asf/hive/blob/de532b1f/llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapServiceDriver.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapServiceDriver.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapServiceDriver.java index a93d53a..e8517ab 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapServiceDriver.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapServiceDriver.java @@ -18,7 +18,6 @@ package org.apache.hadoop.hive.llap.cli; -import java.io.File; import java.io.IOException; import java.io.InputStream; import java.io.OutputStreamWriter; @@ -48,17 +47,10 @@ import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.hive.llap.LlapUtil; import org.apache.hadoop.hive.llap.configuration.LlapDaemonConfiguration; import org.apache.hadoop.hive.llap.daemon.impl.LlapConstants; -import org.apache.hadoop.hive.llap.daemon.impl.LlapDaemon; import org.apache.hadoop.hive.llap.daemon.impl.StaticPermanentFunctionChecker; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos; import org.apache.hadoop.hive.llap.tezplugins.LlapTezUtils; import org.apache.hadoop.registry.client.binding.RegistryUtils; -import org.apache.slider.client.SliderClient; -import org.apache.slider.common.params.ActionCreateArgs; -import org.apache.slider.common.params.ActionDestroyArgs; -import org.apache.slider.common.params.ActionFreezeArgs; -import org.apache.slider.common.params.ActionInstallPackageArgs; -import org.apache.slider.core.exceptions.UnknownApplicationInstanceException; import org.apache.tez.dag.api.TezConfiguration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -84,7 +76,6 @@ import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.apache.hadoop.yarn.exceptions.YarnException; import org.eclipse.jetty.server.ssl.SslSocketConnector; import org.joda.time.DateTime; import org.json.JSONException; @@ -244,6 +235,7 @@ public class LlapServiceDriver { HiveConf.setVar(conf, ConfVars.LLAP_DAEMON_LOGGER, options.getLogger()); propsDirectOptions.setProperty(ConfVars.LLAP_DAEMON_LOGGER.varname, options.getLogger()); } + boolean isDirect = HiveConf.getBoolVar(conf, HiveConf.ConfVars.LLAP_ALLOCATOR_DIRECT); if (options.getSize() != -1) { if (options.getCache() != -1) { @@ -263,8 +255,7 @@ public class LlapServiceDriver { + " smaller than the container sizing (" + LlapUtil.humanReadableByteCount(options.getSize()) + ")"); } - if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.LLAP_ALLOCATOR_DIRECT) - && false == HiveConf.getBoolVar(conf, HiveConf.ConfVars.LLAP_ALLOCATOR_MAPPED)) { + if (isDirect && !HiveConf.getBoolVar(conf, HiveConf.ConfVars.LLAP_ALLOCATOR_MAPPED)) { // direct and not memory mapped Preconditions.checkArgument(options.getXmx() + options.getCache() <= options.getSize(), "Working memory (Xmx=" + LlapUtil.humanReadableByteCount(options.getXmx()) + ") + cache size (" @@ -273,19 +264,6 @@ public class LlapServiceDriver { } } - // This parameter is read in package.py - and nowhere else. Does not need to be part of - // HiveConf - that's just confusing. - final long minAlloc = conf.getInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, -1); - long containerSize = -1; - if (options.getSize() != -1) { - containerSize = options.getSize() / (1024 * 1024); - Preconditions.checkArgument(containerSize >= minAlloc, "Container size (" - + LlapUtil.humanReadableByteCount(options.getSize()) + ") should be greater" - + " than minimum allocation(" + LlapUtil.humanReadableByteCount(minAlloc * 1024L * 1024L) + ")"); - conf.setLong(ConfVars.LLAP_DAEMON_YARN_CONTAINER_MB.varname, containerSize); - propsDirectOptions.setProperty(ConfVars.LLAP_DAEMON_YARN_CONTAINER_MB.varname, - String.valueOf(containerSize)); - } if (options.getExecutors() != -1) { conf.setLong(ConfVars.LLAP_DAEMON_NUM_EXECUTORS.varname, options.getExecutors()); @@ -319,17 +297,30 @@ public class LlapServiceDriver { String.valueOf(xmxMb)); } - final long currentHeadRoom = options.getSize() - options.getXmx() - options.getCache(); - final long minHeadRoom = (long) (options.getXmx() * LlapDaemon.MIN_HEADROOM_PERCENT); - final long headRoom = currentHeadRoom < minHeadRoom ? minHeadRoom : currentHeadRoom; - final long headRoomMb = headRoom / (1024L * 1024L); - conf.setLong(ConfVars.LLAP_DAEMON_HEADROOM_MEMORY_PER_INSTANCE_MB.varname, headRoomMb); - propsDirectOptions.setProperty(ConfVars.LLAP_DAEMON_HEADROOM_MEMORY_PER_INSTANCE_MB.varname, - String.valueOf(headRoomMb)); - - LOG.info("Memory settings: container memory: {} executor memory: {} cache memory: {} headroom memory: {}", - LlapUtil.humanReadableByteCount(options.getSize()), LlapUtil.humanReadableByteCount(options.getXmx()), - LlapUtil.humanReadableByteCount(options.getCache()), LlapUtil.humanReadableByteCount(headRoom)); + long size = options.getSize(); + if (size == -1) { + long heapSize = xmx; + if (!isDirect) { + heapSize += cache; + } + size = Math.min((long)(heapSize * 1.2), heapSize + 1024L*1024*1024); + if (isDirect) { + size += cache; + } + } + long containerSize = size / (1024 * 1024); + final long minAlloc = conf.getInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, -1); + Preconditions.checkArgument(containerSize >= minAlloc, "Container size (" + + LlapUtil.humanReadableByteCount(options.getSize()) + ") should be greater" + + " than minimum allocation(" + LlapUtil.humanReadableByteCount(minAlloc * 1024L * 1024L) + ")"); + conf.setLong(ConfVars.LLAP_DAEMON_YARN_CONTAINER_MB.varname, containerSize); + propsDirectOptions.setProperty(ConfVars.LLAP_DAEMON_YARN_CONTAINER_MB.varname, + String.valueOf(containerSize)); + + LOG.info("Memory settings: container memory: {} executor memory: {} cache memory: {}", + LlapUtil.humanReadableByteCount(options.getSize()), + LlapUtil.humanReadableByteCount(options.getXmx()), + LlapUtil.humanReadableByteCount(options.getCache())); if (options.getLlapQueueName() != null && !options.getLlapQueueName().isEmpty()) { conf.set(ConfVars.LLAP_DAEMON_QUEUE_NAME.varname, options.getLlapQueueName()); @@ -642,9 +633,6 @@ public class LlapServiceDriver { configs.put(ConfVars.LLAP_DAEMON_MEMORY_PER_INSTANCE_MB.varname, HiveConf.getIntVar(conf, ConfVars.LLAP_DAEMON_MEMORY_PER_INSTANCE_MB)); - configs.put(ConfVars.LLAP_DAEMON_HEADROOM_MEMORY_PER_INSTANCE_MB.varname, - HiveConf.getIntVar(conf, ConfVars.LLAP_DAEMON_HEADROOM_MEMORY_PER_INSTANCE_MB)); - configs.put(ConfVars.LLAP_DAEMON_VCPUS_PER_INSTANCE.varname, HiveConf.getIntVar(conf, ConfVars.LLAP_DAEMON_VCPUS_PER_INSTANCE)); http://git-wip-us.apache.org/repos/asf/hive/blob/de532b1f/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java index e737fdd..fc9f530 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java @@ -83,7 +83,6 @@ import com.google.common.primitives.Ints; public class LlapDaemon extends CompositeService implements ContainerRunner, LlapDaemonMXBean { private static final Logger LOG = LoggerFactory.getLogger(LlapDaemon.class); - public static final double MIN_HEADROOM_PERCENT = 0.05; private final Configuration shuffleHandlerConf; private final SecretManager secretManager; @@ -114,7 +113,7 @@ public class LlapDaemon extends CompositeService implements ContainerRunner, Lla public LlapDaemon(Configuration daemonConf, int numExecutors, long executorMemoryBytes, boolean ioEnabled, boolean isDirectCache, long ioMemoryBytes, String[] localDirs, int srvPort, - int mngPort, int shufflePort, int webPort, String appName, final long headRoomBytes) { + int mngPort, int shufflePort, int webPort, String appName) { super("LlapDaemon"); printAsciiArt(); @@ -158,11 +157,9 @@ public class LlapDaemon extends CompositeService implements ContainerRunner, Lla this.maxJvmMemory = getTotalHeapSize(); this.llapIoEnabled = ioEnabled; - Preconditions.checkArgument(headRoomBytes < executorMemoryBytes, "LLAP daemon headroom size should be less " + - "than daemon max memory size. headRoomBytes: " + headRoomBytes + " executorMemoryBytes: " + executorMemoryBytes); - final long minHeadRoomBytes = (long) (executorMemoryBytes * MIN_HEADROOM_PERCENT); - final long headroom = headRoomBytes < minHeadRoomBytes ? minHeadRoomBytes : headRoomBytes; - this.executorMemoryPerInstance = executorMemoryBytes - headroom; + + long xmxHeadRoomBytes = determineXmxHeadroom(daemonConf, executorMemoryBytes, maxJvmMemory); + this.executorMemoryPerInstance = executorMemoryBytes - xmxHeadRoomBytes; this.ioMemoryPerInstance = ioMemoryBytes; this.numExecutors = numExecutors; this.localDirs = localDirs; @@ -173,11 +170,14 @@ public class LlapDaemon extends CompositeService implements ContainerRunner, Lla boolean enablePreemption = HiveConf.getBoolVar( daemonConf, ConfVars.LLAP_DAEMON_TASK_SCHEDULER_ENABLE_PREEMPTION); LOG.warn("Attempting to start LlapDaemonConf with the following configuration: " + - "maxJvmMemory=" + maxJvmMemory + " (" + LlapUtil.humanReadableByteCount(maxJvmMemory) + ")" + + "maxJvmMemory=" + maxJvmMemory + " (" + + LlapUtil.humanReadableByteCount(maxJvmMemory) + ")" + ", requestedExecutorMemory=" + executorMemoryBytes + " (" + LlapUtil.humanReadableByteCount(executorMemoryBytes) + ")" + - ", llapIoCacheSize=" + ioMemoryBytes + " (" + LlapUtil.humanReadableByteCount(ioMemoryBytes) + ")" + - ", headRoomMemory=" + headroom + " (" + LlapUtil.humanReadableByteCount(headroom) + ")" + + ", llapIoCacheSize=" + ioMemoryBytes + " (" + + LlapUtil.humanReadableByteCount(ioMemoryBytes) + ")" + + ", xmxHeadRoomMemory=" + xmxHeadRoomBytes + " (" + + LlapUtil.humanReadableByteCount(xmxHeadRoomBytes) + ")" + ", adjustedExecutorMemory=" + executorMemoryPerInstance + " (" + LlapUtil.humanReadableByteCount(executorMemoryPerInstance) + ")" + ", numExecutors=" + numExecutors + @@ -293,6 +293,30 @@ public class LlapDaemon extends CompositeService implements ContainerRunner, Lla addIfService(amReporter); } + private static long determineXmxHeadroom( + Configuration daemonConf, long executorMemoryBytes, long maxJvmMemory) { + String headroomStr = HiveConf.getVar(daemonConf, ConfVars.LLAP_DAEMON_XMX_HEADROOM).trim(); + long xmxHeadRoomBytes = Long.MAX_VALUE; + try { + if (headroomStr.endsWith("%")) { + long percentage = Integer.parseInt(headroomStr.substring(0, headroomStr.length() - 1)); + Preconditions.checkState(percentage >= 0 && percentage < 100, + "Headroom percentage should be in [0, 100) range; found " + headroomStr); + xmxHeadRoomBytes = maxJvmMemory * percentage / 100L; + } else { + xmxHeadRoomBytes = HiveConf.toSizeBytes(headroomStr); + } + } catch (NumberFormatException ex) { + throw new RuntimeException("Invalid headroom configuration " + headroomStr); + } + + Preconditions.checkArgument(xmxHeadRoomBytes < executorMemoryBytes, + "LLAP daemon headroom size should be less than daemon max memory size. headRoomBytes: " + + xmxHeadRoomBytes + " executorMemoryBytes: " + executorMemoryBytes + " (derived from " + + headroomStr + " out of xmx of " + maxJvmMemory + ")"); + return xmxHeadRoomBytes; + } + private static void initializeLogging(final Configuration conf) { long start = System.currentTimeMillis(); URL llap_l4j2 = LlapDaemon.class.getClassLoader().getResource( @@ -467,15 +491,14 @@ public class LlapDaemon extends CompositeService implements ContainerRunner, Lla int webPort = HiveConf.getIntVar(daemonConf, ConfVars.LLAP_DAEMON_WEB_PORT); long executorMemoryBytes = HiveConf.getIntVar( daemonConf, ConfVars.LLAP_DAEMON_MEMORY_PER_INSTANCE_MB) * 1024l * 1024l; - long headroomBytes = HiveConf.getIntVar( - daemonConf, ConfVars.LLAP_DAEMON_HEADROOM_MEMORY_PER_INSTANCE_MB) * 1024l * 1024l; long ioMemoryBytes = HiveConf.getSizeVar(daemonConf, ConfVars.LLAP_IO_MEMORY_MAX_SIZE); boolean isDirectCache = HiveConf.getBoolVar(daemonConf, ConfVars.LLAP_ALLOCATOR_DIRECT); boolean isLlapIo = HiveConf.getBoolVar(daemonConf, HiveConf.ConfVars.LLAP_IO_ENABLED, true); + LlapDaemon.initializeLogging(daemonConf); llapDaemon = new LlapDaemon(daemonConf, numExecutors, executorMemoryBytes, isLlapIo, isDirectCache, ioMemoryBytes, localDirs, rpcPort, mngPort, shufflePort, webPort, - appName, headroomBytes); + appName); LOG.info("Adding shutdown hook for LlapDaemon"); ShutdownHookManager.addShutdownHook(new CompositeServiceShutdownHook(llapDaemon), 1); http://git-wip-us.apache.org/repos/asf/hive/blob/de532b1f/llap-server/src/main/resources/package.py ---------------------------------------------------------------------- diff --git a/llap-server/src/main/resources/package.py b/llap-server/src/main/resources/package.py index 66648b6..8a378ef 100644 --- a/llap-server/src/main/resources/package.py +++ b/llap-server/src/main/resources/package.py @@ -20,17 +20,15 @@ class LlapResource(object): # convert to Mb self.cache = config["hive.llap.io.memory.size"] / (1024*1024.0) self.direct = config["hive.llap.io.allocator.direct"] - self.min_mb = -1 self.min_cores = -1 # compute heap + cache as final Xmx h = self.memory if (not self.direct): h += self.cache if size == -1: - c = min(h*1.2, h + 1024) # + 1024 or 20% - c += (self.direct and self.cache) or 0 - if self.min_mb > 0: - c = c + c%self.min_mb + print "Cannot determine the container size" + sys.exit(1) + return else: # do not mess with user input c = size http://git-wip-us.apache.org/repos/asf/hive/blob/de532b1f/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/MiniLlapCluster.java ---------------------------------------------------------------------- diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/MiniLlapCluster.java b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/MiniLlapCluster.java index a9b23b6..06f6dac 100644 --- a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/MiniLlapCluster.java +++ b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/MiniLlapCluster.java @@ -164,7 +164,7 @@ public class MiniLlapCluster extends AbstractService { LOG.info("Initializing {} llap instances for MiniLlapCluster with name={}", numInstances, clusterNameTrimmed); for (int i = 0 ;i < numInstances ; i++) { llapDaemons[i] = new LlapDaemon(conf, numExecutorsPerService, execBytesPerService, llapIoEnabled, - ioIsDirect, ioBytesPerService, localDirs, rpcPort, mngPort, shufflePort, webPort, clusterNameTrimmed, 0); + ioIsDirect, ioBytesPerService, localDirs, rpcPort, mngPort, shufflePort, webPort, clusterNameTrimmed); llapDaemons[i].init(new Configuration(conf)); } LOG.info("Initialized {} llap instances for MiniLlapCluster with name={}", numInstances, clusterNameTrimmed);
