This is an automated email from the ASF dual-hosted git repository.
feiwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git
The following commit(s) were added to refs/heads/master by this push:
new 7d8e89c27 [KYUUBI #4949] For operation getNextRowSet method, with
operation lock required
7d8e89c27 is described below
commit 7d8e89c27f4e489079e353e40470592a50cfd45c
Author: fwang12 <[email protected]>
AuthorDate: Mon Jun 12 13:31:36 2023 +0800
[KYUUBI #4949] For operation getNextRowSet method, with operation lock
required
### _Why are the changes needed?_
For the operation getNextRowSet method, we shall add lock for it.
For example, for spark operation, the result iterator is not thread-safe,
it might throw exception(if the jdbc client to kyuubi server connection socket
timeout).
For incremental collect mode, the fetchResult might trigger a spark task to
collect the incremental result(`self.next().toIterator`).
The jdbc client to kyuubi gateway timeout, but the fetchResult request has
been sent to engine.
Then the jdbc client re-send the fetchResult request.
And the getNextResultSet in spark engine side concurrent execute.
And the result iterator is not thread-safe and might cause NPE.


### _How was this patch tested?_
- [ ] Add some test cases that check the changes thoroughly including
negative and positive cases if possible
- [ ] Add screenshots for manual tests if appropriate
- [x] [Run
test](https://kyuubi.readthedocs.io/en/master/develop_tools/testing.html#running-tests)
locally before make a pull request
Closes #4949 from turboFei/lock_next_rowset.
Closes #4949
8f18f3236 [fwang12] getNextRowSetInternal and withLockRequired
Authored-by: fwang12 <[email protected]>
Signed-off-by: fwang12 <[email protected]>
---
.../org/apache/kyuubi/engine/chat/operation/ChatOperation.scala | 2 +-
.../org/apache/kyuubi/engine/flink/operation/FlinkOperation.scala | 2 +-
.../org/apache/kyuubi/engine/hive/operation/HiveOperation.scala | 2 +-
.../org/apache/kyuubi/engine/jdbc/operation/JdbcOperation.scala | 2 +-
.../org/apache/kyuubi/engine/spark/operation/SparkOperation.scala | 2 +-
.../org/apache/kyuubi/engine/trino/operation/ExecuteStatement.scala | 2 +-
.../org/apache/kyuubi/engine/trino/operation/TrinoOperation.scala | 2 +-
.../main/scala/org/apache/kyuubi/operation/AbstractOperation.scala | 6 +++++-
.../src/test/scala/org/apache/kyuubi/operation/NoopOperation.scala | 2 +-
.../scala/org/apache/kyuubi/operation/ExecutedCommandExec.scala | 2 +-
.../org/apache/kyuubi/operation/KyuubiApplicationOperation.scala | 2 +-
.../main/scala/org/apache/kyuubi/operation/KyuubiOperation.scala | 2 +-
12 files changed, 16 insertions(+), 12 deletions(-)
diff --git
a/externals/kyuubi-chat-engine/src/main/scala/org/apache/kyuubi/engine/chat/operation/ChatOperation.scala
b/externals/kyuubi-chat-engine/src/main/scala/org/apache/kyuubi/engine/chat/operation/ChatOperation.scala
index bb6e8a8a3..3ae21aa9f 100644
---
a/externals/kyuubi-chat-engine/src/main/scala/org/apache/kyuubi/engine/chat/operation/ChatOperation.scala
+++
b/externals/kyuubi-chat-engine/src/main/scala/org/apache/kyuubi/engine/chat/operation/ChatOperation.scala
@@ -31,7 +31,7 @@ abstract class ChatOperation(session: Session) extends
AbstractOperation(session
protected lazy val conf: KyuubiConf = session.sessionManager.getConf
- override def getNextRowSet(order: FetchOrientation, rowSetSize: Int):
TRowSet = {
+ override def getNextRowSetInternal(order: FetchOrientation, rowSetSize:
Int): TRowSet = {
validateDefaultFetchOrientation(order)
assertState(OperationState.FINISHED)
setHasResultSet(true)
diff --git
a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperation.scala
b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperation.scala
index 422ae7d4b..ef80034d3 100644
---
a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperation.scala
+++
b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperation.scala
@@ -91,7 +91,7 @@ abstract class FlinkOperation(session: Session) extends
AbstractOperation(sessio
resp
}
- override def getNextRowSet(order: FetchOrientation, rowSetSize: Int):
TRowSet = {
+ override def getNextRowSetInternal(order: FetchOrientation, rowSetSize:
Int): TRowSet = {
validateDefaultFetchOrientation(order)
assertState(OperationState.FINISHED)
setHasResultSet(true)
diff --git
a/externals/kyuubi-hive-sql-engine/src/main/scala/org/apache/kyuubi/engine/hive/operation/HiveOperation.scala
b/externals/kyuubi-hive-sql-engine/src/main/scala/org/apache/kyuubi/engine/hive/operation/HiveOperation.scala
index c02569784..2df4a072a 100644
---
a/externals/kyuubi-hive-sql-engine/src/main/scala/org/apache/kyuubi/engine/hive/operation/HiveOperation.scala
+++
b/externals/kyuubi-hive-sql-engine/src/main/scala/org/apache/kyuubi/engine/hive/operation/HiveOperation.scala
@@ -92,7 +92,7 @@ abstract class HiveOperation(session: Session) extends
AbstractOperation(session
resp
}
- override def getNextRowSet(order: FetchOrientation, rowSetSize: Int):
TRowSet = {
+ override def getNextRowSetInternal(order: FetchOrientation, rowSetSize:
Int): TRowSet = {
val tOrder = FetchOrientation.toTFetchOrientation(order)
val hiveOrder =
org.apache.hive.service.cli.FetchOrientation.getFetchOrientation(tOrder)
val rowSet = internalHiveOperation.getNextRowSet(hiveOrder, rowSetSize)
diff --git
a/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/operation/JdbcOperation.scala
b/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/operation/JdbcOperation.scala
index f4d1c27e7..1c4e0c57f 100644
---
a/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/operation/JdbcOperation.scala
+++
b/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/operation/JdbcOperation.scala
@@ -36,7 +36,7 @@ abstract class JdbcOperation(session: Session) extends
AbstractOperation(session
protected lazy val dialect: JdbcDialect = JdbcDialects.get(conf)
- override def getNextRowSet(order: FetchOrientation, rowSetSize: Int):
TRowSet = {
+ override def getNextRowSetInternal(order: FetchOrientation, rowSetSize:
Int): TRowSet = {
validateDefaultFetchOrientation(order)
assertState(OperationState.FINISHED)
setHasResultSet(true)
diff --git
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkOperation.scala
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkOperation.scala
index 320e67635..5062a4904 100644
---
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkOperation.scala
+++
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkOperation.scala
@@ -233,7 +233,7 @@ abstract class SparkOperation(session: Session)
resp
}
- override def getNextRowSet(order: FetchOrientation, rowSetSize: Int):
TRowSet = {
+ override def getNextRowSetInternal(order: FetchOrientation, rowSetSize:
Int): TRowSet = {
var resultRowSet: TRowSet = null
try {
withLocalProperties {
diff --git
a/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/operation/ExecuteStatement.scala
b/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/operation/ExecuteStatement.scala
index eb1b27300..405c93b0b 100644
---
a/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/operation/ExecuteStatement.scala
+++
b/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/operation/ExecuteStatement.scala
@@ -82,7 +82,7 @@ class ExecuteStatement(
}
}
- override def getNextRowSet(order: FetchOrientation, rowSetSize: Int):
TRowSet = {
+ override def getNextRowSetInternal(order: FetchOrientation, rowSetSize:
Int): TRowSet = {
validateDefaultFetchOrientation(order)
assertState(OperationState.FINISHED)
setHasResultSet(true)
diff --git
a/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/operation/TrinoOperation.scala
b/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/operation/TrinoOperation.scala
index bff058605..20153b84f 100644
---
a/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/operation/TrinoOperation.scala
+++
b/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/operation/TrinoOperation.scala
@@ -54,7 +54,7 @@ abstract class TrinoOperation(session: Session) extends
AbstractOperation(sessio
resp
}
- override def getNextRowSet(order: FetchOrientation, rowSetSize: Int):
TRowSet = {
+ override def getNextRowSetInternal(order: FetchOrientation, rowSetSize:
Int): TRowSet = {
validateDefaultFetchOrientation(order)
assertState(OperationState.FINISHED)
setHasResultSet(true)
diff --git
a/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/AbstractOperation.scala
b/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/AbstractOperation.scala
index 2e52757a2..5264ddebd 100644
---
a/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/AbstractOperation.scala
+++
b/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/AbstractOperation.scala
@@ -182,7 +182,11 @@ abstract class AbstractOperation(session: Session) extends
Operation with Loggin
override def getResultSetMetadata: TGetResultSetMetadataResp
- override def getNextRowSet(order: FetchOrientation, rowSetSize: Int): TRowSet
+ def getNextRowSetInternal(order: FetchOrientation, rowSetSize: Int): TRowSet
+
+ override def getNextRowSet(order: FetchOrientation, rowSetSize: Int):
TRowSet = withLockRequired {
+ getNextRowSetInternal(order, rowSetSize)
+ }
/**
* convert SQL 'like' pattern to a Java regular expression.
diff --git
a/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/NoopOperation.scala
b/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/NoopOperation.scala
index 2d1166525..4093d1fca 100644
---
a/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/NoopOperation.scala
+++
b/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/NoopOperation.scala
@@ -76,7 +76,7 @@ class NoopOperation(session: Session, shouldFail: Boolean =
false)
resp
}
- override def getNextRowSet(order: FetchOrientation, rowSetSize: Int):
TRowSet = {
+ override def getNextRowSetInternal(order: FetchOrientation, rowSetSize:
Int): TRowSet = {
val col = TColumn.stringVal(new TStringColumn(Seq(opType).asJava,
ByteBuffer.allocate(0)))
val tRowSet = ThriftUtils.newEmptyRowSet
tRowSet.addToColumns(col)
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/ExecutedCommandExec.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/ExecutedCommandExec.scala
index 98065b8cb..93379da4b 100644
---
a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/ExecutedCommandExec.scala
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/ExecutedCommandExec.scala
@@ -67,7 +67,7 @@ class ExecutedCommandExec(
if (!shouldRunAsync) getBackgroundHandle.get()
}
- override def getNextRowSet(order: FetchOrientation, rowSetSize: Int):
TRowSet = {
+ override def getNextRowSetInternal(order: FetchOrientation, rowSetSize:
Int): TRowSet = {
validateDefaultFetchOrientation(order)
assertState(OperationState.FINISHED)
setHasResultSet(true)
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/KyuubiApplicationOperation.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/KyuubiApplicationOperation.scala
index fc109f499..54cfdfe93 100644
---
a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/KyuubiApplicationOperation.scala
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/KyuubiApplicationOperation.scala
@@ -54,7 +54,7 @@ abstract class KyuubiApplicationOperation(session: Session)
extends KyuubiOperat
resp
}
- override def getNextRowSet(order: FetchOrientation, rowSetSize: Int):
TRowSet = {
+ override def getNextRowSetInternal(order: FetchOrientation, rowSetSize:
Int): TRowSet = {
applicationInfoMap.map { state =>
val tRow = new TRowSet(0, new JArrayList[TRow](state.size))
Seq(state.keys,
state.values.map(Option(_).getOrElse(""))).map(_.toSeq.asJava).foreach {
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/KyuubiOperation.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/KyuubiOperation.scala
index 783581a06..e357b7912 100644
---
a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/KyuubiOperation.scala
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/KyuubiOperation.scala
@@ -179,7 +179,7 @@ abstract class KyuubiOperation(session: Session) extends
AbstractOperation(sessi
}
}
- override def getNextRowSet(order: FetchOrientation, rowSetSize: Int):
TRowSet = {
+ override def getNextRowSetInternal(order: FetchOrientation, rowSetSize:
Int): TRowSet = {
validateDefaultFetchOrientation(order)
assertState(OperationState.FINISHED)
setHasResultSet(true)