HBASE-15784 Misuse core/maxPoolSize of LinkedBlockingQueue in
ThreadPoolExecutor (Jingcheng Du)


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/7b5d5394
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/7b5d5394
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/7b5d5394

Branch: refs/heads/HBASE-14850
Commit: 7b5d5394c08e4d5a18c8f9fc62b45930bf892f41
Parents: b2b3b1f
Author: Ramkrishna <[email protected]>
Authored: Wed May 18 12:40:43 2016 +0530
Committer: Ramkrishna <[email protected]>
Committed: Wed May 18 12:40:43 2016 +0530

----------------------------------------------------------------------
 .../hadoop/hbase/client/ConnectionImplementation.java     | 10 ++++++----
 hbase-client/src/test/resources/hbase-site.xml            |  4 ----
 .../hbase/replication/regionserver/HFileReplicator.java   |  2 +-
 .../regionserver/RegionReplicaReplicationEndpoint.java    |  6 +-----
 .../org/apache/hadoop/hbase/util/MultiHConnection.java    |  6 +-----
 .../org/apache/hadoop/hbase/util/TestHBaseFsckMOB.java    |  1 -
 .../org/apache/hadoop/hbase/util/TestHBaseFsckOneRS.java  |  1 -
 .../apache/hadoop/hbase/util/TestHBaseFsckReplicas.java   |  1 -
 .../org/apache/hadoop/hbase/util/TestHBaseFsckTwoRS.java  |  1 -
 hbase-server/src/test/resources/hbase-site.xml            |  4 ----
 .../hadoop/hbase/thrift/TBoundedThreadPoolServer.java     | 10 +++++++---
 .../apache/hadoop/hbase/thrift/ThriftServerRunner.java    |  6 ++++--
 12 files changed, 20 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/7b5d5394/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java
index 9a7dfc7..429e47d 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java
@@ -352,8 +352,8 @@ class ConnectionImplementation implements 
ClusterConnection, Closeable {
     if (batchPool == null) {
       synchronized (this) {
         if (batchPool == null) {
-          this.batchPool = 
getThreadPool(conf.getInt("hbase.hconnection.threads.max", 256),
-              conf.getInt("hbase.hconnection.threads.core", 256), "-shared", 
null);
+          int threads = conf.getInt("hbase.hconnection.threads.max", 256);
+          this.batchPool = getThreadPool(threads, threads, "-shared", null);
           this.cleanupPool = true;
         }
       }
@@ -377,6 +377,7 @@ class ConnectionImplementation implements 
ClusterConnection, Closeable {
         new LinkedBlockingQueue<Runnable>(maxThreads *
             conf.getInt(HConstants.HBASE_CLIENT_MAX_TOTAL_TASKS,
                 HConstants.DEFAULT_HBASE_CLIENT_MAX_TOTAL_TASKS));
+      coreThreads = maxThreads;
     }
     ThreadPoolExecutor tpe = new ThreadPoolExecutor(
         coreThreads,
@@ -397,9 +398,10 @@ class ConnectionImplementation implements 
ClusterConnection, Closeable {
           //To start with, threads.max.core threads can hit the meta 
(including replicas).
           //After that, requests will get queued up in the passed queue, and 
only after
           //the queue is full, a new thread will be started
+          int threads = 
conf.getInt("hbase.hconnection.meta.lookup.threads.max", 128);
           this.metaLookupPool = getThreadPool(
-             conf.getInt("hbase.hconnection.meta.lookup.threads.max", 128),
-             conf.getInt("hbase.hconnection.meta.lookup.threads.core", 10),
+             threads,
+             threads,
              "-metaLookup-shared-", new LinkedBlockingQueue<Runnable>());
         }
       }

http://git-wip-us.apache.org/repos/asf/hbase/blob/7b5d5394/hbase-client/src/test/resources/hbase-site.xml
----------------------------------------------------------------------
diff --git a/hbase-client/src/test/resources/hbase-site.xml 
b/hbase-client/src/test/resources/hbase-site.xml
index 5788238..99d2ab8 100644
--- a/hbase-client/src/test/resources/hbase-site.xml
+++ b/hbase-client/src/test/resources/hbase-site.xml
@@ -26,10 +26,6 @@
     <value>true</value>
   </property>
   <property>
-    <name>hbase.hconnection.meta.lookup.threads.core</name>
-    <value>4</value>
-  </property>
-  <property>
     <name>hbase.hconnection.threads.keepalivetime</name>
     <value>3</value>
   </property>

http://git-wip-us.apache.org/repos/asf/hbase/blob/7b5d5394/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HFileReplicator.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HFileReplicator.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HFileReplicator.java
index 1a1044d..9893e7e 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HFileReplicator.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HFileReplicator.java
@@ -107,7 +107,7 @@ public class HFileReplicator {
     ThreadFactoryBuilder builder = new ThreadFactoryBuilder();
     builder.setNameFormat("HFileReplicationCallable-%1$d");
     this.exec =
-        new ThreadPoolExecutor(1, maxCopyThreads, 60, TimeUnit.SECONDS,
+        new ThreadPoolExecutor(maxCopyThreads, maxCopyThreads, 60, 
TimeUnit.SECONDS,
             new LinkedBlockingQueue<Runnable>(), builder.build());
     this.exec.allowCoreThreadTimeOut(true);
     this.copiesPerThread =

http://git-wip-us.apache.org/repos/asf/hbase/blob/7b5d5394/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java
index da37cfa..3611608 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java
@@ -235,20 +235,16 @@ public class RegionReplicaReplicationEndpoint extends 
HBaseReplicationEndpoint {
    */
   private ExecutorService getDefaultThreadPool(Configuration conf) {
     int maxThreads = 
conf.getInt("hbase.region.replica.replication.threads.max", 256);
-    int coreThreads = 
conf.getInt("hbase.region.replica.replication.threads.core", 16);
     if (maxThreads == 0) {
       maxThreads = Runtime.getRuntime().availableProcessors() * 8;
     }
-    if (coreThreads == 0) {
-      coreThreads = Runtime.getRuntime().availableProcessors() * 8;
-    }
     long keepAliveTime = 
conf.getLong("hbase.region.replica.replication.threads.keepalivetime", 60);
     LinkedBlockingQueue<Runnable> workQueue =
         new LinkedBlockingQueue<Runnable>(maxThreads *
             conf.getInt(HConstants.HBASE_CLIENT_MAX_TOTAL_TASKS,
               HConstants.DEFAULT_HBASE_CLIENT_MAX_TOTAL_TASKS));
     ThreadPoolExecutor tpe = new ThreadPoolExecutor(
-      coreThreads,
+      maxThreads,
       maxThreads,
       keepAliveTime,
       TimeUnit.SECONDS,

http://git-wip-us.apache.org/repos/asf/hbase/blob/7b5d5394/hbase-server/src/main/java/org/apache/hadoop/hbase/util/MultiHConnection.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/MultiHConnection.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/MultiHConnection.java
index ed72ea2..6efb10c 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/MultiHConnection.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/MultiHConnection.java
@@ -135,20 +135,16 @@ public class MultiHConnection {
   private void createBatchPool(Configuration conf) {
     // Use the same config for keep alive as in 
ConnectionImplementation.getBatchPool();
     int maxThreads = conf.getInt("hbase.multihconnection.threads.max", 256);
-    int coreThreads = conf.getInt("hbase.multihconnection.threads.core", 256);
     if (maxThreads == 0) {
       maxThreads = Runtime.getRuntime().availableProcessors() * 8;
     }
-    if (coreThreads == 0) {
-      coreThreads = Runtime.getRuntime().availableProcessors() * 8;
-    }
     long keepAliveTime = 
conf.getLong("hbase.multihconnection.threads.keepalivetime", 60);
     LinkedBlockingQueue<Runnable> workQueue =
         new LinkedBlockingQueue<Runnable>(maxThreads
             * conf.getInt(HConstants.HBASE_CLIENT_MAX_TOTAL_TASKS,
               HConstants.DEFAULT_HBASE_CLIENT_MAX_TOTAL_TASKS));
     ThreadPoolExecutor tpe =
-        new ThreadPoolExecutor(coreThreads, maxThreads, keepAliveTime, 
TimeUnit.SECONDS, workQueue,
+        new ThreadPoolExecutor(maxThreads, maxThreads, keepAliveTime, 
TimeUnit.SECONDS, workQueue,
             Threads.newDaemonThreadFactory("MultiHConnection" + "-shared-"));
     tpe.allowCoreThreadTimeOut(true);
     this.batchPool = tpe;

http://git-wip-us.apache.org/repos/asf/hbase/blob/7b5d5394/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestHBaseFsckMOB.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestHBaseFsckMOB.java 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestHBaseFsckMOB.java
index 8e96f83..b04689c 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestHBaseFsckMOB.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestHBaseFsckMOB.java
@@ -57,7 +57,6 @@ public class TestHBaseFsckMOB extends BaseTestHBaseFsck {
 
     conf.setInt("hbase.htable.threads.max", POOL_SIZE);
     conf.setInt("hbase.hconnection.threads.max", 2 * POOL_SIZE);
-    conf.setInt("hbase.hconnection.threads.core", POOL_SIZE);
     conf.setInt("hbase.hbck.close.timeout", 2 * REGION_ONLINE_TIMEOUT);
     conf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, 8 * REGION_ONLINE_TIMEOUT);
     TEST_UTIL.startMiniCluster(1);

http://git-wip-us.apache.org/repos/asf/hbase/blob/7b5d5394/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestHBaseFsckOneRS.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestHBaseFsckOneRS.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestHBaseFsckOneRS.java
index d5869ed..a7c0c55 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestHBaseFsckOneRS.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestHBaseFsckOneRS.java
@@ -109,7 +109,6 @@ public class TestHBaseFsckOneRS extends BaseTestHBaseFsck {
 
     conf.setInt("hbase.htable.threads.max", POOL_SIZE);
     conf.setInt("hbase.hconnection.threads.max", 2 * POOL_SIZE);
-    conf.setInt("hbase.hconnection.threads.core", POOL_SIZE);
     conf.setInt("hbase.hbck.close.timeout", 2 * REGION_ONLINE_TIMEOUT);
     conf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, 8 * REGION_ONLINE_TIMEOUT);
     TEST_UTIL.startMiniCluster(1);

http://git-wip-us.apache.org/repos/asf/hbase/blob/7b5d5394/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestHBaseFsckReplicas.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestHBaseFsckReplicas.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestHBaseFsckReplicas.java
index a859f78..2aa436c 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestHBaseFsckReplicas.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestHBaseFsckReplicas.java
@@ -64,7 +64,6 @@ public class TestHBaseFsckReplicas extends BaseTestHBaseFsck {
 
     conf.setInt("hbase.htable.threads.max", POOL_SIZE);
     conf.setInt("hbase.hconnection.threads.max", 2 * POOL_SIZE);
-    conf.setInt("hbase.hconnection.threads.core", POOL_SIZE);
     conf.setInt("hbase.hbck.close.timeout", 2 * REGION_ONLINE_TIMEOUT);
     conf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, 8 * REGION_ONLINE_TIMEOUT);
     TEST_UTIL.startMiniCluster(3);

http://git-wip-us.apache.org/repos/asf/hbase/blob/7b5d5394/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestHBaseFsckTwoRS.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestHBaseFsckTwoRS.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestHBaseFsckTwoRS.java
index 4865217..6d0e48c 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestHBaseFsckTwoRS.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestHBaseFsckTwoRS.java
@@ -73,7 +73,6 @@ public class TestHBaseFsckTwoRS extends BaseTestHBaseFsck {
 
     conf.setInt("hbase.htable.threads.max", POOL_SIZE);
     conf.setInt("hbase.hconnection.threads.max", 2 * POOL_SIZE);
-    conf.setInt("hbase.hconnection.threads.core", POOL_SIZE);
     conf.setInt("hbase.hbck.close.timeout", 2 * REGION_ONLINE_TIMEOUT);
     conf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, 8 * REGION_ONLINE_TIMEOUT);
     TEST_UTIL.startMiniCluster(2);

http://git-wip-us.apache.org/repos/asf/hbase/blob/7b5d5394/hbase-server/src/test/resources/hbase-site.xml
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/resources/hbase-site.xml 
b/hbase-server/src/test/resources/hbase-site.xml
index 1b429b5..bfe8ee1 100644
--- a/hbase-server/src/test/resources/hbase-site.xml
+++ b/hbase-server/src/test/resources/hbase-site.xml
@@ -163,10 +163,6 @@
     </description>
   </property>
   <property>
-    <name>hbase.hconnection.meta.lookup.threads.core</name>
-    <value>4</value>
-  </property>
-  <property>
     <name>hbase.hconnection.threads.keepalivetime</name>
     <value>3</value>
   </property>

http://git-wip-us.apache.org/repos/asf/hbase/blob/7b5d5394/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/TBoundedThreadPoolServer.java
----------------------------------------------------------------------
diff --git 
a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/TBoundedThreadPoolServer.java
 
b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/TBoundedThreadPoolServer.java
index 84613bd..1b8ebc9 100644
--- 
a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/TBoundedThreadPoolServer.java
+++ 
b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/TBoundedThreadPoolServer.java
@@ -132,7 +132,7 @@ public class TBoundedThreadPoolServer extends TServer {
   }
 
   /** Executor service for handling client connections */
-  private ExecutorService executorService;
+  private ThreadPoolExecutor executorService;
 
   /** Flag for stopping the server */
   private volatile boolean stopped;
@@ -142,9 +142,12 @@ public class TBoundedThreadPoolServer extends TServer {
   public TBoundedThreadPoolServer(Args options, ThriftMetrics metrics) {
     super(options);
 
+    int minWorkerThreads = options.minWorkerThreads;
+    int maxWorkerThreads = options.maxWorkerThreads;
     if (options.maxQueuedRequests > 0) {
       this.callQueue = new CallQueue(
           new LinkedBlockingQueue<Call>(options.maxQueuedRequests), metrics);
+      minWorkerThreads = maxWorkerThreads;
     } else {
       this.callQueue = new CallQueue(new SynchronousQueue<Call>(), metrics);
     }
@@ -153,9 +156,10 @@ public class TBoundedThreadPoolServer extends TServer {
     tfb.setDaemon(true);
     tfb.setNameFormat("thrift-worker-%d");
     executorService =
-        new ThreadPoolExecutor(options.minWorkerThreads,
-            options.maxWorkerThreads, options.threadKeepAliveTimeSec,
+        new ThreadPoolExecutor(minWorkerThreads,
+            maxWorkerThreads, options.threadKeepAliveTimeSec,
             TimeUnit.SECONDS, this.callQueue, tfb.build());
+    executorService.allowCoreThreadTimeOut(true);
     serverOptions = options;
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/7b5d5394/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/ThriftServerRunner.java
----------------------------------------------------------------------
diff --git 
a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/ThriftServerRunner.java
 
b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/ThriftServerRunner.java
index 7d228dc..8767a3c 100644
--- 
a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/ThriftServerRunner.java
+++ 
b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/ThriftServerRunner.java
@@ -555,7 +555,7 @@ public class ThriftServerRunner implements Runnable {
         CallQueue callQueue =
             new CallQueue(new LinkedBlockingQueue<Call>(), metrics);
         ExecutorService executorService = createExecutor(
-            callQueue, serverArgs.getMinWorkerThreads(), 
serverArgs.getMaxWorkerThreads());
+            callQueue, serverArgs.getMaxWorkerThreads(), 
serverArgs.getMaxWorkerThreads());
         serverArgs.executorService(executorService)
                   .processor(processor)
                   .transportFactory(transportFactory)
@@ -620,8 +620,10 @@ public class ThriftServerRunner implements Runnable {
     ThreadFactoryBuilder tfb = new ThreadFactoryBuilder();
     tfb.setDaemon(true);
     tfb.setNameFormat("thrift-worker-%d");
-    return new ThreadPoolExecutor(minWorkers, maxWorkers,
+    ThreadPoolExecutor threadPool = new ThreadPoolExecutor(minWorkers, 
maxWorkers,
             Long.MAX_VALUE, TimeUnit.SECONDS, callQueue, tfb.build());
+    threadPool.allowCoreThreadTimeOut(true);
+    return threadPool;
   }
 
   private InetAddress getBindAddress(Configuration conf)

Reply via email to