zhoukang created SPARK-26751:
--------------------------------

             Summary: HiveSessionImpl might have memory leak since Operation do 
not close properly
                 Key: SPARK-26751
                 URL: https://issues.apache.org/jira/browse/SPARK-26751
             Project: Spark
          Issue Type: Bug
          Components: SQL
    Affects Versions: 2.4.0
            Reporter: zhoukang


When we run in background and we get exception which is not HiveSQLException,
we may encounter memory leak since handleToOperation will not removed correctly.
The reason is below:
1. when calling operation.run we throw an exception which is not 
HiveSQLException
2. then opHandleSet will not add the opHandle, and 
operationManager.closeOperation(opHandle); will not be called
{code:java}
 private OperationHandle executeStatementInternal(String statement, Map<String, 
String> confOverlay, boolean runAsync) throws HiveSQLException {
        this.acquire(true);
        OperationManager operationManager = this.getOperationManager();
        ExecuteStatementOperation operation = 
operationManager.newExecuteStatementOperation(this.getSession(), statement, 
confOverlay, runAsync);
        OperationHandle opHandle = operation.getHandle();

        OperationHandle e;
        try {
            operation.run();
            this.opHandleSet.add(opHandle);
            e = opHandle;
        } catch (HiveSQLException var11) {
            operationManager.closeOperation(opHandle);
            throw var11;
        } finally {
            this.release(true);
        }

        return e;
    }


      try {
        // This submit blocks if no background threads are available to run 
this operation
        val backgroundHandle =
          
parentSession.getSessionManager().submitBackgroundOperation(backgroundOperation)
        setBackgroundHandle(backgroundHandle)
      } catch {
        case rejected: RejectedExecutionException =>
          setState(OperationState.ERROR)
          throw new HiveSQLException("The background threadpool cannot accept" +
            " new task for execution, please retry the operation", rejected)
        case NonFatal(e) =>
          logError(s"Error executing query in background", e)
          setState(OperationState.ERROR)
          throw e
      }
    }
{code}
3. when we close the session we will also call 
operationManager.closeOperation(opHandle),since we did not add this opHandle 
into the opHandleSet.
{code}
public void close() throws HiveSQLException {
        try {
            this.acquire(true);
            Iterator ioe = this.opHandleSet.iterator();

            while(ioe.hasNext()) {
                OperationHandle opHandle = (OperationHandle)ioe.next();
                this.operationManager.closeOperation(opHandle);
            }

            this.opHandleSet.clear();
            this.cleanupSessionLogDir();
            this.cleanupPipeoutFile();
            HiveHistory ioe1 = this.sessionState.getHiveHistory();
            if(null != ioe1) {
                ioe1.closeStream();
            }

            try {
                this.sessionState.close();
            } finally {
                this.sessionState = null;
            }
        } catch (IOException var17) {
            throw new HiveSQLException("Failure to close", var17);
        } finally {
            if(this.sessionState != null) {
                try {
                    this.sessionState.close();
                } catch (Throwable var15) {
                    LOG.warn("Error closing session", var15);
                }

                this.sessionState = null;
            }

            this.release(true);
        }

    }
{code}
4. however, the opHandle will added into handleToOperation for each statement
{code}
val handleToOperation = ReflectionUtils
    .getSuperField[JMap[OperationHandle, Operation]](this, "handleToOperation")

  val sessionToActivePool = new ConcurrentHashMap[SessionHandle, String]()
  val sessionToContexts = new ConcurrentHashMap[SessionHandle, SQLContext]()

  override def newExecuteStatementOperation(
      parentSession: HiveSession,
      statement: String,
      confOverlay: JMap[String, String],
      async: Boolean): ExecuteStatementOperation = synchronized {
    val sqlContext = sessionToContexts.get(parentSession.getSessionHandle)
    require(sqlContext != null, s"Session handle: 
${parentSession.getSessionHandle} has not been" +
      s" initialized or had already closed.")
    val conf = sqlContext.sessionState.conf
    val hiveSessionState = parentSession.getSessionState
    setConfMap(conf, hiveSessionState.getOverriddenConfigurations)
    setConfMap(conf, hiveSessionState.getHiveVariables)
    val runInBackground = async && 
conf.getConf(HiveUtils.HIVE_THRIFT_SERVER_ASYNC)
    val operation = new SparkExecuteStatementOperation(parentSession, 
statement, confOverlay,
      runInBackground)(sqlContext, sessionToActivePool)
    handleToOperation.put(operation.getHandle, operation)
    logDebug(s"Created Operation for $statement with session=$parentSession, " +
      s"runInBackground=$runInBackground")
    operation
  }
{code}





--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to