Modified: hive/branches/cbo/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java URL: http://svn.apache.org/viewvc/hive/branches/cbo/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java?rev=1622396&r1=1622395&r2=1622396&view=diff ============================================================================== --- hive/branches/cbo/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java (original) +++ hive/branches/cbo/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java Thu Sep 4 02:49:46 2014 @@ -54,6 +54,7 @@ import org.apache.hive.service.cli.opera import org.apache.hive.service.cli.operation.GetTableTypesOperation; import org.apache.hive.service.cli.operation.GetTypeInfoOperation; import org.apache.hive.service.cli.operation.MetadataOperation; +import org.apache.hive.service.cli.operation.Operation; import org.apache.hive.service.cli.operation.OperationManager; import org.apache.hive.service.cli.thrift.TProtocolVersion; import org.apache.hive.service.server.ThreadWithGarbageCleanup; @@ -84,6 +85,8 @@ public class HiveSessionImpl implements private boolean isOperationLogEnabled; private File sessionLogDir; + private volatile long lastAccessTime; + public HiveSessionImpl(TProtocolVersion protocol, String username, String password, HiveConf serverhiveConf, String ipAddress) { this.username = username; @@ -108,6 +111,8 @@ public class HiveSessionImpl implements sessionState = new SessionState(hiveConf, username); sessionState.setUserIpAddress(ipAddress); sessionState.setIsHiveServerQuery(true); + + lastAccessTime = System.currentTimeMillis(); SessionState.start(sessionState); } @@ -239,10 +244,13 @@ public class HiveSessionImpl implements SessionState.start(sessionState); } - protected synchronized void acquire() throws HiveSQLException { + protected synchronized void acquire(boolean userAccess) { // Need to make sure that the this HiveServer2's session's session state is // stored in the thread local for the handler thread. SessionState.setCurrentSessionState(sessionState); + if (userAccess) { + lastAccessTime = System.currentTimeMillis(); + } } /** @@ -252,14 +260,16 @@ public class HiveSessionImpl implements * when this thread is garbage collected later. * @see org.apache.hive.service.server.ThreadWithGarbageCleanup#finalize() */ - protected synchronized void release() { - assert sessionState != null; + protected synchronized void release(boolean userAccess) { SessionState.detachSession(); if (ThreadWithGarbageCleanup.currentThread() instanceof ThreadWithGarbageCleanup) { ThreadWithGarbageCleanup currentThread = (ThreadWithGarbageCleanup) ThreadWithGarbageCleanup.currentThread(); currentThread.cacheThreadLocalRawStore(); } + if (userAccess) { + lastAccessTime = System.currentTimeMillis(); + } } @Override @@ -298,7 +308,7 @@ public class HiveSessionImpl implements @Override public GetInfoValue getInfo(GetInfoType getInfoType) throws HiveSQLException { - acquire(); + acquire(true); try { switch (getInfoType) { case CLI_SERVER_NAME: @@ -318,7 +328,7 @@ public class HiveSessionImpl implements throw new HiveSQLException("Unrecognized GetInfoType value: " + getInfoType.toString()); } } finally { - release(); + release(true); } } @@ -337,7 +347,7 @@ public class HiveSessionImpl implements private OperationHandle executeStatementInternal(String statement, Map<String, String> confOverlay, boolean runAsync) throws HiveSQLException { - acquire(); + acquire(true); OperationManager operationManager = getOperationManager(); ExecuteStatementOperation operation = operationManager @@ -355,14 +365,14 @@ public class HiveSessionImpl implements } throw e; } finally { - release(); + release(true); } } @Override public OperationHandle getTypeInfo() throws HiveSQLException { - acquire(); + acquire(true); OperationManager operationManager = getOperationManager(); GetTypeInfoOperation operation = operationManager.newGetTypeInfoOperation(getSession()); @@ -375,14 +385,14 @@ public class HiveSessionImpl implements operationManager.closeOperation(opHandle); throw e; } finally { - release(); + release(true); } } @Override public OperationHandle getCatalogs() throws HiveSQLException { - acquire(); + acquire(true); OperationManager operationManager = getOperationManager(); GetCatalogsOperation operation = operationManager.newGetCatalogsOperation(getSession()); @@ -395,14 +405,14 @@ public class HiveSessionImpl implements operationManager.closeOperation(opHandle); throw e; } finally { - release(); + release(true); } } @Override public OperationHandle getSchemas(String catalogName, String schemaName) throws HiveSQLException { - acquire(); + acquire(true); OperationManager operationManager = getOperationManager(); GetSchemasOperation operation = @@ -416,7 +426,7 @@ public class HiveSessionImpl implements operationManager.closeOperation(opHandle); throw e; } finally { - release(); + release(true); } } @@ -424,7 +434,7 @@ public class HiveSessionImpl implements public OperationHandle getTables(String catalogName, String schemaName, String tableName, List<String> tableTypes) throws HiveSQLException { - acquire(); + acquire(true); OperationManager operationManager = getOperationManager(); MetadataOperation operation = @@ -438,14 +448,14 @@ public class HiveSessionImpl implements operationManager.closeOperation(opHandle); throw e; } finally { - release(); + release(true); } } @Override public OperationHandle getTableTypes() throws HiveSQLException { - acquire(); + acquire(true); OperationManager operationManager = getOperationManager(); GetTableTypesOperation operation = operationManager.newGetTableTypesOperation(getSession()); @@ -458,14 +468,14 @@ public class HiveSessionImpl implements operationManager.closeOperation(opHandle); throw e; } finally { - release(); + release(true); } } @Override public OperationHandle getColumns(String catalogName, String schemaName, String tableName, String columnName) throws HiveSQLException { - acquire(); + acquire(true); OperationManager operationManager = getOperationManager(); GetColumnsOperation operation = operationManager.newGetColumnsOperation(getSession(), @@ -479,14 +489,14 @@ public class HiveSessionImpl implements operationManager.closeOperation(opHandle); throw e; } finally { - release(); + release(true); } } @Override public OperationHandle getFunctions(String catalogName, String schemaName, String functionName) throws HiveSQLException { - acquire(); + acquire(true); OperationManager operationManager = getOperationManager(); GetFunctionsOperation operation = operationManager @@ -500,14 +510,14 @@ public class HiveSessionImpl implements operationManager.closeOperation(opHandle); throw e; } finally { - release(); + release(true); } } @Override public void close() throws HiveSQLException { try { - acquire(); + acquire(true); /** * For metadata operations like getTables(), getColumns() etc, * the session allocates a private metastore handler which should be @@ -532,7 +542,7 @@ public class HiveSessionImpl implements } catch (IOException ioe) { throw new HiveSQLException("Failure to close", ioe); } finally { - release(); + release(true); } } @@ -562,50 +572,79 @@ public class HiveSessionImpl implements } @Override + public long getLastAccessTime() { + return lastAccessTime; + } + + @Override + public void closeExpiredOperations() { + OperationHandle[] handles = opHandleSet.toArray(new OperationHandle[opHandleSet.size()]); + if (handles.length > 0) { + List<Operation> operations = operationManager.removeExpiredOperations(handles); + if (!operations.isEmpty()) { + closeTimedOutOperations(operations); + } + } + } + + private void closeTimedOutOperations(List<Operation> operations) { + acquire(false); + try { + for (Operation operation : operations) { + opHandleSet.remove(operation.getHandle()); + try { + operation.close(); + } catch (Exception e) { + LOG.warn("Exception is thrown closing timed-out operation " + operation.getHandle(), e); + } + } + } finally { + release(false); + } + } + + @Override public void cancelOperation(OperationHandle opHandle) throws HiveSQLException { - acquire(); + acquire(true); try { sessionManager.getOperationManager().cancelOperation(opHandle); } finally { - release(); + release(true); } } @Override public void closeOperation(OperationHandle opHandle) throws HiveSQLException { - acquire(); + acquire(true); try { operationManager.closeOperation(opHandle); opHandleSet.remove(opHandle); } finally { - release(); + release(true); } } @Override public TableSchema getResultSetMetadata(OperationHandle opHandle) throws HiveSQLException { - acquire(); + acquire(true); try { return sessionManager.getOperationManager().getOperationResultSetSchema(opHandle); } finally { - release(); + release(true); } } @Override public RowSet fetchResults(OperationHandle opHandle, FetchOrientation orientation, long maxRows, FetchType fetchType) throws HiveSQLException { - acquire(); + acquire(true); try { if (fetchType == FetchType.QUERY_OUTPUT) { - return sessionManager.getOperationManager() - .getOperationNextRowSet(opHandle, orientation, maxRows); - } else { - return sessionManager.getOperationManager() - .getOperationLogRowSet(opHandle, orientation, maxRows); + return operationManager.getOperationNextRowSet(opHandle, orientation, maxRows); } + return operationManager.getOperationLogRowSet(opHandle, orientation, maxRows); } finally { - release(); + release(true); } }
Modified: hive/branches/cbo/service/src/java/org/apache/hive/service/cli/session/HiveSessionImplwithUGI.java URL: http://svn.apache.org/viewvc/hive/branches/cbo/service/src/java/org/apache/hive/service/cli/session/HiveSessionImplwithUGI.java?rev=1622396&r1=1622395&r2=1622396&view=diff ============================================================================== --- hive/branches/cbo/service/src/java/org/apache/hive/service/cli/session/HiveSessionImplwithUGI.java (original) +++ hive/branches/cbo/service/src/java/org/apache/hive/service/cli/session/HiveSessionImplwithUGI.java Thu Sep 4 02:49:46 2014 @@ -19,7 +19,6 @@ package org.apache.hive.service.cli.session; import java.io.IOException; -import java.util.Map; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.metadata.Hive; @@ -48,6 +47,14 @@ public class HiveSessionImplwithUGI exte super(protocol, username, password, hiveConf, ipAddress); setSessionUGI(username); setDelegationToken(delegationToken); + + // create a new metastore connection for this particular user session + Hive.set(null); + try { + sessionHive = Hive.get(getHiveConf()); + } catch (HiveException e) { + throw new HiveSQLException("Failed to setup metastore connection", e); + } } // setup appropriate UGI for the session @@ -75,8 +82,8 @@ public class HiveSessionImplwithUGI exte } @Override - protected synchronized void acquire() throws HiveSQLException { - super.acquire(); + protected synchronized void acquire(boolean userAccess) { + super.acquire(userAccess); // if we have a metastore connection with impersonation, then set it first if (sessionHive != null) { Hive.set(sessionHive); @@ -90,11 +97,11 @@ public class HiveSessionImplwithUGI exte @Override public void close() throws HiveSQLException { try { - acquire(); + acquire(true); ShimLoader.getHadoopShims().closeAllForUGI(sessionUgi); cancelDelegationToken(); } finally { - release(); + release(true); super.close(); } } @@ -115,13 +122,6 @@ public class HiveSessionImplwithUGI exte } catch (IOException e) { throw new HiveSQLException("Couldn't setup delegation token in the ugi", e); } - // create a new metastore connection using the delegation token - Hive.set(null); - try { - sessionHive = Hive.get(getHiveConf()); - } catch (HiveException e) { - throw new HiveSQLException("Failed to setup metastore connection", e); - } } } Modified: hive/branches/cbo/service/src/java/org/apache/hive/service/cli/session/SessionManager.java URL: http://svn.apache.org/viewvc/hive/branches/cbo/service/src/java/org/apache/hive/service/cli/session/SessionManager.java?rev=1622396&r1=1622395&r2=1622396&view=diff ============================================================================== --- hive/branches/cbo/service/src/java/org/apache/hive/service/cli/session/SessionManager.java (original) +++ hive/branches/cbo/service/src/java/org/apache/hive/service/cli/session/SessionManager.java Thu Sep 4 02:49:46 2014 @@ -20,6 +20,8 @@ package org.apache.hive.service.cli.sess import java.io.File; import java.io.IOException; +import java.util.ArrayList; +import java.util.Date; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -59,6 +61,11 @@ public class SessionManager extends Comp private boolean isOperationLogEnabled; private File operationLogRootDir; + private long checkInterval; + private long sessionTimeout; + + private volatile boolean shutdown; + public SessionManager() { super("SessionManager"); } @@ -81,20 +88,28 @@ public class SessionManager extends Comp } private void createBackgroundOperationPool() { - int backgroundPoolSize = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_ASYNC_EXEC_THREADS); - LOG.info("HiveServer2: Background operation thread pool size: " + backgroundPoolSize); - int backgroundPoolQueueSize = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_ASYNC_EXEC_WAIT_QUEUE_SIZE); - LOG.info("HiveServer2: Background operation thread wait queue size: " + backgroundPoolQueueSize); - int keepAliveTime = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_ASYNC_EXEC_KEEPALIVE_TIME); - LOG.info("HiveServer2: Background operation thread keepalive time: " + keepAliveTime); - // Create a thread pool with #backgroundPoolSize threads + int poolSize = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_ASYNC_EXEC_THREADS); + LOG.info("HiveServer2: Background operation thread pool size: " + poolSize); + int poolQueueSize = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_ASYNC_EXEC_WAIT_QUEUE_SIZE); + LOG.info("HiveServer2: Background operation thread wait queue size: " + poolQueueSize); + long keepAliveTime = HiveConf.getTimeVar( + hiveConf, ConfVars.HIVE_SERVER2_ASYNC_EXEC_KEEPALIVE_TIME, TimeUnit.SECONDS); + LOG.info( + "HiveServer2: Background operation thread keepalive time: " + keepAliveTime + " seconds"); + + // Create a thread pool with #poolSize threads // Threads terminate when they are idle for more than the keepAliveTime - // A bounded blocking queue is used to queue incoming operations, if #operations > backgroundPoolSize + // A bounded blocking queue is used to queue incoming operations, if #operations > poolSize String threadPoolName = "HiveServer2-Background-Pool"; - backgroundOperationPool = new ThreadPoolExecutor(backgroundPoolSize, backgroundPoolSize, - keepAliveTime, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(backgroundPoolQueueSize), + backgroundOperationPool = new ThreadPoolExecutor(poolSize, poolSize, + keepAliveTime, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(poolQueueSize), new ThreadFactoryWithGarbageCleanup(threadPoolName)); backgroundOperationPool.allowCoreThreadTimeOut(true); + + checkInterval = HiveConf.getTimeVar( + hiveConf, ConfVars.HIVE_SERVER2_SESSION_CHECK_INTERVAL, TimeUnit.MILLISECONDS); + sessionTimeout = HiveConf.getTimeVar( + hiveConf, ConfVars.HIVE_SERVER2_IDLE_SESSION_TIMEOUT, TimeUnit.MILLISECONDS); } private void applyAuthorizationConfigPolicy(HiveConf newHiveConf) throws HiveException { @@ -139,20 +154,61 @@ public class SessionManager extends Comp @Override public synchronized void start() { super.start(); + if (checkInterval > 0) { + startTimeoutChecker(); + } + } + + private void startTimeoutChecker() { + final long interval = Math.max(checkInterval, 3000l); // minimum 3 seconds + Runnable timeoutChecker = new Runnable() { + @Override + public void run() { + for (sleepInterval(interval); !shutdown; sleepInterval(interval)) { + long current = System.currentTimeMillis(); + for (HiveSession session : new ArrayList<HiveSession>(handleToSession.values())) { + if (sessionTimeout > 0 && session.getLastAccessTime() + sessionTimeout <= current) { + SessionHandle handle = session.getSessionHandle(); + LOG.warn("Session " + handle + " is Timed-out (last access : " + + new Date(session.getLastAccessTime()) + ") and will be closed"); + try { + closeSession(handle); + } catch (HiveSQLException e) { + LOG.warn("Exception is thrown closing session " + handle, e); + } + } else { + session.closeExpiredOperations(); + } + } + } + } + + private void sleepInterval(long interval) { + try { + Thread.sleep(interval); + } catch (InterruptedException e) { + // ignore + } + } + }; + backgroundOperationPool.execute(timeoutChecker); } @Override public synchronized void stop() { super.stop(); + shutdown = true; if (backgroundOperationPool != null) { backgroundOperationPool.shutdown(); - int timeout = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_ASYNC_EXEC_SHUTDOWN_TIMEOUT); + long timeout = hiveConf.getTimeVar( + ConfVars.HIVE_SERVER2_ASYNC_EXEC_SHUTDOWN_TIMEOUT, TimeUnit.SECONDS); try { backgroundOperationPool.awaitTermination(timeout, TimeUnit.SECONDS); } catch (InterruptedException e) { LOG.warn("HIVE_SERVER2_ASYNC_EXEC_SHUTDOWN_TIMEOUT = " + timeout + " seconds has been exceeded. RUNNING background operations will be shut down", e); } + backgroundOperationPool = null; } cleanupLoggingRootDir(); } Modified: hive/branches/cbo/service/src/java/org/apache/hive/service/cli/thrift/ThriftBinaryCLIService.java URL: http://svn.apache.org/viewvc/hive/branches/cbo/service/src/java/org/apache/hive/service/cli/thrift/ThriftBinaryCLIService.java?rev=1622396&r1=1622395&r2=1622396&view=diff ============================================================================== --- hive/branches/cbo/service/src/java/org/apache/hive/service/cli/thrift/ThriftBinaryCLIService.java (original) +++ hive/branches/cbo/service/src/java/org/apache/hive/service/cli/thrift/ThriftBinaryCLIService.java Thu Sep 4 02:49:46 2014 @@ -70,7 +70,8 @@ public class ThriftBinaryCLIService exte minWorkerThreads = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_THRIFT_MIN_WORKER_THREADS); maxWorkerThreads = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_THRIFT_MAX_WORKER_THREADS); - workerKeepAliveTime = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_THRIFT_WORKER_KEEPALIVE_TIME); + workerKeepAliveTime = hiveConf.getTimeVar( + ConfVars.HIVE_SERVER2_THRIFT_WORKER_KEEPALIVE_TIME, TimeUnit.SECONDS); String threadPoolName = "HiveServer2-Handler-Pool"; ExecutorService executorService = new ThreadPoolExecutor(minWorkerThreads, maxWorkerThreads, workerKeepAliveTime, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), Modified: hive/branches/cbo/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java URL: http://svn.apache.org/viewvc/hive/branches/cbo/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java?rev=1622396&r1=1622395&r2=1622396&view=diff ============================================================================== --- hive/branches/cbo/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java (original) +++ hive/branches/cbo/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java Thu Sep 4 02:49:46 2014 @@ -61,7 +61,7 @@ public abstract class ThriftCLIService e protected int minWorkerThreads; protected int maxWorkerThreads; - protected int workerKeepAliveTime; + protected long workerKeepAliveTime; protected static HiveAuthFactory hiveAuthFactory; Modified: hive/branches/cbo/service/src/java/org/apache/hive/service/cli/thrift/ThriftHttpCLIService.java URL: http://svn.apache.org/viewvc/hive/branches/cbo/service/src/java/org/apache/hive/service/cli/thrift/ThriftHttpCLIService.java?rev=1622396&r1=1622395&r2=1622396&view=diff ============================================================================== --- hive/branches/cbo/service/src/java/org/apache/hive/service/cli/thrift/ThriftHttpCLIService.java (original) +++ hive/branches/cbo/service/src/java/org/apache/hive/service/cli/thrift/ThriftHttpCLIService.java Thu Sep 4 02:49:46 2014 @@ -69,7 +69,8 @@ public class ThriftHttpCLIService extend minWorkerThreads = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_THRIFT_HTTP_MIN_WORKER_THREADS); maxWorkerThreads = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_THRIFT_HTTP_MAX_WORKER_THREADS); - workerKeepAliveTime = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_THRIFT_HTTP_WORKER_KEEPALIVE_TIME); + workerKeepAliveTime = hiveConf.getTimeVar( + ConfVars.HIVE_SERVER2_THRIFT_HTTP_WORKER_KEEPALIVE_TIME, TimeUnit.SECONDS); String httpPath = getHttpPath(hiveConf.getVar(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_HTTP_PATH)); @@ -110,7 +111,8 @@ public class ThriftHttpCLIService extend // Linux:yes, Windows:no connector.setReuseAddress(!Shell.WINDOWS); - int maxIdleTime = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_THRIFT_HTTP_MAX_IDLE_TIME); + int maxIdleTime = (int) hiveConf.getTimeVar( + ConfVars.HIVE_SERVER2_THRIFT_HTTP_MAX_IDLE_TIME, TimeUnit.MILLISECONDS); connector.setMaxIdleTime(maxIdleTime); httpServer.addConnector(connector); Modified: hive/branches/cbo/service/src/test/org/apache/hive/service/cli/CLIServiceTest.java URL: http://svn.apache.org/viewvc/hive/branches/cbo/service/src/test/org/apache/hive/service/cli/CLIServiceTest.java?rev=1622396&r1=1622395&r2=1622396&view=diff ============================================================================== --- hive/branches/cbo/service/src/test/org/apache/hive/service/cli/CLIServiceTest.java (original) +++ hive/branches/cbo/service/src/test/org/apache/hive/service/cli/CLIServiceTest.java Thu Sep 4 02:49:46 2014 @@ -26,9 +26,9 @@ import static org.junit.Assert.fail; import java.util.Collections; import java.util.HashMap; import java.util.Map; +import java.util.concurrent.TimeUnit; import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -202,7 +202,8 @@ public abstract class CLIServiceTest { * to give a compile time error. * (compilation is done synchronous as of now) */ - longPollingTimeout = new HiveConf().getLongVar(ConfVars.HIVE_SERVER2_LONG_POLLING_TIMEOUT); + longPollingTimeout = HiveConf.getTimeVar(new HiveConf(), + HiveConf.ConfVars.HIVE_SERVER2_LONG_POLLING_TIMEOUT, TimeUnit.MILLISECONDS); queryString = "SELECT NON_EXISTING_COLUMN FROM " + tableName; try { runQueryAsync(sessionHandle, queryString, confOverlay, OperationState.ERROR, longPollingTimeout); @@ -295,7 +296,7 @@ public abstract class CLIServiceTest { long longPollingTimeDelta; OperationStatus opStatus = null; OperationState state = null; - confOverlay.put(HiveConf.ConfVars.HIVE_SERVER2_LONG_POLLING_TIMEOUT.varname, String.valueOf(longPollingTimeout)); + confOverlay.put(HiveConf.ConfVars.HIVE_SERVER2_LONG_POLLING_TIMEOUT.varname, longPollingTimeout + "ms"); OperationHandle opHandle = client.executeStatementAsync(sessionHandle, queryString, confOverlay); int count = 0; while (true) { Modified: hive/branches/cbo/shims/common-secure/src/main/java/org/apache/hadoop/hive/thrift/HadoopThriftAuthBridge20S.java URL: http://svn.apache.org/viewvc/hive/branches/cbo/shims/common-secure/src/main/java/org/apache/hadoop/hive/thrift/HadoopThriftAuthBridge20S.java?rev=1622396&r1=1622395&r2=1622396&view=diff ============================================================================== --- hive/branches/cbo/shims/common-secure/src/main/java/org/apache/hadoop/hive/thrift/HadoopThriftAuthBridge20S.java (original) +++ hive/branches/cbo/shims/common-secure/src/main/java/org/apache/hadoop/hive/thrift/HadoopThriftAuthBridge20S.java Thu Sep 4 02:49:46 2014 @@ -24,6 +24,7 @@ import java.net.InetAddress; import java.net.Socket; import java.security.PrivilegedAction; import java.security.PrivilegedExceptionAction; +import java.util.Locale; import java.util.Map; import javax.security.auth.callback.Callback; @@ -79,11 +80,23 @@ public class HadoopThriftAuthBridge20S e } @Override - public Client createClientWithConf(String authType) { - Configuration conf = new Configuration(); - conf.set(HADOOP_SECURITY_AUTHENTICATION, authType); - UserGroupInformation.setConfiguration(conf); - return new Client(); + public Client createClientWithConf(String authMethod) { + UserGroupInformation ugi; + try { + ugi = UserGroupInformation.getLoginUser(); + } catch(IOException e) { + throw new IllegalStateException("Unable to get current login user: " + e, e); + } + if (loginUserHasCurrentAuthMethod(ugi, authMethod)) { + LOG.debug("Not setting UGI conf as passed-in authMethod of " + authMethod + " = current."); + return new Client(); + } else { + LOG.debug("Setting UGI conf as passed-in authMethod of " + authMethod + " != current."); + Configuration conf = new Configuration(); + conf.set(HADOOP_SECURITY_AUTHENTICATION, authMethod); + UserGroupInformation.setConfiguration(conf); + return new Client(); + } } @Override @@ -105,15 +118,48 @@ public class HadoopThriftAuthBridge20S e } @Override - public UserGroupInformation getCurrentUGIWithConf(String authType) + public UserGroupInformation getCurrentUGIWithConf(String authMethod) throws IOException { - Configuration conf = new Configuration(); - conf.set(HADOOP_SECURITY_AUTHENTICATION, authType); - UserGroupInformation.setConfiguration(conf); - return UserGroupInformation.getCurrentUser(); + UserGroupInformation ugi; + try { + ugi = UserGroupInformation.getCurrentUser(); + } catch(IOException e) { + throw new IllegalStateException("Unable to get current user: " + e, e); + } + if (loginUserHasCurrentAuthMethod(ugi, authMethod)) { + LOG.debug("Not setting UGI conf as passed-in authMethod of " + authMethod + " = current."); + return ugi; + } else { + LOG.debug("Setting UGI conf as passed-in authMethod of " + authMethod + " != current."); + Configuration conf = new Configuration(); + conf.set(HADOOP_SECURITY_AUTHENTICATION, authMethod); + UserGroupInformation.setConfiguration(conf); + return UserGroupInformation.getCurrentUser(); + } } /** + * Return true if the current login user is already using the given authMethod. + * + * Used above to ensure we do not create a new Configuration object and as such + * lose other settings such as the cluster to which the JVM is connected. Required + * for oozie since it does not have a core-site.xml see HIVE-7682 + */ + private boolean loginUserHasCurrentAuthMethod(UserGroupInformation ugi, String sAuthMethod) { + AuthenticationMethod authMethod; + try { + // based on SecurityUtil.getAuthenticationMethod() + authMethod = Enum.valueOf(AuthenticationMethod.class, sAuthMethod.toUpperCase(Locale.ENGLISH)); + } catch (IllegalArgumentException iae) { + throw new IllegalArgumentException("Invalid attribute value for " + + HADOOP_SECURITY_AUTHENTICATION + " of " + sAuthMethod, iae); + } + LOG.debug("Current authMethod = " + ugi.getAuthenticationMethod()); + return ugi.getAuthenticationMethod().equals(authMethod); + } + + + /** * Read and return Hadoop SASL configuration which can be configured using * "hadoop.rpc.protection" * @param conf
