Repository: hbase Updated Branches: refs/heads/branch-1.3 439f4a3f6 -> 0cf606115
HBASE-15637 TSHA Thrift-2 server should allow limiting call queue size Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/0cf60611 Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/0cf60611 Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/0cf60611 Branch: refs/heads/branch-1.3 Commit: 0cf60611541764c5a8d5db4f5eedd0e58aeef42f Parents: 439f4a3 Author: Mikhail Antonov <[email protected]> Authored: Tue Apr 12 14:46:22 2016 -0700 Committer: Mikhail Antonov <[email protected]> Committed: Wed Apr 13 12:34:40 2016 -0700 ---------------------------------------------------------------------- .../hadoop/hbase/thrift2/ThriftServer.java | 23 +++++++++++++++----- 1 file changed, 18 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/0cf60611/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/ThriftServer.java ---------------------------------------------------------------------- diff --git a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/ThriftServer.java b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/ThriftServer.java index 941d5f8..a2b4f03 100644 --- a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/ThriftServer.java +++ b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/ThriftServer.java @@ -141,6 +141,8 @@ public class ThriftServer { options.addOption("f", "framed", false, "Use framed transport"); options.addOption("c", "compact", false, "Use the compact protocol"); options.addOption("w", "workers", true, "How many worker threads to use."); + options.addOption("q", "callQueueSize", true, + "Max size of request queue (unbounded by default)"); options.addOption("h", "help", false, "Print help information"); options.addOption(null, "infoport", true, "Port for web UI"); options.addOption("t", READ_TIMEOUT_OPTION, true, @@ -251,7 +253,7 @@ public class ThriftServer { private static TServer getTHsHaServer(TProtocolFactory protocolFactory, TProcessor processor, TTransportFactory transportFactory, - int workerThreads, + int workerThreads, int maxCallQueueSize, InetSocketAddress inetSocketAddress, ThriftMetrics metrics) throws TTransportException { TNonblockingServerTransport serverTransport = new TNonblockingServerSocket(inetSocketAddress); @@ -262,7 +264,7 @@ public class ThriftServer { serverArgs.minWorkerThreads(workerThreads).maxWorkerThreads(workerThreads); } ExecutorService executorService = createExecutor( - workerThreads, metrics); + workerThreads, maxCallQueueSize, metrics); serverArgs.executorService(executorService); serverArgs.processor(processor); serverArgs.transportFactory(transportFactory); @@ -271,9 +273,14 @@ public class ThriftServer { } private static ExecutorService createExecutor( - int workerThreads, ThriftMetrics metrics) { - CallQueue callQueue = new CallQueue( - new LinkedBlockingQueue<Call>(), metrics); + int workerThreads, int maxCallQueueSize, ThriftMetrics metrics) { + CallQueue callQueue; + if (maxCallQueueSize > 0) { + callQueue = new CallQueue(new LinkedBlockingQueue<Call>(maxCallQueueSize), metrics); + } else { + callQueue = new CallQueue(new LinkedBlockingQueue<Call>(), metrics); + } + ThreadFactoryBuilder tfb = new ThreadFactoryBuilder(); tfb.setDaemon(true); tfb.setNameFormat("thrift2-worker-%d"); @@ -336,6 +343,7 @@ public class ThriftServer { Configuration conf = HBaseConfiguration.create(); CommandLine cmd = parseArguments(conf, options, args); int workerThreads = 0; + int maxCallQueueSize = -1; // use unbounded queue by default /** * This is to please both bin/hbase and bin/hbase-daemon. hbase-daemon provides "start" and "stop" arguments hbase @@ -469,6 +477,10 @@ public class ThriftServer { workerThreads = Integer.parseInt(cmd.getOptionValue("w")); } + if (cmd.hasOption("q")) { + maxCallQueueSize = Integer.parseInt(cmd.getOptionValue("q")); + } + // check for user-defined info server port setting, if so override the conf try { if (cmd.hasOption("infoport")) { @@ -502,6 +514,7 @@ public class ThriftServer { processor, transportFactory, workerThreads, + maxCallQueueSize, inetSocketAddress, metrics); } else {
