http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/7d8fa69f/thriftserver/server/src/main/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java
----------------------------------------------------------------------
diff --git 
a/thriftserver/server/src/main/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java
 
b/thriftserver/server/src/main/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java
new file mode 100644
index 0000000..e16c6de
--- /dev/null
+++ 
b/thriftserver/server/src/main/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java
@@ -0,0 +1,871 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.service.cli.thrift;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import org.apache.hive.service.rpc.thrift.TSetClientInfoReq;
+import org.apache.hive.service.rpc.thrift.TSetClientInfoResp;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import javax.security.auth.login.LoginException;
+import org.apache.hadoop.hive.common.ServerUtils;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.shims.HadoopShims.KerberosNameShim;
+import org.apache.hadoop.hive.shims.ShimLoader;
+import org.apache.hive.service.AbstractService;
+import org.apache.hive.service.ServiceException;
+import org.apache.hive.service.ServiceUtils;
+import org.apache.hive.service.auth.HiveAuthConstants;
+import org.apache.hive.service.auth.HiveAuthFactory;
+import org.apache.hive.service.auth.TSetIpAddressProcessor;
+import org.apache.hive.service.cli.FetchOrientation;
+import org.apache.hive.service.cli.FetchType;
+import org.apache.hive.service.cli.GetInfoType;
+import org.apache.hive.service.cli.GetInfoValue;
+import org.apache.hive.service.cli.HiveSQLException;
+import org.apache.hive.service.cli.ICLIService;
+import org.apache.hive.service.cli.JobProgressUpdate;
+import org.apache.hive.service.cli.OperationHandle;
+import org.apache.hive.service.cli.OperationStatus;
+import org.apache.hive.service.cli.OperationType;
+import org.apache.hive.service.cli.ProgressMonitorStatusMapper;
+import org.apache.hive.service.cli.RowSet;
+import org.apache.hive.service.cli.SessionHandle;
+import org.apache.hive.service.cli.TableSchema;
+import org.apache.hive.service.rpc.thrift.TCLIService;
+import org.apache.hive.service.rpc.thrift.TCancelDelegationTokenReq;
+import org.apache.hive.service.rpc.thrift.TCancelDelegationTokenResp;
+import org.apache.hive.service.rpc.thrift.TCancelOperationReq;
+import org.apache.hive.service.rpc.thrift.TCancelOperationResp;
+import org.apache.hive.service.rpc.thrift.TCloseOperationReq;
+import org.apache.hive.service.rpc.thrift.TCloseOperationResp;
+import org.apache.hive.service.rpc.thrift.TCloseSessionReq;
+import org.apache.hive.service.rpc.thrift.TCloseSessionResp;
+import org.apache.hive.service.rpc.thrift.TExecuteStatementReq;
+import org.apache.hive.service.rpc.thrift.TExecuteStatementResp;
+import org.apache.hive.service.rpc.thrift.TFetchResultsReq;
+import org.apache.hive.service.rpc.thrift.TFetchResultsResp;
+import org.apache.hive.service.rpc.thrift.TGetCatalogsReq;
+import org.apache.hive.service.rpc.thrift.TGetCatalogsResp;
+import org.apache.hive.service.rpc.thrift.TGetColumnsReq;
+import org.apache.hive.service.rpc.thrift.TGetColumnsResp;
+import org.apache.hive.service.rpc.thrift.TGetCrossReferenceReq;
+import org.apache.hive.service.rpc.thrift.TGetCrossReferenceResp;
+import org.apache.hive.service.rpc.thrift.TGetDelegationTokenReq;
+import org.apache.hive.service.rpc.thrift.TGetDelegationTokenResp;
+import org.apache.hive.service.rpc.thrift.TGetFunctionsReq;
+import org.apache.hive.service.rpc.thrift.TGetFunctionsResp;
+import org.apache.hive.service.rpc.thrift.TGetInfoReq;
+import org.apache.hive.service.rpc.thrift.TGetInfoResp;
+import org.apache.hive.service.rpc.thrift.TGetOperationStatusReq;
+import org.apache.hive.service.rpc.thrift.TGetOperationStatusResp;
+import org.apache.hive.service.rpc.thrift.TGetPrimaryKeysReq;
+import org.apache.hive.service.rpc.thrift.TGetPrimaryKeysResp;
+import org.apache.hive.service.rpc.thrift.TGetQueryIdReq;
+import org.apache.hive.service.rpc.thrift.TGetQueryIdResp;
+import org.apache.hive.service.rpc.thrift.TGetResultSetMetadataReq;
+import org.apache.hive.service.rpc.thrift.TGetResultSetMetadataResp;
+import org.apache.hive.service.rpc.thrift.TGetSchemasReq;
+import org.apache.hive.service.rpc.thrift.TGetSchemasResp;
+import org.apache.hive.service.rpc.thrift.TGetTableTypesReq;
+import org.apache.hive.service.rpc.thrift.TGetTableTypesResp;
+import org.apache.hive.service.rpc.thrift.TGetTablesReq;
+import org.apache.hive.service.rpc.thrift.TGetTablesResp;
+import org.apache.hive.service.rpc.thrift.TGetTypeInfoReq;
+import org.apache.hive.service.rpc.thrift.TGetTypeInfoResp;
+import org.apache.hive.service.rpc.thrift.TJobExecutionStatus;
+import org.apache.hive.service.rpc.thrift.TOpenSessionReq;
+import org.apache.hive.service.rpc.thrift.TOpenSessionResp;
+import org.apache.hive.service.rpc.thrift.TProgressUpdateResp;
+import org.apache.hive.service.rpc.thrift.TProtocolVersion;
+import org.apache.hive.service.rpc.thrift.TRenewDelegationTokenReq;
+import org.apache.hive.service.rpc.thrift.TRenewDelegationTokenResp;
+import org.apache.hive.service.rpc.thrift.TStatus;
+import org.apache.hive.service.rpc.thrift.TStatusCode;
+import org.apache.hive.service.server.HiveServer2;
+import org.apache.thrift.TException;
+import org.apache.thrift.server.ServerContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.livy.thriftserver.LivyCLIService;
+import org.apache.livy.thriftserver.SessionInfo;
+
+
+/**
+ * ThriftCLIService.
+ *
+ */
+public abstract class ThriftCLIService extends AbstractService implements 
TCLIService.Iface, Runnable {
+
+  public static final Logger LOG = 
LoggerFactory.getLogger(ThriftCLIService.class.getName());
+
+  protected ICLIService cliService;
+  private static final TStatus OK_STATUS = new 
TStatus(TStatusCode.SUCCESS_STATUS);
+  protected static HiveAuthFactory hiveAuthFactory;
+
+  protected int portNum;
+  protected InetAddress serverIPAddress;
+  protected String hiveHost;
+  private boolean isStarted = false;
+  protected boolean isEmbedded = false;
+
+  protected HiveConf hiveConf;
+
+  protected int minWorkerThreads;
+  protected int maxWorkerThreads;
+  protected long workerKeepAliveTime;
+  private Thread serverThread;
+
+  protected ThreadLocal<ServerContext> currentServerContext;
+
+  static class ThriftCLIServerContext implements ServerContext {
+    private SessionHandle sessionHandle = null;
+
+    public void setSessionHandle(SessionHandle sessionHandle) {
+      this.sessionHandle = sessionHandle;
+    }
+
+    public SessionHandle getSessionHandle() {
+      return sessionHandle;
+    }
+  }
+
+  public ThriftCLIService(ICLIService service, String serviceName) {
+    super(serviceName);
+    this.cliService = service;
+    currentServerContext = new ThreadLocal<ServerContext>();
+  }
+
+  @Override
+  public synchronized void init(HiveConf hiveConf) {
+    this.hiveConf = hiveConf;
+
+    String hiveHost = System.getenv("HIVE_SERVER2_THRIFT_BIND_HOST");
+    if (hiveHost == null) {
+      hiveHost = hiveConf.getVar(ConfVars.HIVE_SERVER2_THRIFT_BIND_HOST);
+    }
+    try {
+      serverIPAddress = ServerUtils.getHostAddress(hiveHost);
+    } catch (UnknownHostException e) {
+      throw new ServiceException(e);
+    }
+
+    // Initialize common server configs needed in both binary & http modes
+    String portString;
+    // HTTP mode
+    if (HiveServer2.isHTTPTransportMode(hiveConf)) {
+      workerKeepAliveTime =
+          
hiveConf.getTimeVar(ConfVars.HIVE_SERVER2_THRIFT_HTTP_WORKER_KEEPALIVE_TIME,
+              TimeUnit.SECONDS);
+      portString = System.getenv("HIVE_SERVER2_THRIFT_HTTP_PORT");
+      if (portString != null) {
+        portNum = Integer.parseInt(portString);
+      } else {
+        portNum = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_THRIFT_HTTP_PORT);
+      }
+    }
+    // Binary mode
+    else {
+      workerKeepAliveTime =
+          
hiveConf.getTimeVar(ConfVars.HIVE_SERVER2_THRIFT_WORKER_KEEPALIVE_TIME, 
TimeUnit.SECONDS);
+      portString = System.getenv("HIVE_SERVER2_THRIFT_PORT");
+      if (portString != null) {
+        portNum = Integer.parseInt(portString);
+      } else {
+        portNum = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_THRIFT_PORT);
+      }
+    }
+    minWorkerThreads = 
hiveConf.getIntVar(ConfVars.HIVE_SERVER2_THRIFT_MIN_WORKER_THREADS);
+    maxWorkerThreads = 
hiveConf.getIntVar(ConfVars.HIVE_SERVER2_THRIFT_MAX_WORKER_THREADS);
+    super.init(hiveConf);
+  }
+
+  protected abstract void initServer();
+
+  @Override
+  public synchronized void start() {
+    super.start();
+    if (!isStarted && !isEmbedded) {
+      initServer();
+      serverThread = new Thread(this);
+      serverThread.setName("Thrift Server");
+      serverThread.start();
+      isStarted = true;
+    }
+  }
+
+  protected abstract void stopServer();
+
+  @Override
+  public synchronized void stop() {
+    if (isStarted && !isEmbedded) {
+      if (serverThread != null) {
+        serverThread.interrupt();
+        serverThread = null;
+      }
+      stopServer();
+      isStarted = false;
+    }
+    super.stop();
+  }
+
+  public int getPortNumber() {
+    return portNum;
+  }
+
+  public InetAddress getServerIPAddress() {
+    return serverIPAddress;
+  }
+
+  @Override
+  public TGetDelegationTokenResp GetDelegationToken(TGetDelegationTokenReq req)
+      throws TException {
+    TGetDelegationTokenResp resp = new TGetDelegationTokenResp();
+
+    if (hiveAuthFactory == null || !hiveAuthFactory.isSASLKerberosUser()) {
+      resp.setStatus(unsecureTokenErrorStatus());
+    } else {
+      try {
+        String token = cliService.getDelegationToken(
+            new SessionHandle(req.getSessionHandle()),
+            hiveAuthFactory, req.getOwner(), req.getRenewer());
+        resp.setDelegationToken(token);
+        resp.setStatus(OK_STATUS);
+      } catch (HiveSQLException e) {
+        LOG.error("Error obtaining delegation token", e);
+        TStatus tokenErrorStatus = HiveSQLException.toTStatus(e);
+        tokenErrorStatus.setSqlState("42000");
+        resp.setStatus(tokenErrorStatus);
+      }
+    }
+    return resp;
+  }
+
+  @Override
+  public TCancelDelegationTokenResp 
CancelDelegationToken(TCancelDelegationTokenReq req)
+      throws TException {
+    TCancelDelegationTokenResp resp = new TCancelDelegationTokenResp();
+
+    if (hiveAuthFactory == null || !hiveAuthFactory.isSASLKerberosUser()) {
+      resp.setStatus(unsecureTokenErrorStatus());
+    } else {
+      try {
+        cliService.cancelDelegationToken(new 
SessionHandle(req.getSessionHandle()),
+            hiveAuthFactory, req.getDelegationToken());
+        resp.setStatus(OK_STATUS);
+      } catch (HiveSQLException e) {
+        LOG.error("Error canceling delegation token", e);
+        resp.setStatus(HiveSQLException.toTStatus(e));
+      }
+    }
+    return resp;
+  }
+
+  @Override
+  public TRenewDelegationTokenResp 
RenewDelegationToken(TRenewDelegationTokenReq req)
+      throws TException {
+    TRenewDelegationTokenResp resp = new TRenewDelegationTokenResp();
+    if (hiveAuthFactory == null || !hiveAuthFactory.isSASLKerberosUser()) {
+      resp.setStatus(unsecureTokenErrorStatus());
+    } else {
+      try {
+        cliService.renewDelegationToken(new 
SessionHandle(req.getSessionHandle()),
+            hiveAuthFactory, req.getDelegationToken());
+        resp.setStatus(OK_STATUS);
+      } catch (HiveSQLException e) {
+        LOG.error("Error obtaining renewing token", e);
+        resp.setStatus(HiveSQLException.toTStatus(e));
+      }
+    }
+    return resp;
+  }
+
+  private TStatus unsecureTokenErrorStatus() {
+    TStatus errorStatus = new TStatus(TStatusCode.ERROR_STATUS);
+    errorStatus.setErrorMessage("Delegation token only supported over remote " 
+
+        "client with kerberos authentication");
+    return errorStatus;
+  }
+
+  @Override
+  public TOpenSessionResp OpenSession(TOpenSessionReq req) throws TException {
+    LOG.info("Client protocol version: " + req.getClient_protocol());
+    TOpenSessionResp resp = new TOpenSessionResp();
+    try {
+      SessionHandle sessionHandle = getSessionHandle(req, resp);
+      resp.setSessionHandle(sessionHandle.toTSessionHandle());
+      Map<String, String> configurationMap = new HashMap<String, String>();
+      // Set the updated fetch size from the server into the configuration map 
for the client
+      String defaultFetchSize = Integer.toString(
+          
hiveConf.getIntVar(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_RESULTSET_DEFAULT_FETCH_SIZE));
+      configurationMap.put(
+        
HiveConf.ConfVars.HIVE_SERVER2_THRIFT_RESULTSET_DEFAULT_FETCH_SIZE.varname,
+          defaultFetchSize);
+      resp.setConfiguration(configurationMap);
+      resp.setStatus(OK_STATUS);
+      ThriftCLIServerContext context =
+        (ThriftCLIServerContext)currentServerContext.get();
+      if (context != null) {
+        context.setSessionHandle(sessionHandle);
+      }
+    } catch (Exception e) {
+      LOG.warn("Error opening session: ", e);
+      resp.setStatus(HiveSQLException.toTStatus(e));
+    }
+    return resp;
+  }
+
+  @Override
+  public TSetClientInfoResp SetClientInfo(TSetClientInfoReq req) throws 
TException {
+    // TODO: We don't do anything for now, just log this for debugging.
+    //       We may be able to make use of this later, e.g. for workload 
management.
+    TSetClientInfoResp resp = null;
+    if (req.isSetConfiguration()) {
+      StringBuilder sb = null;
+      SessionHandle sh = null;
+      for (Map.Entry<String, String> e : req.getConfiguration().entrySet()) {
+        if (sb == null) {
+          sh = new SessionHandle(req.getSessionHandle());
+          sb = new StringBuilder("Client information for 
").append(sh).append(": ");
+        } else {
+          sb.append(", ");
+        }
+        sb.append(e.getKey()).append(" = ").append(e.getValue());
+        if ("ApplicationName".equals(e.getKey())) {
+          try {
+            cliService.setApplicationName(sh, e.getValue());
+          } catch (Exception ex) {
+            LOG.warn("Error setting application name", ex);
+            resp = new TSetClientInfoResp(HiveSQLException.toTStatus(ex));
+          }
+        }
+      }
+      if (sb != null) {
+        LOG.info("{}", sb);
+      }
+    }
+    return resp == null ? new TSetClientInfoResp(OK_STATUS) : resp;
+  }
+
+  private String getIpAddress() {
+    String clientIpAddress;
+    // Http transport mode.
+    // We set the thread local ip address, in ThriftHttpServlet.
+    if (hiveConf.getVar(
+        ConfVars.HIVE_SERVER2_TRANSPORT_MODE).equalsIgnoreCase("http")) {
+      clientIpAddress = SessionInfo.getIpAddress();
+    }
+    else {
+      if (hiveAuthFactory != null && 
hiveAuthFactory.isSASLWithKerberizedHadoop()) {
+        clientIpAddress = hiveAuthFactory.getIpAddress();
+      }
+      // NOSASL
+      else {
+        clientIpAddress = TSetIpAddressProcessor.getUserIpAddress();
+      }
+    }
+    LOG.debug("Client's IP Address: " + clientIpAddress);
+    return clientIpAddress;
+  }
+
+  /**
+   * Returns the effective username.
+   * 1. If hive.server2.allow.user.substitution = false: the username of the 
connecting user
+   * 2. If hive.server2.allow.user.substitution = true: the username of the 
end user,
+   * that the connecting user is trying to proxy for.
+   * This includes a check whether the connecting user is allowed to proxy for 
the end user.
+   * @param req
+   * @return
+   * @throws HiveSQLException
+   */
+  private String getUserName(TOpenSessionReq req) throws HiveSQLException, 
IOException {
+    String userName = null;
+
+    if (hiveAuthFactory != null && 
hiveAuthFactory.isSASLWithKerberizedHadoop()) {
+      userName = hiveAuthFactory.getRemoteUser();
+    }
+    // NOSASL
+    if (userName == null) {
+      userName = TSetIpAddressProcessor.getUserName();
+    }
+    // Http transport mode.
+    // We set the thread local username, in ThriftHttpServlet.
+    if (hiveConf.getVar(
+        ConfVars.HIVE_SERVER2_TRANSPORT_MODE).equalsIgnoreCase("http")) {
+      userName = SessionInfo.getUserName();
+    }
+    if (userName == null) {
+      userName = req.getUsername();
+    }
+
+    userName = getShortName(userName);
+    String effectiveClientUser = getProxyUser(userName, 
req.getConfiguration(), getIpAddress());
+    LOG.debug("Client's username: " + effectiveClientUser);
+    return effectiveClientUser;
+  }
+
+  private String getShortName(String userName) throws IOException {
+    String ret = null;
+
+    if (userName != null) {
+      if (hiveAuthFactory != null && hiveAuthFactory.isSASLKerberosUser()) {
+        // KerberosName.getShorName can only be used for kerberos user, but 
not for the user
+        // logged in via other authentications such as LDAP
+        KerberosNameShim fullKerberosName = 
ShimLoader.getHadoopShims().getKerberosNameShim(userName);
+        ret = fullKerberosName.getShortName();
+      } else {
+        int indexOfDomainMatch = ServiceUtils.indexOfDomainMatch(userName);
+        ret = (indexOfDomainMatch <= 0) ? userName : userName.substring(0, 
indexOfDomainMatch);
+      }
+    }
+
+    return ret;
+  }
+
+  /**
+   * Create a session handle
+   * @param req
+   * @param res
+   * @return
+   * @throws HiveSQLException
+   * @throws LoginException
+   * @throws IOException
+   */
+  SessionHandle getSessionHandle(TOpenSessionReq req, TOpenSessionResp res)
+      throws HiveSQLException, LoginException, IOException {
+    String userName = getUserName(req);
+    String ipAddress = getIpAddress();
+    TProtocolVersion protocol = getMinVersion(LivyCLIService.SERVER_VERSION(),
+        req.getClient_protocol());
+    SessionHandle sessionHandle;
+    if (hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_ENABLE_DOAS) &&
+        (userName != null)) {
+      sessionHandle = ((LivyCLIService) 
cliService).openSessionWithImpersonation(protocol,
+          userName, req.getPassword(), ipAddress, req.getConfiguration(), 
null);
+    } else {
+      sessionHandle = ((LivyCLIService) cliService).openSession(protocol, 
userName,
+          req.getPassword(), ipAddress, req.getConfiguration());
+    }
+    res.setServerProtocolVersion(protocol);
+    return sessionHandle;
+  }
+
+  private double getProgressedPercentage(OperationHandle opHandle) throws 
HiveSQLException {
+    
checkArgument(OperationType.EXECUTE_STATEMENT.equals(opHandle.getOperationType()));
+    return 0.0;
+  }
+
+  private TProtocolVersion getMinVersion(TProtocolVersion... versions) {
+    TProtocolVersion[] values = TProtocolVersion.values();
+    int current = values[values.length - 1].getValue();
+    for (TProtocolVersion version : versions) {
+      if (current > version.getValue()) {
+        current = version.getValue();
+      }
+    }
+    for (TProtocolVersion version : values) {
+      if (version.getValue() == current) {
+        return version;
+      }
+    }
+    throw new IllegalArgumentException("never");
+  }
+
+  @Override
+  public TCloseSessionResp CloseSession(TCloseSessionReq req) throws 
TException {
+    TCloseSessionResp resp = new TCloseSessionResp();
+    try {
+      SessionHandle sessionHandle = new SessionHandle(req.getSessionHandle());
+      cliService.closeSession(sessionHandle);
+      resp.setStatus(OK_STATUS);
+      ThriftCLIServerContext context =
+        (ThriftCLIServerContext)currentServerContext.get();
+      if (context != null) {
+        context.setSessionHandle(null);
+      }
+    } catch (Exception e) {
+      LOG.warn("Error closing session: ", e);
+      resp.setStatus(HiveSQLException.toTStatus(e));
+    }
+    return resp;
+  }
+
+  @Override
+  public TGetInfoResp GetInfo(TGetInfoReq req) throws TException {
+    TGetInfoResp resp = new TGetInfoResp();
+    try {
+      GetInfoValue getInfoValue =
+          cliService.getInfo(new SessionHandle(req.getSessionHandle()),
+              GetInfoType.getGetInfoType(req.getInfoType()));
+      resp.setInfoValue(getInfoValue.toTGetInfoValue());
+      resp.setStatus(OK_STATUS);
+    } catch (Exception e) {
+      LOG.warn("Error getting info: ", e);
+      resp.setStatus(HiveSQLException.toTStatus(e));
+    }
+    return resp;
+  }
+
+  @Override
+  public TExecuteStatementResp ExecuteStatement(TExecuteStatementReq req) 
throws TException {
+    TExecuteStatementResp resp = new TExecuteStatementResp();
+    try {
+      SessionHandle sessionHandle = new SessionHandle(req.getSessionHandle());
+      String statement = req.getStatement();
+      Map<String, String> confOverlay = req.getConfOverlay();
+      Boolean runAsync = req.isRunAsync();
+      long queryTimeout = req.getQueryTimeout();
+      OperationHandle operationHandle =
+          runAsync ? cliService.executeStatementAsync(sessionHandle, 
statement, confOverlay,
+              queryTimeout) : cliService.executeStatement(sessionHandle, 
statement, confOverlay,
+              queryTimeout);
+      resp.setOperationHandle(operationHandle.toTOperationHandle());
+      resp.setStatus(OK_STATUS);
+    } catch (Exception e) {
+      // Note: it's rather important that this (and other methods) catch 
Exception, not Throwable;
+      // in combination with HiveSessionProxy.invoke code, perhaps 
unintentionally, it used
+      // to also catch all errors; and now it allows OOMs only to propagate.
+      LOG.warn("Error executing statement: ", e);
+      resp.setStatus(HiveSQLException.toTStatus(e));
+    }
+    return resp;
+  }
+
+  @Override
+  public TGetTypeInfoResp GetTypeInfo(TGetTypeInfoReq req) throws TException {
+    TGetTypeInfoResp resp = new TGetTypeInfoResp();
+    try {
+      OperationHandle operationHandle = cliService.getTypeInfo(new 
SessionHandle(req.getSessionHandle()));
+      resp.setOperationHandle(operationHandle.toTOperationHandle());
+      resp.setStatus(OK_STATUS);
+    } catch (Exception e) {
+      LOG.warn("Error getting type info: ", e);
+      resp.setStatus(HiveSQLException.toTStatus(e));
+    }
+    return resp;
+  }
+
+  @Override
+  public TGetCatalogsResp GetCatalogs(TGetCatalogsReq req) throws TException {
+    TGetCatalogsResp resp = new TGetCatalogsResp();
+    try {
+      OperationHandle opHandle = cliService.getCatalogs(new 
SessionHandle(req.getSessionHandle()));
+      resp.setOperationHandle(opHandle.toTOperationHandle());
+      resp.setStatus(OK_STATUS);
+    } catch (Exception e) {
+      LOG.warn("Error getting catalogs: ", e);
+      resp.setStatus(HiveSQLException.toTStatus(e));
+    }
+    return resp;
+  }
+
+  @Override
+  public TGetSchemasResp GetSchemas(TGetSchemasReq req) throws TException {
+    TGetSchemasResp resp = new TGetSchemasResp();
+    try {
+      OperationHandle opHandle = cliService.getSchemas(
+          new SessionHandle(req.getSessionHandle()), req.getCatalogName(), 
req.getSchemaName());
+      resp.setOperationHandle(opHandle.toTOperationHandle());
+      resp.setStatus(OK_STATUS);
+    } catch (Exception e) {
+      LOG.warn("Error getting schemas: ", e);
+      resp.setStatus(HiveSQLException.toTStatus(e));
+    }
+    return resp;
+  }
+
+  @Override
+  public TGetTablesResp GetTables(TGetTablesReq req) throws TException {
+    TGetTablesResp resp = new TGetTablesResp();
+    try {
+      OperationHandle opHandle = cliService
+          .getTables(new SessionHandle(req.getSessionHandle()), 
req.getCatalogName(),
+              req.getSchemaName(), req.getTableName(), req.getTableTypes());
+      resp.setOperationHandle(opHandle.toTOperationHandle());
+      resp.setStatus(OK_STATUS);
+    } catch (Exception e) {
+      LOG.warn("Error getting tables: ", e);
+      resp.setStatus(HiveSQLException.toTStatus(e));
+    }
+    return resp;
+  }
+
+  @Override
+  public TGetTableTypesResp GetTableTypes(TGetTableTypesReq req) throws 
TException {
+    TGetTableTypesResp resp = new TGetTableTypesResp();
+    try {
+      OperationHandle opHandle = cliService.getTableTypes(new 
SessionHandle(req.getSessionHandle()));
+      resp.setOperationHandle(opHandle.toTOperationHandle());
+      resp.setStatus(OK_STATUS);
+    } catch (Exception e) {
+      LOG.warn("Error getting table types: ", e);
+      resp.setStatus(HiveSQLException.toTStatus(e));
+    }
+    return resp;
+  }
+
+  @Override
+  public TGetColumnsResp GetColumns(TGetColumnsReq req) throws TException {
+    TGetColumnsResp resp = new TGetColumnsResp();
+    try {
+      OperationHandle opHandle = cliService.getColumns(
+          new SessionHandle(req.getSessionHandle()),
+          req.getCatalogName(),
+          req.getSchemaName(),
+          req.getTableName(),
+          req.getColumnName());
+      resp.setOperationHandle(opHandle.toTOperationHandle());
+      resp.setStatus(OK_STATUS);
+    } catch (Exception e) {
+      LOG.warn("Error getting columns: ", e);
+      resp.setStatus(HiveSQLException.toTStatus(e));
+    }
+    return resp;
+  }
+
+  @Override
+  public TGetFunctionsResp GetFunctions(TGetFunctionsReq req) throws 
TException {
+    TGetFunctionsResp resp = new TGetFunctionsResp();
+    try {
+      OperationHandle opHandle = cliService.getFunctions(
+          new SessionHandle(req.getSessionHandle()), req.getCatalogName(),
+          req.getSchemaName(), req.getFunctionName());
+      resp.setOperationHandle(opHandle.toTOperationHandle());
+      resp.setStatus(OK_STATUS);
+    } catch (Exception e) {
+      LOG.warn("Error getting functions: ", e);
+      resp.setStatus(HiveSQLException.toTStatus(e));
+    }
+    return resp;
+  }
+
+  @Override
+  public TGetOperationStatusResp GetOperationStatus(TGetOperationStatusReq 
req) throws TException {
+    TGetOperationStatusResp resp = new TGetOperationStatusResp();
+    OperationHandle operationHandle = new 
OperationHandle(req.getOperationHandle());
+    try {
+      OperationStatus operationStatus =
+          cliService.getOperationStatus(operationHandle, 
req.isGetProgressUpdate());
+
+      resp.setOperationState(operationStatus.getState().toTOperationState());
+      resp.setErrorMessage(operationStatus.getState().getErrorMessage());
+      HiveSQLException opException = operationStatus.getOperationException();
+      resp.setTaskStatus(operationStatus.getTaskStatus());
+      resp.setOperationStarted(operationStatus.getOperationStarted());
+      resp.setOperationCompleted(operationStatus.getOperationCompleted());
+      resp.setHasResultSet(operationStatus.getHasResultSet());
+      JobProgressUpdate progressUpdate = operationStatus.jobProgressUpdate();
+      ProgressMonitorStatusMapper mapper = ProgressMonitorStatusMapper.DEFAULT;
+
+      TJobExecutionStatus executionStatus =
+          mapper.forStatus(progressUpdate.status);
+      resp.setProgressUpdateResponse(new TProgressUpdateResp(
+          progressUpdate.headers(),
+          progressUpdate.rows(),
+          progressUpdate.progressedPercentage,
+          executionStatus,
+          progressUpdate.footerSummary,
+          progressUpdate.startTimeMillis
+      ));
+      if (opException != null) {
+        resp.setSqlState(opException.getSQLState());
+        resp.setErrorCode(opException.getErrorCode());
+        if (opException.getErrorCode() == 29999)
+          
resp.setErrorMessage(org.apache.hadoop.util.StringUtils.stringifyException(opException));
+        else
+          resp.setErrorMessage(opException.getMessage());
+      } else if (executionStatus == TJobExecutionStatus.NOT_AVAILABLE
+          && 
OperationType.EXECUTE_STATEMENT.equals(operationHandle.getOperationType())) {
+        resp.getProgressUpdateResponse().setProgressedPercentage(
+            getProgressedPercentage(operationHandle));
+      }
+      resp.setStatus(OK_STATUS);
+    } catch (Exception e) {
+      LOG.warn("Error getting operation status: ", e);
+      resp.setStatus(HiveSQLException.toTStatus(e));
+    }
+    return resp;
+  }
+
+  @Override
+  public TCancelOperationResp CancelOperation(TCancelOperationReq req) throws 
TException {
+    TCancelOperationResp resp = new TCancelOperationResp();
+    try {
+      cliService.cancelOperation(new 
OperationHandle(req.getOperationHandle()));
+      resp.setStatus(OK_STATUS);
+    } catch (Exception e) {
+      LOG.warn("Error cancelling operation: ", e);
+      resp.setStatus(HiveSQLException.toTStatus(e));
+    }
+    return resp;
+  }
+
+  @Override
+  public TCloseOperationResp CloseOperation(TCloseOperationReq req) throws 
TException {
+    TCloseOperationResp resp = new TCloseOperationResp();
+    try {
+      cliService.closeOperation(new OperationHandle(req.getOperationHandle()));
+      resp.setStatus(OK_STATUS);
+    } catch (Exception e) {
+      LOG.warn("Error closing operation: ", e);
+      resp.setStatus(HiveSQLException.toTStatus(e));
+    }
+    return resp;
+  }
+
+  @Override
+  public TGetResultSetMetadataResp 
GetResultSetMetadata(TGetResultSetMetadataReq req)
+      throws TException {
+    TGetResultSetMetadataResp resp = new TGetResultSetMetadataResp();
+    try {
+      TableSchema schema = cliService.getResultSetMetadata(new 
OperationHandle(req.getOperationHandle()));
+      resp.setSchema(schema.toTTableSchema());
+      resp.setStatus(OK_STATUS);
+    } catch (Exception e) {
+      LOG.warn("Error getting result set metadata: ", e);
+      resp.setStatus(HiveSQLException.toTStatus(e));
+    }
+    return resp;
+  }
+
+  @Override
+  public TFetchResultsResp FetchResults(TFetchResultsReq req) throws 
TException {
+    TFetchResultsResp resp = new TFetchResultsResp();
+    try {
+      // Set fetch size
+      int maxFetchSize =
+        
hiveConf.getIntVar(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_RESULTSET_MAX_FETCH_SIZE);
+      if (req.getMaxRows() > maxFetchSize) {
+        req.setMaxRows(maxFetchSize);
+      }
+      RowSet rowSet = cliService.fetchResults(
+          new OperationHandle(req.getOperationHandle()),
+          FetchOrientation.getFetchOrientation(req.getOrientation()),
+          req.getMaxRows(),
+          FetchType.getFetchType(req.getFetchType()));
+      resp.setResults(rowSet.toTRowSet());
+      resp.setHasMoreRows(false);
+      resp.setStatus(OK_STATUS);
+    } catch (Exception e) {
+      LOG.warn("Error fetching results: ", e);
+      resp.setStatus(HiveSQLException.toTStatus(e));
+    }
+    return resp;
+  }
+
+  @Override
+  public TGetPrimaryKeysResp GetPrimaryKeys(TGetPrimaryKeysReq req)
+               throws TException {
+    TGetPrimaryKeysResp resp = new TGetPrimaryKeysResp();
+    try {
+      OperationHandle opHandle = cliService.getPrimaryKeys(
+      new SessionHandle(req.getSessionHandle()), req.getCatalogName(),
+      req.getSchemaName(), req.getTableName());
+      resp.setOperationHandle(opHandle.toTOperationHandle());
+      resp.setStatus(OK_STATUS);
+    } catch (Exception e) {
+     LOG.warn("Error getting functions: ", e);
+     resp.setStatus(HiveSQLException.toTStatus(e));
+    }
+    return resp;
+  }
+
+  @Override
+  public TGetCrossReferenceResp GetCrossReference(TGetCrossReferenceReq req)
+               throws TException {
+    TGetCrossReferenceResp resp = new TGetCrossReferenceResp();
+    try {
+      OperationHandle opHandle = cliService.getCrossReference(
+        new SessionHandle(req.getSessionHandle()), req.getParentCatalogName(),
+             req.getParentSchemaName(), req.getParentTableName(),
+          req.getForeignCatalogName(), req.getForeignSchemaName(), 
req.getForeignTableName());
+          resp.setOperationHandle(opHandle.toTOperationHandle());
+          resp.setStatus(OK_STATUS);
+    } catch (Exception e) {
+      LOG.warn("Error getting functions: ", e);
+         resp.setStatus(HiveSQLException.toTStatus(e));
+       }
+    return resp;
+  }
+
+  @Override
+  public TGetQueryIdResp GetQueryId(TGetQueryIdReq req) throws TException {
+    try {
+      return new 
TGetQueryIdResp(cliService.getQueryId(req.getOperationHandle()));
+    } catch (HiveSQLException e) {
+      throw new TException(e);
+    }
+  }
+
+  @Override
+  public abstract void run();
+
+  /**
+   * If the proxy user name is provided then check privileges to substitute 
the user.
+   * @param realUser
+   * @param sessionConf
+   * @param ipAddress
+   * @return
+   * @throws HiveSQLException
+   */
+  private String getProxyUser(String realUser, Map<String, String> sessionConf,
+      String ipAddress) throws HiveSQLException {
+    String proxyUser = null;
+    // Http transport mode.
+    // We set the thread local proxy username, in ThriftHttpServlet.
+    if 
(hiveConf.getVar(ConfVars.HIVE_SERVER2_TRANSPORT_MODE).equalsIgnoreCase("http"))
 {
+      proxyUser = SessionInfo.getProxyUserName();
+      LOG.debug("Proxy user from query string: " + proxyUser);
+    }
+
+    if (proxyUser == null && sessionConf != null && 
sessionConf.containsKey(HiveAuthConstants.HS2_PROXY_USER)) {
+      String proxyUserFromThriftBody = 
sessionConf.get(HiveAuthConstants.HS2_PROXY_USER);
+      LOG.debug("Proxy user from thrift body: " + proxyUserFromThriftBody);
+      proxyUser = proxyUserFromThriftBody;
+    }
+
+    if (proxyUser == null) {
+      return realUser;
+    }
+
+    // check whether substitution is allowed
+    if 
(!hiveConf.getBoolVar(HiveConf.ConfVars.HIVE_SERVER2_ALLOW_USER_SUBSTITUTION)) {
+      throw new HiveSQLException("Proxy user substitution is not allowed");
+    }
+
+    // If there's no authentication, then directly substitute the user
+    if (HiveAuthConstants.AuthTypes.NONE.toString().
+        
equalsIgnoreCase(hiveConf.getVar(ConfVars.HIVE_SERVER2_AUTHENTICATION))) {
+      return proxyUser;
+    }
+
+    // Verify proxy user privilege of the realUser for the proxyUser
+    HiveAuthFactory.verifyProxyAccess(realUser, proxyUser, ipAddress, 
hiveConf);
+    LOG.debug("Verified proxy user: " + proxyUser);
+    return proxyUser;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/7d8fa69f/thriftserver/server/src/main/java/org/apache/hive/service/cli/thrift/ThriftHttpCLIService.java
----------------------------------------------------------------------
diff --git 
a/thriftserver/server/src/main/java/org/apache/hive/service/cli/thrift/ThriftHttpCLIService.java
 
b/thriftserver/server/src/main/java/org/apache/hive/service/cli/thrift/ThriftHttpCLIService.java
new file mode 100644
index 0000000..4e48cc5
--- /dev/null
+++ 
b/thriftserver/server/src/main/java/org/apache/hive/service/cli/thrift/ThriftHttpCLIService.java
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.service.cli.thrift;
+
+import java.util.Arrays;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.TimeUnit;
+
+import javax.ws.rs.HttpMethod;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.shims.ShimLoader;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hive.service.auth.HiveAuthFactory;
+import org.apache.hive.service.cli.ICLIService;
+import org.apache.hive.service.rpc.thrift.TCLIService;
+import org.apache.hive.service.rpc.thrift.TCLIService.Iface;
+import org.apache.hive.service.server.ThreadFactoryWithGarbageCleanup;
+import org.apache.livy.thriftserver.LivyCLIService;
+import org.apache.thrift.TProcessor;
+import org.apache.thrift.protocol.TBinaryProtocol;
+import org.apache.thrift.protocol.TProtocolFactory;
+import org.apache.thrift.server.TServlet;
+import org.eclipse.jetty.server.HttpConfiguration;
+import org.eclipse.jetty.server.HttpConnectionFactory;
+import org.eclipse.jetty.server.Server;
+import org.eclipse.jetty.server.ServerConnector;
+import org.eclipse.jetty.server.handler.gzip.GzipHandler;
+import org.eclipse.jetty.servlet.ServletContextHandler;
+import org.eclipse.jetty.servlet.ServletHolder;
+import org.eclipse.jetty.util.ssl.SslContextFactory;
+import org.eclipse.jetty.util.thread.ExecutorThreadPool;
+
+
+public class ThriftHttpCLIService extends ThriftCLIService {
+  private static final String APPLICATION_THRIFT = "application/x-thrift";
+  protected org.eclipse.jetty.server.Server server;
+
+  private final Runnable oomHook;
+  public ThriftHttpCLIService(ICLIService cliService, Runnable oomHook) {
+    super(cliService, ThriftHttpCLIService.class.getSimpleName());
+    this.oomHook = oomHook;
+  }
+
+  /**
+   * Configure Jetty to serve http requests. Example of a client connection 
URL:
+   * http://localhost:10000/servlets/thrifths2/ A gateway may cause actual 
target
+   * URL to differ, e.g. http://gateway:port/hive2/servlets/thrifths2/
+   */
+  @Override
+  protected void initServer() {
+    try {
+      // Server thread pool
+      // Start with minWorkerThreads, expand till maxWorkerThreads and reject
+      // subsequent requests
+      String threadPoolName = "HiveServer2-HttpHandler-Pool";
+      ExecutorService executorService = new 
ThreadPoolExecutorWithOomHook(minWorkerThreads,
+          maxWorkerThreads,workerKeepAliveTime, TimeUnit.SECONDS,
+          new SynchronousQueue<Runnable>(), new 
ThreadFactoryWithGarbageCleanup(threadPoolName), oomHook);
+      ExecutorThreadPool threadPool = new ExecutorThreadPool(executorService);
+
+      // HTTP Server
+      server = new Server(threadPool);
+
+      ServerConnector connector;
+
+      final HttpConfiguration conf = new HttpConfiguration();
+      // Configure header size
+      int requestHeaderSize =
+          
hiveConf.getIntVar(ConfVars.HIVE_SERVER2_THRIFT_HTTP_REQUEST_HEADER_SIZE);
+      int responseHeaderSize =
+          
hiveConf.getIntVar(ConfVars.HIVE_SERVER2_THRIFT_HTTP_RESPONSE_HEADER_SIZE);
+      conf.setRequestHeaderSize(requestHeaderSize);
+      conf.setResponseHeaderSize(responseHeaderSize);
+      final HttpConnectionFactory http = new HttpConnectionFactory(conf);
+
+      boolean useSsl = hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_USE_SSL);
+      String schemeName = useSsl ? "https" : "http";
+
+      // Change connector if SSL is used
+      if (useSsl) {
+        String keyStorePath = 
hiveConf.getVar(ConfVars.HIVE_SERVER2_SSL_KEYSTORE_PATH).trim();
+        String keyStorePassword = 
ShimLoader.getHadoopShims().getPassword(hiveConf,
+            HiveConf.ConfVars.HIVE_SERVER2_SSL_KEYSTORE_PASSWORD.varname);
+        if (keyStorePath.isEmpty()) {
+          throw new IllegalArgumentException(
+              ConfVars.HIVE_SERVER2_SSL_KEYSTORE_PATH.varname 
+              + " Not configured for SSL connection");
+        }
+        SslContextFactory sslContextFactory = new SslContextFactory();
+        String[] excludedProtocols = 
hiveConf.getVar(ConfVars.HIVE_SSL_PROTOCOL_BLACKLIST).split(",");
+        LOG.info("HTTP Server SSL: adding excluded protocols: " + 
Arrays.toString(excludedProtocols));
+        sslContextFactory.addExcludeProtocols(excludedProtocols);
+        LOG.info("HTTP Server SSL: SslContextFactory.getExcludeProtocols = "
+            + Arrays.toString(sslContextFactory.getExcludeProtocols()));
+        sslContextFactory.setKeyStorePath(keyStorePath);
+        sslContextFactory.setKeyStorePassword(keyStorePassword);
+        connector = new ServerConnector(server, sslContextFactory, http);
+      } else {
+        connector = new ServerConnector(server, http);
+      }
+
+      connector.setPort(portNum);
+      // Linux:yes, Windows:no
+      connector.setReuseAddress(true);
+      int maxIdleTime = (int) 
hiveConf.getTimeVar(ConfVars.HIVE_SERVER2_THRIFT_HTTP_MAX_IDLE_TIME,
+          TimeUnit.MILLISECONDS);
+      connector.setIdleTimeout(maxIdleTime);
+
+      server.addConnector(connector);
+
+      // Thrift configs
+      hiveAuthFactory = new HiveAuthFactory(hiveConf);
+      TProcessor processor = new TCLIService.Processor<Iface>(this);
+      TProtocolFactory protocolFactory = new TBinaryProtocol.Factory();
+      // Set during the init phase of HiveServer2 if auth mode is kerberos
+      // UGI for the hive/_HOST (kerberos) principal
+      UserGroupInformation serviceUGI = ((LivyCLIService) 
cliService).getServiceUGI();
+      // UGI for the http/_HOST (SPNego) principal
+      UserGroupInformation httpUGI = ((LivyCLIService) 
cliService).getHttpUGI();
+      String authType = hiveConf.getVar(ConfVars.HIVE_SERVER2_AUTHENTICATION);
+      TServlet thriftHttpServlet = new ThriftHttpServlet(processor, 
protocolFactory, authType, serviceUGI, httpUGI,
+          hiveAuthFactory);
+
+      // Context handler
+      final ServletContextHandler context = new 
ServletContextHandler(ServletContextHandler.SESSIONS);
+      context.setContextPath("/");
+      if 
(hiveConf.getBoolean(ConfVars.HIVE_SERVER2_XSRF_FILTER_ENABLED.varname, false)) 
{
+        // context.addFilter(Utils.getXSRFFilterHolder(null, null), "/" ,
+        // FilterMapping.REQUEST);
+        // Filtering does not work here currently, doing filter in 
ThriftHttpServlet
+        LOG.debug("XSRF filter enabled");
+      } else {
+        LOG.warn("XSRF filter disabled");
+      }
+
+      final String httpPath = 
getHttpPath(hiveConf.getVar(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_HTTP_PATH));
+
+      if (HiveConf.getBoolVar(hiveConf, 
ConfVars.HIVE_SERVER2_THRIFT_HTTP_COMPRESSION_ENABLED)) {
+        final GzipHandler gzipHandler = new GzipHandler();
+        gzipHandler.setHandler(context);
+        gzipHandler.addIncludedMethods(HttpMethod.POST);
+        gzipHandler.addIncludedMimeTypes(APPLICATION_THRIFT);
+        server.setHandler(gzipHandler);
+      } else {
+        server.setHandler(context);
+      }
+      context.addServlet(new ServletHolder(thriftHttpServlet), httpPath);
+
+      // TODO: check defaults: maxTimeout, keepalive, maxBodySize,
+      // bodyRecieveDuration, etc.
+      // Finally, start the server
+      server.start();
+      String msg = "Started " + ThriftHttpCLIService.class.getSimpleName() + " 
in " + schemeName
+          + " mode on port " + portNum + " path=" + httpPath + " with " + 
minWorkerThreads + "..."
+          + maxWorkerThreads + " worker threads";
+      LOG.info(msg);
+    } catch (Exception e) {
+      throw new RuntimeException("Failed to init HttpServer", e);
+    }
+  }
+
+  @Override
+  public void run() {
+    try {
+      server.join();
+    } catch (Throwable t) {
+      if (t instanceof InterruptedException) {
+        // This is likely a shutdown
+        LOG.info("Caught " + t.getClass().getSimpleName() + ". Shutting down 
thrift server.");
+      } else {
+        LOG.error("Exception caught by " + 
ThriftHttpCLIService.class.getSimpleName() +
+            ". Exiting.", t);
+        System.exit(-1);
+      }
+    }
+  }
+
+  /**
+   * The config parameter can be like "path", "/path", "/path/", "path/*", 
"/path1/path2/*" and so on.
+   * httpPath should end up as "/*", "/path/*" or "/path1/../pathN/*"
+   * @param httpPath
+   * @return
+   */
+  private String getHttpPath(String httpPath) {
+    if(httpPath == null || httpPath.equals("")) {
+      httpPath = "/*";
+    }
+    else {
+      if(!httpPath.startsWith("/")) {
+        httpPath = "/" + httpPath;
+      }
+      if(httpPath.endsWith("/")) {
+        httpPath = httpPath + "*";
+      }
+      if(!httpPath.endsWith("/*")) {
+        httpPath = httpPath + "/*";
+      }
+    }
+    return httpPath;
+  }
+
+  @Override
+  protected void stopServer() {
+    if((server != null) && server.isStarted()) {
+      try {
+        server.stop();
+        server = null;
+        LOG.info("Thrift HTTP server has been stopped");
+      } catch (Exception e) {
+        LOG.error("Error stopping HTTP server: ", e);
+      }
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/7d8fa69f/thriftserver/server/src/main/java/org/apache/hive/service/cli/thrift/ThriftHttpServlet.java
----------------------------------------------------------------------
diff --git 
a/thriftserver/server/src/main/java/org/apache/hive/service/cli/thrift/ThriftHttpServlet.java
 
b/thriftserver/server/src/main/java/org/apache/hive/service/cli/thrift/ThriftHttpServlet.java
new file mode 100644
index 0000000..33c942f
--- /dev/null
+++ 
b/thriftserver/server/src/main/java/org/apache/hive/service/cli/thrift/ThriftHttpServlet.java
@@ -0,0 +1,592 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.service.cli.thrift;
+
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.security.PrivilegedExceptionAction;
+import java.security.SecureRandom;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import javax.servlet.ServletException;
+import javax.servlet.http.Cookie;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+import javax.ws.rs.core.NewCookie;
+
+import org.apache.commons.codec.binary.Base64;
+import org.apache.commons.codec.binary.StringUtils;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.shims.HadoopShims.KerberosNameShim;
+import org.apache.hadoop.hive.shims.ShimLoader;
+import org.apache.hadoop.hive.shims.Utils;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hive.service.CookieSigner;
+import org.apache.hive.service.auth.AuthenticationProviderFactory;
+import org.apache.hive.service.auth.AuthenticationProviderFactory.AuthMethods;
+import org.apache.hive.service.auth.HiveAuthConstants;
+import org.apache.hive.service.auth.HiveAuthFactory;
+import org.apache.hive.service.auth.HttpAuthUtils;
+import org.apache.hive.service.auth.HttpAuthenticationException;
+import org.apache.hive.service.auth.PasswdAuthenticationProvider;
+import org.apache.hive.service.cli.HiveSQLException;
+import org.apache.thrift.TProcessor;
+import org.apache.thrift.protocol.TProtocolFactory;
+import org.apache.thrift.server.TServlet;
+import org.ietf.jgss.GSSContext;
+import org.ietf.jgss.GSSCredential;
+import org.ietf.jgss.GSSException;
+import org.ietf.jgss.GSSManager;
+import org.ietf.jgss.GSSName;
+import org.ietf.jgss.Oid;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.livy.thriftserver.SessionInfo;
+
+/**
+ *
+ * ThriftHttpServlet
+ *
+ */
+public class ThriftHttpServlet extends TServlet {
+
+  private static final long serialVersionUID = 1L;
+  public static final Logger LOG = 
LoggerFactory.getLogger(ThriftHttpServlet.class.getName());
+  private final String authType;
+  private final UserGroupInformation serviceUGI;
+  private final UserGroupInformation httpUGI;
+  private HiveConf hiveConf = new HiveConf();
+
+  // Class members for cookie based authentication.
+  private CookieSigner signer;
+  public static final String AUTH_COOKIE = "hive.server2.auth";
+  private static final SecureRandom RAN = new SecureRandom();
+  private boolean isCookieAuthEnabled;
+  private String cookieDomain;
+  private String cookiePath;
+  private int cookieMaxAge;
+  private boolean isCookieSecure;
+  private boolean isHttpOnlyCookie;
+  private final HiveAuthFactory hiveAuthFactory;
+  private static final String HIVE_DELEGATION_TOKEN_HEADER =  
"X-Hive-Delegation-Token";
+  private static final String X_FORWARDED_FOR = "X-Forwarded-For";
+
+  public ThriftHttpServlet(TProcessor processor, TProtocolFactory 
protocolFactory,
+      String authType, UserGroupInformation serviceUGI, UserGroupInformation 
httpUGI,
+      HiveAuthFactory hiveAuthFactory) {
+    super(processor, protocolFactory);
+    this.authType = authType;
+    this.serviceUGI = serviceUGI;
+    this.httpUGI = httpUGI;
+    this.hiveAuthFactory = hiveAuthFactory;
+    this.isCookieAuthEnabled = hiveConf.getBoolVar(
+      ConfVars.HIVE_SERVER2_THRIFT_HTTP_COOKIE_AUTH_ENABLED);
+    // Initialize the cookie based authentication related variables.
+    if (isCookieAuthEnabled) {
+      // Generate the signer with secret.
+      String secret = Long.toString(RAN.nextLong());
+      LOG.debug("Using the random number as the secret for cookie generation " 
+ secret);
+      this.signer = new CookieSigner(secret.getBytes());
+      this.cookieMaxAge = (int) hiveConf.getTimeVar(
+        ConfVars.HIVE_SERVER2_THRIFT_HTTP_COOKIE_MAX_AGE, TimeUnit.SECONDS);
+      this.cookieDomain = 
hiveConf.getVar(ConfVars.HIVE_SERVER2_THRIFT_HTTP_COOKIE_DOMAIN);
+      this.cookiePath = 
hiveConf.getVar(ConfVars.HIVE_SERVER2_THRIFT_HTTP_COOKIE_PATH);
+      // always send secure cookies for SSL mode
+      this.isCookieSecure = hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_USE_SSL);
+      this.isHttpOnlyCookie = hiveConf.getBoolVar(
+        ConfVars.HIVE_SERVER2_THRIFT_HTTP_COOKIE_IS_HTTPONLY);
+    }
+  }
+
+  @Override
+  protected void doPost(HttpServletRequest request, HttpServletResponse 
response)
+      throws ServletException, IOException {
+    String clientUserName = null;
+    String clientIpAddress;
+    boolean requireNewCookie = false;
+
+    try {
+      if 
(hiveConf.getBoolean(ConfVars.HIVE_SERVER2_XSRF_FILTER_ENABLED.varname,false)){
+        boolean continueProcessing = 
Utils.doXsrfFilter(request,response,null,null);
+        if (!continueProcessing){
+          LOG.warn("Request did not have valid XSRF header, rejecting.");
+          return;
+        }
+      }
+      // If the cookie based authentication is already enabled, parse the
+      // request and validate the request cookies.
+      if (isCookieAuthEnabled) {
+        clientUserName = validateCookie(request);
+        requireNewCookie = (clientUserName == null);
+        if (requireNewCookie) {
+          LOG.info("Could not validate cookie sent, will try to generate a new 
cookie");
+        }
+      }
+      // If the cookie based authentication is not enabled or the request does
+      // not have a valid cookie, use the kerberos or password based 
authentication
+      // depending on the server setup.
+      if (clientUserName == null) {
+        // For a kerberos setup
+        if (isKerberosAuthMode(authType)) {
+          String delegationToken = 
request.getHeader(HIVE_DELEGATION_TOKEN_HEADER);
+          // Each http request must have an Authorization header
+          if ((delegationToken != null) && (!delegationToken.isEmpty())) {
+            clientUserName = doTokenAuth(request, response);
+          } else {
+            clientUserName = doKerberosAuth(request);
+          }
+        }
+        // For password based authentication
+        else {
+          clientUserName = doPasswdAuth(request, authType);
+        }
+      }
+      LOG.debug("Client username: " + clientUserName);
+
+      // Set the thread local username to be used for doAs if true
+      SessionInfo.setUserName(clientUserName);
+
+      // find proxy user if any from query param
+      String doAsQueryParam = getDoAsQueryParam(request.getQueryString());
+      if (doAsQueryParam != null) {
+        SessionInfo.setProxyUserName(doAsQueryParam);
+      }
+
+      clientIpAddress = request.getRemoteAddr();
+      LOG.debug("Client IP Address: " + clientIpAddress);
+      // Set the thread local ip address
+      SessionInfo.setIpAddress(clientIpAddress);
+
+      // get forwarded hosts address
+      String forwarded_for = request.getHeader(X_FORWARDED_FOR);
+      if (forwarded_for != null) {
+        LOG.debug("{}:{}", X_FORWARDED_FOR, forwarded_for);
+        List<String> forwardedAddresses = 
Arrays.asList(forwarded_for.split(","));
+        SessionInfo.setForwardedAddresses(forwardedAddresses);
+      } else {
+        SessionInfo.setForwardedAddresses(Collections.<String>emptyList());
+      }
+
+      // Generate new cookie and add it to the response
+      if (requireNewCookie &&
+          
!authType.equalsIgnoreCase(HiveAuthConstants.AuthTypes.NOSASL.toString())) {
+        String cookieToken = HttpAuthUtils.createCookieToken(clientUserName);
+        Cookie hs2Cookie = createCookie(signer.signCookie(cookieToken));
+
+        if (isHttpOnlyCookie) {
+          response.setHeader("SET-COOKIE", getHttpOnlyCookieHeader(hs2Cookie));
+        } else {
+          response.addCookie(hs2Cookie);
+        }
+        LOG.info("Cookie added for clientUserName " + clientUserName);
+      }
+      super.doPost(request, response);
+    }
+    catch (HttpAuthenticationException e) {
+      LOG.error("Error: ", e);
+      // Send a 401 to the client
+      response.setStatus(HttpServletResponse.SC_UNAUTHORIZED);
+      if(isKerberosAuthMode(authType)) {
+        response.addHeader(HttpAuthUtils.WWW_AUTHENTICATE, 
HttpAuthUtils.NEGOTIATE);
+      }
+      response.getWriter().println("Authentication Error: " + e.getMessage());
+    }
+    finally {
+      // Clear the thread locals
+      SessionInfo.clearUserName();
+      SessionInfo.clearIpAddress();
+      SessionInfo.clearProxyUserName();
+      SessionInfo.clearForwardedAddresses();
+    }
+  }
+
+  /**
+   * Retrieves the client name from cookieString. If the cookie does not
+   * correspond to a valid client, the function returns null.
+   * @param cookies HTTP Request cookies.
+   * @return Client Username if cookieString has a HS2 Generated cookie that 
is currently valid.
+   * Else, returns null.
+   */
+  private String getClientNameFromCookie(Cookie[] cookies) {
+    // Current Cookie Name, Current Cookie Value
+    String currName, currValue;
+
+    // Following is the main loop which iterates through all the cookies send 
by the client.
+    // The HS2 generated cookies are of the format hive.server2.auth=<value>
+    // A cookie which is identified as a hiveserver2 generated cookie is 
validated
+    // by calling signer.verifyAndExtract(). If the validation passes, send the
+    // username for which the cookie is validated to the caller. If no client 
side
+    // cookie passes the validation, return null to the caller.
+    for (Cookie currCookie : cookies) {
+      // Get the cookie name
+      currName = currCookie.getName();
+      if (!currName.equals(AUTH_COOKIE)) {
+        // Not a HS2 generated cookie, continue.
+        continue;
+      }
+      // If we reached here, we have match for HS2 generated cookie
+      currValue = currCookie.getValue();
+      // Validate the value.
+      currValue = signer.verifyAndExtract(currValue);
+      // Retrieve the user name, do the final validation step.
+      if (currValue != null) {
+        String userName = HttpAuthUtils.getUserNameFromCookieToken(currValue);
+
+        if (userName == null) {
+          LOG.warn("Invalid cookie token " + currValue);
+          continue;
+        }
+        //We have found a valid cookie in the client request.
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Validated the cookie for user " + userName);
+        }
+        return userName;
+      }
+    }
+    // No valid HS2 generated cookies found, return null
+    return null;
+  }
+
+  /**
+   * Convert cookie array to human readable cookie string
+   * @param cookies Cookie Array
+   * @return String containing all the cookies separated by a newline 
character.
+   * Each cookie is of the format [key]=[value]
+   */
+  private String toCookieStr(Cookie[] cookies) {
+       String cookieStr = "";
+
+       for (Cookie c : cookies) {
+     cookieStr += c.getName() + "=" + c.getValue() + " ;\n";
+    }
+    return cookieStr;
+  }
+
+  /**
+   * Validate the request cookie. This function iterates over the request 
cookie headers
+   * and finds a cookie that represents a valid client/server session. If it 
finds one, it
+   * returns the client name associated with the session. Else, it returns 
null.
+   * @param request The HTTP Servlet Request send by the client
+   * @return Client Username if the request has valid HS2 cookie, else returns 
null
+   * @throws UnsupportedEncodingException
+   */
+  private String validateCookie(HttpServletRequest request) throws 
UnsupportedEncodingException {
+    // Find all the valid cookies associated with the request.
+    Cookie[] cookies = request.getCookies();
+
+    if (cookies == null) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("No valid cookies associated with the request " + request);
+      }
+      return null;
+    }
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Received cookies: " + toCookieStr(cookies));
+    }
+    return getClientNameFromCookie(cookies);
+  }
+
+  /**
+   * Generate a server side cookie given the cookie value as the input.
+   * @param str Input string token.
+   * @return The generated cookie.
+   * @throws UnsupportedEncodingException
+   */
+  private Cookie createCookie(String str) throws UnsupportedEncodingException {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Cookie name = " + AUTH_COOKIE + " value = " + str);
+    }
+    Cookie cookie = new Cookie(AUTH_COOKIE, str);
+
+    cookie.setMaxAge(cookieMaxAge);
+    if (cookieDomain != null) {
+      cookie.setDomain(cookieDomain);
+    }
+    if (cookiePath != null) {
+      cookie.setPath(cookiePath);
+    }
+    cookie.setSecure(isCookieSecure);
+    return cookie;
+  }
+
+  /**
+   * Generate httponly cookie from HS2 cookie
+   * @param cookie HS2 generated cookie
+   * @return The httponly cookie
+   */
+  private static String getHttpOnlyCookieHeader(Cookie cookie) {
+    NewCookie newCookie = new NewCookie(cookie.getName(), cookie.getValue(),
+      cookie.getPath(), cookie.getDomain(), cookie.getVersion(),
+      cookie.getComment(), cookie.getMaxAge(), cookie.getSecure());
+    return newCookie + "; HttpOnly";
+  }
+
+  /**
+   * Do the LDAP/PAM authentication
+   * @param request
+   * @param authType
+   * @throws HttpAuthenticationException
+   */
+  private String doPasswdAuth(HttpServletRequest request, String authType)
+      throws HttpAuthenticationException {
+    String userName = getUsername(request, authType);
+    // No-op when authType is NOSASL
+    if 
(!authType.equalsIgnoreCase(HiveAuthConstants.AuthTypes.NOSASL.toString())) {
+      try {
+        AuthMethods authMethod = AuthMethods.getValidAuthMethod(authType);
+        PasswdAuthenticationProvider provider =
+            
AuthenticationProviderFactory.getAuthenticationProvider(authMethod, hiveConf);
+        provider.Authenticate(userName, getPassword(request, authType));
+
+      } catch (Exception e) {
+        throw new HttpAuthenticationException(e);
+      }
+    }
+    return userName;
+  }
+
+  private String doTokenAuth(HttpServletRequest request, HttpServletResponse 
response)
+      throws HttpAuthenticationException {
+    String tokenStr = request.getHeader(HIVE_DELEGATION_TOKEN_HEADER);
+    try {
+      return hiveAuthFactory.verifyDelegationToken(tokenStr);
+    } catch (HiveSQLException e) {
+      throw new HttpAuthenticationException(e);
+    }
+  }
+
+  /**
+   * Do the GSS-API kerberos authentication.
+   * We already have a logged in subject in the form of serviceUGI,
+   * which GSS-API will extract information from.
+   * In case of a SPNego request we use the httpUGI,
+   * for the authenticating service tickets.
+   * @param request
+   * @return
+   * @throws HttpAuthenticationException
+   */
+  private String doKerberosAuth(HttpServletRequest request)
+      throws HttpAuthenticationException {
+    // Try authenticating with the http/_HOST principal
+    if (httpUGI != null) {
+      try {
+        return httpUGI.doAs(new HttpKerberosServerAction(request, httpUGI));
+      } catch (Exception e) {
+        LOG.info("Failed to authenticate with http/_HOST kerberos principal, " 
+
+            "trying with hive/_HOST kerberos principal");
+      }
+    }
+    // Now try with hive/_HOST principal
+    try {
+      return serviceUGI.doAs(new HttpKerberosServerAction(request, 
serviceUGI));
+    } catch (Exception e) {
+      LOG.error("Failed to authenticate with hive/_HOST kerberos principal");
+      throw new HttpAuthenticationException(e);
+    }
+
+  }
+
+  class HttpKerberosServerAction implements PrivilegedExceptionAction<String> {
+    HttpServletRequest request;
+    UserGroupInformation serviceUGI;
+
+    HttpKerberosServerAction(HttpServletRequest request,
+        UserGroupInformation serviceUGI) {
+      this.request = request;
+      this.serviceUGI = serviceUGI;
+    }
+
+    @Override
+    public String run() throws HttpAuthenticationException {
+      // Get own Kerberos credentials for accepting connection
+      GSSManager manager = GSSManager.getInstance();
+      GSSContext gssContext = null;
+      String serverPrincipal = getPrincipalWithoutRealm(
+          serviceUGI.getUserName());
+      try {
+        // This Oid for Kerberos GSS-API mechanism.
+        Oid kerberosMechOid = new Oid("1.2.840.113554.1.2.2");
+        // Oid for SPNego GSS-API mechanism.
+        Oid spnegoMechOid = new Oid("1.3.6.1.5.5.2");
+        // Oid for kerberos principal name
+        Oid krb5PrincipalOid = new Oid("1.2.840.113554.1.2.2.1");
+
+        // GSS name for server
+        GSSName serverName = manager.createName(serverPrincipal, 
krb5PrincipalOid);
+
+        // GSS credentials for server
+        GSSCredential serverCreds = manager.createCredential(serverName,
+            GSSCredential.DEFAULT_LIFETIME,
+            new Oid[]{kerberosMechOid, spnegoMechOid},
+            GSSCredential.ACCEPT_ONLY);
+
+        // Create a GSS context
+        gssContext = manager.createContext(serverCreds);
+        // Get service ticket from the authorization header
+        String serviceTicketBase64 = getAuthHeader(request, authType);
+        byte[] inToken = Base64.decodeBase64(serviceTicketBase64.getBytes());
+        gssContext.acceptSecContext(inToken, 0, inToken.length);
+        // Authenticate or deny based on its context completion
+        if (!gssContext.isEstablished()) {
+          throw new HttpAuthenticationException("Kerberos authentication 
failed: " +
+              "unable to establish context with the service ticket " +
+              "provided by the client.");
+        }
+        else {
+          return 
getPrincipalWithoutRealmAndHost(gssContext.getSrcName().toString());
+        }
+      }
+      catch (GSSException e) {
+        throw new HttpAuthenticationException("Kerberos authentication failed: 
", e);
+      }
+      finally {
+        if (gssContext != null) {
+          try {
+            gssContext.dispose();
+          } catch (GSSException e) {
+            // No-op
+          }
+        }
+      }
+    }
+
+    private String getPrincipalWithoutRealm(String fullPrincipal)
+        throws HttpAuthenticationException {
+      KerberosNameShim fullKerberosName;
+      try {
+        fullKerberosName = 
ShimLoader.getHadoopShims().getKerberosNameShim(fullPrincipal);
+      } catch (IOException e) {
+        throw new HttpAuthenticationException(e);
+      }
+      String serviceName = fullKerberosName.getServiceName();
+      String hostName = fullKerberosName.getHostName();
+      String principalWithoutRealm = serviceName;
+      if (hostName != null) {
+        principalWithoutRealm = serviceName + "/" + hostName;
+      }
+      return principalWithoutRealm;
+    }
+
+    private String getPrincipalWithoutRealmAndHost(String fullPrincipal)
+        throws HttpAuthenticationException {
+      KerberosNameShim fullKerberosName;
+      try {
+        fullKerberosName = 
ShimLoader.getHadoopShims().getKerberosNameShim(fullPrincipal);
+        return fullKerberosName.getShortName();
+      } catch (IOException e) {
+        throw new HttpAuthenticationException(e);
+      }
+    }
+  }
+
+  private String getUsername(HttpServletRequest request, String authType)
+      throws HttpAuthenticationException {
+    String creds[] = getAuthHeaderTokens(request, authType);
+    // Username must be present
+    if (creds[0] == null || creds[0].isEmpty()) {
+      throw new HttpAuthenticationException("Authorization header received " +
+          "from the client does not contain username.");
+    }
+    return creds[0];
+  }
+
+  private String getPassword(HttpServletRequest request, String authType)
+      throws HttpAuthenticationException {
+    String creds[] = getAuthHeaderTokens(request, authType);
+    // Password must be present
+    if (creds[1] == null || creds[1].isEmpty()) {
+      throw new HttpAuthenticationException("Authorization header received " +
+          "from the client does not contain username.");
+    }
+    return creds[1];
+  }
+
+  private String[] getAuthHeaderTokens(HttpServletRequest request,
+      String authType) throws HttpAuthenticationException {
+    String authHeaderBase64 = getAuthHeader(request, authType);
+    String authHeaderString = StringUtils.newStringUtf8(
+        Base64.decodeBase64(authHeaderBase64.getBytes()));
+    String[] creds = authHeaderString.split(":");
+    return creds;
+  }
+
+  /**
+   * Returns the base64 encoded auth header payload
+   * @param request
+   * @param authType
+   * @return
+   * @throws HttpAuthenticationException
+   */
+  private String getAuthHeader(HttpServletRequest request, String authType)
+      throws HttpAuthenticationException {
+    String authHeader = request.getHeader(HttpAuthUtils.AUTHORIZATION);
+    // Each http request must have an Authorization header
+    if (authHeader == null || authHeader.isEmpty()) {
+      throw new HttpAuthenticationException("Authorization header received " +
+          "from the client is empty.");
+    }
+
+    String authHeaderBase64String;
+    int beginIndex;
+    if (isKerberosAuthMode(authType)) {
+      beginIndex = (HttpAuthUtils.NEGOTIATE + " ").length();
+    }
+    else {
+      beginIndex = (HttpAuthUtils.BASIC + " ").length();
+    }
+    authHeaderBase64String = authHeader.substring(beginIndex);
+    // Authorization header must have a payload
+    if (authHeaderBase64String == null || authHeaderBase64String.isEmpty()) {
+      throw new HttpAuthenticationException("Authorization header received " +
+          "from the client does not contain any data.");
+    }
+    return authHeaderBase64String;
+  }
+
+  private boolean isKerberosAuthMode(String authType) {
+    return 
authType.equalsIgnoreCase(HiveAuthConstants.AuthTypes.KERBEROS.toString());
+  }
+
+  private static String getDoAsQueryParam(String queryString) {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("URL query string:" + queryString);
+    }
+    if (queryString == null) {
+      return null;
+    }
+    Map<String, String[]> params = 
javax.servlet.http.HttpUtils.parseQueryString( queryString );
+    Set<String> keySet = params.keySet();
+    for (String key: keySet) {
+      if (key.equalsIgnoreCase("doAs")) {
+        return params.get(key)[0];
+      }
+    }
+    return null;
+  }
+
+}
+
+

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/7d8fa69f/thriftserver/server/src/main/java/org/apache/hive/service/server/HiveServer2.java
----------------------------------------------------------------------
diff --git 
a/thriftserver/server/src/main/java/org/apache/hive/service/server/HiveServer2.java
 
b/thriftserver/server/src/main/java/org/apache/hive/service/server/HiveServer2.java
new file mode 100644
index 0000000..3db0590
--- /dev/null
+++ 
b/thriftserver/server/src/main/java/org/apache/hive/service/server/HiveServer2.java
@@ -0,0 +1,316 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.service.server;
+
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.cli.GnuParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.OptionBuilder;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.common.JvmPauseMonitor;
+import org.apache.hadoop.hive.common.LogUtils;
+import org.apache.hadoop.hive.common.LogUtils.LogInitializationException;
+import org.apache.hadoop.hive.common.cli.CommonCliOptions;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hive.common.util.HiveStringUtils;
+import org.apache.hive.common.util.ShutdownHookManager;
+import org.apache.hive.service.CompositeService;
+import org.apache.hive.service.cli.ICLIService;
+import org.apache.hive.service.cli.thrift.ThriftBinaryCLIService;
+import org.apache.hive.service.cli.thrift.ThriftCLIService;
+import org.apache.hive.service.cli.thrift.ThriftHttpCLIService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * HiveServer2.
+ *
+ */
+public class HiveServer2 extends CompositeService {
+  private static final Logger LOG = LoggerFactory.getLogger(HiveServer2.class);
+  public static final String INSTANCE_URI_CONFIG = "hive.server2.instance.uri";
+  protected ICLIService cliService;
+  protected ThriftCLIService thriftCLIService;
+
+  public HiveServer2() {
+    super(HiveServer2.class.getSimpleName());
+    HiveConf.setLoadHiveServer2Config(true);
+  }
+
+  @Override
+  public synchronized void init(HiveConf hiveConf) {
+    assert cliService != null;
+    assert cliService instanceof CompositeService;
+    addService((CompositeService) cliService);
+    final HiveServer2 hiveServer2 = this;
+    Runnable oomHook = new Runnable() {
+      @Override
+      public void run() {
+        hiveServer2.stop();
+      }
+    };
+    if (isHTTPTransportMode(hiveConf)) {
+      thriftCLIService = new ThriftHttpCLIService(cliService, oomHook);
+    } else {
+      thriftCLIService = new ThriftBinaryCLIService(cliService, oomHook);
+    }
+    addService(thriftCLIService);
+    super.init(hiveConf);
+    // Add a shutdown hook for catching SIGTERM & SIGINT
+    ShutdownHookManager.addShutdownHook(new Thread() {
+      @Override
+      public void run() {
+        hiveServer2.stop();
+      }
+    });
+  }
+
+  public static boolean isHTTPTransportMode(Configuration hiveConf) {
+    String transportMode = System.getenv("HIVE_SERVER2_TRANSPORT_MODE");
+    if (transportMode == null) {
+      transportMode = 
hiveConf.get(ConfVars.HIVE_SERVER2_TRANSPORT_MODE.varname);
+    }
+    if (transportMode != null && (transportMode.equalsIgnoreCase("http"))) {
+      return true;
+    }
+    return false;
+  }
+
+  public static boolean isKerberosAuthMode(Configuration hiveConf) {
+    String authMode = 
hiveConf.get(ConfVars.HIVE_SERVER2_AUTHENTICATION.varname);
+    if (authMode != null && (authMode.equalsIgnoreCase("KERBEROS"))) {
+      return true;
+    }
+    return false;
+  }
+
+  public String getServerHost() throws Exception {
+    if ((thriftCLIService == null) || (thriftCLIService.getServerIPAddress() 
== null)) {
+      throw new Exception("Unable to get the server address; it hasn't been 
initialized yet.");
+    }
+    return thriftCLIService.getServerIPAddress().getHostName();
+  }
+
+  @Override
+  public synchronized void start() {
+    super.start();
+  }
+
+  @Override
+  public synchronized void stop() {
+    LOG.info("Shutting down HiveServer2");
+    super.stop();
+  }
+
+  private static void startHiveServer2() throws Throwable {
+    long attempts = 0, maxAttempts = 1;
+    while (true) {
+      LOG.info("Starting HiveServer2");
+      HiveConf hiveConf = new HiveConf();
+      maxAttempts = 
hiveConf.getLongVar(HiveConf.ConfVars.HIVE_SERVER2_MAX_START_ATTEMPTS);
+      long retrySleepIntervalMs = hiveConf
+          
.getTimeVar(ConfVars.HIVE_SERVER2_SLEEP_INTERVAL_BETWEEN_START_ATTEMPTS,
+              TimeUnit.MILLISECONDS);
+      HiveServer2 server = null;
+      try {
+        server = new HiveServer2();
+        server.init(hiveConf);
+        server.start();
+
+        try {
+          JvmPauseMonitor pauseMonitor = new JvmPauseMonitor(hiveConf);
+          pauseMonitor.start();
+        } catch (Throwable t) {
+          LOG.warn("Could not initiate the JvmPauseMonitor thread." + " GCs 
and Pauses may not be " +
+            "warned upon.", t);
+        }
+        break;
+      } catch (Throwable throwable) {
+        if (server != null) {
+          try {
+            server.stop();
+          } catch (Throwable t) {
+            LOG.info("Exception caught when calling stop of HiveServer2 before 
retrying start", t);
+          } finally {
+            server = null;
+          }
+        }
+        if (++attempts >= maxAttempts) {
+          throw new Error("Max start attempts " + maxAttempts + " exhausted", 
throwable);
+        } else {
+          LOG.warn("Error starting HiveServer2 on attempt " + attempts
+              + ", will retry in " + retrySleepIntervalMs + "ms", throwable);
+          try {
+            Thread.sleep(retrySleepIntervalMs);
+          } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+          }
+        }
+      }
+    }
+  }
+
+  public static void main(String[] args) {
+    HiveConf.setLoadHiveServer2Config(true);
+    try {
+      ServerOptionsProcessor oproc = new ServerOptionsProcessor("hiveserver2");
+      ServerOptionsProcessorResponse oprocResponse = oproc.parse(args);
+
+      // NOTE: It is critical to do this here so that log4j is reinitialized
+      // before any of the other core hive classes are loaded
+      String initLog4jMessage = LogUtils.initHiveLog4j();
+      LOG.debug(initLog4jMessage);
+      HiveStringUtils.startupShutdownMessage(HiveServer2.class, args, LOG);
+
+      // Logger debug message from "oproc" after log4j initialize properly
+      LOG.debug(oproc.getDebugMessage().toString());
+
+      // Call the executor which will execute the appropriate command based on 
the parsed options
+      oprocResponse.getServerOptionsExecutor().execute();
+    } catch (LogInitializationException e) {
+      LOG.error("Error initializing log: " + e.getMessage(), e);
+      System.exit(-1);
+    }
+  }
+
+  /**
+   * ServerOptionsProcessor.
+   * Process arguments given to HiveServer2 (-hiveconf property=value)
+   * Set properties in System properties
+   * Create an appropriate response object,
+   * which has executor to execute the appropriate command based on the parsed 
options.
+   */
+  public static class ServerOptionsProcessor {
+    private final Options options = new Options();
+    private org.apache.commons.cli.CommandLine commandLine;
+    private final String serverName;
+    private final StringBuilder debugMessage = new StringBuilder();
+
+    @SuppressWarnings("static-access")
+    public ServerOptionsProcessor(String serverName) {
+      this.serverName = serverName;
+      // -hiveconf x=y
+      options.addOption(OptionBuilder
+          .withValueSeparator()
+          .hasArgs(2)
+          .withArgName("property=value")
+          .withLongOpt("hiveconf")
+          .withDescription("Use value for given property")
+          .create());
+      options.addOption(new Option("H", "help", false, "Print help 
information"));
+    }
+
+    public ServerOptionsProcessorResponse parse(String[] argv) {
+      try {
+        commandLine = new GnuParser().parse(options, argv);
+        // Process --hiveconf
+        // Get hiveconf param values and set the System property values
+        Properties confProps = commandLine.getOptionProperties("hiveconf");
+        for (String propKey : confProps.stringPropertyNames()) {
+          // save logging message for log4j output latter after log4j 
initialize properly
+          debugMessage.append("Setting " + propKey + "=" + 
confProps.getProperty(propKey) + ";\n");
+          if (propKey.equalsIgnoreCase("hive.root.logger")) {
+            CommonCliOptions.splitAndSetLogger(propKey, confProps);
+          } else {
+            System.setProperty(propKey, confProps.getProperty(propKey));
+          }
+        }
+
+        // Process --help
+        if (commandLine.hasOption('H')) {
+          return new ServerOptionsProcessorResponse(new 
HelpOptionExecutor(serverName, options));
+        }
+      } catch (ParseException e) {
+        // Error out & exit - we were not able to parse the args successfully
+        System.err.println("Error starting HiveServer2 with given arguments: 
");
+        System.err.println(e.getMessage());
+        System.exit(-1);
+      }
+      // Default executor, when no option is specified
+      return new ServerOptionsProcessorResponse(new StartOptionExecutor());
+    }
+
+    StringBuilder getDebugMessage() {
+      return debugMessage;
+    }
+  }
+
+  /**
+   * The response sent back from {@link ServerOptionsProcessor#parse(String[])}
+   */
+  static class ServerOptionsProcessorResponse {
+    private final ServerOptionsExecutor serverOptionsExecutor;
+
+    ServerOptionsProcessorResponse(ServerOptionsExecutor 
serverOptionsExecutor) {
+      this.serverOptionsExecutor = serverOptionsExecutor;
+    }
+
+    ServerOptionsExecutor getServerOptionsExecutor() {
+      return serverOptionsExecutor;
+    }
+  }
+
+  /**
+   * The executor interface for running the appropriate HiveServer2 command 
based on parsed options
+   */
+  static interface ServerOptionsExecutor {
+    public void execute();
+  }
+
+  /**
+   * HelpOptionExecutor: executes the --help option by printing out the usage
+   */
+  static class HelpOptionExecutor implements ServerOptionsExecutor {
+    private final Options options;
+    private final String serverName;
+
+    HelpOptionExecutor(String serverName, Options options) {
+      this.options = options;
+      this.serverName = serverName;
+    }
+
+    @Override
+    public void execute() {
+      new HelpFormatter().printHelp(serverName, options);
+      System.exit(0);
+    }
+  }
+
+  /**
+   * StartOptionExecutor: starts HiveServer2.
+   * This is the default executor, when no option is specified.
+   */
+  static class StartOptionExecutor implements ServerOptionsExecutor {
+    @Override
+    public void execute() {
+      try {
+        startHiveServer2();
+      } catch (Throwable t) {
+        LOG.error("Error starting HiveServer2", t);
+        System.exit(-1);
+      }
+    }
+  }
+}

Reply via email to