This is an automated email from the ASF dual-hosted git repository.
chengpan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git
The following commit(s) were added to refs/heads/master by this push:
new 14d0dab69 [KYUUBI #5160] Refactor getNextRowSetInternal to support
fetch streaming data
14d0dab69 is described below
commit 14d0dab697c314366750b8b80de057aee0155ffd
Author: Paul Lin <[email protected]>
AuthorDate: Tue Aug 15 16:06:10 2023 +0800
[KYUUBI #5160] Refactor getNextRowSetInternal to support fetch streaming
data
### _Why are the changes needed?_
Currently, `getNextRowSetInternal` returns `TRowSet` which is not friendly
to explicit EOS in streaming result fetch.
This PR changes the return type to `TFetchResultsResp` to allow the engines
to determine the EOS.
### _How was this patch tested?_
- [x] Add some test cases that check the changes thoroughly including
negative and positive cases if possible
- [ ] Add screenshots for manual tests if appropriate
- [x] [Run
test](https://kyuubi.readthedocs.io/en/master/contributing/code/testing.html#running-tests)
locally before make a pull request
Closes #5160 from link3280/refactor_result.
Closes #5160
09822f2ee [Paul Lin] Fix hasMoreRows missing
c94907e2b [Paul Lin] Explicitly set `resp.setHasMoreRows(false)` for
operations
4d193fb1d [Paul Lin] Revert unrelated changes in FlinkOperation
ffd0367b3 [Paul Lin] Refactor getNextRowSetInternal to support fetch
streaming data
Authored-by: Paul Lin <[email protected]>
Signed-off-by: Cheng Pan <[email protected]>
---
.../engine/chat/operation/ChatOperation.scala | 9 ++++++--
.../engine/flink/operation/FlinkOperation.scala | 11 ++++++---
.../engine/hive/operation/HiveOperation.scala | 19 +++++++++++----
.../hive/operation/HiveOperationManager.scala | 4 ++--
.../engine/jdbc/operation/JdbcOperation.scala | 11 ++++++---
.../engine/spark/operation/SparkOperation.scala | 11 ++++++---
.../engine/trino/operation/ExecuteStatement.scala | 11 ++++++---
.../engine/trino/operation/TrinoOperation.scala | 11 ++++++---
.../kyuubi/operation/AbstractOperation.scala | 11 +++++----
.../apache/kyuubi/operation/FetchIterator.scala | 2 +-
.../org/apache/kyuubi/operation/Operation.scala | 4 ++--
.../apache/kyuubi/operation/OperationManager.scala | 10 +++++---
.../kyuubi/service/AbstractBackendService.scala | 2 +-
.../org/apache/kyuubi/service/BackendService.scala | 2 +-
.../apache/kyuubi/service/TFrontendService.scala | 9 +++-----
.../apache/kyuubi/session/AbstractSession.scala | 2 +-
.../scala/org/apache/kyuubi/session/Session.scala | 4 ++--
.../apache/kyuubi/operation/NoopOperation.scala | 11 ++++++---
.../kyuubi/operation/NoopOperationManager.scala | 9 +++++---
.../kyuubi/jdbc/hive/KyuubiQueryResultSet.java | 27 ++++++++++++++--------
.../kyuubi/operation/ExecutedCommandExec.scala | 12 +++++++---
.../operation/KyuubiApplicationOperation.scala | 12 +++++++---
.../apache/kyuubi/operation/KyuubiOperation.scala | 10 ++++++--
.../kyuubi/operation/KyuubiOperationManager.scala | 11 +++++----
.../kyuubi/server/BackendServiceMetric.scala | 7 +++---
.../kyuubi/server/api/v1/OperationsResource.scala | 6 +++--
.../kyuubi/server/mysql/MySQLCommandHandler.scala | 3 ++-
.../org/apache/kyuubi/server/trino/api/Query.scala | 2 +-
.../org/apache/kyuubi/WithKyuubiServerOnYarn.scala | 2 +-
.../server/trino/api/TrinoContextSuite.scala | 4 ++--
30 files changed, 166 insertions(+), 83 deletions(-)
diff --git
a/externals/kyuubi-chat-engine/src/main/scala/org/apache/kyuubi/engine/chat/operation/ChatOperation.scala
b/externals/kyuubi-chat-engine/src/main/scala/org/apache/kyuubi/engine/chat/operation/ChatOperation.scala
index 3ae21aa9f..b0b1806f8 100644
---
a/externals/kyuubi-chat-engine/src/main/scala/org/apache/kyuubi/engine/chat/operation/ChatOperation.scala
+++
b/externals/kyuubi-chat-engine/src/main/scala/org/apache/kyuubi/engine/chat/operation/ChatOperation.scala
@@ -31,7 +31,9 @@ abstract class ChatOperation(session: Session) extends
AbstractOperation(session
protected lazy val conf: KyuubiConf = session.sessionManager.getConf
- override def getNextRowSetInternal(order: FetchOrientation, rowSetSize:
Int): TRowSet = {
+ override def getNextRowSetInternal(
+ order: FetchOrientation,
+ rowSetSize: Int): TFetchResultsResp = {
validateDefaultFetchOrientation(order)
assertState(OperationState.FINISHED)
setHasResultSet(true)
@@ -47,7 +49,10 @@ abstract class ChatOperation(session: Session) extends
AbstractOperation(session
val taken = iter.take(rowSetSize)
val resultRowSet = RowSet.toTRowSet(taken.toSeq, 1, getProtocolVersion)
resultRowSet.setStartRowOffset(iter.getPosition)
- resultRowSet
+ val resp = new TFetchResultsResp(OK_STATUS)
+ resp.setResults(resultRowSet)
+ resp.setHasMoreRows(false)
+ resp
}
override def cancel(): Unit = {
diff --git
a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperation.scala
b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperation.scala
index ef80034d3..5a79d2c0e 100644
---
a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperation.scala
+++
b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperation.scala
@@ -25,7 +25,7 @@ import
scala.collection.JavaConverters.collectionAsScalaIterableConverter
import org.apache.flink.configuration.Configuration
import org.apache.flink.table.gateway.service.context.SessionContext
import org.apache.flink.table.gateway.service.operation.OperationExecutor
-import org.apache.hive.service.rpc.thrift.{TGetResultSetMetadataResp, TRowSet,
TTableSchema}
+import org.apache.hive.service.rpc.thrift.{TFetchResultsResp,
TGetResultSetMetadataResp, TTableSchema}
import org.apache.kyuubi.{KyuubiSQLException, Utils}
import org.apache.kyuubi.engine.flink.result.ResultSet
@@ -91,7 +91,9 @@ abstract class FlinkOperation(session: Session) extends
AbstractOperation(sessio
resp
}
- override def getNextRowSetInternal(order: FetchOrientation, rowSetSize:
Int): TRowSet = {
+ override def getNextRowSetInternal(
+ order: FetchOrientation,
+ rowSetSize: Int): TFetchResultsResp = {
validateDefaultFetchOrientation(order)
assertState(OperationState.FINISHED)
setHasResultSet(true)
@@ -112,7 +114,10 @@ abstract class FlinkOperation(session: Session) extends
AbstractOperation(sessio
zoneId,
getProtocolVersion)
resultRowSet.setStartRowOffset(resultSet.getData.getPosition)
- resultRowSet
+ val resp = new TFetchResultsResp(OK_STATUS)
+ resp.setResults(resultRowSet)
+ resp.setHasMoreRows(false)
+ resp
}
override def shouldRunAsync: Boolean = false
diff --git
a/externals/kyuubi-hive-sql-engine/src/main/scala/org/apache/kyuubi/engine/hive/operation/HiveOperation.scala
b/externals/kyuubi-hive-sql-engine/src/main/scala/org/apache/kyuubi/engine/hive/operation/HiveOperation.scala
index c7166356d..9759fa00b 100644
---
a/externals/kyuubi-hive-sql-engine/src/main/scala/org/apache/kyuubi/engine/hive/operation/HiveOperation.scala
+++
b/externals/kyuubi-hive-sql-engine/src/main/scala/org/apache/kyuubi/engine/hive/operation/HiveOperation.scala
@@ -21,7 +21,7 @@ import java.util.concurrent.Future
import org.apache.hive.service.cli.operation.{Operation, OperationManager}
import org.apache.hive.service.cli.session.{HiveSession, SessionManager =>
HiveSessionManager}
-import org.apache.hive.service.rpc.thrift.{TGetResultSetMetadataResp, TRowSet}
+import org.apache.hive.service.rpc.thrift.{TFetchResultsResp,
TGetResultSetMetadataResp}
import org.apache.kyuubi.KyuubiSQLException
import org.apache.kyuubi.config.KyuubiReservedKeys.KYUUBI_SESSION_USER_KEY
@@ -95,22 +95,31 @@ abstract class HiveOperation(session: Session) extends
AbstractOperation(session
resp
}
- override def getNextRowSetInternal(order: FetchOrientation, rowSetSize:
Int): TRowSet = {
+ override def getNextRowSetInternal(
+ order: FetchOrientation,
+ rowSetSize: Int): TFetchResultsResp = {
val tOrder = FetchOrientation.toTFetchOrientation(order)
val hiveOrder =
org.apache.hive.service.cli.FetchOrientation.getFetchOrientation(tOrder)
val rowSet = internalHiveOperation.getNextRowSet(hiveOrder, rowSetSize)
- rowSet.toTRowSet
+ val resp = new TFetchResultsResp(OK_STATUS)
+ resp.setResults(rowSet.toTRowSet)
+ resp.setHasMoreRows(false)
+ resp
}
- def getOperationLogRowSet(order: FetchOrientation, rowSetSize: Int): TRowSet
= {
+ def getOperationLogRowSet(order: FetchOrientation, rowSetSize: Int):
TFetchResultsResp = {
val tOrder = FetchOrientation.toTFetchOrientation(order)
val hiveOrder =
org.apache.hive.service.cli.FetchOrientation.getFetchOrientation(tOrder)
val handle = internalHiveOperation.getHandle
- delegatedOperationManager.getOperationLogRowSet(
+ val rowSet = delegatedOperationManager.getOperationLogRowSet(
handle,
hiveOrder,
rowSetSize,
hive.getHiveConf).toTRowSet
+ val resp = new TFetchResultsResp(OK_STATUS)
+ resp.setResults(rowSet)
+ resp.setHasMoreRows(false)
+ resp
}
override def isTimedOut: Boolean =
internalHiveOperation.isTimedOut(System.currentTimeMillis)
diff --git
a/externals/kyuubi-hive-sql-engine/src/main/scala/org/apache/kyuubi/engine/hive/operation/HiveOperationManager.scala
b/externals/kyuubi-hive-sql-engine/src/main/scala/org/apache/kyuubi/engine/hive/operation/HiveOperationManager.scala
index 0762a2938..4e41e742e 100644
---
a/externals/kyuubi-hive-sql-engine/src/main/scala/org/apache/kyuubi/engine/hive/operation/HiveOperationManager.scala
+++
b/externals/kyuubi-hive-sql-engine/src/main/scala/org/apache/kyuubi/engine/hive/operation/HiveOperationManager.scala
@@ -20,7 +20,7 @@ package org.apache.kyuubi.engine.hive.operation
import java.util.List
import org.apache.hadoop.hive.conf.HiveConf.ConfVars
-import org.apache.hive.service.rpc.thrift.TRowSet
+import org.apache.hive.service.rpc.thrift.TFetchResultsResp
import org.apache.kyuubi.config.KyuubiConf._
import org.apache.kyuubi.engine.hive.session.HiveSessionImpl
@@ -154,7 +154,7 @@ class HiveOperationManager() extends
OperationManager("HiveOperationManager") {
override def getOperationLogRowSet(
opHandle: OperationHandle,
order: FetchOrientation,
- maxRows: Int): TRowSet = {
+ maxRows: Int): TFetchResultsResp = {
val operation = getOperation(opHandle).asInstanceOf[HiveOperation]
operation.getOperationLogRowSet(order, maxRows)
}
diff --git
a/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/operation/JdbcOperation.scala
b/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/operation/JdbcOperation.scala
index 1c4e0c57f..2ca173757 100644
---
a/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/operation/JdbcOperation.scala
+++
b/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/operation/JdbcOperation.scala
@@ -16,7 +16,7 @@
*/
package org.apache.kyuubi.engine.jdbc.operation
-import org.apache.hive.service.rpc.thrift.{TGetResultSetMetadataResp, TRowSet}
+import org.apache.hive.service.rpc.thrift.{TFetchResultsResp,
TGetResultSetMetadataResp, TRowSet}
import org.apache.kyuubi.{KyuubiSQLException, Utils}
import org.apache.kyuubi.config.KyuubiConf
@@ -36,7 +36,9 @@ abstract class JdbcOperation(session: Session) extends
AbstractOperation(session
protected lazy val dialect: JdbcDialect = JdbcDialects.get(conf)
- override def getNextRowSetInternal(order: FetchOrientation, rowSetSize:
Int): TRowSet = {
+ override def getNextRowSetInternal(
+ order: FetchOrientation,
+ rowSetSize: Int): TFetchResultsResp = {
validateDefaultFetchOrientation(order)
assertState(OperationState.FINISHED)
setHasResultSet(true)
@@ -51,7 +53,10 @@ abstract class JdbcOperation(session: Session) extends
AbstractOperation(session
val taken = iter.take(rowSetSize)
val resultRowSet = toTRowSet(taken)
resultRowSet.setStartRowOffset(iter.getPosition)
- resultRowSet
+ val resp = new TFetchResultsResp(OK_STATUS)
+ resp.setResults(resultRowSet)
+ resp.setHasMoreRows(false)
+ resp
}
override def cancel(): Unit = {
diff --git
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkOperation.scala
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkOperation.scala
index 5062a4904..782f443df 100644
---
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkOperation.scala
+++
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkOperation.scala
@@ -20,7 +20,7 @@ package org.apache.kyuubi.engine.spark.operation
import java.io.IOException
import java.time.ZoneId
-import org.apache.hive.service.rpc.thrift.{TGetResultSetMetadataResp,
TProgressUpdateResp, TRowSet}
+import org.apache.hive.service.rpc.thrift.{TFetchResultsResp,
TGetResultSetMetadataResp, TProgressUpdateResp, TRowSet}
import org.apache.spark.kyuubi.{SparkProgressMonitor, SQLOperationListener}
import org.apache.spark.kyuubi.SparkUtilsHelper.redact
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
@@ -233,7 +233,9 @@ abstract class SparkOperation(session: Session)
resp
}
- override def getNextRowSetInternal(order: FetchOrientation, rowSetSize:
Int): TRowSet = {
+ override def getNextRowSetInternal(
+ order: FetchOrientation,
+ rowSetSize: Int): TFetchResultsResp = {
var resultRowSet: TRowSet = null
try {
withLocalProperties {
@@ -264,7 +266,10 @@ abstract class SparkOperation(session: Session)
}
} catch onError(cancel = true)
- resultRowSet
+ val resp = new TFetchResultsResp(OK_STATUS)
+ resp.setResults(resultRowSet)
+ resp.setHasMoreRows(false)
+ resp
}
override def shouldRunAsync: Boolean = false
diff --git
a/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/operation/ExecuteStatement.scala
b/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/operation/ExecuteStatement.scala
index 405c93b0b..3e7cce80c 100644
---
a/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/operation/ExecuteStatement.scala
+++
b/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/operation/ExecuteStatement.scala
@@ -19,7 +19,7 @@ package org.apache.kyuubi.engine.trino.operation
import java.util.concurrent.RejectedExecutionException
-import org.apache.hive.service.rpc.thrift.TRowSet
+import org.apache.hive.service.rpc.thrift.TFetchResultsResp
import org.apache.kyuubi.{KyuubiSQLException, Logging}
import org.apache.kyuubi.engine.trino.TrinoStatement
@@ -82,7 +82,9 @@ class ExecuteStatement(
}
}
- override def getNextRowSetInternal(order: FetchOrientation, rowSetSize:
Int): TRowSet = {
+ override def getNextRowSetInternal(
+ order: FetchOrientation,
+ rowSetSize: Int): TFetchResultsResp = {
validateDefaultFetchOrientation(order)
assertState(OperationState.FINISHED)
setHasResultSet(true)
@@ -97,7 +99,10 @@ class ExecuteStatement(
val taken = iter.take(rowSetSize)
val resultRowSet = RowSet.toTRowSet(taken.toList, schema,
getProtocolVersion)
resultRowSet.setStartRowOffset(iter.getPosition)
- resultRowSet
+ val fetchResultsResp = new TFetchResultsResp(OK_STATUS)
+ fetchResultsResp.setResults(resultRowSet)
+ fetchResultsResp.setHasMoreRows(false)
+ fetchResultsResp
}
private def executeStatement(trinoStatement: TrinoStatement): Unit = {
diff --git
a/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/operation/TrinoOperation.scala
b/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/operation/TrinoOperation.scala
index 20153b84f..11eaa1bc1 100644
---
a/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/operation/TrinoOperation.scala
+++
b/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/operation/TrinoOperation.scala
@@ -21,7 +21,7 @@ import java.io.IOException
import io.trino.client.Column
import io.trino.client.StatementClient
-import org.apache.hive.service.rpc.thrift.{TGetResultSetMetadataResp, TRowSet}
+import org.apache.hive.service.rpc.thrift.{TFetchResultsResp,
TGetResultSetMetadataResp}
import org.apache.kyuubi.KyuubiSQLException
import org.apache.kyuubi.Utils
@@ -54,7 +54,9 @@ abstract class TrinoOperation(session: Session) extends
AbstractOperation(sessio
resp
}
- override def getNextRowSetInternal(order: FetchOrientation, rowSetSize:
Int): TRowSet = {
+ override def getNextRowSetInternal(
+ order: FetchOrientation,
+ rowSetSize: Int): TFetchResultsResp = {
validateDefaultFetchOrientation(order)
assertState(OperationState.FINISHED)
setHasResultSet(true)
@@ -66,7 +68,10 @@ abstract class TrinoOperation(session: Session) extends
AbstractOperation(sessio
val taken = iter.take(rowSetSize)
val resultRowSet = RowSet.toTRowSet(taken.toList, schema,
getProtocolVersion)
resultRowSet.setStartRowOffset(iter.getPosition)
- resultRowSet
+ val resp = new TFetchResultsResp(OK_STATUS)
+ resp.setResults(resultRowSet)
+ resp.setHasMoreRows(false)
+ resp
}
override protected def beforeRun(): Unit = {
diff --git
a/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/AbstractOperation.scala
b/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/AbstractOperation.scala
index 94f53111d..0a185b942 100644
---
a/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/AbstractOperation.scala
+++
b/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/AbstractOperation.scala
@@ -23,7 +23,7 @@ import java.util.concurrent.locks.ReentrantLock
import scala.collection.JavaConverters._
import org.apache.commons.lang3.StringUtils
-import org.apache.hive.service.rpc.thrift.{TGetResultSetMetadataResp,
TProgressUpdateResp, TProtocolVersion, TRowSet, TStatus, TStatusCode}
+import org.apache.hive.service.rpc.thrift.{TFetchResultsResp,
TGetResultSetMetadataResp, TProgressUpdateResp, TProtocolVersion, TStatus,
TStatusCode}
import org.apache.kyuubi.{KyuubiSQLException, Logging, Utils}
import org.apache.kyuubi.config.KyuubiConf.OPERATION_IDLE_TIMEOUT
@@ -182,11 +182,12 @@ abstract class AbstractOperation(session: Session)
extends Operation with Loggin
override def getResultSetMetadata: TGetResultSetMetadataResp
- def getNextRowSetInternal(order: FetchOrientation, rowSetSize: Int): TRowSet
+ def getNextRowSetInternal(order: FetchOrientation, rowSetSize: Int):
TFetchResultsResp
- override def getNextRowSet(order: FetchOrientation, rowSetSize: Int):
TRowSet = withLockRequired {
- getNextRowSetInternal(order, rowSetSize)
- }
+ override def getNextRowSet(order: FetchOrientation, rowSetSize: Int):
TFetchResultsResp =
+ withLockRequired {
+ getNextRowSetInternal(order, rowSetSize)
+ }
/**
* convert SQL 'like' pattern to a Java regular expression.
diff --git
a/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/FetchIterator.scala
b/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/FetchIterator.scala
index fdada1174..ada155887 100644
---
a/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/FetchIterator.scala
+++
b/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/FetchIterator.scala
@@ -20,7 +20,7 @@ package org.apache.kyuubi.operation
/**
* Borrowed from Apache Spark, see SPARK-33655
*/
-sealed trait FetchIterator[A] extends Iterator[A] {
+trait FetchIterator[A] extends Iterator[A] {
/**
* Begin a fetch block, forward from the current position.
diff --git
a/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/Operation.scala
b/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/Operation.scala
index 6f496c9b8..c20a16f61 100644
--- a/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/Operation.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/Operation.scala
@@ -19,7 +19,7 @@ package org.apache.kyuubi.operation
import java.util.concurrent.Future
-import org.apache.hive.service.rpc.thrift.{TGetResultSetMetadataResp, TRowSet}
+import org.apache.hive.service.rpc.thrift.{TFetchResultsResp,
TGetResultSetMetadataResp}
import org.apache.kyuubi.operation.FetchOrientation.FetchOrientation
import org.apache.kyuubi.operation.log.OperationLog
@@ -32,7 +32,7 @@ trait Operation {
def close(): Unit
def getResultSetMetadata: TGetResultSetMetadataResp
- def getNextRowSet(order: FetchOrientation, rowSetSize: Int): TRowSet
+ def getNextRowSet(order: FetchOrientation, rowSetSize: Int):
TFetchResultsResp
def getSession: Session
def getHandle: OperationHandle
diff --git
a/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/OperationManager.scala
b/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/OperationManager.scala
index 3093bcfbe..38dabcc1a 100644
---
a/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/OperationManager.scala
+++
b/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/OperationManager.scala
@@ -137,18 +137,22 @@ abstract class OperationManager(name: String) extends
AbstractService(name) {
final def getOperationNextRowSet(
opHandle: OperationHandle,
order: FetchOrientation,
- maxRows: Int): TRowSet = {
+ maxRows: Int): TFetchResultsResp = {
getOperation(opHandle).getNextRowSet(order, maxRows)
}
def getOperationLogRowSet(
opHandle: OperationHandle,
order: FetchOrientation,
- maxRows: Int): TRowSet = {
+ maxRows: Int): TFetchResultsResp = {
val operationLog = getOperation(opHandle).getOperationLog
- operationLog.map(_.read(order, maxRows)).getOrElse {
+ val rowSet = operationLog.map(_.read(order, maxRows)).getOrElse {
throw KyuubiSQLException(s"$opHandle failed to generate operation log")
}
+ val resp = new TFetchResultsResp(new TStatus(TStatusCode.SUCCESS_STATUS))
+ resp.setResults(rowSet)
+ resp.setHasMoreRows(false)
+ resp
}
final def removeExpiredOperations(handles: Seq[OperationHandle]):
Seq[Operation] = synchronized {
diff --git
a/kyuubi-common/src/main/scala/org/apache/kyuubi/service/AbstractBackendService.scala
b/kyuubi-common/src/main/scala/org/apache/kyuubi/service/AbstractBackendService.scala
index 171e04901..443b35354 100644
---
a/kyuubi-common/src/main/scala/org/apache/kyuubi/service/AbstractBackendService.scala
+++
b/kyuubi-common/src/main/scala/org/apache/kyuubi/service/AbstractBackendService.scala
@@ -201,7 +201,7 @@ abstract class AbstractBackendService(name: String)
operationHandle: OperationHandle,
orientation: FetchOrientation,
maxRows: Int,
- fetchLog: Boolean): TRowSet = {
+ fetchLog: Boolean): TFetchResultsResp = {
maxRowsLimit.foreach(limit =>
if (maxRows > limit) {
throw new IllegalArgumentException(s"Max rows for fetching results " +
diff --git
a/kyuubi-common/src/main/scala/org/apache/kyuubi/service/BackendService.scala
b/kyuubi-common/src/main/scala/org/apache/kyuubi/service/BackendService.scala
index 968a94197..85df9024c 100644
---
a/kyuubi-common/src/main/scala/org/apache/kyuubi/service/BackendService.scala
+++
b/kyuubi-common/src/main/scala/org/apache/kyuubi/service/BackendService.scala
@@ -101,7 +101,7 @@ trait BackendService {
operationHandle: OperationHandle,
orientation: FetchOrientation,
maxRows: Int,
- fetchLog: Boolean): TRowSet
+ fetchLog: Boolean): TFetchResultsResp
def sessionManager: SessionManager
}
diff --git
a/kyuubi-common/src/main/scala/org/apache/kyuubi/service/TFrontendService.scala
b/kyuubi-common/src/main/scala/org/apache/kyuubi/service/TFrontendService.scala
index aa012ab56..7cc23779f 100644
---
a/kyuubi-common/src/main/scala/org/apache/kyuubi/service/TFrontendService.scala
+++
b/kyuubi-common/src/main/scala/org/apache/kyuubi/service/TFrontendService.scala
@@ -520,23 +520,20 @@ abstract class TFrontendService(name: String)
override def FetchResults(req: TFetchResultsReq): TFetchResultsResp = {
debug(req.toString)
- val resp = new TFetchResultsResp
try {
val operationHandle = OperationHandle(req.getOperationHandle)
val orientation =
FetchOrientation.getFetchOrientation(req.getOrientation)
// 1 means fetching log
val fetchLog = req.getFetchType == 1
val maxRows = req.getMaxRows.toInt
- val rowSet = be.fetchResults(operationHandle, orientation, maxRows,
fetchLog)
- resp.setResults(rowSet)
- resp.setHasMoreRows(false)
- resp.setStatus(OK_STATUS)
+ be.fetchResults(operationHandle, orientation, maxRows, fetchLog)
} catch {
case e: Exception =>
error("Error fetching results: ", e)
+ val resp = new TFetchResultsResp
resp.setStatus(KyuubiSQLException.toTStatus(e))
+ resp
}
- resp
}
protected def notSupportTokenErrorStatus = {
diff --git
a/kyuubi-common/src/main/scala/org/apache/kyuubi/session/AbstractSession.scala
b/kyuubi-common/src/main/scala/org/apache/kyuubi/session/AbstractSession.scala
index 1a8c51ccd..a9e33f5a0 100644
---
a/kyuubi-common/src/main/scala/org/apache/kyuubi/session/AbstractSession.scala
+++
b/kyuubi-common/src/main/scala/org/apache/kyuubi/session/AbstractSession.scala
@@ -233,7 +233,7 @@ abstract class AbstractSession(
operationHandle: OperationHandle,
orientation: FetchOrientation,
maxRows: Int,
- fetchLog: Boolean): TRowSet = {
+ fetchLog: Boolean): TFetchResultsResp = {
if (fetchLog) {
sessionManager.operationManager.getOperationLogRowSet(operationHandle,
orientation, maxRows)
} else {
diff --git
a/kyuubi-common/src/main/scala/org/apache/kyuubi/session/Session.scala
b/kyuubi-common/src/main/scala/org/apache/kyuubi/session/Session.scala
index bc9f9a8f6..2cdac9f3a 100644
--- a/kyuubi-common/src/main/scala/org/apache/kyuubi/session/Session.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/session/Session.scala
@@ -17,7 +17,7 @@
package org.apache.kyuubi.session
-import org.apache.hive.service.rpc.thrift.{TGetInfoType, TGetInfoValue,
TGetResultSetMetadataResp, TProtocolVersion, TRowSet}
+import org.apache.hive.service.rpc.thrift.{TFetchResultsResp, TGetInfoType,
TGetInfoValue, TGetResultSetMetadataResp, TProtocolVersion}
import org.apache.kyuubi.operation.FetchOrientation.FetchOrientation
import org.apache.kyuubi.operation.OperationHandle
@@ -91,7 +91,7 @@ trait Session {
operationHandle: OperationHandle,
orientation: FetchOrientation,
maxRows: Int,
- fetchLog: Boolean): TRowSet
+ fetchLog: Boolean): TFetchResultsResp
def closeExpiredOperations(): Unit
}
diff --git
a/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/NoopOperation.scala
b/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/NoopOperation.scala
index 4093d1fca..c369e00ef 100644
---
a/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/NoopOperation.scala
+++
b/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/NoopOperation.scala
@@ -21,7 +21,7 @@ import java.nio.ByteBuffer
import scala.collection.JavaConverters._
-import org.apache.hive.service.rpc.thrift.{TColumn, TColumnDesc,
TGetResultSetMetadataResp, TPrimitiveTypeEntry, TRowSet, TStringColumn,
TTableSchema, TTypeDesc, TTypeEntry, TTypeId}
+import org.apache.hive.service.rpc.thrift.{TColumn, TColumnDesc,
TFetchResultsResp, TGetResultSetMetadataResp, TPrimitiveTypeEntry,
TStringColumn, TTableSchema, TTypeDesc, TTypeEntry, TTypeId}
import org.apache.kyuubi.KyuubiSQLException
import org.apache.kyuubi.operation.FetchOrientation.FetchOrientation
@@ -76,11 +76,16 @@ class NoopOperation(session: Session, shouldFail: Boolean =
false)
resp
}
- override def getNextRowSetInternal(order: FetchOrientation, rowSetSize:
Int): TRowSet = {
+ override def getNextRowSetInternal(
+ order: FetchOrientation,
+ rowSetSize: Int): TFetchResultsResp = {
val col = TColumn.stringVal(new TStringColumn(Seq(opType).asJava,
ByteBuffer.allocate(0)))
val tRowSet = ThriftUtils.newEmptyRowSet
tRowSet.addToColumns(col)
- tRowSet
+ val resp = new TFetchResultsResp(OK_STATUS)
+ resp.setResults(tRowSet)
+ resp.setHasMoreRows(false)
+ resp
}
override def shouldRunAsync: Boolean = false
diff --git
a/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/NoopOperationManager.scala
b/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/NoopOperationManager.scala
index 455e5d4d2..352aae905 100644
---
a/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/NoopOperationManager.scala
+++
b/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/NoopOperationManager.scala
@@ -20,7 +20,7 @@ package org.apache.kyuubi.operation
import java.nio.ByteBuffer
import java.util
-import org.apache.hive.service.rpc.thrift.{TColumn, TRow, TRowSet,
TStringColumn}
+import org.apache.hive.service.rpc.thrift.{TColumn, TFetchResultsResp, TRow,
TRowSet, TStatus, TStatusCode, TStringColumn}
import org.apache.kyuubi.operation.FetchOrientation.FetchOrientation
import org.apache.kyuubi.session.Session
@@ -136,13 +136,16 @@ class NoopOperationManager extends
OperationManager("noop") {
override def getOperationLogRowSet(
opHandle: OperationHandle,
order: FetchOrientation,
- maxRows: Int): TRowSet = {
+ maxRows: Int): TFetchResultsResp = {
val logs = new util.ArrayList[String]()
logs.add("test")
val tColumn = TColumn.stringVal(new TStringColumn(logs,
ByteBuffer.allocate(0)))
val tRow = new TRowSet(0, new util.ArrayList[TRow](logs.size()))
tRow.addToColumns(tColumn)
- tRow
+ val resp = new TFetchResultsResp(new TStatus(TStatusCode.SUCCESS_STATUS))
+ resp.setResults(tRow)
+ resp.setHasMoreRows(false)
+ resp
}
override def getQueryId(operation: Operation): String = {
diff --git
a/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/KyuubiQueryResultSet.java
b/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/KyuubiQueryResultSet.java
index 82ea74a01..242ec7720 100644
---
a/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/KyuubiQueryResultSet.java
+++
b/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/KyuubiQueryResultSet.java
@@ -26,6 +26,7 @@ import org.apache.hive.service.rpc.thrift.*;
import org.apache.kyuubi.jdbc.hive.cli.RowSet;
import org.apache.kyuubi.jdbc.hive.cli.RowSetFactory;
import org.apache.kyuubi.jdbc.hive.common.HiveDecimal;
+import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -47,6 +48,7 @@ public class KyuubiQueryResultSet extends KyuubiBaseResultSet
{
private boolean emptyResultSet = false;
private boolean isScrollable = false;
private boolean fetchFirst = false;
+ private boolean hasMoreToFetch = false;
private final TProtocolVersion protocol;
@@ -317,25 +319,20 @@ public class KyuubiQueryResultSet extends
KyuubiBaseResultSet {
try {
TFetchOrientation orientation = TFetchOrientation.FETCH_NEXT;
if (fetchFirst) {
- // If we are asked to start from begining, clear the current fetched
resultset
+ // If we are asked to start from beginning, clear the current fetched
resultset
orientation = TFetchOrientation.FETCH_FIRST;
fetchedRows = null;
fetchedRowsItr = null;
fetchFirst = false;
}
if (fetchedRows == null || !fetchedRowsItr.hasNext()) {
- TFetchResultsReq fetchReq = new TFetchResultsReq(stmtHandle,
orientation, fetchSize);
- TFetchResultsResp fetchResp;
- fetchResp = client.FetchResults(fetchReq);
- Utils.verifySuccessWithInfo(fetchResp.getStatus());
-
- TRowSet results = fetchResp.getResults();
- fetchedRows = RowSetFactory.create(results, protocol);
- fetchedRowsItr = fetchedRows.iterator();
+ fetchResult(orientation);
}
if (fetchedRowsItr.hasNext()) {
row = fetchedRowsItr.next();
+ } else if (hasMoreToFetch) {
+ fetchResult(orientation);
} else {
return false;
}
@@ -350,6 +347,18 @@ public class KyuubiQueryResultSet extends
KyuubiBaseResultSet {
return true;
}
+ private void fetchResult(TFetchOrientation orientation) throws SQLException,
TException {
+ TFetchResultsReq fetchReq = new TFetchResultsReq(stmtHandle, orientation,
fetchSize);
+ TFetchResultsResp fetchResp;
+ fetchResp = client.FetchResults(fetchReq);
+ Utils.verifySuccessWithInfo(fetchResp.getStatus());
+ hasMoreToFetch = fetchResp.isSetHasMoreRows() && fetchResp.isHasMoreRows();
+
+ TRowSet results = fetchResp.getResults();
+ fetchedRows = RowSetFactory.create(results, protocol);
+ fetchedRowsItr = fetchedRows.iterator();
+ }
+
@Override
public ResultSetMetaData getMetaData() throws SQLException {
if (isClosed) {
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/ExecutedCommandExec.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/ExecutedCommandExec.scala
index 93379da4b..70b727e5e 100644
---
a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/ExecutedCommandExec.scala
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/ExecutedCommandExec.scala
@@ -17,7 +17,7 @@
package org.apache.kyuubi.operation
-import org.apache.hive.service.rpc.thrift.{TGetResultSetMetadataResp, TRowSet}
+import org.apache.hive.service.rpc.thrift.{TFetchResultsResp,
TGetResultSetMetadataResp}
import org.apache.kyuubi.operation.FetchOrientation.FetchOrientation
import org.apache.kyuubi.operation.log.OperationLog
@@ -67,11 +67,17 @@ class ExecutedCommandExec(
if (!shouldRunAsync) getBackgroundHandle.get()
}
- override def getNextRowSetInternal(order: FetchOrientation, rowSetSize:
Int): TRowSet = {
+ override def getNextRowSetInternal(
+ order: FetchOrientation,
+ rowSetSize: Int): TFetchResultsResp = {
validateDefaultFetchOrientation(order)
assertState(OperationState.FINISHED)
setHasResultSet(true)
- command.getNextRowSet(order, rowSetSize, getProtocolVersion)
+ val rowSet = command.getNextRowSet(order, rowSetSize, getProtocolVersion)
+ val resp = new TFetchResultsResp(OK_STATUS)
+ resp.setResults(rowSet)
+ resp.setHasMoreRows(false)
+ resp
}
override def getResultSetMetadata: TGetResultSetMetadataResp = {
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/KyuubiApplicationOperation.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/KyuubiApplicationOperation.scala
index 54cfdfe93..93929c59c 100644
---
a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/KyuubiApplicationOperation.scala
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/KyuubiApplicationOperation.scala
@@ -22,7 +22,7 @@ import java.util.{ArrayList => JArrayList}
import scala.collection.JavaConverters._
-import org.apache.hive.service.rpc.thrift.{TColumn, TColumnDesc,
TGetResultSetMetadataResp, TPrimitiveTypeEntry, TRow, TRowSet, TStringColumn,
TTableSchema, TTypeDesc, TTypeEntry, TTypeId}
+import org.apache.hive.service.rpc.thrift.{TColumn, TColumnDesc,
TFetchResultsResp, TGetResultSetMetadataResp, TPrimitiveTypeEntry, TRow,
TRowSet, TStringColumn, TTableSchema, TTypeDesc, TTypeEntry, TTypeId}
import org.apache.kyuubi.engine.ApplicationInfo
import org.apache.kyuubi.operation.FetchOrientation.FetchOrientation
@@ -54,8 +54,11 @@ abstract class KyuubiApplicationOperation(session: Session)
extends KyuubiOperat
resp
}
- override def getNextRowSetInternal(order: FetchOrientation, rowSetSize:
Int): TRowSet = {
- applicationInfoMap.map { state =>
+ override def getNextRowSetInternal(
+ order: FetchOrientation,
+ rowSetSize: Int): TFetchResultsResp = {
+ val resp = new TFetchResultsResp(OK_STATUS)
+ val rowSet = applicationInfoMap.map { state =>
val tRow = new TRowSet(0, new JArrayList[TRow](state.size))
Seq(state.keys,
state.values.map(Option(_).getOrElse(""))).map(_.toSeq.asJava).foreach {
col =>
@@ -64,5 +67,8 @@ abstract class KyuubiApplicationOperation(session: Session)
extends KyuubiOperat
}
tRow
}.getOrElse(ThriftUtils.EMPTY_ROW_SET)
+ resp.setResults(rowSet)
+ resp.setHasMoreRows(false)
+ resp
}
}
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/KyuubiOperation.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/KyuubiOperation.scala
index e357b7912..b85fa8b02 100644
---
a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/KyuubiOperation.scala
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/KyuubiOperation.scala
@@ -179,11 +179,17 @@ abstract class KyuubiOperation(session: Session) extends
AbstractOperation(sessi
}
}
- override def getNextRowSetInternal(order: FetchOrientation, rowSetSize:
Int): TRowSet = {
+ override def getNextRowSetInternal(
+ order: FetchOrientation,
+ rowSetSize: Int): TFetchResultsResp = {
validateDefaultFetchOrientation(order)
assertState(OperationState.FINISHED)
setHasResultSet(true)
- client.fetchResults(_remoteOpHandle, order, rowSetSize, fetchLog = false)
+ val rowset = client.fetchResults(_remoteOpHandle, order, rowSetSize,
fetchLog = false)
+ val resp = new TFetchResultsResp(OK_STATUS)
+ resp.setResults(rowset)
+ resp.setHasMoreRows(false)
+ resp
}
override def shouldRunAsync: Boolean = false
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/KyuubiOperationManager.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/KyuubiOperationManager.scala
index 3078401b0..8ae9c91f8 100644
---
a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/KyuubiOperationManager.scala
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/KyuubiOperationManager.scala
@@ -19,7 +19,7 @@ package org.apache.kyuubi.operation
import java.util.concurrent.TimeUnit
-import org.apache.hive.service.rpc.thrift.TRowSet
+import org.apache.hive.service.rpc.thrift.{TFetchResultsResp, TStatus,
TStatusCode}
import org.apache.kyuubi.KyuubiSQLException
import org.apache.kyuubi.config.KyuubiConf
@@ -214,11 +214,11 @@ class KyuubiOperationManager private (name: String)
extends OperationManager(nam
override def getOperationLogRowSet(
opHandle: OperationHandle,
order: FetchOrientation,
- maxRows: Int): TRowSet = {
-
+ maxRows: Int): TFetchResultsResp = {
+ val resp = new TFetchResultsResp(new TStatus(TStatusCode.SUCCESS_STATUS))
val operation = getOperation(opHandle).asInstanceOf[KyuubiOperation]
val operationLog = operation.getOperationLog
- operationLog match {
+ val rowSet = operationLog match {
case Some(log) => log.read(order, maxRows)
case None =>
val remoteHandle = operation.remoteOpHandle()
@@ -229,6 +229,9 @@ class KyuubiOperationManager private (name: String) extends
OperationManager(nam
ThriftUtils.EMPTY_ROW_SET
}
}
+ resp.setResults(rowSet)
+ resp.setHasMoreRows(false)
+ resp
}
override def start(): Unit = synchronized {
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/BackendServiceMetric.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/BackendServiceMetric.scala
index 3a92a5ad0..9da4b78c0 100644
---
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/BackendServiceMetric.scala
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/BackendServiceMetric.scala
@@ -183,9 +183,10 @@ trait BackendServiceMetric extends BackendService {
operationHandle: OperationHandle,
orientation: FetchOrientation,
maxRows: Int,
- fetchLog: Boolean): TRowSet = {
+ fetchLog: Boolean): TFetchResultsResp = {
MetricsSystem.timerTracing(MetricsConstants.BS_FETCH_RESULTS) {
- val rowSet = super.fetchResults(operationHandle, orientation, maxRows,
fetchLog)
+ val fetchResultsResp = super.fetchResults(operationHandle, orientation,
maxRows, fetchLog)
+ val rowSet = fetchResultsResp.getResults
// TODO: the statistics are wrong when we enabled the arrow.
val rowsSize =
if (rowSet.getColumnsSize > 0) {
@@ -217,7 +218,7 @@ trait BackendServiceMetric extends BackendService {
operation.increaseFetchResultsCount(rowsSize)
}
- rowSet
+ fetchResultsResp
}
}
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/OperationsResource.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/OperationsResource.scala
index ae5feea52..fdde5bbc5 100644
---
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/OperationsResource.scala
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/OperationsResource.scala
@@ -145,10 +145,11 @@ private[v1] class OperationsResource extends
ApiRequestContext with Logging {
if (fetchOrientation != "FETCH_NEXT" && fetchOrientation !=
"FETCH_FIRST") {
throw new BadRequestException(s"$fetchOrientation in operation log is
not supported")
}
- val rowSet = fe.be.sessionManager.operationManager.getOperationLogRowSet(
+ val fetchResultsResp =
fe.be.sessionManager.operationManager.getOperationLogRowSet(
OperationHandle(operationHandleStr),
FetchOrientation.withName(fetchOrientation),
maxRows)
+ val rowSet = fetchResultsResp.getResults
val logRowSet = rowSet.getColumns.get(0).getStringVal.getValues.asScala
new OperationLog(logRowSet.asJava, logRowSet.size)
} catch {
@@ -175,11 +176,12 @@ private[v1] class OperationsResource extends
ApiRequestContext with Logging {
@QueryParam("fetchorientation") @DefaultValue("FETCH_NEXT")
fetchOrientation: String): ResultRowSet = {
try {
- val rowSet = fe.be.fetchResults(
+ val fetchResultsResp = fe.be.fetchResults(
OperationHandle(operationHandleStr),
FetchOrientation.withName(fetchOrientation),
maxRows,
fetchLog = false)
+ val rowSet = fetchResultsResp.getResults
val rows = rowSet.getRows.asScala.map(i => {
new Row(i.getColVals.asScala.map(i => {
new Field(
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/mysql/MySQLCommandHandler.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/mysql/MySQLCommandHandler.scala
index 2f574d904..5f7a07f58 100644
---
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/mysql/MySQLCommandHandler.scala
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/mysql/MySQLCommandHandler.scala
@@ -196,11 +196,12 @@ class MySQLCommandHandler(
.getOrElse(KyuubiSQLException(s"Error operator state
${opStatus.state}"))
}
val resultSetMetadata = be.getResultSetMetadata(opHandle)
- val rowSet = be.fetchResults(
+ val fetchResultResp = be.fetchResults(
opHandle,
FetchOrientation.FETCH_NEXT,
Int.MaxValue,
fetchLog = false)
+ val rowSet = fetchResultResp.getResults
MySQLQueryResult(resultSetMetadata.getSchema, rowSet)
} catch {
case rethrow: Exception =>
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/trino/api/Query.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/trino/api/Query.scala
index 4e768b04a..dc9de4ae2 100644
---
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/trino/api/Query.scala
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/trino/api/Query.scala
@@ -69,7 +69,7 @@ case class Query(
queryId.operationHandle,
defaultFetchOrientation,
defaultMaxRows,
- false)
+ false).getResults
TrinoContext.createQueryResults(
queryId.getQueryId,
nextUri,
diff --git
a/kyuubi-server/src/test/scala/org/apache/kyuubi/WithKyuubiServerOnYarn.scala
b/kyuubi-server/src/test/scala/org/apache/kyuubi/WithKyuubiServerOnYarn.scala
index 68d95dc80..1826760db 100644
---
a/kyuubi-server/src/test/scala/org/apache/kyuubi/WithKyuubiServerOnYarn.scala
+++
b/kyuubi-server/src/test/scala/org/apache/kyuubi/WithKyuubiServerOnYarn.scala
@@ -151,7 +151,7 @@ class KyuubiOperationYarnClusterSuite extends
WithKyuubiServerOnYarn with HiveJD
}
val resultColumns =
batchJobSubmissionOp.getNextRowSet(FetchOrientation.FETCH_NEXT, 10)
- .getColumns.asScala
+ .getResults.getColumns.asScala
val keys = resultColumns.head.getStringVal.getValues.asScala
val values = resultColumns.apply(1).getStringVal.getValues.asScala
diff --git
a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/trino/api/TrinoContextSuite.scala
b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/trino/api/TrinoContextSuite.scala
index 87c8eda96..6c5a01e46 100644
---
a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/trino/api/TrinoContextSuite.scala
+++
b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/trino/api/TrinoContextSuite.scala
@@ -84,7 +84,7 @@ class TrinoContextSuite extends KyuubiFunSuite with
RestFrontendTestHelper {
checkOpState(opHandleStr, FINISHED)
val metadataResp = fe.be.getResultSetMetadata(opHandle)
- val tRowSet = fe.be.fetchResults(opHandle, FetchOrientation.FETCH_NEXT,
1000, false)
+ val tRowSet = fe.be.fetchResults(opHandle, FetchOrientation.FETCH_NEXT,
1000, false).getResults
val status = fe.be.getOperationStatus(opHandle, Some(0))
val uri = new URI("sfdsfsdfdsf")
@@ -111,7 +111,7 @@ class TrinoContextSuite extends KyuubiFunSuite with
RestFrontendTestHelper {
checkOpState(opHandleStr, FINISHED)
val metadataResp = fe.be.getResultSetMetadata(opHandle)
- val tRowSet = fe.be.fetchResults(opHandle, FetchOrientation.FETCH_NEXT,
1000, false)
+ val tRowSet = fe.be.fetchResults(opHandle, FetchOrientation.FETCH_NEXT,
1000, false).getResults
val status = fe.be.getOperationStatus(opHandle, Some(0))
val uri = new URI("sfdsfsdfdsf")