HBASE-15784 Misuse core/maxPoolSize of LinkedBlockingQueue in ThreadPoolExecutor (Jingcheng Du)
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/7b5d5394 Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/7b5d5394 Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/7b5d5394 Branch: refs/heads/HBASE-14850 Commit: 7b5d5394c08e4d5a18c8f9fc62b45930bf892f41 Parents: b2b3b1f Author: Ramkrishna <[email protected]> Authored: Wed May 18 12:40:43 2016 +0530 Committer: Ramkrishna <[email protected]> Committed: Wed May 18 12:40:43 2016 +0530 ---------------------------------------------------------------------- .../hadoop/hbase/client/ConnectionImplementation.java | 10 ++++++---- hbase-client/src/test/resources/hbase-site.xml | 4 ---- .../hbase/replication/regionserver/HFileReplicator.java | 2 +- .../regionserver/RegionReplicaReplicationEndpoint.java | 6 +----- .../org/apache/hadoop/hbase/util/MultiHConnection.java | 6 +----- .../org/apache/hadoop/hbase/util/TestHBaseFsckMOB.java | 1 - .../org/apache/hadoop/hbase/util/TestHBaseFsckOneRS.java | 1 - .../apache/hadoop/hbase/util/TestHBaseFsckReplicas.java | 1 - .../org/apache/hadoop/hbase/util/TestHBaseFsckTwoRS.java | 1 - hbase-server/src/test/resources/hbase-site.xml | 4 ---- .../hadoop/hbase/thrift/TBoundedThreadPoolServer.java | 10 +++++++--- .../apache/hadoop/hbase/thrift/ThriftServerRunner.java | 6 ++++-- 12 files changed, 20 insertions(+), 32 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/7b5d5394/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java index 9a7dfc7..429e47d 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java @@ -352,8 +352,8 @@ class ConnectionImplementation implements ClusterConnection, Closeable { if (batchPool == null) { synchronized (this) { if (batchPool == null) { - this.batchPool = getThreadPool(conf.getInt("hbase.hconnection.threads.max", 256), - conf.getInt("hbase.hconnection.threads.core", 256), "-shared", null); + int threads = conf.getInt("hbase.hconnection.threads.max", 256); + this.batchPool = getThreadPool(threads, threads, "-shared", null); this.cleanupPool = true; } } @@ -377,6 +377,7 @@ class ConnectionImplementation implements ClusterConnection, Closeable { new LinkedBlockingQueue<Runnable>(maxThreads * conf.getInt(HConstants.HBASE_CLIENT_MAX_TOTAL_TASKS, HConstants.DEFAULT_HBASE_CLIENT_MAX_TOTAL_TASKS)); + coreThreads = maxThreads; } ThreadPoolExecutor tpe = new ThreadPoolExecutor( coreThreads, @@ -397,9 +398,10 @@ class ConnectionImplementation implements ClusterConnection, Closeable { //To start with, threads.max.core threads can hit the meta (including replicas). //After that, requests will get queued up in the passed queue, and only after //the queue is full, a new thread will be started + int threads = conf.getInt("hbase.hconnection.meta.lookup.threads.max", 128); this.metaLookupPool = getThreadPool( - conf.getInt("hbase.hconnection.meta.lookup.threads.max", 128), - conf.getInt("hbase.hconnection.meta.lookup.threads.core", 10), + threads, + threads, "-metaLookup-shared-", new LinkedBlockingQueue<Runnable>()); } } http://git-wip-us.apache.org/repos/asf/hbase/blob/7b5d5394/hbase-client/src/test/resources/hbase-site.xml ---------------------------------------------------------------------- diff --git a/hbase-client/src/test/resources/hbase-site.xml b/hbase-client/src/test/resources/hbase-site.xml index 5788238..99d2ab8 100644 --- a/hbase-client/src/test/resources/hbase-site.xml +++ b/hbase-client/src/test/resources/hbase-site.xml @@ -26,10 +26,6 @@ <value>true</value> </property> <property> - <name>hbase.hconnection.meta.lookup.threads.core</name> - <value>4</value> - </property> - <property> <name>hbase.hconnection.threads.keepalivetime</name> <value>3</value> </property> http://git-wip-us.apache.org/repos/asf/hbase/blob/7b5d5394/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HFileReplicator.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HFileReplicator.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HFileReplicator.java index 1a1044d..9893e7e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HFileReplicator.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HFileReplicator.java @@ -107,7 +107,7 @@ public class HFileReplicator { ThreadFactoryBuilder builder = new ThreadFactoryBuilder(); builder.setNameFormat("HFileReplicationCallable-%1$d"); this.exec = - new ThreadPoolExecutor(1, maxCopyThreads, 60, TimeUnit.SECONDS, + new ThreadPoolExecutor(maxCopyThreads, maxCopyThreads, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), builder.build()); this.exec.allowCoreThreadTimeOut(true); this.copiesPerThread = http://git-wip-us.apache.org/repos/asf/hbase/blob/7b5d5394/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java index da37cfa..3611608 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java @@ -235,20 +235,16 @@ public class RegionReplicaReplicationEndpoint extends HBaseReplicationEndpoint { */ private ExecutorService getDefaultThreadPool(Configuration conf) { int maxThreads = conf.getInt("hbase.region.replica.replication.threads.max", 256); - int coreThreads = conf.getInt("hbase.region.replica.replication.threads.core", 16); if (maxThreads == 0) { maxThreads = Runtime.getRuntime().availableProcessors() * 8; } - if (coreThreads == 0) { - coreThreads = Runtime.getRuntime().availableProcessors() * 8; - } long keepAliveTime = conf.getLong("hbase.region.replica.replication.threads.keepalivetime", 60); LinkedBlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<Runnable>(maxThreads * conf.getInt(HConstants.HBASE_CLIENT_MAX_TOTAL_TASKS, HConstants.DEFAULT_HBASE_CLIENT_MAX_TOTAL_TASKS)); ThreadPoolExecutor tpe = new ThreadPoolExecutor( - coreThreads, + maxThreads, maxThreads, keepAliveTime, TimeUnit.SECONDS, http://git-wip-us.apache.org/repos/asf/hbase/blob/7b5d5394/hbase-server/src/main/java/org/apache/hadoop/hbase/util/MultiHConnection.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/MultiHConnection.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/MultiHConnection.java index ed72ea2..6efb10c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/MultiHConnection.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/MultiHConnection.java @@ -135,20 +135,16 @@ public class MultiHConnection { private void createBatchPool(Configuration conf) { // Use the same config for keep alive as in ConnectionImplementation.getBatchPool(); int maxThreads = conf.getInt("hbase.multihconnection.threads.max", 256); - int coreThreads = conf.getInt("hbase.multihconnection.threads.core", 256); if (maxThreads == 0) { maxThreads = Runtime.getRuntime().availableProcessors() * 8; } - if (coreThreads == 0) { - coreThreads = Runtime.getRuntime().availableProcessors() * 8; - } long keepAliveTime = conf.getLong("hbase.multihconnection.threads.keepalivetime", 60); LinkedBlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<Runnable>(maxThreads * conf.getInt(HConstants.HBASE_CLIENT_MAX_TOTAL_TASKS, HConstants.DEFAULT_HBASE_CLIENT_MAX_TOTAL_TASKS)); ThreadPoolExecutor tpe = - new ThreadPoolExecutor(coreThreads, maxThreads, keepAliveTime, TimeUnit.SECONDS, workQueue, + new ThreadPoolExecutor(maxThreads, maxThreads, keepAliveTime, TimeUnit.SECONDS, workQueue, Threads.newDaemonThreadFactory("MultiHConnection" + "-shared-")); tpe.allowCoreThreadTimeOut(true); this.batchPool = tpe; http://git-wip-us.apache.org/repos/asf/hbase/blob/7b5d5394/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestHBaseFsckMOB.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestHBaseFsckMOB.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestHBaseFsckMOB.java index 8e96f83..b04689c 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestHBaseFsckMOB.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestHBaseFsckMOB.java @@ -57,7 +57,6 @@ public class TestHBaseFsckMOB extends BaseTestHBaseFsck { conf.setInt("hbase.htable.threads.max", POOL_SIZE); conf.setInt("hbase.hconnection.threads.max", 2 * POOL_SIZE); - conf.setInt("hbase.hconnection.threads.core", POOL_SIZE); conf.setInt("hbase.hbck.close.timeout", 2 * REGION_ONLINE_TIMEOUT); conf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, 8 * REGION_ONLINE_TIMEOUT); TEST_UTIL.startMiniCluster(1); http://git-wip-us.apache.org/repos/asf/hbase/blob/7b5d5394/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestHBaseFsckOneRS.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestHBaseFsckOneRS.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestHBaseFsckOneRS.java index d5869ed..a7c0c55 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestHBaseFsckOneRS.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestHBaseFsckOneRS.java @@ -109,7 +109,6 @@ public class TestHBaseFsckOneRS extends BaseTestHBaseFsck { conf.setInt("hbase.htable.threads.max", POOL_SIZE); conf.setInt("hbase.hconnection.threads.max", 2 * POOL_SIZE); - conf.setInt("hbase.hconnection.threads.core", POOL_SIZE); conf.setInt("hbase.hbck.close.timeout", 2 * REGION_ONLINE_TIMEOUT); conf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, 8 * REGION_ONLINE_TIMEOUT); TEST_UTIL.startMiniCluster(1); http://git-wip-us.apache.org/repos/asf/hbase/blob/7b5d5394/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestHBaseFsckReplicas.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestHBaseFsckReplicas.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestHBaseFsckReplicas.java index a859f78..2aa436c 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestHBaseFsckReplicas.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestHBaseFsckReplicas.java @@ -64,7 +64,6 @@ public class TestHBaseFsckReplicas extends BaseTestHBaseFsck { conf.setInt("hbase.htable.threads.max", POOL_SIZE); conf.setInt("hbase.hconnection.threads.max", 2 * POOL_SIZE); - conf.setInt("hbase.hconnection.threads.core", POOL_SIZE); conf.setInt("hbase.hbck.close.timeout", 2 * REGION_ONLINE_TIMEOUT); conf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, 8 * REGION_ONLINE_TIMEOUT); TEST_UTIL.startMiniCluster(3); http://git-wip-us.apache.org/repos/asf/hbase/blob/7b5d5394/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestHBaseFsckTwoRS.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestHBaseFsckTwoRS.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestHBaseFsckTwoRS.java index 4865217..6d0e48c 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestHBaseFsckTwoRS.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestHBaseFsckTwoRS.java @@ -73,7 +73,6 @@ public class TestHBaseFsckTwoRS extends BaseTestHBaseFsck { conf.setInt("hbase.htable.threads.max", POOL_SIZE); conf.setInt("hbase.hconnection.threads.max", 2 * POOL_SIZE); - conf.setInt("hbase.hconnection.threads.core", POOL_SIZE); conf.setInt("hbase.hbck.close.timeout", 2 * REGION_ONLINE_TIMEOUT); conf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, 8 * REGION_ONLINE_TIMEOUT); TEST_UTIL.startMiniCluster(2); http://git-wip-us.apache.org/repos/asf/hbase/blob/7b5d5394/hbase-server/src/test/resources/hbase-site.xml ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/resources/hbase-site.xml b/hbase-server/src/test/resources/hbase-site.xml index 1b429b5..bfe8ee1 100644 --- a/hbase-server/src/test/resources/hbase-site.xml +++ b/hbase-server/src/test/resources/hbase-site.xml @@ -163,10 +163,6 @@ </description> </property> <property> - <name>hbase.hconnection.meta.lookup.threads.core</name> - <value>4</value> - </property> - <property> <name>hbase.hconnection.threads.keepalivetime</name> <value>3</value> </property> http://git-wip-us.apache.org/repos/asf/hbase/blob/7b5d5394/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/TBoundedThreadPoolServer.java ---------------------------------------------------------------------- diff --git a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/TBoundedThreadPoolServer.java b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/TBoundedThreadPoolServer.java index 84613bd..1b8ebc9 100644 --- a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/TBoundedThreadPoolServer.java +++ b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/TBoundedThreadPoolServer.java @@ -132,7 +132,7 @@ public class TBoundedThreadPoolServer extends TServer { } /** Executor service for handling client connections */ - private ExecutorService executorService; + private ThreadPoolExecutor executorService; /** Flag for stopping the server */ private volatile boolean stopped; @@ -142,9 +142,12 @@ public class TBoundedThreadPoolServer extends TServer { public TBoundedThreadPoolServer(Args options, ThriftMetrics metrics) { super(options); + int minWorkerThreads = options.minWorkerThreads; + int maxWorkerThreads = options.maxWorkerThreads; if (options.maxQueuedRequests > 0) { this.callQueue = new CallQueue( new LinkedBlockingQueue<Call>(options.maxQueuedRequests), metrics); + minWorkerThreads = maxWorkerThreads; } else { this.callQueue = new CallQueue(new SynchronousQueue<Call>(), metrics); } @@ -153,9 +156,10 @@ public class TBoundedThreadPoolServer extends TServer { tfb.setDaemon(true); tfb.setNameFormat("thrift-worker-%d"); executorService = - new ThreadPoolExecutor(options.minWorkerThreads, - options.maxWorkerThreads, options.threadKeepAliveTimeSec, + new ThreadPoolExecutor(minWorkerThreads, + maxWorkerThreads, options.threadKeepAliveTimeSec, TimeUnit.SECONDS, this.callQueue, tfb.build()); + executorService.allowCoreThreadTimeOut(true); serverOptions = options; } http://git-wip-us.apache.org/repos/asf/hbase/blob/7b5d5394/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/ThriftServerRunner.java ---------------------------------------------------------------------- diff --git a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/ThriftServerRunner.java b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/ThriftServerRunner.java index 7d228dc..8767a3c 100644 --- a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/ThriftServerRunner.java +++ b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/ThriftServerRunner.java @@ -555,7 +555,7 @@ public class ThriftServerRunner implements Runnable { CallQueue callQueue = new CallQueue(new LinkedBlockingQueue<Call>(), metrics); ExecutorService executorService = createExecutor( - callQueue, serverArgs.getMinWorkerThreads(), serverArgs.getMaxWorkerThreads()); + callQueue, serverArgs.getMaxWorkerThreads(), serverArgs.getMaxWorkerThreads()); serverArgs.executorService(executorService) .processor(processor) .transportFactory(transportFactory) @@ -620,8 +620,10 @@ public class ThriftServerRunner implements Runnable { ThreadFactoryBuilder tfb = new ThreadFactoryBuilder(); tfb.setDaemon(true); tfb.setNameFormat("thrift-worker-%d"); - return new ThreadPoolExecutor(minWorkers, maxWorkers, + ThreadPoolExecutor threadPool = new ThreadPoolExecutor(minWorkers, maxWorkers, Long.MAX_VALUE, TimeUnit.SECONDS, callQueue, tfb.build()); + threadPool.allowCoreThreadTimeOut(true); + return threadPool; } private InetAddress getBindAddress(Configuration conf)
