This is an automated email from the ASF dual-hosted git repository.
chengpan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git
The following commit(s) were added to refs/heads/master by this push:
new 0a0088bd7 [KYUUBI #5570] Fix memory leak when using incremental
collect mode in JDBC engine
0a0088bd7 is described below
commit 0a0088bd7255d0b579924683c273085c6444210f
Author: wforget <[email protected]>
AuthorDate: Wed Nov 1 21:48:21 2023 +0800
[KYUUBI #5570] Fix memory leak when using incremental collect mode in JDBC
engine
### _Why are the changes needed?_
Similar to #3885, there is also memory leak in the jdbc engine when using
incremental collect mode.
Duplicate of #5161
### _How was this patch tested?_
- [ ] Add some test cases that check the changes thoroughly including
negative and positive cases if possible
- [ ] Add screenshots for manual tests if appropriate
- [ ] [Run
test](https://kyuubi.readthedocs.io/en/master/contributing/code/testing.html#running-tests)
locally before make a pull request
### _Was this patch authored or co-authored using generative AI tooling?_
No
Closes #5570 from wForget/hotfix.
Closes #5570
dbeddc9cf [wforget] comment
0c4e499ba [wforget] fix
b7c421a37 [wforget] fix
0ef21ce55 [wforget] fix
f5f2e8048 [wforget] Fix memory leak when using incremental collect mode in
JDBC engine
Authored-by: wforget <[email protected]>
Signed-off-by: Cheng Pan <[email protected]>
---
.../engine/jdbc/operation/ExecuteStatement.scala | 37 ++++++++++++++++++----
.../engine/jdbc/operation/JdbcOperation.scala | 5 ++-
.../kyuubi/engine/jdbc/util/ResultSetWrapper.scala | 2 ++
3 files changed, 36 insertions(+), 8 deletions(-)
diff --git
a/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/operation/ExecuteStatement.scala
b/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/operation/ExecuteStatement.scala
index ef49f2b30..b7caea014 100644
---
a/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/operation/ExecuteStatement.scala
+++
b/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/operation/ExecuteStatement.scala
@@ -18,11 +18,13 @@ package org.apache.kyuubi.engine.jdbc.operation
import java.sql.{Connection, Statement, Types}
-import org.apache.kyuubi.Logging
+import org.apache.kyuubi.{KyuubiSQLException, Logging}
import org.apache.kyuubi.engine.jdbc.schema.{Column, Row, Schema}
import org.apache.kyuubi.engine.jdbc.session.JdbcSessionImpl
import org.apache.kyuubi.engine.jdbc.util.ResultSetWrapper
-import org.apache.kyuubi.operation.{ArrayFetchIterator, IterableFetchIterator,
OperationState}
+import org.apache.kyuubi.operation.{ArrayFetchIterator, FetchOrientation,
IterableFetchIterator, OperationState}
+import org.apache.kyuubi.operation.FetchOrientation.FetchOrientation
+import org.apache.kyuubi.operation.OperationState.OperationState
import org.apache.kyuubi.operation.log.OperationLog
import org.apache.kyuubi.session.Session
@@ -37,6 +39,8 @@ class ExecuteStatement(
private val operationLog: OperationLog =
OperationLog.createOperationLog(session, getHandle)
override def getOperationLog: Option[OperationLog] = Option(operationLog)
+ @volatile private var jdbcStatement: Statement = _
+
override protected def runInternal(): Unit = {
addTimeoutMonitor(queryTimeout)
if (shouldRunAsync) {
@@ -55,7 +59,6 @@ class ExecuteStatement(
private def executeStatement(): Unit = {
setState(OperationState.RUNNING)
- var jdbcStatement: Statement = null
try {
val connection: Connection =
session.asInstanceOf[JdbcSessionImpl].sessionConnection
jdbcStatement = dialect.createStatement(connection)
@@ -67,9 +70,12 @@ class ExecuteStatement(
iter =
if (incrementalCollect) {
info("Execute in incremental collect mode")
- new IterableFetchIterator(resultSetWrapper.toIterable)
+ new IterableFetchIterator(new Iterable[Row] {
+ override def iterator: Iterator[Row] = resultSetWrapper
+ })
} else {
warn(s"Execute in full collect mode")
+ jdbcStatement.closeOnCompletion()
new ArrayFetchIterator(resultSetWrapper.toArray())
}
} else {
@@ -89,10 +95,27 @@ class ExecuteStatement(
} catch {
onError(true)
} finally {
- if (jdbcStatement != null) {
- jdbcStatement.closeOnCompletion()
- }
shutdownTimeoutMonitor()
}
}
+
+ override def validateFetchOrientation(order: FetchOrientation): Unit = {
+ if (incrementalCollect && order != FetchOrientation.FETCH_NEXT) {
+ throw KyuubiSQLException(s"The fetch type $order is not supported" +
+ " of incremental collect mode.")
+ }
+ super.validateFetchOrientation(order)
+ }
+
+ override def cleanup(targetState: OperationState): Unit = withLockRequired {
+ try {
+ super.cleanup(targetState)
+ } finally {
+ if (jdbcStatement != null && !jdbcStatement.isClosed) {
+ jdbcStatement.close()
+ jdbcStatement = null
+ }
+ }
+ }
+
}
diff --git
a/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/operation/JdbcOperation.scala
b/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/operation/JdbcOperation.scala
index 2ca173757..788d1ba55 100644
---
a/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/operation/JdbcOperation.scala
+++
b/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/operation/JdbcOperation.scala
@@ -36,10 +36,13 @@ abstract class JdbcOperation(session: Session) extends
AbstractOperation(session
protected lazy val dialect: JdbcDialect = JdbcDialects.get(conf)
+ def validateFetchOrientation(order: FetchOrientation): Unit =
+ validateDefaultFetchOrientation(order)
+
override def getNextRowSetInternal(
order: FetchOrientation,
rowSetSize: Int): TFetchResultsResp = {
- validateDefaultFetchOrientation(order)
+ validateFetchOrientation(order)
assertState(OperationState.FINISHED)
setHasResultSet(true)
order match {
diff --git
a/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/util/ResultSetWrapper.scala
b/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/util/ResultSetWrapper.scala
index 8bc7027f1..0fead73b1 100644
---
a/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/util/ResultSetWrapper.scala
+++
b/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/util/ResultSetWrapper.scala
@@ -30,6 +30,7 @@ class ResultSetWrapper(statement: Statement)
private lazy val metadata = currentResult.getMetaData
override def hasNext: Boolean = {
+ if (currentResult == null) return false
val result = currentResult.next()
if (!result) {
val hasMoreResults =
statement.getMoreResults(Statement.CLOSE_CURRENT_RESULT)
@@ -37,6 +38,7 @@ class ResultSetWrapper(statement: Statement)
currentResult = statement.getResultSet
currentResult.next()
} else {
+ currentResult = null
false
}
} else {