[ https://issues.apache.org/jira/browse/SPARK-26751?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
zhoukang updated SPARK-26751: ----------------------------- Description: When we run in background and we get exception which is not HiveSQLException, we may encounter memory leak since handleToOperation will not removed correctly. The reason is below: 1. when calling operation.run we throw an exception which is not HiveSQLException 2. then opHandleSet will not add the opHandle, and operationManager.closeOperation(opHandle); will not be called {code:java} private OperationHandle executeStatementInternal(String statement, Map<String, String> confOverlay, boolean runAsync) throws HiveSQLException { this.acquire(true); OperationManager operationManager = this.getOperationManager(); ExecuteStatementOperation operation = operationManager.newExecuteStatementOperation(this.getSession(), statement, confOverlay, runAsync); OperationHandle opHandle = operation.getHandle(); OperationHandle e; try { operation.run(); this.opHandleSet.add(opHandle); e = opHandle; } catch (HiveSQLException var11) { operationManager.closeOperation(opHandle); throw var11; } finally { this.release(true); } return e; } try { // This submit blocks if no background threads are available to run this operation val backgroundHandle = parentSession.getSessionManager().submitBackgroundOperation(backgroundOperation) setBackgroundHandle(backgroundHandle) } catch { case rejected: RejectedExecutionException => setState(OperationState.ERROR) throw new HiveSQLException("The background threadpool cannot accept" + " new task for execution, please retry the operation", rejected) case NonFatal(e) => logError(s"Error executing query in background", e) setState(OperationState.ERROR) throw e } } {code} 3. when we close the session we will also call operationManager.closeOperation(opHandle),since we did not add this opHandle into the opHandleSet. {code} public void close() throws HiveSQLException { try { this.acquire(true); Iterator ioe = this.opHandleSet.iterator(); while(ioe.hasNext()) { OperationHandle opHandle = (OperationHandle)ioe.next(); this.operationManager.closeOperation(opHandle); } this.opHandleSet.clear(); this.cleanupSessionLogDir(); this.cleanupPipeoutFile(); HiveHistory ioe1 = this.sessionState.getHiveHistory(); if(null != ioe1) { ioe1.closeStream(); } try { this.sessionState.close(); } finally { this.sessionState = null; } } catch (IOException var17) { throw new HiveSQLException("Failure to close", var17); } finally { if(this.sessionState != null) { try { this.sessionState.close(); } catch (Throwable var15) { LOG.warn("Error closing session", var15); } this.sessionState = null; } this.release(true); } } {code} 4. however, the opHandle will added into handleToOperation for each statement {code} val handleToOperation = ReflectionUtils .getSuperField[JMap[OperationHandle, Operation]](this, "handleToOperation") val sessionToActivePool = new ConcurrentHashMap[SessionHandle, String]() val sessionToContexts = new ConcurrentHashMap[SessionHandle, SQLContext]() override def newExecuteStatementOperation( parentSession: HiveSession, statement: String, confOverlay: JMap[String, String], async: Boolean): ExecuteStatementOperation = synchronized { val sqlContext = sessionToContexts.get(parentSession.getSessionHandle) require(sqlContext != null, s"Session handle: ${parentSession.getSessionHandle} has not been" + s" initialized or had already closed.") val conf = sqlContext.sessionState.conf val hiveSessionState = parentSession.getSessionState setConfMap(conf, hiveSessionState.getOverriddenConfigurations) setConfMap(conf, hiveSessionState.getHiveVariables) val runInBackground = async && conf.getConf(HiveUtils.HIVE_THRIFT_SERVER_ASYNC) val operation = new SparkExecuteStatementOperation(parentSession, statement, confOverlay, runInBackground)(sqlContext, sessionToActivePool) handleToOperation.put(operation.getHandle, operation) logDebug(s"Created Operation for $statement with session=$parentSession, " + s"runInBackground=$runInBackground") operation } {code} Below is an example which has memory leak: !26751.png! was: When we run in background and we get exception which is not HiveSQLException, we may encounter memory leak since handleToOperation will not removed correctly. The reason is below: 1. when calling operation.run we throw an exception which is not HiveSQLException 2. then opHandleSet will not add the opHandle, and operationManager.closeOperation(opHandle); will not be called {code:java} private OperationHandle executeStatementInternal(String statement, Map<String, String> confOverlay, boolean runAsync) throws HiveSQLException { this.acquire(true); OperationManager operationManager = this.getOperationManager(); ExecuteStatementOperation operation = operationManager.newExecuteStatementOperation(this.getSession(), statement, confOverlay, runAsync); OperationHandle opHandle = operation.getHandle(); OperationHandle e; try { operation.run(); this.opHandleSet.add(opHandle); e = opHandle; } catch (HiveSQLException var11) { operationManager.closeOperation(opHandle); throw var11; } finally { this.release(true); } return e; } try { // This submit blocks if no background threads are available to run this operation val backgroundHandle = parentSession.getSessionManager().submitBackgroundOperation(backgroundOperation) setBackgroundHandle(backgroundHandle) } catch { case rejected: RejectedExecutionException => setState(OperationState.ERROR) throw new HiveSQLException("The background threadpool cannot accept" + " new task for execution, please retry the operation", rejected) case NonFatal(e) => logError(s"Error executing query in background", e) setState(OperationState.ERROR) throw e } } {code} 3. when we close the session we will also call operationManager.closeOperation(opHandle),since we did not add this opHandle into the opHandleSet. {code} public void close() throws HiveSQLException { try { this.acquire(true); Iterator ioe = this.opHandleSet.iterator(); while(ioe.hasNext()) { OperationHandle opHandle = (OperationHandle)ioe.next(); this.operationManager.closeOperation(opHandle); } this.opHandleSet.clear(); this.cleanupSessionLogDir(); this.cleanupPipeoutFile(); HiveHistory ioe1 = this.sessionState.getHiveHistory(); if(null != ioe1) { ioe1.closeStream(); } try { this.sessionState.close(); } finally { this.sessionState = null; } } catch (IOException var17) { throw new HiveSQLException("Failure to close", var17); } finally { if(this.sessionState != null) { try { this.sessionState.close(); } catch (Throwable var15) { LOG.warn("Error closing session", var15); } this.sessionState = null; } this.release(true); } } {code} 4. however, the opHandle will added into handleToOperation for each statement {code} val handleToOperation = ReflectionUtils .getSuperField[JMap[OperationHandle, Operation]](this, "handleToOperation") val sessionToActivePool = new ConcurrentHashMap[SessionHandle, String]() val sessionToContexts = new ConcurrentHashMap[SessionHandle, SQLContext]() override def newExecuteStatementOperation( parentSession: HiveSession, statement: String, confOverlay: JMap[String, String], async: Boolean): ExecuteStatementOperation = synchronized { val sqlContext = sessionToContexts.get(parentSession.getSessionHandle) require(sqlContext != null, s"Session handle: ${parentSession.getSessionHandle} has not been" + s" initialized or had already closed.") val conf = sqlContext.sessionState.conf val hiveSessionState = parentSession.getSessionState setConfMap(conf, hiveSessionState.getOverriddenConfigurations) setConfMap(conf, hiveSessionState.getHiveVariables) val runInBackground = async && conf.getConf(HiveUtils.HIVE_THRIFT_SERVER_ASYNC) val operation = new SparkExecuteStatementOperation(parentSession, statement, confOverlay, runInBackground)(sqlContext, sessionToActivePool) handleToOperation.put(operation.getHandle, operation) logDebug(s"Created Operation for $statement with session=$parentSession, " + s"runInBackground=$runInBackground") operation } {code} Below is an example which has memory leak: > HiveSessionImpl might have memory leak since Operation do not close properly > ---------------------------------------------------------------------------- > > Key: SPARK-26751 > URL: https://issues.apache.org/jira/browse/SPARK-26751 > Project: Spark > Issue Type: Bug > Components: SQL > Affects Versions: 2.4.0 > Reporter: zhoukang > Priority: Major > Attachments: 26751.png > > > When we run in background and we get exception which is not HiveSQLException, > we may encounter memory leak since handleToOperation will not removed > correctly. > The reason is below: > 1. when calling operation.run we throw an exception which is not > HiveSQLException > 2. then opHandleSet will not add the opHandle, and > operationManager.closeOperation(opHandle); will not be called > {code:java} > private OperationHandle executeStatementInternal(String statement, > Map<String, String> confOverlay, boolean runAsync) throws HiveSQLException { > this.acquire(true); > OperationManager operationManager = this.getOperationManager(); > ExecuteStatementOperation operation = > operationManager.newExecuteStatementOperation(this.getSession(), statement, > confOverlay, runAsync); > OperationHandle opHandle = operation.getHandle(); > OperationHandle e; > try { > operation.run(); > this.opHandleSet.add(opHandle); > e = opHandle; > } catch (HiveSQLException var11) { > operationManager.closeOperation(opHandle); > throw var11; > } finally { > this.release(true); > } > return e; > } > try { > // This submit blocks if no background threads are available to run > this operation > val backgroundHandle = > > parentSession.getSessionManager().submitBackgroundOperation(backgroundOperation) > setBackgroundHandle(backgroundHandle) > } catch { > case rejected: RejectedExecutionException => > setState(OperationState.ERROR) > throw new HiveSQLException("The background threadpool cannot > accept" + > " new task for execution, please retry the operation", rejected) > case NonFatal(e) => > logError(s"Error executing query in background", e) > setState(OperationState.ERROR) > throw e > } > } > {code} > 3. when we close the session we will also call > operationManager.closeOperation(opHandle),since we did not add this opHandle > into the opHandleSet. > {code} > public void close() throws HiveSQLException { > try { > this.acquire(true); > Iterator ioe = this.opHandleSet.iterator(); > while(ioe.hasNext()) { > OperationHandle opHandle = (OperationHandle)ioe.next(); > this.operationManager.closeOperation(opHandle); > } > this.opHandleSet.clear(); > this.cleanupSessionLogDir(); > this.cleanupPipeoutFile(); > HiveHistory ioe1 = this.sessionState.getHiveHistory(); > if(null != ioe1) { > ioe1.closeStream(); > } > try { > this.sessionState.close(); > } finally { > this.sessionState = null; > } > } catch (IOException var17) { > throw new HiveSQLException("Failure to close", var17); > } finally { > if(this.sessionState != null) { > try { > this.sessionState.close(); > } catch (Throwable var15) { > LOG.warn("Error closing session", var15); > } > this.sessionState = null; > } > this.release(true); > } > } > {code} > 4. however, the opHandle will added into handleToOperation for each statement > {code} > val handleToOperation = ReflectionUtils > .getSuperField[JMap[OperationHandle, Operation]](this, > "handleToOperation") > val sessionToActivePool = new ConcurrentHashMap[SessionHandle, String]() > val sessionToContexts = new ConcurrentHashMap[SessionHandle, SQLContext]() > override def newExecuteStatementOperation( > parentSession: HiveSession, > statement: String, > confOverlay: JMap[String, String], > async: Boolean): ExecuteStatementOperation = synchronized { > val sqlContext = sessionToContexts.get(parentSession.getSessionHandle) > require(sqlContext != null, s"Session handle: > ${parentSession.getSessionHandle} has not been" + > s" initialized or had already closed.") > val conf = sqlContext.sessionState.conf > val hiveSessionState = parentSession.getSessionState > setConfMap(conf, hiveSessionState.getOverriddenConfigurations) > setConfMap(conf, hiveSessionState.getHiveVariables) > val runInBackground = async && > conf.getConf(HiveUtils.HIVE_THRIFT_SERVER_ASYNC) > val operation = new SparkExecuteStatementOperation(parentSession, > statement, confOverlay, > runInBackground)(sqlContext, sessionToActivePool) > handleToOperation.put(operation.getHandle, operation) > logDebug(s"Created Operation for $statement with session=$parentSession, > " + > s"runInBackground=$runInBackground") > operation > } > {code} > Below is an example which has memory leak: > !26751.png! -- This message was sent by Atlassian JIRA (v7.6.3#76005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org