This is an automated email from the ASF dual-hosted git repository.

chengpan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git


The following commit(s) were added to refs/heads/master by this push:
     new 14d0dab69 [KYUUBI #5160] Refactor getNextRowSetInternal to support 
fetch streaming data
14d0dab69 is described below

commit 14d0dab697c314366750b8b80de057aee0155ffd
Author: Paul Lin <[email protected]>
AuthorDate: Tue Aug 15 16:06:10 2023 +0800

    [KYUUBI #5160] Refactor getNextRowSetInternal to support fetch streaming 
data
    
    ### _Why are the changes needed?_
    Currently, `getNextRowSetInternal` returns `TRowSet` which is not friendly 
to explicit EOS in streaming result fetch.
    
    This PR changes the return type to `TFetchResultsResp` to allow the engines 
to determine the EOS.
    
    ### _How was this patch tested?_
    - [x] Add some test cases that check the changes thoroughly including 
negative and positive cases if possible
    
    - [ ] Add screenshots for manual tests if appropriate
    
    - [x] [Run 
test](https://kyuubi.readthedocs.io/en/master/contributing/code/testing.html#running-tests)
 locally before make a pull request
    
    Closes #5160 from link3280/refactor_result.
    
    Closes #5160
    
    09822f2ee [Paul Lin] Fix hasMoreRows missing
    c94907e2b [Paul Lin] Explicitly set `resp.setHasMoreRows(false)` for 
operations
    4d193fb1d [Paul Lin] Revert unrelated changes in FlinkOperation
    ffd0367b3 [Paul Lin] Refactor getNextRowSetInternal to support fetch 
streaming data
    
    Authored-by: Paul Lin <[email protected]>
    Signed-off-by: Cheng Pan <[email protected]>
---
 .../engine/chat/operation/ChatOperation.scala      |  9 ++++++--
 .../engine/flink/operation/FlinkOperation.scala    | 11 ++++++---
 .../engine/hive/operation/HiveOperation.scala      | 19 +++++++++++----
 .../hive/operation/HiveOperationManager.scala      |  4 ++--
 .../engine/jdbc/operation/JdbcOperation.scala      | 11 ++++++---
 .../engine/spark/operation/SparkOperation.scala    | 11 ++++++---
 .../engine/trino/operation/ExecuteStatement.scala  | 11 ++++++---
 .../engine/trino/operation/TrinoOperation.scala    | 11 ++++++---
 .../kyuubi/operation/AbstractOperation.scala       | 11 +++++----
 .../apache/kyuubi/operation/FetchIterator.scala    |  2 +-
 .../org/apache/kyuubi/operation/Operation.scala    |  4 ++--
 .../apache/kyuubi/operation/OperationManager.scala | 10 +++++---
 .../kyuubi/service/AbstractBackendService.scala    |  2 +-
 .../org/apache/kyuubi/service/BackendService.scala |  2 +-
 .../apache/kyuubi/service/TFrontendService.scala   |  9 +++-----
 .../apache/kyuubi/session/AbstractSession.scala    |  2 +-
 .../scala/org/apache/kyuubi/session/Session.scala  |  4 ++--
 .../apache/kyuubi/operation/NoopOperation.scala    | 11 ++++++---
 .../kyuubi/operation/NoopOperationManager.scala    |  9 +++++---
 .../kyuubi/jdbc/hive/KyuubiQueryResultSet.java     | 27 ++++++++++++++--------
 .../kyuubi/operation/ExecutedCommandExec.scala     | 12 +++++++---
 .../operation/KyuubiApplicationOperation.scala     | 12 +++++++---
 .../apache/kyuubi/operation/KyuubiOperation.scala  | 10 ++++++--
 .../kyuubi/operation/KyuubiOperationManager.scala  | 11 +++++----
 .../kyuubi/server/BackendServiceMetric.scala       |  7 +++---
 .../kyuubi/server/api/v1/OperationsResource.scala  |  6 +++--
 .../kyuubi/server/mysql/MySQLCommandHandler.scala  |  3 ++-
 .../org/apache/kyuubi/server/trino/api/Query.scala |  2 +-
 .../org/apache/kyuubi/WithKyuubiServerOnYarn.scala |  2 +-
 .../server/trino/api/TrinoContextSuite.scala       |  4 ++--
 30 files changed, 166 insertions(+), 83 deletions(-)

diff --git 
a/externals/kyuubi-chat-engine/src/main/scala/org/apache/kyuubi/engine/chat/operation/ChatOperation.scala
 
b/externals/kyuubi-chat-engine/src/main/scala/org/apache/kyuubi/engine/chat/operation/ChatOperation.scala
index 3ae21aa9f..b0b1806f8 100644
--- 
a/externals/kyuubi-chat-engine/src/main/scala/org/apache/kyuubi/engine/chat/operation/ChatOperation.scala
+++ 
b/externals/kyuubi-chat-engine/src/main/scala/org/apache/kyuubi/engine/chat/operation/ChatOperation.scala
@@ -31,7 +31,9 @@ abstract class ChatOperation(session: Session) extends 
AbstractOperation(session
 
   protected lazy val conf: KyuubiConf = session.sessionManager.getConf
 
-  override def getNextRowSetInternal(order: FetchOrientation, rowSetSize: 
Int): TRowSet = {
+  override def getNextRowSetInternal(
+      order: FetchOrientation,
+      rowSetSize: Int): TFetchResultsResp = {
     validateDefaultFetchOrientation(order)
     assertState(OperationState.FINISHED)
     setHasResultSet(true)
@@ -47,7 +49,10 @@ abstract class ChatOperation(session: Session) extends 
AbstractOperation(session
     val taken = iter.take(rowSetSize)
     val resultRowSet = RowSet.toTRowSet(taken.toSeq, 1, getProtocolVersion)
     resultRowSet.setStartRowOffset(iter.getPosition)
-    resultRowSet
+    val resp = new TFetchResultsResp(OK_STATUS)
+    resp.setResults(resultRowSet)
+    resp.setHasMoreRows(false)
+    resp
   }
 
   override def cancel(): Unit = {
diff --git 
a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperation.scala
 
b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperation.scala
index ef80034d3..5a79d2c0e 100644
--- 
a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperation.scala
+++ 
b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperation.scala
@@ -25,7 +25,7 @@ import 
scala.collection.JavaConverters.collectionAsScalaIterableConverter
 import org.apache.flink.configuration.Configuration
 import org.apache.flink.table.gateway.service.context.SessionContext
 import org.apache.flink.table.gateway.service.operation.OperationExecutor
-import org.apache.hive.service.rpc.thrift.{TGetResultSetMetadataResp, TRowSet, 
TTableSchema}
+import org.apache.hive.service.rpc.thrift.{TFetchResultsResp, 
TGetResultSetMetadataResp, TTableSchema}
 
 import org.apache.kyuubi.{KyuubiSQLException, Utils}
 import org.apache.kyuubi.engine.flink.result.ResultSet
@@ -91,7 +91,9 @@ abstract class FlinkOperation(session: Session) extends 
AbstractOperation(sessio
     resp
   }
 
-  override def getNextRowSetInternal(order: FetchOrientation, rowSetSize: 
Int): TRowSet = {
+  override def getNextRowSetInternal(
+      order: FetchOrientation,
+      rowSetSize: Int): TFetchResultsResp = {
     validateDefaultFetchOrientation(order)
     assertState(OperationState.FINISHED)
     setHasResultSet(true)
@@ -112,7 +114,10 @@ abstract class FlinkOperation(session: Session) extends 
AbstractOperation(sessio
       zoneId,
       getProtocolVersion)
     resultRowSet.setStartRowOffset(resultSet.getData.getPosition)
-    resultRowSet
+    val resp = new TFetchResultsResp(OK_STATUS)
+    resp.setResults(resultRowSet)
+    resp.setHasMoreRows(false)
+    resp
   }
 
   override def shouldRunAsync: Boolean = false
diff --git 
a/externals/kyuubi-hive-sql-engine/src/main/scala/org/apache/kyuubi/engine/hive/operation/HiveOperation.scala
 
b/externals/kyuubi-hive-sql-engine/src/main/scala/org/apache/kyuubi/engine/hive/operation/HiveOperation.scala
index c7166356d..9759fa00b 100644
--- 
a/externals/kyuubi-hive-sql-engine/src/main/scala/org/apache/kyuubi/engine/hive/operation/HiveOperation.scala
+++ 
b/externals/kyuubi-hive-sql-engine/src/main/scala/org/apache/kyuubi/engine/hive/operation/HiveOperation.scala
@@ -21,7 +21,7 @@ import java.util.concurrent.Future
 
 import org.apache.hive.service.cli.operation.{Operation, OperationManager}
 import org.apache.hive.service.cli.session.{HiveSession, SessionManager => 
HiveSessionManager}
-import org.apache.hive.service.rpc.thrift.{TGetResultSetMetadataResp, TRowSet}
+import org.apache.hive.service.rpc.thrift.{TFetchResultsResp, 
TGetResultSetMetadataResp}
 
 import org.apache.kyuubi.KyuubiSQLException
 import org.apache.kyuubi.config.KyuubiReservedKeys.KYUUBI_SESSION_USER_KEY
@@ -95,22 +95,31 @@ abstract class HiveOperation(session: Session) extends 
AbstractOperation(session
     resp
   }
 
-  override def getNextRowSetInternal(order: FetchOrientation, rowSetSize: 
Int): TRowSet = {
+  override def getNextRowSetInternal(
+      order: FetchOrientation,
+      rowSetSize: Int): TFetchResultsResp = {
     val tOrder = FetchOrientation.toTFetchOrientation(order)
     val hiveOrder = 
org.apache.hive.service.cli.FetchOrientation.getFetchOrientation(tOrder)
     val rowSet = internalHiveOperation.getNextRowSet(hiveOrder, rowSetSize)
-    rowSet.toTRowSet
+    val resp = new TFetchResultsResp(OK_STATUS)
+    resp.setResults(rowSet.toTRowSet)
+    resp.setHasMoreRows(false)
+    resp
   }
 
-  def getOperationLogRowSet(order: FetchOrientation, rowSetSize: Int): TRowSet 
= {
+  def getOperationLogRowSet(order: FetchOrientation, rowSetSize: Int): 
TFetchResultsResp = {
     val tOrder = FetchOrientation.toTFetchOrientation(order)
     val hiveOrder = 
org.apache.hive.service.cli.FetchOrientation.getFetchOrientation(tOrder)
     val handle = internalHiveOperation.getHandle
-    delegatedOperationManager.getOperationLogRowSet(
+    val rowSet = delegatedOperationManager.getOperationLogRowSet(
       handle,
       hiveOrder,
       rowSetSize,
       hive.getHiveConf).toTRowSet
+    val resp = new TFetchResultsResp(OK_STATUS)
+    resp.setResults(rowSet)
+    resp.setHasMoreRows(false)
+    resp
   }
 
   override def isTimedOut: Boolean = 
internalHiveOperation.isTimedOut(System.currentTimeMillis)
diff --git 
a/externals/kyuubi-hive-sql-engine/src/main/scala/org/apache/kyuubi/engine/hive/operation/HiveOperationManager.scala
 
b/externals/kyuubi-hive-sql-engine/src/main/scala/org/apache/kyuubi/engine/hive/operation/HiveOperationManager.scala
index 0762a2938..4e41e742e 100644
--- 
a/externals/kyuubi-hive-sql-engine/src/main/scala/org/apache/kyuubi/engine/hive/operation/HiveOperationManager.scala
+++ 
b/externals/kyuubi-hive-sql-engine/src/main/scala/org/apache/kyuubi/engine/hive/operation/HiveOperationManager.scala
@@ -20,7 +20,7 @@ package org.apache.kyuubi.engine.hive.operation
 import java.util.List
 
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars
-import org.apache.hive.service.rpc.thrift.TRowSet
+import org.apache.hive.service.rpc.thrift.TFetchResultsResp
 
 import org.apache.kyuubi.config.KyuubiConf._
 import org.apache.kyuubi.engine.hive.session.HiveSessionImpl
@@ -154,7 +154,7 @@ class HiveOperationManager() extends 
OperationManager("HiveOperationManager") {
   override def getOperationLogRowSet(
       opHandle: OperationHandle,
       order: FetchOrientation,
-      maxRows: Int): TRowSet = {
+      maxRows: Int): TFetchResultsResp = {
     val operation = getOperation(opHandle).asInstanceOf[HiveOperation]
     operation.getOperationLogRowSet(order, maxRows)
   }
diff --git 
a/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/operation/JdbcOperation.scala
 
b/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/operation/JdbcOperation.scala
index 1c4e0c57f..2ca173757 100644
--- 
a/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/operation/JdbcOperation.scala
+++ 
b/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/operation/JdbcOperation.scala
@@ -16,7 +16,7 @@
  */
 package org.apache.kyuubi.engine.jdbc.operation
 
-import org.apache.hive.service.rpc.thrift.{TGetResultSetMetadataResp, TRowSet}
+import org.apache.hive.service.rpc.thrift.{TFetchResultsResp, 
TGetResultSetMetadataResp, TRowSet}
 
 import org.apache.kyuubi.{KyuubiSQLException, Utils}
 import org.apache.kyuubi.config.KyuubiConf
@@ -36,7 +36,9 @@ abstract class JdbcOperation(session: Session) extends 
AbstractOperation(session
 
   protected lazy val dialect: JdbcDialect = JdbcDialects.get(conf)
 
-  override def getNextRowSetInternal(order: FetchOrientation, rowSetSize: 
Int): TRowSet = {
+  override def getNextRowSetInternal(
+      order: FetchOrientation,
+      rowSetSize: Int): TFetchResultsResp = {
     validateDefaultFetchOrientation(order)
     assertState(OperationState.FINISHED)
     setHasResultSet(true)
@@ -51,7 +53,10 @@ abstract class JdbcOperation(session: Session) extends 
AbstractOperation(session
     val taken = iter.take(rowSetSize)
     val resultRowSet = toTRowSet(taken)
     resultRowSet.setStartRowOffset(iter.getPosition)
-    resultRowSet
+    val resp = new TFetchResultsResp(OK_STATUS)
+    resp.setResults(resultRowSet)
+    resp.setHasMoreRows(false)
+    resp
   }
 
   override def cancel(): Unit = {
diff --git 
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkOperation.scala
 
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkOperation.scala
index 5062a4904..782f443df 100644
--- 
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkOperation.scala
+++ 
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkOperation.scala
@@ -20,7 +20,7 @@ package org.apache.kyuubi.engine.spark.operation
 import java.io.IOException
 import java.time.ZoneId
 
-import org.apache.hive.service.rpc.thrift.{TGetResultSetMetadataResp, 
TProgressUpdateResp, TRowSet}
+import org.apache.hive.service.rpc.thrift.{TFetchResultsResp, 
TGetResultSetMetadataResp, TProgressUpdateResp, TRowSet}
 import org.apache.spark.kyuubi.{SparkProgressMonitor, SQLOperationListener}
 import org.apache.spark.kyuubi.SparkUtilsHelper.redact
 import org.apache.spark.sql.{DataFrame, Row, SparkSession}
@@ -233,7 +233,9 @@ abstract class SparkOperation(session: Session)
     resp
   }
 
-  override def getNextRowSetInternal(order: FetchOrientation, rowSetSize: 
Int): TRowSet = {
+  override def getNextRowSetInternal(
+      order: FetchOrientation,
+      rowSetSize: Int): TFetchResultsResp = {
     var resultRowSet: TRowSet = null
     try {
       withLocalProperties {
@@ -264,7 +266,10 @@ abstract class SparkOperation(session: Session)
       }
     } catch onError(cancel = true)
 
-    resultRowSet
+    val resp = new TFetchResultsResp(OK_STATUS)
+    resp.setResults(resultRowSet)
+    resp.setHasMoreRows(false)
+    resp
   }
 
   override def shouldRunAsync: Boolean = false
diff --git 
a/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/operation/ExecuteStatement.scala
 
b/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/operation/ExecuteStatement.scala
index 405c93b0b..3e7cce80c 100644
--- 
a/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/operation/ExecuteStatement.scala
+++ 
b/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/operation/ExecuteStatement.scala
@@ -19,7 +19,7 @@ package org.apache.kyuubi.engine.trino.operation
 
 import java.util.concurrent.RejectedExecutionException
 
-import org.apache.hive.service.rpc.thrift.TRowSet
+import org.apache.hive.service.rpc.thrift.TFetchResultsResp
 
 import org.apache.kyuubi.{KyuubiSQLException, Logging}
 import org.apache.kyuubi.engine.trino.TrinoStatement
@@ -82,7 +82,9 @@ class ExecuteStatement(
     }
   }
 
-  override def getNextRowSetInternal(order: FetchOrientation, rowSetSize: 
Int): TRowSet = {
+  override def getNextRowSetInternal(
+      order: FetchOrientation,
+      rowSetSize: Int): TFetchResultsResp = {
     validateDefaultFetchOrientation(order)
     assertState(OperationState.FINISHED)
     setHasResultSet(true)
@@ -97,7 +99,10 @@ class ExecuteStatement(
     val taken = iter.take(rowSetSize)
     val resultRowSet = RowSet.toTRowSet(taken.toList, schema, 
getProtocolVersion)
     resultRowSet.setStartRowOffset(iter.getPosition)
-    resultRowSet
+    val fetchResultsResp = new TFetchResultsResp(OK_STATUS)
+    fetchResultsResp.setResults(resultRowSet)
+    fetchResultsResp.setHasMoreRows(false)
+    fetchResultsResp
   }
 
   private def executeStatement(trinoStatement: TrinoStatement): Unit = {
diff --git 
a/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/operation/TrinoOperation.scala
 
b/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/operation/TrinoOperation.scala
index 20153b84f..11eaa1bc1 100644
--- 
a/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/operation/TrinoOperation.scala
+++ 
b/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/operation/TrinoOperation.scala
@@ -21,7 +21,7 @@ import java.io.IOException
 
 import io.trino.client.Column
 import io.trino.client.StatementClient
-import org.apache.hive.service.rpc.thrift.{TGetResultSetMetadataResp, TRowSet}
+import org.apache.hive.service.rpc.thrift.{TFetchResultsResp, 
TGetResultSetMetadataResp}
 
 import org.apache.kyuubi.KyuubiSQLException
 import org.apache.kyuubi.Utils
@@ -54,7 +54,9 @@ abstract class TrinoOperation(session: Session) extends 
AbstractOperation(sessio
     resp
   }
 
-  override def getNextRowSetInternal(order: FetchOrientation, rowSetSize: 
Int): TRowSet = {
+  override def getNextRowSetInternal(
+      order: FetchOrientation,
+      rowSetSize: Int): TFetchResultsResp = {
     validateDefaultFetchOrientation(order)
     assertState(OperationState.FINISHED)
     setHasResultSet(true)
@@ -66,7 +68,10 @@ abstract class TrinoOperation(session: Session) extends 
AbstractOperation(sessio
     val taken = iter.take(rowSetSize)
     val resultRowSet = RowSet.toTRowSet(taken.toList, schema, 
getProtocolVersion)
     resultRowSet.setStartRowOffset(iter.getPosition)
-    resultRowSet
+    val resp = new TFetchResultsResp(OK_STATUS)
+    resp.setResults(resultRowSet)
+    resp.setHasMoreRows(false)
+    resp
   }
 
   override protected def beforeRun(): Unit = {
diff --git 
a/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/AbstractOperation.scala
 
b/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/AbstractOperation.scala
index 94f53111d..0a185b942 100644
--- 
a/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/AbstractOperation.scala
+++ 
b/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/AbstractOperation.scala
@@ -23,7 +23,7 @@ import java.util.concurrent.locks.ReentrantLock
 import scala.collection.JavaConverters._
 
 import org.apache.commons.lang3.StringUtils
-import org.apache.hive.service.rpc.thrift.{TGetResultSetMetadataResp, 
TProgressUpdateResp, TProtocolVersion, TRowSet, TStatus, TStatusCode}
+import org.apache.hive.service.rpc.thrift.{TFetchResultsResp, 
TGetResultSetMetadataResp, TProgressUpdateResp, TProtocolVersion, TStatus, 
TStatusCode}
 
 import org.apache.kyuubi.{KyuubiSQLException, Logging, Utils}
 import org.apache.kyuubi.config.KyuubiConf.OPERATION_IDLE_TIMEOUT
@@ -182,11 +182,12 @@ abstract class AbstractOperation(session: Session) 
extends Operation with Loggin
 
   override def getResultSetMetadata: TGetResultSetMetadataResp
 
-  def getNextRowSetInternal(order: FetchOrientation, rowSetSize: Int): TRowSet
+  def getNextRowSetInternal(order: FetchOrientation, rowSetSize: Int): 
TFetchResultsResp
 
-  override def getNextRowSet(order: FetchOrientation, rowSetSize: Int): 
TRowSet = withLockRequired {
-    getNextRowSetInternal(order, rowSetSize)
-  }
+  override def getNextRowSet(order: FetchOrientation, rowSetSize: Int): 
TFetchResultsResp =
+    withLockRequired {
+      getNextRowSetInternal(order, rowSetSize)
+    }
 
   /**
    * convert SQL 'like' pattern to a Java regular expression.
diff --git 
a/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/FetchIterator.scala 
b/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/FetchIterator.scala
index fdada1174..ada155887 100644
--- 
a/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/FetchIterator.scala
+++ 
b/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/FetchIterator.scala
@@ -20,7 +20,7 @@ package org.apache.kyuubi.operation
 /**
  * Borrowed from Apache Spark, see SPARK-33655
  */
-sealed trait FetchIterator[A] extends Iterator[A] {
+trait FetchIterator[A] extends Iterator[A] {
 
   /**
    * Begin a fetch block, forward from the current position.
diff --git 
a/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/Operation.scala 
b/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/Operation.scala
index 6f496c9b8..c20a16f61 100644
--- a/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/Operation.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/Operation.scala
@@ -19,7 +19,7 @@ package org.apache.kyuubi.operation
 
 import java.util.concurrent.Future
 
-import org.apache.hive.service.rpc.thrift.{TGetResultSetMetadataResp, TRowSet}
+import org.apache.hive.service.rpc.thrift.{TFetchResultsResp, 
TGetResultSetMetadataResp}
 
 import org.apache.kyuubi.operation.FetchOrientation.FetchOrientation
 import org.apache.kyuubi.operation.log.OperationLog
@@ -32,7 +32,7 @@ trait Operation {
   def close(): Unit
 
   def getResultSetMetadata: TGetResultSetMetadataResp
-  def getNextRowSet(order: FetchOrientation, rowSetSize: Int): TRowSet
+  def getNextRowSet(order: FetchOrientation, rowSetSize: Int): 
TFetchResultsResp
 
   def getSession: Session
   def getHandle: OperationHandle
diff --git 
a/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/OperationManager.scala
 
b/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/OperationManager.scala
index 3093bcfbe..38dabcc1a 100644
--- 
a/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/OperationManager.scala
+++ 
b/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/OperationManager.scala
@@ -137,18 +137,22 @@ abstract class OperationManager(name: String) extends 
AbstractService(name) {
   final def getOperationNextRowSet(
       opHandle: OperationHandle,
       order: FetchOrientation,
-      maxRows: Int): TRowSet = {
+      maxRows: Int): TFetchResultsResp = {
     getOperation(opHandle).getNextRowSet(order, maxRows)
   }
 
   def getOperationLogRowSet(
       opHandle: OperationHandle,
       order: FetchOrientation,
-      maxRows: Int): TRowSet = {
+      maxRows: Int): TFetchResultsResp = {
     val operationLog = getOperation(opHandle).getOperationLog
-    operationLog.map(_.read(order, maxRows)).getOrElse {
+    val rowSet = operationLog.map(_.read(order, maxRows)).getOrElse {
       throw KyuubiSQLException(s"$opHandle failed to generate operation log")
     }
+    val resp = new TFetchResultsResp(new TStatus(TStatusCode.SUCCESS_STATUS))
+    resp.setResults(rowSet)
+    resp.setHasMoreRows(false)
+    resp
   }
 
   final def removeExpiredOperations(handles: Seq[OperationHandle]): 
Seq[Operation] = synchronized {
diff --git 
a/kyuubi-common/src/main/scala/org/apache/kyuubi/service/AbstractBackendService.scala
 
b/kyuubi-common/src/main/scala/org/apache/kyuubi/service/AbstractBackendService.scala
index 171e04901..443b35354 100644
--- 
a/kyuubi-common/src/main/scala/org/apache/kyuubi/service/AbstractBackendService.scala
+++ 
b/kyuubi-common/src/main/scala/org/apache/kyuubi/service/AbstractBackendService.scala
@@ -201,7 +201,7 @@ abstract class AbstractBackendService(name: String)
       operationHandle: OperationHandle,
       orientation: FetchOrientation,
       maxRows: Int,
-      fetchLog: Boolean): TRowSet = {
+      fetchLog: Boolean): TFetchResultsResp = {
     maxRowsLimit.foreach(limit =>
       if (maxRows > limit) {
         throw new IllegalArgumentException(s"Max rows for fetching results " +
diff --git 
a/kyuubi-common/src/main/scala/org/apache/kyuubi/service/BackendService.scala 
b/kyuubi-common/src/main/scala/org/apache/kyuubi/service/BackendService.scala
index 968a94197..85df9024c 100644
--- 
a/kyuubi-common/src/main/scala/org/apache/kyuubi/service/BackendService.scala
+++ 
b/kyuubi-common/src/main/scala/org/apache/kyuubi/service/BackendService.scala
@@ -101,7 +101,7 @@ trait BackendService {
       operationHandle: OperationHandle,
       orientation: FetchOrientation,
       maxRows: Int,
-      fetchLog: Boolean): TRowSet
+      fetchLog: Boolean): TFetchResultsResp
 
   def sessionManager: SessionManager
 }
diff --git 
a/kyuubi-common/src/main/scala/org/apache/kyuubi/service/TFrontendService.scala 
b/kyuubi-common/src/main/scala/org/apache/kyuubi/service/TFrontendService.scala
index aa012ab56..7cc23779f 100644
--- 
a/kyuubi-common/src/main/scala/org/apache/kyuubi/service/TFrontendService.scala
+++ 
b/kyuubi-common/src/main/scala/org/apache/kyuubi/service/TFrontendService.scala
@@ -520,23 +520,20 @@ abstract class TFrontendService(name: String)
 
   override def FetchResults(req: TFetchResultsReq): TFetchResultsResp = {
     debug(req.toString)
-    val resp = new TFetchResultsResp
     try {
       val operationHandle = OperationHandle(req.getOperationHandle)
       val orientation = 
FetchOrientation.getFetchOrientation(req.getOrientation)
       // 1 means fetching log
       val fetchLog = req.getFetchType == 1
       val maxRows = req.getMaxRows.toInt
-      val rowSet = be.fetchResults(operationHandle, orientation, maxRows, 
fetchLog)
-      resp.setResults(rowSet)
-      resp.setHasMoreRows(false)
-      resp.setStatus(OK_STATUS)
+      be.fetchResults(operationHandle, orientation, maxRows, fetchLog)
     } catch {
       case e: Exception =>
         error("Error fetching results: ", e)
+        val resp = new TFetchResultsResp
         resp.setStatus(KyuubiSQLException.toTStatus(e))
+        resp
     }
-    resp
   }
 
   protected def notSupportTokenErrorStatus = {
diff --git 
a/kyuubi-common/src/main/scala/org/apache/kyuubi/session/AbstractSession.scala 
b/kyuubi-common/src/main/scala/org/apache/kyuubi/session/AbstractSession.scala
index 1a8c51ccd..a9e33f5a0 100644
--- 
a/kyuubi-common/src/main/scala/org/apache/kyuubi/session/AbstractSession.scala
+++ 
b/kyuubi-common/src/main/scala/org/apache/kyuubi/session/AbstractSession.scala
@@ -233,7 +233,7 @@ abstract class AbstractSession(
       operationHandle: OperationHandle,
       orientation: FetchOrientation,
       maxRows: Int,
-      fetchLog: Boolean): TRowSet = {
+      fetchLog: Boolean): TFetchResultsResp = {
     if (fetchLog) {
       sessionManager.operationManager.getOperationLogRowSet(operationHandle, 
orientation, maxRows)
     } else {
diff --git 
a/kyuubi-common/src/main/scala/org/apache/kyuubi/session/Session.scala 
b/kyuubi-common/src/main/scala/org/apache/kyuubi/session/Session.scala
index bc9f9a8f6..2cdac9f3a 100644
--- a/kyuubi-common/src/main/scala/org/apache/kyuubi/session/Session.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/session/Session.scala
@@ -17,7 +17,7 @@
 
 package org.apache.kyuubi.session
 
-import org.apache.hive.service.rpc.thrift.{TGetInfoType, TGetInfoValue, 
TGetResultSetMetadataResp, TProtocolVersion, TRowSet}
+import org.apache.hive.service.rpc.thrift.{TFetchResultsResp, TGetInfoType, 
TGetInfoValue, TGetResultSetMetadataResp, TProtocolVersion}
 
 import org.apache.kyuubi.operation.FetchOrientation.FetchOrientation
 import org.apache.kyuubi.operation.OperationHandle
@@ -91,7 +91,7 @@ trait Session {
       operationHandle: OperationHandle,
       orientation: FetchOrientation,
       maxRows: Int,
-      fetchLog: Boolean): TRowSet
+      fetchLog: Boolean): TFetchResultsResp
 
   def closeExpiredOperations(): Unit
 }
diff --git 
a/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/NoopOperation.scala 
b/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/NoopOperation.scala
index 4093d1fca..c369e00ef 100644
--- 
a/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/NoopOperation.scala
+++ 
b/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/NoopOperation.scala
@@ -21,7 +21,7 @@ import java.nio.ByteBuffer
 
 import scala.collection.JavaConverters._
 
-import org.apache.hive.service.rpc.thrift.{TColumn, TColumnDesc, 
TGetResultSetMetadataResp, TPrimitiveTypeEntry, TRowSet, TStringColumn, 
TTableSchema, TTypeDesc, TTypeEntry, TTypeId}
+import org.apache.hive.service.rpc.thrift.{TColumn, TColumnDesc, 
TFetchResultsResp, TGetResultSetMetadataResp, TPrimitiveTypeEntry, 
TStringColumn, TTableSchema, TTypeDesc, TTypeEntry, TTypeId}
 
 import org.apache.kyuubi.KyuubiSQLException
 import org.apache.kyuubi.operation.FetchOrientation.FetchOrientation
@@ -76,11 +76,16 @@ class NoopOperation(session: Session, shouldFail: Boolean = 
false)
     resp
   }
 
-  override def getNextRowSetInternal(order: FetchOrientation, rowSetSize: 
Int): TRowSet = {
+  override def getNextRowSetInternal(
+      order: FetchOrientation,
+      rowSetSize: Int): TFetchResultsResp = {
     val col = TColumn.stringVal(new TStringColumn(Seq(opType).asJava, 
ByteBuffer.allocate(0)))
     val tRowSet = ThriftUtils.newEmptyRowSet
     tRowSet.addToColumns(col)
-    tRowSet
+    val resp = new TFetchResultsResp(OK_STATUS)
+    resp.setResults(tRowSet)
+    resp.setHasMoreRows(false)
+    resp
   }
 
   override def shouldRunAsync: Boolean = false
diff --git 
a/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/NoopOperationManager.scala
 
b/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/NoopOperationManager.scala
index 455e5d4d2..352aae905 100644
--- 
a/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/NoopOperationManager.scala
+++ 
b/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/NoopOperationManager.scala
@@ -20,7 +20,7 @@ package org.apache.kyuubi.operation
 import java.nio.ByteBuffer
 import java.util
 
-import org.apache.hive.service.rpc.thrift.{TColumn, TRow, TRowSet, 
TStringColumn}
+import org.apache.hive.service.rpc.thrift.{TColumn, TFetchResultsResp, TRow, 
TRowSet, TStatus, TStatusCode, TStringColumn}
 
 import org.apache.kyuubi.operation.FetchOrientation.FetchOrientation
 import org.apache.kyuubi.session.Session
@@ -136,13 +136,16 @@ class NoopOperationManager extends 
OperationManager("noop") {
   override def getOperationLogRowSet(
       opHandle: OperationHandle,
       order: FetchOrientation,
-      maxRows: Int): TRowSet = {
+      maxRows: Int): TFetchResultsResp = {
     val logs = new util.ArrayList[String]()
     logs.add("test")
     val tColumn = TColumn.stringVal(new TStringColumn(logs, 
ByteBuffer.allocate(0)))
     val tRow = new TRowSet(0, new util.ArrayList[TRow](logs.size()))
     tRow.addToColumns(tColumn)
-    tRow
+    val resp = new TFetchResultsResp(new TStatus(TStatusCode.SUCCESS_STATUS))
+    resp.setResults(tRow)
+    resp.setHasMoreRows(false)
+    resp
   }
 
   override def getQueryId(operation: Operation): String = {
diff --git 
a/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/KyuubiQueryResultSet.java
 
b/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/KyuubiQueryResultSet.java
index 82ea74a01..242ec7720 100644
--- 
a/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/KyuubiQueryResultSet.java
+++ 
b/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/KyuubiQueryResultSet.java
@@ -26,6 +26,7 @@ import org.apache.hive.service.rpc.thrift.*;
 import org.apache.kyuubi.jdbc.hive.cli.RowSet;
 import org.apache.kyuubi.jdbc.hive.cli.RowSetFactory;
 import org.apache.kyuubi.jdbc.hive.common.HiveDecimal;
+import org.apache.thrift.TException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -47,6 +48,7 @@ public class KyuubiQueryResultSet extends KyuubiBaseResultSet 
{
   private boolean emptyResultSet = false;
   private boolean isScrollable = false;
   private boolean fetchFirst = false;
+  private boolean hasMoreToFetch = false;
 
   private final TProtocolVersion protocol;
 
@@ -317,25 +319,20 @@ public class KyuubiQueryResultSet extends 
KyuubiBaseResultSet {
     try {
       TFetchOrientation orientation = TFetchOrientation.FETCH_NEXT;
       if (fetchFirst) {
-        // If we are asked to start from begining, clear the current fetched 
resultset
+        // If we are asked to start from beginning, clear the current fetched 
resultset
         orientation = TFetchOrientation.FETCH_FIRST;
         fetchedRows = null;
         fetchedRowsItr = null;
         fetchFirst = false;
       }
       if (fetchedRows == null || !fetchedRowsItr.hasNext()) {
-        TFetchResultsReq fetchReq = new TFetchResultsReq(stmtHandle, 
orientation, fetchSize);
-        TFetchResultsResp fetchResp;
-        fetchResp = client.FetchResults(fetchReq);
-        Utils.verifySuccessWithInfo(fetchResp.getStatus());
-
-        TRowSet results = fetchResp.getResults();
-        fetchedRows = RowSetFactory.create(results, protocol);
-        fetchedRowsItr = fetchedRows.iterator();
+        fetchResult(orientation);
       }
 
       if (fetchedRowsItr.hasNext()) {
         row = fetchedRowsItr.next();
+      } else if (hasMoreToFetch) {
+        fetchResult(orientation);
       } else {
         return false;
       }
@@ -350,6 +347,18 @@ public class KyuubiQueryResultSet extends 
KyuubiBaseResultSet {
     return true;
   }
 
+  private void fetchResult(TFetchOrientation orientation) throws SQLException, 
TException {
+    TFetchResultsReq fetchReq = new TFetchResultsReq(stmtHandle, orientation, 
fetchSize);
+    TFetchResultsResp fetchResp;
+    fetchResp = client.FetchResults(fetchReq);
+    Utils.verifySuccessWithInfo(fetchResp.getStatus());
+    hasMoreToFetch = fetchResp.isSetHasMoreRows() && fetchResp.isHasMoreRows();
+
+    TRowSet results = fetchResp.getResults();
+    fetchedRows = RowSetFactory.create(results, protocol);
+    fetchedRowsItr = fetchedRows.iterator();
+  }
+
   @Override
   public ResultSetMetaData getMetaData() throws SQLException {
     if (isClosed) {
diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/ExecutedCommandExec.scala
 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/ExecutedCommandExec.scala
index 93379da4b..70b727e5e 100644
--- 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/ExecutedCommandExec.scala
+++ 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/ExecutedCommandExec.scala
@@ -17,7 +17,7 @@
 
 package org.apache.kyuubi.operation
 
-import org.apache.hive.service.rpc.thrift.{TGetResultSetMetadataResp, TRowSet}
+import org.apache.hive.service.rpc.thrift.{TFetchResultsResp, 
TGetResultSetMetadataResp}
 
 import org.apache.kyuubi.operation.FetchOrientation.FetchOrientation
 import org.apache.kyuubi.operation.log.OperationLog
@@ -67,11 +67,17 @@ class ExecutedCommandExec(
     if (!shouldRunAsync) getBackgroundHandle.get()
   }
 
-  override def getNextRowSetInternal(order: FetchOrientation, rowSetSize: 
Int): TRowSet = {
+  override def getNextRowSetInternal(
+      order: FetchOrientation,
+      rowSetSize: Int): TFetchResultsResp = {
     validateDefaultFetchOrientation(order)
     assertState(OperationState.FINISHED)
     setHasResultSet(true)
-    command.getNextRowSet(order, rowSetSize, getProtocolVersion)
+    val rowSet = command.getNextRowSet(order, rowSetSize, getProtocolVersion)
+    val resp = new TFetchResultsResp(OK_STATUS)
+    resp.setResults(rowSet)
+    resp.setHasMoreRows(false)
+    resp
   }
 
   override def getResultSetMetadata: TGetResultSetMetadataResp = {
diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/KyuubiApplicationOperation.scala
 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/KyuubiApplicationOperation.scala
index 54cfdfe93..93929c59c 100644
--- 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/KyuubiApplicationOperation.scala
+++ 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/KyuubiApplicationOperation.scala
@@ -22,7 +22,7 @@ import java.util.{ArrayList => JArrayList}
 
 import scala.collection.JavaConverters._
 
-import org.apache.hive.service.rpc.thrift.{TColumn, TColumnDesc, 
TGetResultSetMetadataResp, TPrimitiveTypeEntry, TRow, TRowSet, TStringColumn, 
TTableSchema, TTypeDesc, TTypeEntry, TTypeId}
+import org.apache.hive.service.rpc.thrift.{TColumn, TColumnDesc, 
TFetchResultsResp, TGetResultSetMetadataResp, TPrimitiveTypeEntry, TRow, 
TRowSet, TStringColumn, TTableSchema, TTypeDesc, TTypeEntry, TTypeId}
 
 import org.apache.kyuubi.engine.ApplicationInfo
 import org.apache.kyuubi.operation.FetchOrientation.FetchOrientation
@@ -54,8 +54,11 @@ abstract class KyuubiApplicationOperation(session: Session) 
extends KyuubiOperat
     resp
   }
 
-  override def getNextRowSetInternal(order: FetchOrientation, rowSetSize: 
Int): TRowSet = {
-    applicationInfoMap.map { state =>
+  override def getNextRowSetInternal(
+      order: FetchOrientation,
+      rowSetSize: Int): TFetchResultsResp = {
+    val resp = new TFetchResultsResp(OK_STATUS)
+    val rowSet = applicationInfoMap.map { state =>
       val tRow = new TRowSet(0, new JArrayList[TRow](state.size))
       Seq(state.keys, 
state.values.map(Option(_).getOrElse(""))).map(_.toSeq.asJava).foreach {
         col =>
@@ -64,5 +67,8 @@ abstract class KyuubiApplicationOperation(session: Session) 
extends KyuubiOperat
       }
       tRow
     }.getOrElse(ThriftUtils.EMPTY_ROW_SET)
+    resp.setResults(rowSet)
+    resp.setHasMoreRows(false)
+    resp
   }
 }
diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/KyuubiOperation.scala
 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/KyuubiOperation.scala
index e357b7912..b85fa8b02 100644
--- 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/KyuubiOperation.scala
+++ 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/KyuubiOperation.scala
@@ -179,11 +179,17 @@ abstract class KyuubiOperation(session: Session) extends 
AbstractOperation(sessi
     }
   }
 
-  override def getNextRowSetInternal(order: FetchOrientation, rowSetSize: 
Int): TRowSet = {
+  override def getNextRowSetInternal(
+      order: FetchOrientation,
+      rowSetSize: Int): TFetchResultsResp = {
     validateDefaultFetchOrientation(order)
     assertState(OperationState.FINISHED)
     setHasResultSet(true)
-    client.fetchResults(_remoteOpHandle, order, rowSetSize, fetchLog = false)
+    val rowset = client.fetchResults(_remoteOpHandle, order, rowSetSize, 
fetchLog = false)
+    val resp = new TFetchResultsResp(OK_STATUS)
+    resp.setResults(rowset)
+    resp.setHasMoreRows(false)
+    resp
   }
 
   override def shouldRunAsync: Boolean = false
diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/KyuubiOperationManager.scala
 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/KyuubiOperationManager.scala
index 3078401b0..8ae9c91f8 100644
--- 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/KyuubiOperationManager.scala
+++ 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/KyuubiOperationManager.scala
@@ -19,7 +19,7 @@ package org.apache.kyuubi.operation
 
 import java.util.concurrent.TimeUnit
 
-import org.apache.hive.service.rpc.thrift.TRowSet
+import org.apache.hive.service.rpc.thrift.{TFetchResultsResp, TStatus, 
TStatusCode}
 
 import org.apache.kyuubi.KyuubiSQLException
 import org.apache.kyuubi.config.KyuubiConf
@@ -214,11 +214,11 @@ class KyuubiOperationManager private (name: String) 
extends OperationManager(nam
   override def getOperationLogRowSet(
       opHandle: OperationHandle,
       order: FetchOrientation,
-      maxRows: Int): TRowSet = {
-
+      maxRows: Int): TFetchResultsResp = {
+    val resp = new TFetchResultsResp(new TStatus(TStatusCode.SUCCESS_STATUS))
     val operation = getOperation(opHandle).asInstanceOf[KyuubiOperation]
     val operationLog = operation.getOperationLog
-    operationLog match {
+    val rowSet = operationLog match {
       case Some(log) => log.read(order, maxRows)
       case None =>
         val remoteHandle = operation.remoteOpHandle()
@@ -229,6 +229,9 @@ class KyuubiOperationManager private (name: String) extends 
OperationManager(nam
           ThriftUtils.EMPTY_ROW_SET
         }
     }
+    resp.setResults(rowSet)
+    resp.setHasMoreRows(false)
+    resp
   }
 
   override def start(): Unit = synchronized {
diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/BackendServiceMetric.scala
 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/BackendServiceMetric.scala
index 3a92a5ad0..9da4b78c0 100644
--- 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/BackendServiceMetric.scala
+++ 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/BackendServiceMetric.scala
@@ -183,9 +183,10 @@ trait BackendServiceMetric extends BackendService {
       operationHandle: OperationHandle,
       orientation: FetchOrientation,
       maxRows: Int,
-      fetchLog: Boolean): TRowSet = {
+      fetchLog: Boolean): TFetchResultsResp = {
     MetricsSystem.timerTracing(MetricsConstants.BS_FETCH_RESULTS) {
-      val rowSet = super.fetchResults(operationHandle, orientation, maxRows, 
fetchLog)
+      val fetchResultsResp = super.fetchResults(operationHandle, orientation, 
maxRows, fetchLog)
+      val rowSet = fetchResultsResp.getResults
       // TODO: the statistics are wrong when we enabled the arrow.
       val rowsSize =
         if (rowSet.getColumnsSize > 0) {
@@ -217,7 +218,7 @@ trait BackendServiceMetric extends BackendService {
         operation.increaseFetchResultsCount(rowsSize)
       }
 
-      rowSet
+      fetchResultsResp
     }
   }
 
diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/OperationsResource.scala
 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/OperationsResource.scala
index ae5feea52..fdde5bbc5 100644
--- 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/OperationsResource.scala
+++ 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/OperationsResource.scala
@@ -145,10 +145,11 @@ private[v1] class OperationsResource extends 
ApiRequestContext with Logging {
       if (fetchOrientation != "FETCH_NEXT" && fetchOrientation != 
"FETCH_FIRST") {
         throw new BadRequestException(s"$fetchOrientation in operation log is 
not supported")
       }
-      val rowSet = fe.be.sessionManager.operationManager.getOperationLogRowSet(
+      val fetchResultsResp = 
fe.be.sessionManager.operationManager.getOperationLogRowSet(
         OperationHandle(operationHandleStr),
         FetchOrientation.withName(fetchOrientation),
         maxRows)
+      val rowSet = fetchResultsResp.getResults
       val logRowSet = rowSet.getColumns.get(0).getStringVal.getValues.asScala
       new OperationLog(logRowSet.asJava, logRowSet.size)
     } catch {
@@ -175,11 +176,12 @@ private[v1] class OperationsResource extends 
ApiRequestContext with Logging {
       @QueryParam("fetchorientation") @DefaultValue("FETCH_NEXT")
       fetchOrientation: String): ResultRowSet = {
     try {
-      val rowSet = fe.be.fetchResults(
+      val fetchResultsResp = fe.be.fetchResults(
         OperationHandle(operationHandleStr),
         FetchOrientation.withName(fetchOrientation),
         maxRows,
         fetchLog = false)
+      val rowSet = fetchResultsResp.getResults
       val rows = rowSet.getRows.asScala.map(i => {
         new Row(i.getColVals.asScala.map(i => {
           new Field(
diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/mysql/MySQLCommandHandler.scala
 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/mysql/MySQLCommandHandler.scala
index 2f574d904..5f7a07f58 100644
--- 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/mysql/MySQLCommandHandler.scala
+++ 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/mysql/MySQLCommandHandler.scala
@@ -196,11 +196,12 @@ class MySQLCommandHandler(
           .getOrElse(KyuubiSQLException(s"Error operator state 
${opStatus.state}"))
       }
       val resultSetMetadata = be.getResultSetMetadata(opHandle)
-      val rowSet = be.fetchResults(
+      val fetchResultResp = be.fetchResults(
         opHandle,
         FetchOrientation.FETCH_NEXT,
         Int.MaxValue,
         fetchLog = false)
+      val rowSet = fetchResultResp.getResults
       MySQLQueryResult(resultSetMetadata.getSchema, rowSet)
     } catch {
       case rethrow: Exception =>
diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/trino/api/Query.scala 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/trino/api/Query.scala
index 4e768b04a..dc9de4ae2 100644
--- 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/trino/api/Query.scala
+++ 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/trino/api/Query.scala
@@ -69,7 +69,7 @@ case class Query(
           queryId.operationHandle,
           defaultFetchOrientation,
           defaultMaxRows,
-          false)
+          false).getResults
         TrinoContext.createQueryResults(
           queryId.getQueryId,
           nextUri,
diff --git 
a/kyuubi-server/src/test/scala/org/apache/kyuubi/WithKyuubiServerOnYarn.scala 
b/kyuubi-server/src/test/scala/org/apache/kyuubi/WithKyuubiServerOnYarn.scala
index 68d95dc80..1826760db 100644
--- 
a/kyuubi-server/src/test/scala/org/apache/kyuubi/WithKyuubiServerOnYarn.scala
+++ 
b/kyuubi-server/src/test/scala/org/apache/kyuubi/WithKyuubiServerOnYarn.scala
@@ -151,7 +151,7 @@ class KyuubiOperationYarnClusterSuite extends 
WithKyuubiServerOnYarn with HiveJD
     }
 
     val resultColumns = 
batchJobSubmissionOp.getNextRowSet(FetchOrientation.FETCH_NEXT, 10)
-      .getColumns.asScala
+      .getResults.getColumns.asScala
 
     val keys = resultColumns.head.getStringVal.getValues.asScala
     val values = resultColumns.apply(1).getStringVal.getValues.asScala
diff --git 
a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/trino/api/TrinoContextSuite.scala
 
b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/trino/api/TrinoContextSuite.scala
index 87c8eda96..6c5a01e46 100644
--- 
a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/trino/api/TrinoContextSuite.scala
+++ 
b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/trino/api/TrinoContextSuite.scala
@@ -84,7 +84,7 @@ class TrinoContextSuite extends KyuubiFunSuite with 
RestFrontendTestHelper {
     checkOpState(opHandleStr, FINISHED)
 
     val metadataResp = fe.be.getResultSetMetadata(opHandle)
-    val tRowSet = fe.be.fetchResults(opHandle, FetchOrientation.FETCH_NEXT, 
1000, false)
+    val tRowSet = fe.be.fetchResults(opHandle, FetchOrientation.FETCH_NEXT, 
1000, false).getResults
     val status = fe.be.getOperationStatus(opHandle, Some(0))
 
     val uri = new URI("sfdsfsdfdsf")
@@ -111,7 +111,7 @@ class TrinoContextSuite extends KyuubiFunSuite with 
RestFrontendTestHelper {
     checkOpState(opHandleStr, FINISHED)
 
     val metadataResp = fe.be.getResultSetMetadata(opHandle)
-    val tRowSet = fe.be.fetchResults(opHandle, FetchOrientation.FETCH_NEXT, 
1000, false)
+    val tRowSet = fe.be.fetchResults(opHandle, FetchOrientation.FETCH_NEXT, 
1000, false).getResults
     val status = fe.be.getOperationStatus(opHandle, Some(0))
 
     val uri = new URI("sfdsfsdfdsf")


Reply via email to