Repository: hive
Updated Branches:
  refs/heads/master aabe83dbf -> 79eb2243a


HIVE-15693: LLAP: cached threadpool in AMReporter creates too many threads 
leading to OOM (Rajesh Balamohan, reviewed by Siddharth Seth, Lefty Leverenz)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/79eb2243
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/79eb2243
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/79eb2243

Branch: refs/heads/master
Commit: 79eb2243aac6b5b0f5678369d579581a91a1925f
Parents: aabe83d
Author: Rajesh Balamohan <rbalamo...@apache.org>
Authored: Sat Jan 28 04:41:35 2017 +0530
Committer: Rajesh Balamohan <rbalamo...@apache.org>
Committed: Sat Jan 28 04:41:35 2017 +0530

----------------------------------------------------------------------
 .../org/apache/hadoop/hive/conf/HiveConf.java   |  4 ++++
 .../hive/llap/daemon/impl/AMReporter.java       | 20 ++++++++++++++------
 .../hive/llap/daemon/impl/LlapDaemon.java       |  4 +++-
 3 files changed, 21 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/79eb2243/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java 
b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 1e7bfd7..4e83867 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -3081,6 +3081,10 @@ public class HiveConf extends Configuration {
     LLAP_DAEMON_NUM_EXECUTORS("hive.llap.daemon.num.executors", 4,
       "Number of executors to use in LLAP daemon; essentially, the number of 
tasks that can be\n" +
       "executed in parallel.", "llap.daemon.num.executors"),
+    
LLAP_DAEMON_AM_REPORTER_MAX_THREADS("hive.llap.daemon.am-reporter.max.threads", 
4,
+        "Maximum number of threads to be used for AM reporter. If this is 
lower than number of\n" +
+        "executors in llap daemon, it would be set to number of executors at 
runtime.",
+        "llap.daemon.am-reporter.max.threads"),
     LLAP_DAEMON_RPC_PORT("hive.llap.daemon.rpc.port", 0, "The LLAP daemon RPC 
port.",
       "llap.daemon.rpc.port. A value of 0 indicates a dynamic port"),
     
LLAP_DAEMON_MEMORY_PER_INSTANCE_MB("hive.llap.daemon.memory.per.instance.mb", 
4096,

http://git-wip-us.apache.org/repos/asf/hive/blob/79eb2243/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/AMReporter.java
----------------------------------------------------------------------
diff --git 
a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/AMReporter.java 
b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/AMReporter.java
index 027d8eb..93237e6 100644
--- 
a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/AMReporter.java
+++ 
b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/AMReporter.java
@@ -40,6 +40,8 @@ import java.util.concurrent.DelayQueue;
 import java.util.concurrent.Delayed;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -90,8 +92,6 @@ public class AMReporter extends AbstractService {
   Ignore exceptions when communicating with the AM.
   At a later point, report back saying the AM is dead so that tasks can be 
removed from the running queue.
 
-  Use a cachedThreadPool so that a few AMs going down does not affect other 
AppMasters.
-
   Race: When a task completes - it sends out it's message via the regular 
TaskReporter. The AM after this may run another DAG,
   or may die. This may need to be consolidated with the LlapTaskReporter. Try 
ensuring there's no race between the two.
 
@@ -118,15 +118,23 @@ public class AMReporter extends AbstractService {
   volatile ListenableFuture<Void> queueLookupFuture;
   private final DaemonId daemonId;
 
-  public AMReporter(AtomicReference<InetSocketAddress> localAddress,
-      QueryFailedHandler queryFailedHandler, Configuration conf, DaemonId 
daemonId) {
+  public AMReporter(int numExecutors, int maxThreads, 
AtomicReference<InetSocketAddress>
+      localAddress, QueryFailedHandler queryFailedHandler, Configuration conf, 
DaemonId daemonId) {
     super(AMReporter.class.getName());
     this.localAddress = localAddress;
     this.queryFailedHandler = queryFailedHandler;
     this.conf = conf;
     this.daemonId = daemonId;
-    ExecutorService rawExecutor = Executors.newCachedThreadPool(
-        new ThreadFactoryBuilder().setDaemon(true).setNameFormat("AMReporter 
%d").build());
+    if (maxThreads < numExecutors) {
+      maxThreads = numExecutors;
+      LOG.warn("maxThreads={} is less than numExecutors={}. Setting 
maxThreads=numExecutors",
+          maxThreads, numExecutors);
+    }
+    ExecutorService rawExecutor =
+        new ThreadPoolExecutor(numExecutors, maxThreads,
+            60L, TimeUnit.SECONDS,
+            new LinkedBlockingQueue<Runnable>(),
+            new 
ThreadFactoryBuilder().setDaemon(true).setNameFormat("AMReporter %d").build());
     this.executor = MoreExecutors.listeningDecorator(rawExecutor);
     ExecutorService rawExecutor2 = Executors.newFixedThreadPool(1,
         new 
ThreadFactoryBuilder().setDaemon(true).setNameFormat("AMReporterQueueDrainer").build());

http://git-wip-us.apache.org/repos/asf/hive/blob/79eb2243/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java
----------------------------------------------------------------------
diff --git 
a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java 
b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java
index 519bfbd..cca6bc6 100644
--- 
a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java
+++ 
b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java
@@ -254,7 +254,9 @@ public class LlapDaemon extends CompositeService implements 
ContainerRunner, Lla
     LOG.info("Started LlapMetricsSystem with displayName: " + displayName +
         " sessionId: " + sessionId);
 
-    this.amReporter = new AMReporter(srvAddress, new 
QueryFailedHandlerProxy(), daemonConf, daemonId);
+    int maxAmReporterThreads = HiveConf.getIntVar(daemonConf, 
ConfVars.LLAP_DAEMON_AM_REPORTER_MAX_THREADS);
+    this.amReporter = new AMReporter(numExecutors, maxAmReporterThreads, 
srvAddress,
+        new QueryFailedHandlerProxy(), daemonConf, daemonId);
 
     SecretManager sm = null;
     if (UserGroupInformation.isSecurityEnabled()) {

Reply via email to