Repository: hive
Updated Branches:
  refs/heads/branch-3 cad7a2e61 -> 1c5ce3f61


HIVE-19776 : HiveServer2.startHiveServer2 retries of start has concurrency 
issues (Thejas Nair, reviewed by Daniel Dai)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/1c5ce3f6
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/1c5ce3f6
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/1c5ce3f6

Branch: refs/heads/branch-3
Commit: 1c5ce3f61e0dbf902cc7c75fb56f9f0b91a36e00
Parents: cad7a2e
Author: Thejas M Nair <[email protected]>
Authored: Fri Jun 8 15:37:22 2018 -0700
Committer: Thejas M Nair <[email protected]>
Committed: Fri Jun 8 15:37:30 2018 -0700

----------------------------------------------------------------------
 .../cli/thrift/ThriftBinaryCLIService.java      | 78 ++++++++++-------
 .../service/cli/thrift/ThriftCLIService.java    | 28 +++---
 .../cli/thrift/ThriftHttpCLIService.java        | 90 +++++++++++++-------
 3 files changed, 118 insertions(+), 78 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/1c5ce3f6/service/src/java/org/apache/hive/service/cli/thrift/ThriftBinaryCLIService.java
----------------------------------------------------------------------
diff --git 
a/service/src/java/org/apache/hive/service/cli/thrift/ThriftBinaryCLIService.java
 
b/service/src/java/org/apache/hive/service/cli/thrift/ThriftBinaryCLIService.java
index 67c73e2..df2d3a7 100644
--- 
a/service/src/java/org/apache/hive/service/cli/thrift/ThriftBinaryCLIService.java
+++ 
b/service/src/java/org/apache/hive/service/cli/thrift/ThriftBinaryCLIService.java
@@ -40,15 +40,18 @@ import org.apache.thrift.TProcessorFactory;
 import org.apache.thrift.protocol.TBinaryProtocol;
 import org.apache.thrift.protocol.TProtocol;
 import org.apache.thrift.server.ServerContext;
+import org.apache.thrift.server.TServer;
 import org.apache.thrift.server.TServerEventHandler;
 import org.apache.thrift.server.TThreadPoolServer;
 import org.apache.thrift.transport.TServerSocket;
 import org.apache.thrift.transport.TTransport;
+import org.apache.thrift.transport.TTransportException;
 import org.apache.thrift.transport.TTransportFactory;
 
 
 public class ThriftBinaryCLIService extends ThriftCLIService {
   private final Runnable oomHook;
+  protected TServer server;
 
   public ThriftBinaryCLIService(CLIService cliService, Runnable oomHook) {
     super(cliService, ThriftBinaryCLIService.class.getSimpleName());
@@ -56,14 +59,13 @@ public class ThriftBinaryCLIService extends 
ThriftCLIService {
   }
 
   @Override
-  public void run() {
+  protected void initServer() {
     try {
       // Server thread pool
       String threadPoolName = "HiveServer2-Handler-Pool";
-      ExecutorService executorService = new 
ThreadPoolExecutorWithOomHook(minWorkerThreads,
-          maxWorkerThreads, workerKeepAliveTime, TimeUnit.SECONDS,
-          new SynchronousQueue<Runnable>(), new 
ThreadFactoryWithGarbageCleanup(threadPoolName),
-          oomHook);
+      ExecutorService executorService = new 
ThreadPoolExecutorWithOomHook(minWorkerThreads, maxWorkerThreads,
+          workerKeepAliveTime, TimeUnit.SECONDS, new 
SynchronousQueue<Runnable>(),
+          new ThreadFactoryWithGarbageCleanup(threadPoolName), oomHook);
 
       // Thrift configs
       hiveAuthFactory = new HiveAuthFactory(hiveConf);
@@ -79,35 +81,32 @@ public class ThriftBinaryCLIService extends 
ThriftCLIService {
       } else {
         String keyStorePath = 
hiveConf.getVar(ConfVars.HIVE_SERVER2_SSL_KEYSTORE_PATH).trim();
         if (keyStorePath.isEmpty()) {
-          throw new 
IllegalArgumentException(ConfVars.HIVE_SERVER2_SSL_KEYSTORE_PATH.varname
-              + " Not configured for SSL connection");
+          throw new IllegalArgumentException(
+              ConfVars.HIVE_SERVER2_SSL_KEYSTORE_PATH.varname + " Not 
configured for SSL connection");
         }
         String keyStorePassword = 
ShimLoader.getHadoopShims().getPassword(hiveConf,
             HiveConf.ConfVars.HIVE_SERVER2_SSL_KEYSTORE_PASSWORD.varname);
-        serverSocket = HiveAuthUtils.getServerSSLSocket(hiveHost, portNum, 
keyStorePath,
-            keyStorePassword, sslVersionBlacklist);
+        serverSocket = HiveAuthUtils.getServerSSLSocket(hiveHost, portNum, 
keyStorePath, keyStorePassword,
+            sslVersionBlacklist);
       }
 
       // Server args
       int maxMessageSize = 
hiveConf.getIntVar(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_MAX_MESSAGE_SIZE);
-      int requestTimeout = (int) hiveConf.getTimeVar(
-          HiveConf.ConfVars.HIVE_SERVER2_THRIFT_LOGIN_TIMEOUT, 
TimeUnit.SECONDS);
-      int beBackoffSlotLength = (int) hiveConf.getTimeVar(
-          HiveConf.ConfVars.HIVE_SERVER2_THRIFT_LOGIN_BEBACKOFF_SLOT_LENGTH, 
TimeUnit.MILLISECONDS);
-      TThreadPoolServer.Args sargs = new TThreadPoolServer.Args(serverSocket)
-          
.processorFactory(processorFactory).transportFactory(transportFactory)
-          .protocolFactory(new TBinaryProtocol.Factory())
+      int requestTimeout = (int) 
hiveConf.getTimeVar(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_LOGIN_TIMEOUT,
+          TimeUnit.SECONDS);
+      int beBackoffSlotLength = (int) hiveConf
+          
.getTimeVar(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_LOGIN_BEBACKOFF_SLOT_LENGTH, 
TimeUnit.MILLISECONDS);
+      TThreadPoolServer.Args sargs = new 
TThreadPoolServer.Args(serverSocket).processorFactory(processorFactory)
+          .transportFactory(transportFactory).protocolFactory(new 
TBinaryProtocol.Factory())
           .inputProtocolFactory(new TBinaryProtocol.Factory(true, true, 
maxMessageSize, maxMessageSize))
-          .requestTimeout(requestTimeout).requestTimeoutUnit(TimeUnit.SECONDS)
-          
.beBackoffSlotLength(beBackoffSlotLength).beBackoffSlotLengthUnit(TimeUnit.MILLISECONDS)
-          .executorService(executorService);
+          
.requestTimeout(requestTimeout).requestTimeoutUnit(TimeUnit.SECONDS).beBackoffSlotLength(beBackoffSlotLength)
+          
.beBackoffSlotLengthUnit(TimeUnit.MILLISECONDS).executorService(executorService);
 
       // TCP Server
       server = new TThreadPoolServer(sargs);
       server.setServerEventHandler(new TServerEventHandler() {
         @Override
-        public ServerContext createContext(
-          TProtocol input, TProtocol output) {
+        public ServerContext createContext(TProtocol input, TProtocol output) {
           Metrics metrics = MetricsFactory.getInstance();
           if (metrics != null) {
             try {
@@ -121,8 +120,7 @@ public class ThriftBinaryCLIService extends 
ThriftCLIService {
         }
 
         @Override
-        public void deleteContext(ServerContext serverContext,
-          TProtocol input, TProtocol output) {
+        public void deleteContext(ServerContext serverContext, TProtocol 
input, TProtocol output) {
           Metrics metrics = MetricsFactory.getInstance();
           if (metrics != null) {
             try {
@@ -137,7 +135,7 @@ public class ThriftBinaryCLIService extends 
ThriftCLIService {
             LOG.info("Session disconnected without closing properly. ");
             try {
               boolean close = 
cliService.getSessionManager().getSession(sessionHandle).getHiveConf()
-                .getBoolVar(ConfVars.HIVE_SERVER2_CLOSE_SESSION_ON_DISCONNECT);
+                  
.getBoolVar(ConfVars.HIVE_SERVER2_CLOSE_SESSION_ON_DISCONNECT);
               LOG.info((close ? "" : "Not ") + "Closing the session: " + 
sessionHandle);
               if (close) {
                 cliService.closeSession(sessionHandle);
@@ -153,21 +151,39 @@ public class ThriftBinaryCLIService extends 
ThriftCLIService {
         }
 
         @Override
-        public void processContext(ServerContext serverContext,
-          TTransport input, TTransport output) {
+        public void processContext(ServerContext serverContext, TTransport 
input, TTransport output) {
           currentServerContext.set(serverContext);
         }
       });
-      String msg = "Starting " + ThriftBinaryCLIService.class.getSimpleName() 
+ " on port "
-          + portNum + " with " + minWorkerThreads + "..." + maxWorkerThreads + 
" worker threads";
+      String msg = "Starting " + ThriftBinaryCLIService.class.getSimpleName() 
+ " on port " + portNum + " with "
+          + minWorkerThreads + "..." + maxWorkerThreads + " worker threads";
       LOG.info(msg);
+    } catch (Exception e) {
+      throw new RuntimeException("Failed to init thrift server", e);
+    }
+  }
+
+  @Override
+  public void run() {
+    try {
       server.serve();
     } catch (Throwable t) {
-      LOG.error(
-          "Error starting HiveServer2: could not start "
-              + ThriftBinaryCLIService.class.getSimpleName(), t);
+      if (t instanceof InterruptedException) {
+        // This is likely a shutdown
+        LOG.info("Caught " + t.getClass().getSimpleName() + ". Shutting 
down.");
+      } else {
+        LOG.error("Exception caught by " + this.getClass().getSimpleName() +
+            ". Exiting.", t);
+      }
       System.exit(-1);
     }
   }
 
+  @Override
+  protected void stopServer() {
+    server.stop();
+    server = null;
+    LOG.info("Thrift server has stopped");
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/1c5ce3f6/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java
----------------------------------------------------------------------
diff --git 
a/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java 
b/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java
index c64c991..5481b90 100644
--- a/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java
+++ b/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java
@@ -133,9 +133,6 @@ public abstract class ThriftCLIService extends 
AbstractService implements TCLISe
   protected int portNum;
   protected InetAddress serverIPAddress;
   protected String hiveHost;
-  protected TServer server;
-  protected org.eclipse.jetty.server.Server httpServer;
-
   private boolean isStarted = false;
   protected boolean isEmbedded = false;
 
@@ -144,6 +141,7 @@ public abstract class ThriftCLIService extends 
AbstractService implements TCLISe
   protected int minWorkerThreads;
   protected int maxWorkerThreads;
   protected long workerKeepAliveTime;
+  private Thread serverThread;
 
   protected ThreadLocal<ServerContext> currentServerContext;
 
@@ -209,30 +207,30 @@ public abstract class ThriftCLIService extends 
AbstractService implements TCLISe
     super.init(hiveConf);
   }
 
+  protected abstract void initServer();
+
   @Override
   public synchronized void start() {
     super.start();
     if (!isStarted && !isEmbedded) {
-      new Thread(this).start();
+      initServer();
+      serverThread = new Thread(this);
+      serverThread.setName("Thrift Server");
+      serverThread.start();
       isStarted = true;
     }
   }
 
+  protected abstract void stopServer();
+
   @Override
   public synchronized void stop() {
     if (isStarted && !isEmbedded) {
-      if(server != null) {
-        server.stop();
-        LOG.info("Thrift server has stopped");
-      }
-      if((httpServer != null) && httpServer.isStarted()) {
-        try {
-          httpServer.stop();
-          LOG.info("Http server has stopped");
-        } catch (Exception e) {
-          LOG.error("Error stopping Http server: ", e);
-        }
+      if (serverThread != null) {
+        serverThread.interrupt();
+        serverThread = null;
       }
+      stopServer();
       isStarted = false;
     }
     super.stop();

http://git-wip-us.apache.org/repos/asf/hive/blob/1c5ce3f6/service/src/java/org/apache/hive/service/cli/thrift/ThriftHttpCLIService.java
----------------------------------------------------------------------
diff --git 
a/service/src/java/org/apache/hive/service/cli/thrift/ThriftHttpCLIService.java 
b/service/src/java/org/apache/hive/service/cli/thrift/ThriftHttpCLIService.java
index 0b3f2c3..d4ea7ab 100644
--- 
a/service/src/java/org/apache/hive/service/cli/thrift/ThriftHttpCLIService.java
+++ 
b/service/src/java/org/apache/hive/service/cli/thrift/ThriftHttpCLIService.java
@@ -56,9 +56,9 @@ import org.eclipse.jetty.util.thread.ExecutorThreadPool;
 
 public class ThriftHttpCLIService extends ThriftCLIService {
   private static final String APPLICATION_THRIFT = "application/x-thrift";
+  protected org.eclipse.jetty.server.Server server;
 
   private final Runnable oomHook;
-
   public ThriftHttpCLIService(CLIService cliService, Runnable oomHook) {
     super(cliService, ThriftHttpCLIService.class.getSimpleName());
     this.oomHook = oomHook;
@@ -66,23 +66,23 @@ public class ThriftHttpCLIService extends ThriftCLIService {
 
   /**
    * Configure Jetty to serve http requests. Example of a client connection 
URL:
-   * http://localhost:10000/servlets/thrifths2/ A gateway may cause actual 
target URL to differ,
-   * e.g. http://gateway:port/hive2/servlets/thrifths2/
+   * http://localhost:10000/servlets/thrifths2/ A gateway may cause actual 
target
+   * URL to differ, e.g. http://gateway:port/hive2/servlets/thrifths2/
    */
   @Override
-  public void run() {
+  protected void initServer() {
     try {
       // Server thread pool
-      // Start with minWorkerThreads, expand till maxWorkerThreads and reject 
subsequent requests
+      // Start with minWorkerThreads, expand till maxWorkerThreads and reject
+      // subsequent requests
       String threadPoolName = "HiveServer2-HttpHandler-Pool";
       ExecutorService executorService = new 
ThreadPoolExecutorWithOomHook(minWorkerThreads,
-          maxWorkerThreads, workerKeepAliveTime, TimeUnit.SECONDS, new 
SynchronousQueue<Runnable>(),
-          new ThreadFactoryWithGarbageCleanup(threadPoolName), oomHook);
+          maxWorkerThreads,workerKeepAliveTime, TimeUnit.SECONDS,
+          new SynchronousQueue<Runnable>(), new 
ThreadFactoryWithGarbageCleanup(threadPoolName), oomHook);
       ExecutorThreadPool threadPool = new ExecutorThreadPool(executorService);
 
       // HTTP Server
-      httpServer = new Server(threadPool);
-
+      server = new Server(threadPool);
 
       ServerConnector connector;
 
@@ -105,20 +105,21 @@ public class ThriftHttpCLIService extends 
ThriftCLIService {
         String keyStorePassword = 
ShimLoader.getHadoopShims().getPassword(hiveConf,
             HiveConf.ConfVars.HIVE_SERVER2_SSL_KEYSTORE_PASSWORD.varname);
         if (keyStorePath.isEmpty()) {
-          throw new 
IllegalArgumentException(ConfVars.HIVE_SERVER2_SSL_KEYSTORE_PATH.varname
+          throw new IllegalArgumentException(
+              ConfVars.HIVE_SERVER2_SSL_KEYSTORE_PATH.varname 
               + " Not configured for SSL connection");
         }
         SslContextFactory sslContextFactory = new SslContextFactory();
         String[] excludedProtocols = 
hiveConf.getVar(ConfVars.HIVE_SSL_PROTOCOL_BLACKLIST).split(",");
         LOG.info("HTTP Server SSL: adding excluded protocols: " + 
Arrays.toString(excludedProtocols));
         sslContextFactory.addExcludeProtocols(excludedProtocols);
-        LOG.info("HTTP Server SSL: SslContextFactory.getExcludeProtocols = " +
-          Arrays.toString(sslContextFactory.getExcludeProtocols()));
+        LOG.info("HTTP Server SSL: SslContextFactory.getExcludeProtocols = "
+            + Arrays.toString(sslContextFactory.getExcludeProtocols()));
         sslContextFactory.setKeyStorePath(keyStorePath);
         sslContextFactory.setKeyStorePassword(keyStorePassword);
-        connector = new ServerConnector(httpServer, sslContextFactory, http);
+        connector = new ServerConnector(server, sslContextFactory, http);
       } else {
-        connector = new ServerConnector(httpServer, http);
+        connector = new ServerConnector(server, http);
       }
 
       connector.setPort(portNum);
@@ -128,7 +129,7 @@ public class ThriftHttpCLIService extends ThriftCLIService {
           TimeUnit.MILLISECONDS);
       connector.setIdleTimeout(maxIdleTime);
 
-      httpServer.addConnector(connector);
+      server.addConnector(connector);
 
       // Thrift configs
       hiveAuthFactory = new HiveAuthFactory(hiveConf);
@@ -140,16 +141,15 @@ public class ThriftHttpCLIService extends 
ThriftCLIService {
       // UGI for the http/_HOST (SPNego) principal
       UserGroupInformation httpUGI = cliService.getHttpUGI();
       String authType = hiveConf.getVar(ConfVars.HIVE_SERVER2_AUTHENTICATION);
-      TServlet thriftHttpServlet = new ThriftHttpServlet(processor, 
protocolFactory, authType,
-          serviceUGI, httpUGI, hiveAuthFactory);
+      TServlet thriftHttpServlet = new ThriftHttpServlet(processor, 
protocolFactory, authType, serviceUGI, httpUGI,
+          hiveAuthFactory);
 
       // Context handler
-      final ServletContextHandler context = new ServletContextHandler(
-          ServletContextHandler.SESSIONS);
+      final ServletContextHandler context = new 
ServletContextHandler(ServletContextHandler.SESSIONS);
       context.setContextPath("/");
-      if 
(hiveConf.getBoolean(ConfVars.HIVE_SERVER2_XSRF_FILTER_ENABLED.varname, false)){
+      if 
(hiveConf.getBoolean(ConfVars.HIVE_SERVER2_XSRF_FILTER_ENABLED.varname, false)) 
{
         // context.addFilter(Utils.getXSRFFilterHolder(null, null), "/" ,
-        //    FilterMapping.REQUEST);
+        // FilterMapping.REQUEST);
         // Filtering does not work here currently, doing filter in 
ThriftHttpServlet
         LOG.debug("XSRF filter enabled");
       } else {
@@ -183,33 +183,45 @@ public class ThriftHttpCLIService extends 
ThriftCLIService {
         }
       });
 
-      final String httpPath = getHttpPath(hiveConf
-          .getVar(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_HTTP_PATH));
+      final String httpPath = 
getHttpPath(hiveConf.getVar(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_HTTP_PATH));
 
       if (HiveConf.getBoolVar(hiveConf, 
ConfVars.HIVE_SERVER2_THRIFT_HTTP_COMPRESSION_ENABLED)) {
         final GzipHandler gzipHandler = new GzipHandler();
         gzipHandler.setHandler(context);
         gzipHandler.addIncludedMethods(HttpMethod.POST);
         gzipHandler.addIncludedMimeTypes(APPLICATION_THRIFT);
-        httpServer.setHandler(gzipHandler);
+        server.setHandler(gzipHandler);
       } else {
-        httpServer.setHandler(context);
+        server.setHandler(context);
       }
       context.addServlet(new ServletHolder(thriftHttpServlet), httpPath);
 
-      // TODO: check defaults: maxTimeout, keepalive, maxBodySize, 
bodyRecieveDuration, etc.
+      // TODO: check defaults: maxTimeout, keepalive, maxBodySize,
+      // bodyRecieveDuration, etc.
       // Finally, start the server
-      httpServer.start();
+      server.start();
       String msg = "Started " + ThriftHttpCLIService.class.getSimpleName() + " 
in " + schemeName
           + " mode on port " + portNum + " path=" + httpPath + " with " + 
minWorkerThreads + "..."
           + maxWorkerThreads + " worker threads";
       LOG.info(msg);
-      httpServer.join();
+    } catch (Exception e) {
+      throw new RuntimeException("Failed to init HttpServer", e);
+    }
+  }
+
+  @Override
+  public void run() {
+    try {
+      server.join();
     } catch (Throwable t) {
-      LOG.error(
-          "Error starting HiveServer2: could not start "
-              + ThriftHttpCLIService.class.getSimpleName(), t);
-      System.exit(-1);
+      if (t instanceof InterruptedException) {
+        // This is likely a shutdown
+        LOG.info("Caught " + t.getClass().getSimpleName() + ". Shutting down 
thrift server.");
+      } else {
+        LOG.error("Exception caught by " + 
ThriftHttpCLIService.class.getSimpleName() +
+            ". Exiting.", t);
+        System.exit(-1);
+      }
     }
   }
 
@@ -236,4 +248,18 @@ public class ThriftHttpCLIService extends ThriftCLIService 
{
     }
     return httpPath;
   }
+
+  @Override
+  protected void stopServer() {
+    if((server != null) && server.isStarted()) {
+      try {
+        server.stop();
+        server = null;
+        LOG.info("Thrift HTTP server has been stopped");
+      } catch (Exception e) {
+        LOG.error("Error stopping HTTP server: ", e);
+      }
+    }
+  }
+
 }

Reply via email to