Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java?rev=1621912&r1=1621911&r2=1621912&view=diff ============================================================================== --- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java (original) +++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java Tue Sep 2 04:41:29 2014 @@ -43,6 +43,7 @@ import java.io.IOException; import java.security.PrivilegedExceptionAction; import java.util.List; import java.util.Set; +import java.util.concurrent.TimeUnit; /** * A class to initiate compactions. This will run in a separate thread. @@ -50,7 +51,6 @@ import java.util.Set; public class Initiator extends CompactorThread { static final private String CLASS_NAME = Initiator.class.getName(); static final private Log LOG = LogFactory.getLog(CLASS_NAME); - static final private int threadId = 10000; static final private String NO_COMPACTION = "NO_AUTO_COMPACTION"; @@ -63,7 +63,7 @@ public class Initiator extends Compactor try { recoverFailedCompactions(false); - int abortedThreashold = HiveConf.getIntVar(conf, + int abortedThreshold = HiveConf.getIntVar(conf, HiveConf.ConfVars.HIVE_COMPACTOR_ABORTEDTXN_THRESHOLD); // Make sure we run through the loop once before checking to stop as this makes testing @@ -77,7 +77,7 @@ public class Initiator extends Compactor try { ShowCompactResponse currentCompactions = txnHandler.showCompact(new ShowCompactRequest()); ValidTxnList txns = TxnHandler.createValidTxnList(txnHandler.getOpenTxns()); - Set<CompactionInfo> potentials = txnHandler.findPotentialCompactions(abortedThreashold); + Set<CompactionInfo> potentials = txnHandler.findPotentialCompactions(abortedThreshold); LOG.debug("Found " + potentials.size() + " potential compactions, " + "checking to see if we should compact any of them"); for (CompactionInfo ci : potentials) { @@ -140,13 +140,13 @@ public class Initiator extends Compactor public void init(BooleanPointer stop) throws MetaException { super.init(stop); checkInterval = - HiveConf.getLongVar(conf, HiveConf.ConfVars.HIVE_COMPACTOR_CHECK_INTERVAL) * 1000; + conf.getTimeVar(HiveConf.ConfVars.HIVE_COMPACTOR_CHECK_INTERVAL, TimeUnit.MILLISECONDS) ; } private void recoverFailedCompactions(boolean remoteOnly) throws MetaException { if (!remoteOnly) txnHandler.revokeFromLocalWorkers(Worker.hostname()); - txnHandler.revokeTimedoutWorkers(HiveConf.getLongVar(conf, - HiveConf.ConfVars.HIVE_COMPACTOR_WORKER_TIMEOUT)); + txnHandler.revokeTimedoutWorkers(HiveConf.getTimeVar(conf, + HiveConf.ConfVars.HIVE_COMPACTOR_WORKER_TIMEOUT, TimeUnit.MILLISECONDS)); } // Figure out if there are any currently running compactions on the same table or partition.
Modified: hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java?rev=1621912&r1=1621911&r2=1621912&view=diff ============================================================================== --- hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java (original) +++ hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java Tue Sep 2 04:41:29 2014 @@ -30,6 +30,7 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.TimeUnit; /** * Tests for the compactor Initiator thread. @@ -89,7 +90,7 @@ public class TestInitiator extends Compa txnHandler.findNextToCompact("nosuchhost-193892"); HiveConf conf = new HiveConf(); - HiveConf.setLongVar(conf, HiveConf.ConfVars.HIVE_COMPACTOR_WORKER_TIMEOUT, 1L); + conf.setTimeVar(HiveConf.ConfVars.HIVE_COMPACTOR_WORKER_TIMEOUT, 1L, TimeUnit.MILLISECONDS); startInitiator(conf); Modified: hive/trunk/ql/src/test/results/clientpositive/show_conf.q.out URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/results/clientpositive/show_conf.q.out?rev=1621912&r1=1621911&r2=1621912&view=diff ============================================================================== --- hive/trunk/ql/src/test/results/clientpositive/show_conf.q.out (original) +++ hive/trunk/ql/src/test/results/clientpositive/show_conf.q.out Tue Sep 2 04:41:29 2014 @@ -7,4 +7,4 @@ PREHOOK: query: show conf "hive.stats.re PREHOOK: type: SHOWCONF POSTHOOK: query: show conf "hive.stats.retries.wait" POSTHOOK: type: SHOWCONF -3000 INT The base waiting window (in milliseconds) before the next retry. The actual wait time is calculated by baseWindow * failures baseWindow * (failure 1) * (random number between [0.0,1.0]). +3000ms STRING(TIME) Expects a time value with unit (d/day, h/hour, m/min, s/sec, ms/msec, us/usec, ns/nsec), which is msec if not specified. The base waiting window before the next retry. The actual wait time is calculated by baseWindow * failures baseWindow * (failure + 1) * (random number between [0.0,1.0]). Modified: hive/trunk/service/src/java/org/apache/hadoop/hive/service/HiveServer.java URL: http://svn.apache.org/viewvc/hive/trunk/service/src/java/org/apache/hadoop/hive/service/HiveServer.java?rev=1621912&r1=1621911&r2=1621912&view=diff ============================================================================== --- hive/trunk/service/src/java/org/apache/hadoop/hive/service/HiveServer.java (original) +++ hive/trunk/service/src/java/org/apache/hadoop/hive/service/HiveServer.java Tue Sep 2 04:41:29 2014 @@ -30,6 +30,7 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Properties; +import java.util.concurrent.TimeUnit; import org.apache.commons.cli.OptionBuilder; import org.apache.commons.logging.Log; @@ -62,8 +63,6 @@ import org.apache.thrift.transport.TServ import org.apache.thrift.transport.TServerTransport; import org.apache.thrift.transport.TTransport; import org.apache.thrift.transport.TTransportFactory; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; import com.facebook.fb303.fb_status; /** @@ -670,8 +669,11 @@ public class HiveServer extends ThriftHi boolean tcpKeepAlive = conf.getBoolVar(HiveConf.ConfVars.SERVER_TCP_KEEP_ALIVE); + int timeout = (int) HiveConf.getTimeVar( + conf, HiveConf.ConfVars.SERVER_READ_SOCKET_TIMEOUT, TimeUnit.MILLISECONDS); - TServerTransport serverTransport = tcpKeepAlive ? new TServerSocketKeepAlive(cli.port) : new TServerSocket(cli.port, 1000 * conf.getIntVar(HiveConf.ConfVars.SERVER_READ_SOCKET_TIMEOUT)); + TServerTransport serverTransport = + tcpKeepAlive ? new TServerSocketKeepAlive(cli.port) : new TServerSocket(cli.port, timeout); // set all properties specified on the command line for (Map.Entry<Object, Object> item : hiveconf.entrySet()) { Modified: hive/trunk/service/src/java/org/apache/hive/service/cli/CLIService.java URL: http://svn.apache.org/viewvc/hive/trunk/service/src/java/org/apache/hive/service/cli/CLIService.java?rev=1621912&r1=1621911&r2=1621912&view=diff ============================================================================== --- hive/trunk/service/src/java/org/apache/hive/service/cli/CLIService.java (original) +++ hive/trunk/service/src/java/org/apache/hive/service/cli/CLIService.java Tue Sep 2 04:41:29 2014 @@ -362,8 +362,9 @@ public class CLIService extends Composit * However, if the background operation is complete, we return immediately. */ if (operation.shouldRunAsync()) { - long timeout = operation.getParentSession().getHiveConf().getLongVar( - HiveConf.ConfVars.HIVE_SERVER2_LONG_POLLING_TIMEOUT); + HiveConf conf = operation.getParentSession().getHiveConf(); + long timeout = HiveConf.getTimeVar(conf, + HiveConf.ConfVars.HIVE_SERVER2_LONG_POLLING_TIMEOUT, TimeUnit.MILLISECONDS); try { operation.getBackgroundHandle().get(timeout, TimeUnit.MILLISECONDS); } catch (TimeoutException e) { Modified: hive/trunk/service/src/java/org/apache/hive/service/cli/OperationState.java URL: http://svn.apache.org/viewvc/hive/trunk/service/src/java/org/apache/hive/service/cli/OperationState.java?rev=1621912&r1=1621911&r2=1621912&view=diff ============================================================================== --- hive/trunk/service/src/java/org/apache/hive/service/cli/OperationState.java (original) +++ hive/trunk/service/src/java/org/apache/hive/service/cli/OperationState.java Tue Sep 2 04:41:29 2014 @@ -25,29 +25,26 @@ import org.apache.hive.service.cli.thrif * */ public enum OperationState { - INITIALIZED(TOperationState.INITIALIZED_STATE), - RUNNING(TOperationState.RUNNING_STATE), - FINISHED(TOperationState.FINISHED_STATE), - CANCELED(TOperationState.CANCELED_STATE), - CLOSED(TOperationState.CLOSED_STATE), - ERROR(TOperationState.ERROR_STATE), - UNKNOWN(TOperationState.UKNOWN_STATE), - PENDING(TOperationState.PENDING_STATE); + INITIALIZED(TOperationState.INITIALIZED_STATE, false), + RUNNING(TOperationState.RUNNING_STATE, false), + FINISHED(TOperationState.FINISHED_STATE, true), + CANCELED(TOperationState.CANCELED_STATE, true), + CLOSED(TOperationState.CLOSED_STATE, true), + ERROR(TOperationState.ERROR_STATE, true), + UNKNOWN(TOperationState.UKNOWN_STATE, false), + PENDING(TOperationState.PENDING_STATE, false); private final TOperationState tOperationState; + private final boolean terminal; - OperationState(TOperationState tOperationState) { + OperationState(TOperationState tOperationState, boolean terminal) { this.tOperationState = tOperationState; + this.terminal = terminal; } + // must be sync with TOperationState in order public static OperationState getOperationState(TOperationState tOperationState) { - // TODO: replace this with a Map? - for (OperationState opState : values()) { - if (tOperationState.equals(opState.tOperationState)) { - return opState; - } - } - return OperationState.UNKNOWN; + return OperationState.values()[tOperationState.getValue()]; } public static void validateTransition(OperationState oldState, @@ -91,7 +88,8 @@ public enum OperationState { default: // fall-through } - throw new HiveSQLException("Illegal Operation state transition"); + throw new HiveSQLException("Illegal Operation state transition " + + "from " + oldState + " to " + newState); } public void validateTransition(OperationState newState) @@ -102,4 +100,8 @@ public enum OperationState { public TOperationState toTOperationState() { return tOperationState; } + + public boolean isTerminal() { + return terminal; + } } Modified: hive/trunk/service/src/java/org/apache/hive/service/cli/operation/Operation.java URL: http://svn.apache.org/viewvc/hive/trunk/service/src/java/org/apache/hive/service/cli/operation/Operation.java?rev=1621912&r1=1621911&r2=1621912&view=diff ============================================================================== --- hive/trunk/service/src/java/org/apache/hive/service/cli/operation/Operation.java (original) +++ hive/trunk/service/src/java/org/apache/hive/service/cli/operation/Operation.java Tue Sep 2 04:41:29 2014 @@ -21,6 +21,7 @@ import java.io.File; import java.io.FileNotFoundException; import java.util.EnumSet; import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -52,14 +53,19 @@ public abstract class Operation { protected OperationLog operationLog; protected boolean isOperationLogEnabled; + private long operationTimeout; + private long lastAccessTime; + protected static final EnumSet<FetchOrientation> DEFAULT_FETCH_ORIENTATION_SET = EnumSet.of(FetchOrientation.FETCH_NEXT,FetchOrientation.FETCH_FIRST); protected Operation(HiveSession parentSession, OperationType opType, boolean runInBackground) { - super(); this.parentSession = parentSession; this.runAsync = runInBackground; this.opHandle = new OperationHandle(opType, parentSession.getProtocolVersion()); + lastAccessTime = System.currentTimeMillis(); + operationTimeout = HiveConf.getTimeVar(parentSession.getHiveConf(), + HiveConf.ConfVars.HIVE_SERVER2_IDLE_OPERATION_TIMEOUT, TimeUnit.MILLISECONDS); } public Future<?> getBackgroundHandle() { @@ -111,7 +117,6 @@ public abstract class Operation { opHandle.setHasResultSet(hasResultSet); } - public OperationLog getOperationLog() { return operationLog; } @@ -119,9 +124,33 @@ public abstract class Operation { protected final OperationState setState(OperationState newState) throws HiveSQLException { state.validateTransition(newState); this.state = newState; + this.lastAccessTime = System.currentTimeMillis(); return this.state; } + public boolean isTimedOut(long current) { + if (operationTimeout == 0) { + return false; + } + if (operationTimeout > 0) { + // check only when it's in terminal state + return state.isTerminal() && lastAccessTime + operationTimeout <= current; + } + return lastAccessTime + -operationTimeout <= current; + } + + public long getLastAccessTime() { + return lastAccessTime; + } + + public long getOperationTimeout() { + return operationTimeout; + } + + public void setOperationTimeout(long operationTimeout) { + this.operationTimeout = operationTimeout; + } + protected void setOperationException(HiveSQLException operationException) { this.operationException = operationException; } @@ -130,6 +159,7 @@ public abstract class Operation { if (this.state != state) { throw new HiveSQLException("Expected state " + state + ", but found " + this.state); } + this.lastAccessTime = System.currentTimeMillis(); } public boolean isRunning() { Modified: hive/trunk/service/src/java/org/apache/hive/service/cli/operation/OperationManager.java URL: http://svn.apache.org/viewvc/hive/trunk/service/src/java/org/apache/hive/service/cli/operation/OperationManager.java?rev=1621912&r1=1621911&r2=1621912&view=diff ============================================================================== --- hive/trunk/service/src/java/org/apache/hive/service/cli/operation/OperationManager.java (original) +++ hive/trunk/service/src/java/org/apache/hive/service/cli/operation/OperationManager.java Tue Sep 2 04:41:29 2014 @@ -19,6 +19,7 @@ package org.apache.hive.service.cli.operation; import java.util.Enumeration; +import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -155,15 +156,27 @@ public class OperationManager extends Ab return operation; } - public synchronized Operation getOperation(OperationHandle operationHandle) - throws HiveSQLException { - Operation operation = handleToOperation.get(operationHandle); + public Operation getOperation(OperationHandle operationHandle) throws HiveSQLException { + Operation operation = getOperationInternal(operationHandle); if (operation == null) { throw new HiveSQLException("Invalid OperationHandle: " + operationHandle); } return operation; } + private synchronized Operation getOperationInternal(OperationHandle operationHandle) { + return handleToOperation.get(operationHandle); + } + + private synchronized Operation removeTimedOutOperation(OperationHandle operationHandle) { + Operation operation = handleToOperation.get(operationHandle); + if (operation != null && operation.isTimedOut(System.currentTimeMillis())) { + handleToOperation.remove(operationHandle); + return operation; + } + return null; + } + private synchronized void addOperation(Operation operation) { handleToOperation.put(operation.getHandle(), operation); } @@ -252,4 +265,16 @@ public class OperationManager extends Ab public OperationLog getOperationLogByThread() { return OperationLog.getCurrentOperationLog(); } + + public List<Operation> removeExpiredOperations(OperationHandle[] handles) { + List<Operation> removed = new ArrayList<Operation>(); + for (OperationHandle handle : handles) { + Operation operation = removeTimedOutOperation(handle); + if (operation != null) { + LOG.warn("Operation " + handle + " is timed-out and will be closed"); + removed.add(operation); + } + } + return removed; + } } Modified: hive/trunk/service/src/java/org/apache/hive/service/cli/session/HiveSession.java URL: http://svn.apache.org/viewvc/hive/trunk/service/src/java/org/apache/hive/service/cli/session/HiveSession.java?rev=1621912&r1=1621911&r2=1621912&view=diff ============================================================================== --- hive/trunk/service/src/java/org/apache/hive/service/cli/session/HiveSession.java (original) +++ hive/trunk/service/src/java/org/apache/hive/service/cli/session/HiveSession.java Tue Sep 2 04:41:29 2014 @@ -27,9 +27,9 @@ import org.apache.hive.service.cli.*; public interface HiveSession extends HiveSessionBase { - public void open(); + void open(); - public IMetaStoreClient getMetaStoreClient() throws HiveSQLException; + IMetaStoreClient getMetaStoreClient() throws HiveSQLException; /** * getInfo operation handler @@ -37,7 +37,7 @@ public interface HiveSession extends Hiv * @return * @throws HiveSQLException */ - public GetInfoValue getInfo(GetInfoType getInfoType) throws HiveSQLException; + GetInfoValue getInfo(GetInfoType getInfoType) throws HiveSQLException; /** * execute operation handler @@ -46,7 +46,7 @@ public interface HiveSession extends Hiv * @return * @throws HiveSQLException */ - public OperationHandle executeStatement(String statement, + OperationHandle executeStatement(String statement, Map<String, String> confOverlay) throws HiveSQLException; /** @@ -56,7 +56,7 @@ public interface HiveSession extends Hiv * @return * @throws HiveSQLException */ - public OperationHandle executeStatementAsync(String statement, + OperationHandle executeStatementAsync(String statement, Map<String, String> confOverlay) throws HiveSQLException; /** @@ -64,14 +64,14 @@ public interface HiveSession extends Hiv * @return * @throws HiveSQLException */ - public OperationHandle getTypeInfo() throws HiveSQLException; + OperationHandle getTypeInfo() throws HiveSQLException; /** * getCatalogs operation handler * @return * @throws HiveSQLException */ - public OperationHandle getCatalogs() throws HiveSQLException; + OperationHandle getCatalogs() throws HiveSQLException; /** * getSchemas operation handler @@ -80,7 +80,7 @@ public interface HiveSession extends Hiv * @return * @throws HiveSQLException */ - public OperationHandle getSchemas(String catalogName, String schemaName) + OperationHandle getSchemas(String catalogName, String schemaName) throws HiveSQLException; /** @@ -92,7 +92,7 @@ public interface HiveSession extends Hiv * @return * @throws HiveSQLException */ - public OperationHandle getTables(String catalogName, String schemaName, + OperationHandle getTables(String catalogName, String schemaName, String tableName, List<String> tableTypes) throws HiveSQLException; /** @@ -100,7 +100,7 @@ public interface HiveSession extends Hiv * @return * @throws HiveSQLException */ - public OperationHandle getTableTypes() throws HiveSQLException ; + OperationHandle getTableTypes() throws HiveSQLException ; /** * getColumns operation handler @@ -111,7 +111,7 @@ public interface HiveSession extends Hiv * @return * @throws HiveSQLException */ - public OperationHandle getColumns(String catalogName, String schemaName, + OperationHandle getColumns(String catalogName, String schemaName, String tableName, String columnName) throws HiveSQLException; /** @@ -122,31 +122,33 @@ public interface HiveSession extends Hiv * @return * @throws HiveSQLException */ - public OperationHandle getFunctions(String catalogName, String schemaName, + OperationHandle getFunctions(String catalogName, String schemaName, String functionName) throws HiveSQLException; /** * close the session * @throws HiveSQLException */ - public void close() throws HiveSQLException; + void close() throws HiveSQLException; - public void cancelOperation(OperationHandle opHandle) throws HiveSQLException; + void cancelOperation(OperationHandle opHandle) throws HiveSQLException; - public void closeOperation(OperationHandle opHandle) throws HiveSQLException; + void closeOperation(OperationHandle opHandle) throws HiveSQLException; - public TableSchema getResultSetMetadata(OperationHandle opHandle) + TableSchema getResultSetMetadata(OperationHandle opHandle) throws HiveSQLException; - public RowSet fetchResults(OperationHandle opHandle, FetchOrientation orientation, + RowSet fetchResults(OperationHandle opHandle, FetchOrientation orientation, long maxRows, FetchType fetchType) throws HiveSQLException; - public String getDelegationToken(HiveAuthFactory authFactory, String owner, + String getDelegationToken(HiveAuthFactory authFactory, String owner, String renewer) throws HiveSQLException; - public void cancelDelegationToken(HiveAuthFactory authFactory, String tokenStr) + void cancelDelegationToken(HiveAuthFactory authFactory, String tokenStr) throws HiveSQLException; - public void renewDelegationToken(HiveAuthFactory authFactory, String tokenStr) + void renewDelegationToken(HiveAuthFactory authFactory, String tokenStr) throws HiveSQLException; + + void closeExpiredOperations(); } Modified: hive/trunk/service/src/java/org/apache/hive/service/cli/session/HiveSessionBase.java URL: http://svn.apache.org/viewvc/hive/trunk/service/src/java/org/apache/hive/service/cli/session/HiveSessionBase.java?rev=1621912&r1=1621911&r2=1621912&view=diff ============================================================================== --- hive/trunk/service/src/java/org/apache/hive/service/cli/session/HiveSessionBase.java (original) +++ hive/trunk/service/src/java/org/apache/hive/service/cli/session/HiveSessionBase.java Tue Sep 2 04:41:29 2014 @@ -92,4 +92,6 @@ public interface HiveSessionBase { String getIpAddress(); void setIpAddress(String ipAddress); + + long getLastAccessTime(); } Modified: hive/trunk/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java URL: http://svn.apache.org/viewvc/hive/trunk/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java?rev=1621912&r1=1621911&r2=1621912&view=diff ============================================================================== --- hive/trunk/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java (original) +++ hive/trunk/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java Tue Sep 2 04:41:29 2014 @@ -54,6 +54,7 @@ import org.apache.hive.service.cli.opera import org.apache.hive.service.cli.operation.GetTableTypesOperation; import org.apache.hive.service.cli.operation.GetTypeInfoOperation; import org.apache.hive.service.cli.operation.MetadataOperation; +import org.apache.hive.service.cli.operation.Operation; import org.apache.hive.service.cli.operation.OperationManager; import org.apache.hive.service.cli.thrift.TProtocolVersion; import org.apache.hive.service.server.ThreadWithGarbageCleanup; @@ -84,6 +85,8 @@ public class HiveSessionImpl implements private boolean isOperationLogEnabled; private File sessionLogDir; + private volatile long lastAccessTime; + public HiveSessionImpl(TProtocolVersion protocol, String username, String password, HiveConf serverhiveConf, String ipAddress) { this.username = username; @@ -108,6 +111,8 @@ public class HiveSessionImpl implements sessionState = new SessionState(hiveConf, username); sessionState.setUserIpAddress(ipAddress); sessionState.setIsHiveServerQuery(true); + + lastAccessTime = System.currentTimeMillis(); SessionState.start(sessionState); } @@ -239,10 +244,13 @@ public class HiveSessionImpl implements SessionState.start(sessionState); } - protected synchronized void acquire() throws HiveSQLException { + protected synchronized void acquire(boolean userAccess) { // Need to make sure that the this HiveServer2's session's session state is // stored in the thread local for the handler thread. SessionState.setCurrentSessionState(sessionState); + if (userAccess) { + lastAccessTime = System.currentTimeMillis(); + } } /** @@ -252,14 +260,16 @@ public class HiveSessionImpl implements * when this thread is garbage collected later. * @see org.apache.hive.service.server.ThreadWithGarbageCleanup#finalize() */ - protected synchronized void release() { - assert sessionState != null; + protected synchronized void release(boolean userAccess) { SessionState.detachSession(); if (ThreadWithGarbageCleanup.currentThread() instanceof ThreadWithGarbageCleanup) { ThreadWithGarbageCleanup currentThread = (ThreadWithGarbageCleanup) ThreadWithGarbageCleanup.currentThread(); currentThread.cacheThreadLocalRawStore(); } + if (userAccess) { + lastAccessTime = System.currentTimeMillis(); + } } @Override @@ -298,7 +308,7 @@ public class HiveSessionImpl implements @Override public GetInfoValue getInfo(GetInfoType getInfoType) throws HiveSQLException { - acquire(); + acquire(true); try { switch (getInfoType) { case CLI_SERVER_NAME: @@ -318,7 +328,7 @@ public class HiveSessionImpl implements throw new HiveSQLException("Unrecognized GetInfoType value: " + getInfoType.toString()); } } finally { - release(); + release(true); } } @@ -337,7 +347,7 @@ public class HiveSessionImpl implements private OperationHandle executeStatementInternal(String statement, Map<String, String> confOverlay, boolean runAsync) throws HiveSQLException { - acquire(); + acquire(true); OperationManager operationManager = getOperationManager(); ExecuteStatementOperation operation = operationManager @@ -355,14 +365,14 @@ public class HiveSessionImpl implements } throw e; } finally { - release(); + release(true); } } @Override public OperationHandle getTypeInfo() throws HiveSQLException { - acquire(); + acquire(true); OperationManager operationManager = getOperationManager(); GetTypeInfoOperation operation = operationManager.newGetTypeInfoOperation(getSession()); @@ -375,14 +385,14 @@ public class HiveSessionImpl implements operationManager.closeOperation(opHandle); throw e; } finally { - release(); + release(true); } } @Override public OperationHandle getCatalogs() throws HiveSQLException { - acquire(); + acquire(true); OperationManager operationManager = getOperationManager(); GetCatalogsOperation operation = operationManager.newGetCatalogsOperation(getSession()); @@ -395,14 +405,14 @@ public class HiveSessionImpl implements operationManager.closeOperation(opHandle); throw e; } finally { - release(); + release(true); } } @Override public OperationHandle getSchemas(String catalogName, String schemaName) throws HiveSQLException { - acquire(); + acquire(true); OperationManager operationManager = getOperationManager(); GetSchemasOperation operation = @@ -416,7 +426,7 @@ public class HiveSessionImpl implements operationManager.closeOperation(opHandle); throw e; } finally { - release(); + release(true); } } @@ -424,7 +434,7 @@ public class HiveSessionImpl implements public OperationHandle getTables(String catalogName, String schemaName, String tableName, List<String> tableTypes) throws HiveSQLException { - acquire(); + acquire(true); OperationManager operationManager = getOperationManager(); MetadataOperation operation = @@ -438,14 +448,14 @@ public class HiveSessionImpl implements operationManager.closeOperation(opHandle); throw e; } finally { - release(); + release(true); } } @Override public OperationHandle getTableTypes() throws HiveSQLException { - acquire(); + acquire(true); OperationManager operationManager = getOperationManager(); GetTableTypesOperation operation = operationManager.newGetTableTypesOperation(getSession()); @@ -458,14 +468,14 @@ public class HiveSessionImpl implements operationManager.closeOperation(opHandle); throw e; } finally { - release(); + release(true); } } @Override public OperationHandle getColumns(String catalogName, String schemaName, String tableName, String columnName) throws HiveSQLException { - acquire(); + acquire(true); OperationManager operationManager = getOperationManager(); GetColumnsOperation operation = operationManager.newGetColumnsOperation(getSession(), @@ -479,14 +489,14 @@ public class HiveSessionImpl implements operationManager.closeOperation(opHandle); throw e; } finally { - release(); + release(true); } } @Override public OperationHandle getFunctions(String catalogName, String schemaName, String functionName) throws HiveSQLException { - acquire(); + acquire(true); OperationManager operationManager = getOperationManager(); GetFunctionsOperation operation = operationManager @@ -500,14 +510,14 @@ public class HiveSessionImpl implements operationManager.closeOperation(opHandle); throw e; } finally { - release(); + release(true); } } @Override public void close() throws HiveSQLException { try { - acquire(); + acquire(true); /** * For metadata operations like getTables(), getColumns() etc, * the session allocates a private metastore handler which should be @@ -532,7 +542,7 @@ public class HiveSessionImpl implements } catch (IOException ioe) { throw new HiveSQLException("Failure to close", ioe); } finally { - release(); + release(true); } } @@ -562,50 +572,79 @@ public class HiveSessionImpl implements } @Override + public long getLastAccessTime() { + return lastAccessTime; + } + + @Override + public void closeExpiredOperations() { + OperationHandle[] handles = opHandleSet.toArray(new OperationHandle[opHandleSet.size()]); + if (handles.length > 0) { + List<Operation> operations = operationManager.removeExpiredOperations(handles); + if (!operations.isEmpty()) { + closeTimedOutOperations(operations); + } + } + } + + private void closeTimedOutOperations(List<Operation> operations) { + acquire(false); + try { + for (Operation operation : operations) { + opHandleSet.remove(operation.getHandle()); + try { + operation.close(); + } catch (Exception e) { + LOG.warn("Exception is thrown closing timed-out operation " + operation.getHandle(), e); + } + } + } finally { + release(false); + } + } + + @Override public void cancelOperation(OperationHandle opHandle) throws HiveSQLException { - acquire(); + acquire(true); try { sessionManager.getOperationManager().cancelOperation(opHandle); } finally { - release(); + release(true); } } @Override public void closeOperation(OperationHandle opHandle) throws HiveSQLException { - acquire(); + acquire(true); try { operationManager.closeOperation(opHandle); opHandleSet.remove(opHandle); } finally { - release(); + release(true); } } @Override public TableSchema getResultSetMetadata(OperationHandle opHandle) throws HiveSQLException { - acquire(); + acquire(true); try { return sessionManager.getOperationManager().getOperationResultSetSchema(opHandle); } finally { - release(); + release(true); } } @Override public RowSet fetchResults(OperationHandle opHandle, FetchOrientation orientation, long maxRows, FetchType fetchType) throws HiveSQLException { - acquire(); + acquire(true); try { if (fetchType == FetchType.QUERY_OUTPUT) { - return sessionManager.getOperationManager() - .getOperationNextRowSet(opHandle, orientation, maxRows); - } else { - return sessionManager.getOperationManager() - .getOperationLogRowSet(opHandle, orientation, maxRows); + return operationManager.getOperationNextRowSet(opHandle, orientation, maxRows); } + return operationManager.getOperationLogRowSet(opHandle, orientation, maxRows); } finally { - release(); + release(true); } } Modified: hive/trunk/service/src/java/org/apache/hive/service/cli/session/HiveSessionImplwithUGI.java URL: http://svn.apache.org/viewvc/hive/trunk/service/src/java/org/apache/hive/service/cli/session/HiveSessionImplwithUGI.java?rev=1621912&r1=1621911&r2=1621912&view=diff ============================================================================== --- hive/trunk/service/src/java/org/apache/hive/service/cli/session/HiveSessionImplwithUGI.java (original) +++ hive/trunk/service/src/java/org/apache/hive/service/cli/session/HiveSessionImplwithUGI.java Tue Sep 2 04:41:29 2014 @@ -19,7 +19,6 @@ package org.apache.hive.service.cli.session; import java.io.IOException; -import java.util.Map; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.metadata.Hive; @@ -83,8 +82,8 @@ public class HiveSessionImplwithUGI exte } @Override - protected synchronized void acquire() throws HiveSQLException { - super.acquire(); + protected synchronized void acquire(boolean userAccess) { + super.acquire(userAccess); // if we have a metastore connection with impersonation, then set it first if (sessionHive != null) { Hive.set(sessionHive); @@ -98,11 +97,11 @@ public class HiveSessionImplwithUGI exte @Override public void close() throws HiveSQLException { try { - acquire(); + acquire(true); ShimLoader.getHadoopShims().closeAllForUGI(sessionUgi); cancelDelegationToken(); } finally { - release(); + release(true); super.close(); } } Modified: hive/trunk/service/src/java/org/apache/hive/service/cli/session/SessionManager.java URL: http://svn.apache.org/viewvc/hive/trunk/service/src/java/org/apache/hive/service/cli/session/SessionManager.java?rev=1621912&r1=1621911&r2=1621912&view=diff ============================================================================== --- hive/trunk/service/src/java/org/apache/hive/service/cli/session/SessionManager.java (original) +++ hive/trunk/service/src/java/org/apache/hive/service/cli/session/SessionManager.java Tue Sep 2 04:41:29 2014 @@ -20,6 +20,8 @@ package org.apache.hive.service.cli.sess import java.io.File; import java.io.IOException; +import java.util.ArrayList; +import java.util.Date; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -59,6 +61,11 @@ public class SessionManager extends Comp private boolean isOperationLogEnabled; private File operationLogRootDir; + private long checkInterval; + private long sessionTimeout; + + private volatile boolean shutdown; + public SessionManager() { super("SessionManager"); } @@ -81,20 +88,28 @@ public class SessionManager extends Comp } private void createBackgroundOperationPool() { - int backgroundPoolSize = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_ASYNC_EXEC_THREADS); - LOG.info("HiveServer2: Background operation thread pool size: " + backgroundPoolSize); - int backgroundPoolQueueSize = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_ASYNC_EXEC_WAIT_QUEUE_SIZE); - LOG.info("HiveServer2: Background operation thread wait queue size: " + backgroundPoolQueueSize); - int keepAliveTime = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_ASYNC_EXEC_KEEPALIVE_TIME); - LOG.info("HiveServer2: Background operation thread keepalive time: " + keepAliveTime); - // Create a thread pool with #backgroundPoolSize threads + int poolSize = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_ASYNC_EXEC_THREADS); + LOG.info("HiveServer2: Background operation thread pool size: " + poolSize); + int poolQueueSize = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_ASYNC_EXEC_WAIT_QUEUE_SIZE); + LOG.info("HiveServer2: Background operation thread wait queue size: " + poolQueueSize); + long keepAliveTime = HiveConf.getTimeVar( + hiveConf, ConfVars.HIVE_SERVER2_ASYNC_EXEC_KEEPALIVE_TIME, TimeUnit.SECONDS); + LOG.info( + "HiveServer2: Background operation thread keepalive time: " + keepAliveTime + " seconds"); + + // Create a thread pool with #poolSize threads // Threads terminate when they are idle for more than the keepAliveTime - // A bounded blocking queue is used to queue incoming operations, if #operations > backgroundPoolSize + // A bounded blocking queue is used to queue incoming operations, if #operations > poolSize String threadPoolName = "HiveServer2-Background-Pool"; - backgroundOperationPool = new ThreadPoolExecutor(backgroundPoolSize, backgroundPoolSize, - keepAliveTime, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(backgroundPoolQueueSize), + backgroundOperationPool = new ThreadPoolExecutor(poolSize, poolSize, + keepAliveTime, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(poolQueueSize), new ThreadFactoryWithGarbageCleanup(threadPoolName)); backgroundOperationPool.allowCoreThreadTimeOut(true); + + checkInterval = HiveConf.getTimeVar( + hiveConf, ConfVars.HIVE_SERVER2_SESSION_CHECK_INTERVAL, TimeUnit.MILLISECONDS); + sessionTimeout = HiveConf.getTimeVar( + hiveConf, ConfVars.HIVE_SERVER2_IDLE_SESSION_TIMEOUT, TimeUnit.MILLISECONDS); } private void applyAuthorizationConfigPolicy(HiveConf newHiveConf) throws HiveException { @@ -139,20 +154,61 @@ public class SessionManager extends Comp @Override public synchronized void start() { super.start(); + if (checkInterval > 0) { + startTimeoutChecker(); + } + } + + private void startTimeoutChecker() { + final long interval = Math.max(checkInterval, 3000l); // minimum 3 seconds + Runnable timeoutChecker = new Runnable() { + @Override + public void run() { + for (sleepInterval(interval); !shutdown; sleepInterval(interval)) { + long current = System.currentTimeMillis(); + for (HiveSession session : new ArrayList<HiveSession>(handleToSession.values())) { + if (sessionTimeout > 0 && session.getLastAccessTime() + sessionTimeout <= current) { + SessionHandle handle = session.getSessionHandle(); + LOG.warn("Session " + handle + " is Timed-out (last access : " + + new Date(session.getLastAccessTime()) + ") and will be closed"); + try { + closeSession(handle); + } catch (HiveSQLException e) { + LOG.warn("Exception is thrown closing session " + handle, e); + } + } else { + session.closeExpiredOperations(); + } + } + } + } + + private void sleepInterval(long interval) { + try { + Thread.sleep(interval); + } catch (InterruptedException e) { + // ignore + } + } + }; + backgroundOperationPool.execute(timeoutChecker); } @Override public synchronized void stop() { super.stop(); + shutdown = true; if (backgroundOperationPool != null) { backgroundOperationPool.shutdown(); - int timeout = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_ASYNC_EXEC_SHUTDOWN_TIMEOUT); + long timeout = hiveConf.getTimeVar( + ConfVars.HIVE_SERVER2_ASYNC_EXEC_SHUTDOWN_TIMEOUT, TimeUnit.SECONDS); try { backgroundOperationPool.awaitTermination(timeout, TimeUnit.SECONDS); } catch (InterruptedException e) { LOG.warn("HIVE_SERVER2_ASYNC_EXEC_SHUTDOWN_TIMEOUT = " + timeout + " seconds has been exceeded. RUNNING background operations will be shut down", e); } + backgroundOperationPool = null; } cleanupLoggingRootDir(); } Modified: hive/trunk/service/src/java/org/apache/hive/service/cli/thrift/ThriftBinaryCLIService.java URL: http://svn.apache.org/viewvc/hive/trunk/service/src/java/org/apache/hive/service/cli/thrift/ThriftBinaryCLIService.java?rev=1621912&r1=1621911&r2=1621912&view=diff ============================================================================== --- hive/trunk/service/src/java/org/apache/hive/service/cli/thrift/ThriftBinaryCLIService.java (original) +++ hive/trunk/service/src/java/org/apache/hive/service/cli/thrift/ThriftBinaryCLIService.java Tue Sep 2 04:41:29 2014 @@ -70,7 +70,8 @@ public class ThriftBinaryCLIService exte minWorkerThreads = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_THRIFT_MIN_WORKER_THREADS); maxWorkerThreads = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_THRIFT_MAX_WORKER_THREADS); - workerKeepAliveTime = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_THRIFT_WORKER_KEEPALIVE_TIME); + workerKeepAliveTime = hiveConf.getTimeVar( + ConfVars.HIVE_SERVER2_THRIFT_WORKER_KEEPALIVE_TIME, TimeUnit.SECONDS); String threadPoolName = "HiveServer2-Handler-Pool"; ExecutorService executorService = new ThreadPoolExecutor(minWorkerThreads, maxWorkerThreads, workerKeepAliveTime, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), Modified: hive/trunk/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java URL: http://svn.apache.org/viewvc/hive/trunk/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java?rev=1621912&r1=1621911&r2=1621912&view=diff ============================================================================== --- hive/trunk/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java (original) +++ hive/trunk/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java Tue Sep 2 04:41:29 2014 @@ -61,7 +61,7 @@ public abstract class ThriftCLIService e protected int minWorkerThreads; protected int maxWorkerThreads; - protected int workerKeepAliveTime; + protected long workerKeepAliveTime; protected static HiveAuthFactory hiveAuthFactory; Modified: hive/trunk/service/src/java/org/apache/hive/service/cli/thrift/ThriftHttpCLIService.java URL: http://svn.apache.org/viewvc/hive/trunk/service/src/java/org/apache/hive/service/cli/thrift/ThriftHttpCLIService.java?rev=1621912&r1=1621911&r2=1621912&view=diff ============================================================================== --- hive/trunk/service/src/java/org/apache/hive/service/cli/thrift/ThriftHttpCLIService.java (original) +++ hive/trunk/service/src/java/org/apache/hive/service/cli/thrift/ThriftHttpCLIService.java Tue Sep 2 04:41:29 2014 @@ -69,7 +69,8 @@ public class ThriftHttpCLIService extend minWorkerThreads = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_THRIFT_HTTP_MIN_WORKER_THREADS); maxWorkerThreads = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_THRIFT_HTTP_MAX_WORKER_THREADS); - workerKeepAliveTime = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_THRIFT_HTTP_WORKER_KEEPALIVE_TIME); + workerKeepAliveTime = hiveConf.getTimeVar( + ConfVars.HIVE_SERVER2_THRIFT_HTTP_WORKER_KEEPALIVE_TIME, TimeUnit.SECONDS); String httpPath = getHttpPath(hiveConf.getVar(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_HTTP_PATH)); @@ -110,7 +111,8 @@ public class ThriftHttpCLIService extend // Linux:yes, Windows:no connector.setReuseAddress(!Shell.WINDOWS); - int maxIdleTime = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_THRIFT_HTTP_MAX_IDLE_TIME); + int maxIdleTime = (int) hiveConf.getTimeVar( + ConfVars.HIVE_SERVER2_THRIFT_HTTP_MAX_IDLE_TIME, TimeUnit.MILLISECONDS); connector.setMaxIdleTime(maxIdleTime); httpServer.addConnector(connector); Modified: hive/trunk/service/src/test/org/apache/hive/service/cli/CLIServiceTest.java URL: http://svn.apache.org/viewvc/hive/trunk/service/src/test/org/apache/hive/service/cli/CLIServiceTest.java?rev=1621912&r1=1621911&r2=1621912&view=diff ============================================================================== --- hive/trunk/service/src/test/org/apache/hive/service/cli/CLIServiceTest.java (original) +++ hive/trunk/service/src/test/org/apache/hive/service/cli/CLIServiceTest.java Tue Sep 2 04:41:29 2014 @@ -26,9 +26,9 @@ import static org.junit.Assert.fail; import java.util.Collections; import java.util.HashMap; import java.util.Map; +import java.util.concurrent.TimeUnit; import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -202,7 +202,8 @@ public abstract class CLIServiceTest { * to give a compile time error. * (compilation is done synchronous as of now) */ - longPollingTimeout = new HiveConf().getLongVar(ConfVars.HIVE_SERVER2_LONG_POLLING_TIMEOUT); + longPollingTimeout = HiveConf.getTimeVar(new HiveConf(), + HiveConf.ConfVars.HIVE_SERVER2_LONG_POLLING_TIMEOUT, TimeUnit.MILLISECONDS); queryString = "SELECT NON_EXISTING_COLUMN FROM " + tableName; try { runQueryAsync(sessionHandle, queryString, confOverlay, OperationState.ERROR, longPollingTimeout); @@ -295,7 +296,7 @@ public abstract class CLIServiceTest { long longPollingTimeDelta; OperationStatus opStatus = null; OperationState state = null; - confOverlay.put(HiveConf.ConfVars.HIVE_SERVER2_LONG_POLLING_TIMEOUT.varname, String.valueOf(longPollingTimeout)); + confOverlay.put(HiveConf.ConfVars.HIVE_SERVER2_LONG_POLLING_TIMEOUT.varname, longPollingTimeout + "ms"); OperationHandle opHandle = client.executeStatementAsync(sessionHandle, queryString, confOverlay); int count = 0; while (true) {
