zhoukang created SPARK-26751:
--------------------------------
Summary: HiveSessionImpl might have memory leak since Operation do
not close properly
Key: SPARK-26751
URL: https://issues.apache.org/jira/browse/SPARK-26751
Project: Spark
Issue Type: Bug
Components: SQL
Affects Versions: 2.4.0
Reporter: zhoukang
When we run in background and we get exception which is not HiveSQLException,
we may encounter memory leak since handleToOperation will not removed correctly.
The reason is below:
1. when calling operation.run we throw an exception which is not
HiveSQLException
2. then opHandleSet will not add the opHandle, and
operationManager.closeOperation(opHandle); will not be called
{code:java}
private OperationHandle executeStatementInternal(String statement, Map<String,
String> confOverlay, boolean runAsync) throws HiveSQLException {
this.acquire(true);
OperationManager operationManager = this.getOperationManager();
ExecuteStatementOperation operation =
operationManager.newExecuteStatementOperation(this.getSession(), statement,
confOverlay, runAsync);
OperationHandle opHandle = operation.getHandle();
OperationHandle e;
try {
operation.run();
this.opHandleSet.add(opHandle);
e = opHandle;
} catch (HiveSQLException var11) {
operationManager.closeOperation(opHandle);
throw var11;
} finally {
this.release(true);
}
return e;
}
try {
// This submit blocks if no background threads are available to run
this operation
val backgroundHandle =
parentSession.getSessionManager().submitBackgroundOperation(backgroundOperation)
setBackgroundHandle(backgroundHandle)
} catch {
case rejected: RejectedExecutionException =>
setState(OperationState.ERROR)
throw new HiveSQLException("The background threadpool cannot accept" +
" new task for execution, please retry the operation", rejected)
case NonFatal(e) =>
logError(s"Error executing query in background", e)
setState(OperationState.ERROR)
throw e
}
}
{code}
3. when we close the session we will also call
operationManager.closeOperation(opHandle),since we did not add this opHandle
into the opHandleSet.
{code}
public void close() throws HiveSQLException {
try {
this.acquire(true);
Iterator ioe = this.opHandleSet.iterator();
while(ioe.hasNext()) {
OperationHandle opHandle = (OperationHandle)ioe.next();
this.operationManager.closeOperation(opHandle);
}
this.opHandleSet.clear();
this.cleanupSessionLogDir();
this.cleanupPipeoutFile();
HiveHistory ioe1 = this.sessionState.getHiveHistory();
if(null != ioe1) {
ioe1.closeStream();
}
try {
this.sessionState.close();
} finally {
this.sessionState = null;
}
} catch (IOException var17) {
throw new HiveSQLException("Failure to close", var17);
} finally {
if(this.sessionState != null) {
try {
this.sessionState.close();
} catch (Throwable var15) {
LOG.warn("Error closing session", var15);
}
this.sessionState = null;
}
this.release(true);
}
}
{code}
4. however, the opHandle will added into handleToOperation for each statement
{code}
val handleToOperation = ReflectionUtils
.getSuperField[JMap[OperationHandle, Operation]](this, "handleToOperation")
val sessionToActivePool = new ConcurrentHashMap[SessionHandle, String]()
val sessionToContexts = new ConcurrentHashMap[SessionHandle, SQLContext]()
override def newExecuteStatementOperation(
parentSession: HiveSession,
statement: String,
confOverlay: JMap[String, String],
async: Boolean): ExecuteStatementOperation = synchronized {
val sqlContext = sessionToContexts.get(parentSession.getSessionHandle)
require(sqlContext != null, s"Session handle:
${parentSession.getSessionHandle} has not been" +
s" initialized or had already closed.")
val conf = sqlContext.sessionState.conf
val hiveSessionState = parentSession.getSessionState
setConfMap(conf, hiveSessionState.getOverriddenConfigurations)
setConfMap(conf, hiveSessionState.getHiveVariables)
val runInBackground = async &&
conf.getConf(HiveUtils.HIVE_THRIFT_SERVER_ASYNC)
val operation = new SparkExecuteStatementOperation(parentSession,
statement, confOverlay,
runInBackground)(sqlContext, sessionToActivePool)
handleToOperation.put(operation.getHandle, operation)
logDebug(s"Created Operation for $statement with session=$parentSession, " +
s"runInBackground=$runInBackground")
operation
}
{code}
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]