Repository: hbase
Updated Branches:
  refs/heads/master 37b30d93d -> 00b302435


HBASE-17181 Let HBase thrift2 support TThreadedSelectorServer

Signed-off-by: zhangduo <zhang...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/00b30243
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/00b30243
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/00b30243

Branch: refs/heads/master
Commit: 00b302435536974bc56e72deb406002c3b13669e
Parents: 37b30d9
Author: eyjian <eyj...@live.com>
Authored: Wed Nov 30 09:16:01 2016 +0800
Committer: zhangduo <zhang...@apache.org>
Committed: Fri Dec 2 10:05:48 2016 +0800

----------------------------------------------------------------------
 .../hadoop/hbase/thrift2/ThriftServer.java      | 48 ++++++++++++++++++--
 1 file changed, 45 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/00b30243/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/ThriftServer.java
----------------------------------------------------------------------
diff --git 
a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/ThriftServer.java 
b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/ThriftServer.java
index 7d94d0e..5d35674 100644
--- 
a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/ThriftServer.java
+++ 
b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/ThriftServer.java
@@ -47,11 +47,11 @@ import org.apache.commons.cli.ParseException;
 import org.apache.commons.cli.PosixParser;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configured;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HBaseInterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.filter.ParseFilter;
 import org.apache.hadoop.hbase.http.InfoServer;
 import org.apache.hadoop.hbase.security.SaslUtil;
@@ -64,8 +64,8 @@ import 
org.apache.hadoop.hbase.thrift2.generated.THBaseService;
 import org.apache.hadoop.hbase.util.DNS;
 import org.apache.hadoop.hbase.util.JvmPauseMonitor;
 import org.apache.hadoop.hbase.util.Strings;
-import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.SaslRpcServer.SaslGssCallbackHandler;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
 import org.apache.thrift.TException;
@@ -78,6 +78,7 @@ import org.apache.thrift.server.THsHaServer;
 import org.apache.thrift.server.TNonblockingServer;
 import org.apache.thrift.server.TServer;
 import org.apache.thrift.server.TThreadPoolServer;
+import org.apache.thrift.server.TThreadedSelectorServer;
 import org.apache.thrift.transport.TFramedTransport;
 import org.apache.thrift.transport.TNonblockingServerSocket;
 import org.apache.thrift.transport.TNonblockingServerTransport;
@@ -144,6 +145,7 @@ public class ThriftServer extends Configured implements 
Tool {
     options.addOption("f", "framed", false, "Use framed transport");
     options.addOption("c", "compact", false, "Use the compact protocol");
     options.addOption("w", "workers", true, "How many worker threads to use.");
+    options.addOption("s", "selectors", true, "How many selector threads to 
use.");
     options.addOption("q", "callQueueSize", true,
       "Max size of request queue (unbounded by default)");
     options.addOption("h", "help", false, "Print help information");
@@ -156,6 +158,7 @@ public class ThriftServer extends Configured implements 
Tool {
     servers.addOption(
         new Option("nonblocking", false, "Use the TNonblockingServer. This 
implies the framed transport."));
     servers.addOption(new Option("hsha", false, "Use the THsHaServer. This 
implies the framed transport."));
+    servers.addOption(new Option("selector", false, "Use the 
TThreadedSelectorServer. This implies the framed transport."));
     servers.addOption(new Option("threadpool", false, "Use the 
TThreadPoolServer. This is the default."));
     options.addOptionGroup(servers);
     return options;
@@ -273,6 +276,30 @@ public class ThriftServer extends Configured implements 
Tool {
     return new THsHaServer(serverArgs);
   }
 
+  private static TServer getTThreadedSelectorServer(TProtocolFactory 
protocolFactory,
+      TProcessor processor, TTransportFactory transportFactory,
+      int workerThreads, int selectorThreads, int maxCallQueueSize,
+      InetSocketAddress inetSocketAddress, ThriftMetrics metrics)
+      throws TTransportException {
+    TNonblockingServerTransport serverTransport = new 
TNonblockingServerSocket(inetSocketAddress);
+    log.info("starting HBase ThreadedSelector Thrift server on " + 
inetSocketAddress.toString());
+    TThreadedSelectorServer.Args serverArgs = new 
TThreadedSelectorServer.Args(serverTransport);
+    if (workerThreads > 0) {
+        serverArgs.workerThreads(workerThreads);
+    }
+    if (selectorThreads > 0) {
+        serverArgs.selectorThreads(selectorThreads);
+    }
+
+    ExecutorService executorService = createExecutor(
+        workerThreads, maxCallQueueSize, metrics);
+    serverArgs.executorService(executorService);
+    serverArgs.processor(processor);
+    serverArgs.transportFactory(transportFactory);
+    serverArgs.protocolFactory(protocolFactory);
+    return new TThreadedSelectorServer(serverArgs);
+  }
+
   private static ExecutorService createExecutor(
       int workerThreads, int maxCallQueueSize, ThriftMetrics metrics) {
     CallQueue callQueue;
@@ -350,6 +377,7 @@ public class ThriftServer extends Configured implements 
Tool {
     Options options = getOptions();
     CommandLine cmd = parseArguments(conf, options, args);
     int workerThreads = 0;
+    int selectorThreads = 0;
     int maxCallQueueSize = -1; // use unbounded queue by default
 
     /**
@@ -432,6 +460,7 @@ public class ThriftServer extends Configured implements 
Tool {
 
     boolean nonblocking = cmd.hasOption("nonblocking");
     boolean hsha = cmd.hasOption("hsha");
+    boolean selector = cmd.hasOption("selector");
 
     ThriftMetrics metrics = new ThriftMetrics(conf, 
ThriftMetrics.ThriftServerType.TWO);
     final JvmPauseMonitor pauseMonitor = new JvmPauseMonitor(conf, 
metrics.getSource());
@@ -441,6 +470,8 @@ public class ThriftServer extends Configured implements 
Tool {
       implType = "nonblocking";
     } else if (hsha) {
       implType = "hsha";
+    } else if (selector) {
+      implType = "selector";
     }
 
     conf.set("hbase.regionserver.thrift.server.type", implType);
@@ -484,7 +515,9 @@ public class ThriftServer extends Configured implements 
Tool {
     if (cmd.hasOption("w")) {
       workerThreads = Integer.parseInt(cmd.getOptionValue("w"));
     }
-
+    if (cmd.hasOption("s")) {
+      selectorThreads = Integer.parseInt(cmd.getOptionValue("s"));
+    }
     if (cmd.hasOption("q")) {
       maxCallQueueSize = Integer.parseInt(cmd.getOptionValue("q"));
     }
@@ -525,6 +558,15 @@ public class ThriftServer extends Configured implements 
Tool {
           maxCallQueueSize,
           inetSocketAddress,
           metrics);
+    } else if (selector) {
+      server = getTThreadedSelectorServer(protocolFactory,
+          processor,
+          transportFactory,
+          workerThreads,
+          selectorThreads,
+          maxCallQueueSize,
+          inetSocketAddress,
+          metrics);
     } else {
       server = getTThreadPoolServer(protocolFactory,
           processor,

Reply via email to