This is an automated email from the ASF dual-hosted git repository.

chengpan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git


The following commit(s) were added to refs/heads/master by this push:
     new 0a0088bd7 [KYUUBI #5570] Fix memory leak when using incremental 
collect mode in JDBC engine
0a0088bd7 is described below

commit 0a0088bd7255d0b579924683c273085c6444210f
Author: wforget <[email protected]>
AuthorDate: Wed Nov 1 21:48:21 2023 +0800

    [KYUUBI #5570] Fix memory leak when using incremental collect mode in JDBC 
engine
    
    ### _Why are the changes needed?_
    
    Similar to #3885, there is also memory leak in the jdbc engine when using 
incremental collect mode.
    
    Duplicate of #5161
    
    ### _How was this patch tested?_
    - [ ] Add some test cases that check the changes thoroughly including 
negative and positive cases if possible
    
    - [ ] Add screenshots for manual tests if appropriate
    
    - [ ] [Run 
test](https://kyuubi.readthedocs.io/en/master/contributing/code/testing.html#running-tests)
 locally before make a pull request
    
    ### _Was this patch authored or co-authored using generative AI tooling?_
    
    No
    
    Closes #5570 from wForget/hotfix.
    
    Closes #5570
    
    dbeddc9cf [wforget] comment
    0c4e499ba [wforget] fix
    b7c421a37 [wforget] fix
    0ef21ce55 [wforget] fix
    f5f2e8048 [wforget] Fix memory leak when using incremental collect mode in 
JDBC engine
    
    Authored-by: wforget <[email protected]>
    Signed-off-by: Cheng Pan <[email protected]>
---
 .../engine/jdbc/operation/ExecuteStatement.scala   | 37 ++++++++++++++++++----
 .../engine/jdbc/operation/JdbcOperation.scala      |  5 ++-
 .../kyuubi/engine/jdbc/util/ResultSetWrapper.scala |  2 ++
 3 files changed, 36 insertions(+), 8 deletions(-)

diff --git 
a/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/operation/ExecuteStatement.scala
 
b/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/operation/ExecuteStatement.scala
index ef49f2b30..b7caea014 100644
--- 
a/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/operation/ExecuteStatement.scala
+++ 
b/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/operation/ExecuteStatement.scala
@@ -18,11 +18,13 @@ package org.apache.kyuubi.engine.jdbc.operation
 
 import java.sql.{Connection, Statement, Types}
 
-import org.apache.kyuubi.Logging
+import org.apache.kyuubi.{KyuubiSQLException, Logging}
 import org.apache.kyuubi.engine.jdbc.schema.{Column, Row, Schema}
 import org.apache.kyuubi.engine.jdbc.session.JdbcSessionImpl
 import org.apache.kyuubi.engine.jdbc.util.ResultSetWrapper
-import org.apache.kyuubi.operation.{ArrayFetchIterator, IterableFetchIterator, 
OperationState}
+import org.apache.kyuubi.operation.{ArrayFetchIterator, FetchOrientation, 
IterableFetchIterator, OperationState}
+import org.apache.kyuubi.operation.FetchOrientation.FetchOrientation
+import org.apache.kyuubi.operation.OperationState.OperationState
 import org.apache.kyuubi.operation.log.OperationLog
 import org.apache.kyuubi.session.Session
 
@@ -37,6 +39,8 @@ class ExecuteStatement(
   private val operationLog: OperationLog = 
OperationLog.createOperationLog(session, getHandle)
   override def getOperationLog: Option[OperationLog] = Option(operationLog)
 
+  @volatile private var jdbcStatement: Statement = _
+
   override protected def runInternal(): Unit = {
     addTimeoutMonitor(queryTimeout)
     if (shouldRunAsync) {
@@ -55,7 +59,6 @@ class ExecuteStatement(
 
   private def executeStatement(): Unit = {
     setState(OperationState.RUNNING)
-    var jdbcStatement: Statement = null
     try {
       val connection: Connection = 
session.asInstanceOf[JdbcSessionImpl].sessionConnection
       jdbcStatement = dialect.createStatement(connection)
@@ -67,9 +70,12 @@ class ExecuteStatement(
         iter =
           if (incrementalCollect) {
             info("Execute in incremental collect mode")
-            new IterableFetchIterator(resultSetWrapper.toIterable)
+            new IterableFetchIterator(new Iterable[Row] {
+              override def iterator: Iterator[Row] = resultSetWrapper
+            })
           } else {
             warn(s"Execute in full collect mode")
+            jdbcStatement.closeOnCompletion()
             new ArrayFetchIterator(resultSetWrapper.toArray())
           }
       } else {
@@ -89,10 +95,27 @@ class ExecuteStatement(
     } catch {
       onError(true)
     } finally {
-      if (jdbcStatement != null) {
-        jdbcStatement.closeOnCompletion()
-      }
       shutdownTimeoutMonitor()
     }
   }
+
+  override def validateFetchOrientation(order: FetchOrientation): Unit = {
+    if (incrementalCollect && order != FetchOrientation.FETCH_NEXT) {
+      throw KyuubiSQLException(s"The fetch type $order is not supported" +
+        " of incremental collect mode.")
+    }
+    super.validateFetchOrientation(order)
+  }
+
+  override def cleanup(targetState: OperationState): Unit = withLockRequired {
+    try {
+      super.cleanup(targetState)
+    } finally {
+      if (jdbcStatement != null && !jdbcStatement.isClosed) {
+        jdbcStatement.close()
+        jdbcStatement = null
+      }
+    }
+  }
+
 }
diff --git 
a/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/operation/JdbcOperation.scala
 
b/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/operation/JdbcOperation.scala
index 2ca173757..788d1ba55 100644
--- 
a/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/operation/JdbcOperation.scala
+++ 
b/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/operation/JdbcOperation.scala
@@ -36,10 +36,13 @@ abstract class JdbcOperation(session: Session) extends 
AbstractOperation(session
 
   protected lazy val dialect: JdbcDialect = JdbcDialects.get(conf)
 
+  def validateFetchOrientation(order: FetchOrientation): Unit =
+    validateDefaultFetchOrientation(order)
+
   override def getNextRowSetInternal(
       order: FetchOrientation,
       rowSetSize: Int): TFetchResultsResp = {
-    validateDefaultFetchOrientation(order)
+    validateFetchOrientation(order)
     assertState(OperationState.FINISHED)
     setHasResultSet(true)
     order match {
diff --git 
a/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/util/ResultSetWrapper.scala
 
b/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/util/ResultSetWrapper.scala
index 8bc7027f1..0fead73b1 100644
--- 
a/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/util/ResultSetWrapper.scala
+++ 
b/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/util/ResultSetWrapper.scala
@@ -30,6 +30,7 @@ class ResultSetWrapper(statement: Statement)
   private lazy val metadata = currentResult.getMetaData
 
   override def hasNext: Boolean = {
+    if (currentResult == null) return false
     val result = currentResult.next()
     if (!result) {
       val hasMoreResults = 
statement.getMoreResults(Statement.CLOSE_CURRENT_RESULT)
@@ -37,6 +38,7 @@ class ResultSetWrapper(statement: Statement)
         currentResult = statement.getResultSet
         currentResult.next()
       } else {
+        currentResult = null
         false
       }
     } else {

Reply via email to