Repository: hbase Updated Branches: refs/heads/branch-1.2 c9f388bec -> 9a5d1b689
HBASE-17255 Backport HBASE-17181 to branch-1.2 Signed-off-by: zhangduo <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/9a5d1b68 Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/9a5d1b68 Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/9a5d1b68 Branch: refs/heads/branch-1.2 Commit: 9a5d1b689f3080e8303dd44b349fc0cdb286daea Parents: c9f388b Author: eyjian <[email protected]> Authored: Mon Dec 5 15:20:20 2016 +0800 Committer: zhangduo <[email protected]> Committed: Tue Dec 6 18:11:02 2016 +0800 ---------------------------------------------------------------------- .../hadoop/hbase/thrift2/ThriftServer.java | 41 ++++++++++++++++++++ 1 file changed, 41 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/9a5d1b68/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/ThriftServer.java ---------------------------------------------------------------------- diff --git a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/ThriftServer.java b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/ThriftServer.java index 429475e..c3aa6c3 100644 --- a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/ThriftServer.java +++ b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/ThriftServer.java @@ -75,6 +75,7 @@ import org.apache.thrift.server.THsHaServer; import org.apache.thrift.server.TNonblockingServer; import org.apache.thrift.server.TServer; import org.apache.thrift.server.TThreadPoolServer; +import org.apache.thrift.server.TThreadedSelectorServer; import org.apache.thrift.transport.TFramedTransport; import org.apache.thrift.transport.TNonblockingServerSocket; import org.apache.thrift.transport.TNonblockingServerTransport; @@ -141,6 +142,7 @@ public class ThriftServer { options.addOption("f", "framed", false, "Use framed transport"); options.addOption("c", "compact", false, "Use the compact protocol"); options.addOption("w", "workers", true, "How many worker threads to use."); + options.addOption("s", "selectors", true, "How many selector threads to use."); options.addOption("h", "help", false, "Print help information"); options.addOption(null, "infoport", true, "Port for web UI"); options.addOption("t", READ_TIMEOUT_OPTION, true, @@ -151,6 +153,7 @@ public class ThriftServer { servers.addOption( new Option("nonblocking", false, "Use the TNonblockingServer. This implies the framed transport.")); servers.addOption(new Option("hsha", false, "Use the THsHaServer. This implies the framed transport.")); + servers.addOption(new Option("selector", false, "Use the TThreadedSelectorServer. This implies the framed transport.")); servers.addOption(new Option("threadpool", false, "Use the TThreadPoolServer. This is the default.")); options.addOptionGroup(servers); return options; @@ -270,6 +273,30 @@ public class ThriftServer { return new THsHaServer(serverArgs); } + private static TServer getTThreadedSelectorServer(TProtocolFactory protocolFactory, + TProcessor processor, TTransportFactory transportFactory, + int workerThreads, int selectorThreads, + InetSocketAddress inetSocketAddress, ThriftMetrics metrics) + throws TTransportException { + TNonblockingServerTransport serverTransport = new TNonblockingServerSocket(inetSocketAddress); + log.info("starting HBase ThreadedSelector Thrift server on " + inetSocketAddress.toString()); + TThreadedSelectorServer.Args serverArgs = new TThreadedSelectorServer.Args(serverTransport); + if (workerThreads > 0) { + serverArgs.workerThreads(workerThreads); + } + if (selectorThreads > 0) { + serverArgs.selectorThreads(selectorThreads); + } + + ExecutorService executorService = createExecutor( + workerThreads, metrics); + serverArgs.executorService(executorService); + serverArgs.processor(processor); + serverArgs.transportFactory(transportFactory); + serverArgs.protocolFactory(protocolFactory); + return new TThreadedSelectorServer(serverArgs); + } + private static ExecutorService createExecutor( int workerThreads, ThriftMetrics metrics) { CallQueue callQueue = new CallQueue( @@ -336,6 +363,7 @@ public class ThriftServer { Configuration conf = HBaseConfiguration.create(); CommandLine cmd = parseArguments(conf, options, args); int workerThreads = 0; + int selectorThreads = 0; /** * This is to please both bin/hbase and bin/hbase-daemon. hbase-daemon provides "start" and "stop" arguments hbase @@ -417,6 +445,7 @@ public class ThriftServer { boolean nonblocking = cmd.hasOption("nonblocking"); boolean hsha = cmd.hasOption("hsha"); + boolean selector = cmd.hasOption("selector"); ThriftMetrics metrics = new ThriftMetrics(conf, ThriftMetrics.ThriftServerType.TWO); @@ -425,6 +454,8 @@ public class ThriftServer { implType = "nonblocking"; } else if (hsha) { implType = "hsha"; + } else if (selector) { + implType = "selector"; } conf.set("hbase.regionserver.thrift.server.type", implType); @@ -468,6 +499,9 @@ public class ThriftServer { if (cmd.hasOption("w")) { workerThreads = Integer.parseInt(cmd.getOptionValue("w")); } + if (cmd.hasOption("s")) { + selectorThreads = Integer.parseInt(cmd.getOptionValue("s")); + } // check for user-defined info server port setting, if so override the conf try { @@ -504,6 +538,13 @@ public class ThriftServer { workerThreads, inetSocketAddress, metrics); + } else if (selector) { + server = getTThreadedSelectorServer(protocolFactory, + processor, + transportFactory, + workerThreads, selectorThreads, + inetSocketAddress, + metrics); } else { server = getTThreadPoolServer(protocolFactory, processor,
