Repository: hbase Updated Branches: refs/heads/branch-1.3 b84e26973 -> 746447cf6
HBASE-18967 Backport HBASE-17181 to branch-1.3 (Let HBase thrift2 support TThreadedSelectorServer) (cherry picked from commit 00b302435536974bc56e72deb406002c3b13669e) Signed-off-by: Chia-Ping Tsai <chia7...@gmail.com> Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/746447cf Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/746447cf Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/746447cf Branch: refs/heads/branch-1.3 Commit: 746447cf6487da3f0c9ec72b3824e9c99e88430e Parents: b84e269 Author: Peter Somogyi <psomo...@cloudera.com> Authored: Tue Oct 10 15:30:24 2017 -0700 Committer: Chia-Ping Tsai <chia7...@gmail.com> Committed: Fri Nov 17 22:16:17 2017 +0800 ---------------------------------------------------------------------- .../hadoop/hbase/thrift2/ThriftServer.java | 57 +++++++++++++++++--- 1 file changed, 51 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/746447cf/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/ThriftServer.java ---------------------------------------------------------------------- diff --git a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/ThriftServer.java b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/ThriftServer.java index 37bf06d..4c7ca8a 100644 --- a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/ThriftServer.java +++ b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/ThriftServer.java @@ -48,10 +48,10 @@ import org.apache.commons.cli.ParseException; import org.apache.commons.cli.PosixParser; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HBaseInterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.filter.ParseFilter; import org.apache.hadoop.hbase.http.InfoServer; import org.apache.hadoop.hbase.security.SaslUtil; @@ -64,8 +64,8 @@ import org.apache.hadoop.hbase.thrift.ThriftMetrics; import org.apache.hadoop.hbase.thrift2.generated.THBaseService; import org.apache.hadoop.hbase.util.DNS; import org.apache.hadoop.hbase.util.Strings; -import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.SaslRpcServer.SaslGssCallbackHandler; +import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.GenericOptionsParser; import org.apache.thrift.TException; import org.apache.thrift.TProcessor; @@ -77,6 +77,7 @@ import org.apache.thrift.server.THsHaServer; import org.apache.thrift.server.TNonblockingServer; import org.apache.thrift.server.TServer; import org.apache.thrift.server.TThreadPoolServer; +import org.apache.thrift.server.TThreadedSelectorServer; import org.apache.thrift.transport.TFramedTransport; import org.apache.thrift.transport.TNonblockingServerSocket; import org.apache.thrift.transport.TNonblockingServerTransport; @@ -143,6 +144,7 @@ public class ThriftServer { options.addOption("f", "framed", false, "Use framed transport"); options.addOption("c", "compact", false, "Use the compact protocol"); options.addOption("w", "workers", true, "How many worker threads to use."); + options.addOption("s", "selectors", true, "How many selector threads to use."); options.addOption("q", "callQueueSize", true, "Max size of request queue (unbounded by default)"); options.addOption("h", "help", false, "Print help information"); @@ -153,9 +155,14 @@ public class ThriftServer { "only applies to TBoundedThreadPoolServer"); OptionGroup servers = new OptionGroup(); servers.addOption( - new Option("nonblocking", false, "Use the TNonblockingServer. This implies the framed transport.")); - servers.addOption(new Option("hsha", false, "Use the THsHaServer. This implies the framed transport.")); - servers.addOption(new Option("threadpool", false, "Use the TThreadPoolServer. This is the default.")); + new Option("nonblocking", false, + "Use the TNonblockingServer. This implies the framed transport.")); + servers.addOption(new Option("hsha", false, + "Use the THsHaServer. This implies the framed transport.")); + servers.addOption(new Option("selector", false, + "Use the TThreadedSelectorServer. This implies the framed transport.")); + servers.addOption(new Option("threadpool", false, + "Use the TThreadPoolServer. This is the default.")); options.addOptionGroup(servers); return options; } @@ -275,6 +282,29 @@ public class ThriftServer { return new THsHaServer(serverArgs); } + private static TServer getTThreadedSelectorServer(TProtocolFactory protocolFactory, + TProcessor processor, TTransportFactory transportFactory, + int workerThreads, int selectorThreads, int maxCallQueueSize, + InetSocketAddress inetSocketAddress, ThriftMetrics metrics) + throws TTransportException { + TNonblockingServerTransport serverTransport = new TNonblockingServerSocket(inetSocketAddress); + log.info("starting HBase ThreadedSelector Thrift server on " + inetSocketAddress.toString()); + TThreadedSelectorServer.Args serverArgs = new TThreadedSelectorServer.Args(serverTransport); + if (workerThreads > 0) { + serverArgs.workerThreads(workerThreads); + } + if (selectorThreads > 0) { + serverArgs.selectorThreads(selectorThreads); + } + + ExecutorService executorService = createExecutor(workerThreads, maxCallQueueSize, metrics); + serverArgs.executorService(executorService); + serverArgs.processor(processor); + serverArgs.transportFactory(transportFactory); + serverArgs.protocolFactory(protocolFactory); + return new TThreadedSelectorServer(serverArgs); + } + private static ExecutorService createExecutor( int workerThreads, int maxCallQueueSize, ThriftMetrics metrics) { CallQueue callQueue; @@ -352,6 +382,7 @@ public class ThriftServer { Configuration conf = HBaseConfiguration.create(); CommandLine cmd = parseArguments(conf, options, args); int workerThreads = 0; + int selectorThreads = 0; int maxCallQueueSize = -1; // use unbounded queue by default /** @@ -434,6 +465,7 @@ public class ThriftServer { boolean nonblocking = cmd.hasOption("nonblocking"); boolean hsha = cmd.hasOption("hsha"); + boolean selector = cmd.hasOption("selector"); ThriftMetrics metrics = new ThriftMetrics(conf, ThriftMetrics.ThriftServerType.TWO); @@ -442,6 +474,8 @@ public class ThriftServer { implType = "nonblocking"; } else if (hsha) { implType = "hsha"; + } else if (selector) { + implType = "selector"; } conf.set("hbase.regionserver.thrift.server.type", implType); @@ -485,7 +519,9 @@ public class ThriftServer { if (cmd.hasOption("w")) { workerThreads = Integer.parseInt(cmd.getOptionValue("w")); } - + if (cmd.hasOption("s")) { + selectorThreads = Integer.parseInt(cmd.getOptionValue("s")); + } if (cmd.hasOption("q")) { maxCallQueueSize = Integer.parseInt(cmd.getOptionValue("q")); } @@ -526,6 +562,15 @@ public class ThriftServer { maxCallQueueSize, inetSocketAddress, metrics); + } else if (selector) { + server = getTThreadedSelectorServer(protocolFactory, + processor, + transportFactory, + workerThreads, + selectorThreads, + maxCallQueueSize, + inetSocketAddress, + metrics); } else { server = getTThreadPoolServer(protocolFactory, processor,