This is an automated email from the ASF dual-hosted git repository.

feiwang pushed a commit to branch branch-1.7
in repository https://gitbox.apache.org/repos/asf/kyuubi.git


The following commit(s) were added to refs/heads/branch-1.7 by this push:
     new 0b7a710a1 [KYUUBI #4949] For operation getNextRowSet method, with 
operation lock required
0b7a710a1 is described below

commit 0b7a710a1b86bc30126f6ba2ed25effa878f5de1
Author: fwang12 <[email protected]>
AuthorDate: Mon Jun 12 13:31:36 2023 +0800

    [KYUUBI #4949] For operation getNextRowSet method, with operation lock 
required
    
    For the operation getNextRowSet method, we shall add lock for it.
    
    For example, for spark operation, the result iterator is not thread-safe, 
it might throw exception(if the jdbc client to kyuubi server connection socket 
timeout).
    
    For incremental collect mode, the fetchResult might trigger a spark task to 
collect the incremental result(`self.next().toIterator`).
    
    The jdbc client to kyuubi gateway timeout, but the fetchResult request has 
been sent to engine.
    Then the jdbc client re-send the fetchResult request.
    
    And the getNextResultSet in spark engine side concurrent execute.
    
    And the result iterator is not thread-safe and might cause NPE.
    
    
![image](https://github.com/apache/kyuubi/assets/6757692/03c369c7-dc12-40d7-aac3-c8f5e799d1cf)
    
![image](https://github.com/apache/kyuubi/assets/6757692/a3414f84-5112-4ea6-a611-f15e6288aba2)
    
    - [ ] Add some test cases that check the changes thoroughly including 
negative and positive cases if possible
    
    - [ ] Add screenshots for manual tests if appropriate
    
    - [x] [Run 
test](https://kyuubi.readthedocs.io/en/master/develop_tools/testing.html#running-tests)
 locally before make a pull request
    
    Closes #4949 from turboFei/lock_next_rowset.
    
    Closes #4949
    
    8f18f3236 [fwang12] getNextRowSetInternal and withLockRequired
    
    Authored-by: fwang12 <[email protected]>
    Signed-off-by: fwang12 <[email protected]>
    (cherry picked from commit 7d8e89c27f4e489079e353e40470592a50cfd45c)
    Signed-off-by: fwang12 <[email protected]>
---
 .../org/apache/kyuubi/engine/flink/operation/FlinkOperation.scala   | 2 +-
 .../org/apache/kyuubi/engine/hive/operation/HiveOperation.scala     | 2 +-
 .../org/apache/kyuubi/engine/jdbc/operation/JdbcOperation.scala     | 2 +-
 .../org/apache/kyuubi/engine/spark/operation/SparkOperation.scala   | 2 +-
 .../org/apache/kyuubi/engine/trino/operation/ExecuteStatement.scala | 2 +-
 .../org/apache/kyuubi/engine/trino/operation/TrinoOperation.scala   | 2 +-
 .../main/scala/org/apache/kyuubi/operation/AbstractOperation.scala  | 6 +++++-
 .../src/test/scala/org/apache/kyuubi/operation/NoopOperation.scala  | 2 +-
 .../scala/org/apache/kyuubi/operation/ExecutedCommandExec.scala     | 2 +-
 .../org/apache/kyuubi/operation/KyuubiApplicationOperation.scala    | 2 +-
 .../main/scala/org/apache/kyuubi/operation/KyuubiOperation.scala    | 2 +-
 11 files changed, 15 insertions(+), 11 deletions(-)

diff --git 
a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperation.scala
 
b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperation.scala
index ba450828a..ecef37622 100644
--- 
a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperation.scala
+++ 
b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperation.scala
@@ -85,7 +85,7 @@ abstract class FlinkOperation(session: Session) extends 
AbstractOperation(sessio
     resp
   }
 
-  override def getNextRowSet(order: FetchOrientation, rowSetSize: Int): 
TRowSet = {
+  override def getNextRowSetInternal(order: FetchOrientation, rowSetSize: 
Int): TRowSet = {
     validateDefaultFetchOrientation(order)
     assertState(OperationState.FINISHED)
     setHasResultSet(true)
diff --git 
a/externals/kyuubi-hive-sql-engine/src/main/scala/org/apache/kyuubi/engine/hive/operation/HiveOperation.scala
 
b/externals/kyuubi-hive-sql-engine/src/main/scala/org/apache/kyuubi/engine/hive/operation/HiveOperation.scala
index c02569784..2df4a072a 100644
--- 
a/externals/kyuubi-hive-sql-engine/src/main/scala/org/apache/kyuubi/engine/hive/operation/HiveOperation.scala
+++ 
b/externals/kyuubi-hive-sql-engine/src/main/scala/org/apache/kyuubi/engine/hive/operation/HiveOperation.scala
@@ -92,7 +92,7 @@ abstract class HiveOperation(session: Session) extends 
AbstractOperation(session
     resp
   }
 
-  override def getNextRowSet(order: FetchOrientation, rowSetSize: Int): 
TRowSet = {
+  override def getNextRowSetInternal(order: FetchOrientation, rowSetSize: 
Int): TRowSet = {
     val tOrder = FetchOrientation.toTFetchOrientation(order)
     val hiveOrder = 
org.apache.hive.service.cli.FetchOrientation.getFetchOrientation(tOrder)
     val rowSet = internalHiveOperation.getNextRowSet(hiveOrder, rowSetSize)
diff --git 
a/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/operation/JdbcOperation.scala
 
b/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/operation/JdbcOperation.scala
index f4d1c27e7..1c4e0c57f 100644
--- 
a/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/operation/JdbcOperation.scala
+++ 
b/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/operation/JdbcOperation.scala
@@ -36,7 +36,7 @@ abstract class JdbcOperation(session: Session) extends 
AbstractOperation(session
 
   protected lazy val dialect: JdbcDialect = JdbcDialects.get(conf)
 
-  override def getNextRowSet(order: FetchOrientation, rowSetSize: Int): 
TRowSet = {
+  override def getNextRowSetInternal(order: FetchOrientation, rowSetSize: 
Int): TRowSet = {
     validateDefaultFetchOrientation(order)
     assertState(OperationState.FINISHED)
     setHasResultSet(true)
diff --git 
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkOperation.scala
 
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkOperation.scala
index 320e67635..5062a4904 100644
--- 
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkOperation.scala
+++ 
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkOperation.scala
@@ -233,7 +233,7 @@ abstract class SparkOperation(session: Session)
     resp
   }
 
-  override def getNextRowSet(order: FetchOrientation, rowSetSize: Int): 
TRowSet = {
+  override def getNextRowSetInternal(order: FetchOrientation, rowSetSize: 
Int): TRowSet = {
     var resultRowSet: TRowSet = null
     try {
       withLocalProperties {
diff --git 
a/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/operation/ExecuteStatement.scala
 
b/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/operation/ExecuteStatement.scala
index eb1b27300..405c93b0b 100644
--- 
a/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/operation/ExecuteStatement.scala
+++ 
b/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/operation/ExecuteStatement.scala
@@ -82,7 +82,7 @@ class ExecuteStatement(
     }
   }
 
-  override def getNextRowSet(order: FetchOrientation, rowSetSize: Int): 
TRowSet = {
+  override def getNextRowSetInternal(order: FetchOrientation, rowSetSize: 
Int): TRowSet = {
     validateDefaultFetchOrientation(order)
     assertState(OperationState.FINISHED)
     setHasResultSet(true)
diff --git 
a/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/operation/TrinoOperation.scala
 
b/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/operation/TrinoOperation.scala
index bff058605..20153b84f 100644
--- 
a/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/operation/TrinoOperation.scala
+++ 
b/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/operation/TrinoOperation.scala
@@ -54,7 +54,7 @@ abstract class TrinoOperation(session: Session) extends 
AbstractOperation(sessio
     resp
   }
 
-  override def getNextRowSet(order: FetchOrientation, rowSetSize: Int): 
TRowSet = {
+  override def getNextRowSetInternal(order: FetchOrientation, rowSetSize: 
Int): TRowSet = {
     validateDefaultFetchOrientation(order)
     assertState(OperationState.FINISHED)
     setHasResultSet(true)
diff --git 
a/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/AbstractOperation.scala
 
b/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/AbstractOperation.scala
index 75d75dbb0..97ed4ffc6 100644
--- 
a/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/AbstractOperation.scala
+++ 
b/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/AbstractOperation.scala
@@ -182,7 +182,11 @@ abstract class AbstractOperation(session: Session) extends 
Operation with Loggin
 
   override def getResultSetMetadata: TGetResultSetMetadataResp
 
-  override def getNextRowSet(order: FetchOrientation, rowSetSize: Int): TRowSet
+  def getNextRowSetInternal(order: FetchOrientation, rowSetSize: Int): TRowSet
+
+  override def getNextRowSet(order: FetchOrientation, rowSetSize: Int): 
TRowSet = withLockRequired {
+    getNextRowSetInternal(order, rowSetSize)
+  }
 
   /**
    * convert SQL 'like' pattern to a Java regular expression.
diff --git 
a/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/NoopOperation.scala 
b/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/NoopOperation.scala
index 2d1166525..4093d1fca 100644
--- 
a/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/NoopOperation.scala
+++ 
b/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/NoopOperation.scala
@@ -76,7 +76,7 @@ class NoopOperation(session: Session, shouldFail: Boolean = 
false)
     resp
   }
 
-  override def getNextRowSet(order: FetchOrientation, rowSetSize: Int): 
TRowSet = {
+  override def getNextRowSetInternal(order: FetchOrientation, rowSetSize: 
Int): TRowSet = {
     val col = TColumn.stringVal(new TStringColumn(Seq(opType).asJava, 
ByteBuffer.allocate(0)))
     val tRowSet = ThriftUtils.newEmptyRowSet
     tRowSet.addToColumns(col)
diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/ExecutedCommandExec.scala
 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/ExecutedCommandExec.scala
index 98065b8cb..93379da4b 100644
--- 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/ExecutedCommandExec.scala
+++ 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/ExecutedCommandExec.scala
@@ -67,7 +67,7 @@ class ExecutedCommandExec(
     if (!shouldRunAsync) getBackgroundHandle.get()
   }
 
-  override def getNextRowSet(order: FetchOrientation, rowSetSize: Int): 
TRowSet = {
+  override def getNextRowSetInternal(order: FetchOrientation, rowSetSize: 
Int): TRowSet = {
     validateDefaultFetchOrientation(order)
     assertState(OperationState.FINISHED)
     setHasResultSet(true)
diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/KyuubiApplicationOperation.scala
 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/KyuubiApplicationOperation.scala
index e39b9024f..0a497f876 100644
--- 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/KyuubiApplicationOperation.scala
+++ 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/KyuubiApplicationOperation.scala
@@ -54,7 +54,7 @@ abstract class KyuubiApplicationOperation(session: Session) 
extends KyuubiOperat
     resp
   }
 
-  override def getNextRowSet(order: FetchOrientation, rowSetSize: Int): 
TRowSet = {
+  override def getNextRowSetInternal(order: FetchOrientation, rowSetSize: 
Int): TRowSet = {
     currentApplicationInfo.map(_.toMap).map { state =>
       val tRow = new TRowSet(0, new JArrayList[TRow](state.size))
       Seq(state.keys, 
state.values.map(Option(_).getOrElse(""))).map(_.toSeq.asJava).foreach {
diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/KyuubiOperation.scala
 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/KyuubiOperation.scala
index 44b2becd7..660405bcf 100644
--- 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/KyuubiOperation.scala
+++ 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/KyuubiOperation.scala
@@ -161,7 +161,7 @@ abstract class KyuubiOperation(session: Session) extends 
AbstractOperation(sessi
     }
   }
 
-  override def getNextRowSet(order: FetchOrientation, rowSetSize: Int): 
TRowSet = {
+  override def getNextRowSetInternal(order: FetchOrientation, rowSetSize: 
Int): TRowSet = {
     validateDefaultFetchOrientation(order)
     assertState(OperationState.FINISHED)
     setHasResultSet(true)

Reply via email to