This is an automated email from the ASF dual-hosted git repository.

paullin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git


The following commit(s) were added to refs/heads/master by this push:
     new e9ca8272b [KYUUBI #4806][FLINK] Support time-out incremental result 
fetch for Flink engine
e9ca8272b is described below

commit e9ca8272b07e9cc48292c21cb6b035a9381b2c93
Author: Paul Lin <[email protected]>
AuthorDate: Thu Aug 24 11:58:08 2023 +0800

    [KYUUBI #4806][FLINK] Support time-out incremental result fetch for Flink 
engine
    
    ### _Why are the changes needed?_
    As titled.
    
    ### _How was this patch tested?_
    - [x] Add some test cases that check the changes thoroughly including 
negative and positive cases if possible
    
    - [ ] Add screenshots for manual tests if appropriate
    
    - [x] [Run 
test](https://kyuubi.readthedocs.io/en/master/contributing/code/testing.html#running-tests)
 locally before make a pull request
    
    Closes #5134 from link3280/KYUUBI-4806.
    
    Closes #4806
    
    a1b74783c [Paul Lin] Optimize code style
    546cfdf5b [Paul Lin] Update 
externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperation.scala
    b6eb7af4f [Paul Lin] Update 
externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/result/ResultSet.scala
    1563fa98b [Paul Lin] Remove explicit StartRowOffset for Flink
    4e61a348c [Paul Lin] Add comments
    c93294650 [Paul Lin] Improve code style
    6bd0c8e69 [Paul Lin] Use dedicated thread pool
    15412db3a [Paul Lin] Improve logging
    d6a2a9cff [Paul Lin] [KYUUBI #4806][FLINK] Implement incremental result 
fetching
    
    Authored-by: Paul Lin <[email protected]>
    Signed-off-by: Paul Lin <[email protected]>
---
 docs/configuration/settings.md                     |   1 +
 .../engine/flink/operation/ExecuteStatement.scala  |  11 +-
 .../engine/flink/operation/FlinkOperation.scala    |  34 +++-
 .../flink/operation/FlinkSQLOperationManager.scala |  23 ++-
 .../engine/flink/operation/PlanOnlyStatement.scala |  11 +-
 .../flink/result/QueryResultFetchIterator.scala    | 176 +++++++++++++++++++++
 .../kyuubi/engine/flink/result/ResultSet.scala     |   7 +
 .../kyuubi/engine/flink/result/ResultSetUtil.scala |  72 ++-------
 .../flink/session/FlinkSQLSessionManager.scala     |   5 +-
 .../flink/operation/FlinkOperationSuite.scala      |  25 ++-
 .../it/flink/operation/FlinkOperationSuite.scala   |   4 +-
 .../operation/FlinkOperationSuiteOnYarn.scala      |   4 +-
 .../org/apache/kyuubi/config/KyuubiConf.scala      |   9 ++
 13 files changed, 292 insertions(+), 90 deletions(-)

diff --git a/docs/configuration/settings.md b/docs/configuration/settings.md
index 928ff0ab8..ddb8546c2 100644
--- a/docs/configuration/settings.md
+++ b/docs/configuration/settings.md
@@ -411,6 +411,7 @@ You can configure the Kyuubi properties in 
`$KYUUBI_HOME/conf/kyuubi-defaults.co
 | kyuubi.session.engine.alive.probe.interval           | PT10S                 
  | The interval for engine alive probe.                                        
                                                                                
                                                                                
                                                                                
                                                                                
              [...]
 | kyuubi.session.engine.alive.timeout                  | PT2M                  
  | The timeout for engine alive. If there is no alive probe success in the 
last timeout window, the engine will be marked as no-alive.                     
                                                                                
                                                                                
                                                                                
                  [...]
 | kyuubi.session.engine.check.interval                 | PT1M                  
  | The check interval for engine timeout                                       
                                                                                
                                                                                
                                                                                
                                                                                
              [...]
+| kyuubi.session.engine.flink.fetch.timeout            | &lt;undefined&gt;     
  | Result fetch timeout for Flink engine. If the timeout is reached, the 
result fetch would be stopped and the current fetched would be returned. If no 
data are fetched, a TimeoutException would be thrown.                           
                                                                                
                                                                                
                     [...]
 | kyuubi.session.engine.flink.main.resource            | &lt;undefined&gt;     
  | The package used to create Flink SQL engine remote job. If it is undefined, 
Kyuubi will use the default                                                     
                                                                                
                                                                                
                                                                                
              [...]
 | kyuubi.session.engine.flink.max.rows                 | 1000000               
  | Max rows of Flink query results. For batch queries, rows exceeding the 
limit would be ignored. For streaming queries, the query would be canceled if 
the limit is reached.                                                           
                                                                                
                                                                                
                     [...]
 | kyuubi.session.engine.hive.main.resource             | &lt;undefined&gt;     
  | The package used to create Hive engine remote job. If it is undefined, 
Kyuubi will use the default                                                     
                                                                                
                                                                                
                                                                                
                   [...]
diff --git 
a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/ExecuteStatement.scala
 
b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/ExecuteStatement.scala
index 4042756b6..0e0c476e2 100644
--- 
a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/ExecuteStatement.scala
+++ 
b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/ExecuteStatement.scala
@@ -17,6 +17,8 @@
 
 package org.apache.kyuubi.engine.flink.operation
 
+import scala.concurrent.duration.Duration
+
 import org.apache.flink.api.common.JobID
 import org.apache.flink.table.gateway.api.operation.OperationHandle
 
@@ -32,7 +34,8 @@ class ExecuteStatement(
     override val statement: String,
     override val shouldRunAsync: Boolean,
     queryTimeout: Long,
-    resultMaxRows: Int)
+    resultMaxRows: Int,
+    resultFetchTimeout: Duration)
   extends FlinkOperation(session) with Logging {
 
   private val operationLog: OperationLog =
@@ -48,10 +51,6 @@ class ExecuteStatement(
     setHasResultSet(true)
   }
 
-  override protected def afterRun(): Unit = {
-    OperationLog.removeCurrentOperationLog()
-  }
-
   override protected def runInternal(): Unit = {
     addTimeoutMonitor(queryTimeout)
     executeStatement()
@@ -64,7 +63,7 @@ class ExecuteStatement(
         new OperationHandle(getHandle.identifier),
         statement)
       jobId = FlinkEngineUtils.getResultJobId(resultFetcher)
-      resultSet = ResultSetUtil.fromResultFetcher(resultFetcher, resultMaxRows)
+      resultSet = ResultSetUtil.fromResultFetcher(resultFetcher, 
resultMaxRows, resultFetchTimeout)
       setState(OperationState.FINISHED)
     } catch {
       onError(cancel = true)
diff --git 
a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperation.scala
 
b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperation.scala
index 5a79d2c0e..1424b721c 100644
--- 
a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperation.scala
+++ 
b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperation.scala
@@ -19,12 +19,15 @@ package org.apache.kyuubi.engine.flink.operation
 
 import java.io.IOException
 import java.time.ZoneId
+import java.util.concurrent.TimeoutException
 
 import scala.collection.JavaConverters.collectionAsScalaIterableConverter
+import scala.collection.mutable.ListBuffer
 
 import org.apache.flink.configuration.Configuration
 import org.apache.flink.table.gateway.service.context.SessionContext
 import org.apache.flink.table.gateway.service.operation.OperationExecutor
+import org.apache.flink.types.Row
 import org.apache.hive.service.rpc.thrift.{TFetchResultsResp, 
TGetResultSetMetadataResp, TTableSchema}
 
 import org.apache.kyuubi.{KyuubiSQLException, Utils}
@@ -72,6 +75,10 @@ abstract class FlinkOperation(session: Session) extends 
AbstractOperation(sessio
 
   override def close(): Unit = {
     cleanup(OperationState.CLOSED)
+    //  the result set may be null if the operation ends exceptionally
+    if (resultSet != null) {
+      resultSet.close
+    }
     try {
       getOperationLog.foreach(_.close())
     } catch {
@@ -98,25 +105,42 @@ abstract class FlinkOperation(session: Session) extends 
AbstractOperation(sessio
     assertState(OperationState.FINISHED)
     setHasResultSet(true)
     order match {
-      case FETCH_NEXT => resultSet.getData.fetchNext()
       case FETCH_PRIOR => resultSet.getData.fetchPrior(rowSetSize);
       case FETCH_FIRST => resultSet.getData.fetchAbsolute(0);
+      case FETCH_NEXT => // ignored because new data are fetched lazily
+    }
+    val batch = new ListBuffer[Row]
+    try {
+      // there could be null values at the end of the batch
+      // because Flink could return an EOS
+      var rows = 0
+      while (resultSet.getData.hasNext && rows < rowSetSize) {
+        Option(resultSet.getData.next()).foreach { r => batch += r; rows += 1 }
+      }
+    } catch {
+      case e: TimeoutException =>
+        // ignore and return the current batch if there's some data
+        // otherwise, rethrow the timeout exception
+        if (batch.nonEmpty) {
+          debug(s"Timeout fetching more data for $opType operation. " +
+            s"Returning the current fetched data.")
+        } else {
+          throw e
+        }
     }
-    val token = resultSet.getData.take(rowSetSize)
     val timeZone = 
Option(flinkSession.getSessionConfig.get("table.local-time-zone"))
     val zoneId = timeZone match {
       case Some(tz) => ZoneId.of(tz)
       case None => ZoneId.systemDefault()
     }
     val resultRowSet = RowSet.resultSetToTRowSet(
-      token.toList,
+      batch.toList,
       resultSet,
       zoneId,
       getProtocolVersion)
-    resultRowSet.setStartRowOffset(resultSet.getData.getPosition)
     val resp = new TFetchResultsResp(OK_STATUS)
     resp.setResults(resultRowSet)
-    resp.setHasMoreRows(false)
+    resp.setHasMoreRows(resultSet.getData.hasNext)
     resp
   }
 
diff --git 
a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/FlinkSQLOperationManager.scala
 
b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/FlinkSQLOperationManager.scala
index 712c13596..d5c0629ee 100644
--- 
a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/FlinkSQLOperationManager.scala
+++ 
b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/FlinkSQLOperationManager.scala
@@ -20,6 +20,8 @@ package org.apache.kyuubi.engine.flink.operation
 import java.util
 
 import scala.collection.JavaConverters._
+import scala.concurrent.duration.{Duration, DurationLong}
+import scala.language.postfixOps
 
 import org.apache.kyuubi.KyuubiSQLException
 import org.apache.kyuubi.config.KyuubiConf._
@@ -66,14 +68,31 @@ class FlinkSQLOperationManager extends 
OperationManager("FlinkSQLOperationManage
       flinkSession.normalizedConf.getOrElse(
         ENGINE_FLINK_MAX_ROWS.key,
         resultMaxRowsDefault.toString).toInt
+
+    val resultFetchTimeout =
+      
flinkSession.normalizedConf.get(ENGINE_FLINK_FETCH_TIMEOUT.key).map(_.toLong 
milliseconds)
+        .getOrElse(Duration.Inf)
+
     val op = mode match {
       case NoneMode =>
         // FLINK-24427 seals calcite classes which required to access in async 
mode, considering
         // there is no much benefit in async mode, here we just ignore 
`runAsync` and always run
         // statement in sync mode as a workaround
-        new ExecuteStatement(session, statement, false, queryTimeout, 
resultMaxRows)
+        new ExecuteStatement(
+          session,
+          statement,
+          false,
+          queryTimeout,
+          resultMaxRows,
+          resultFetchTimeout)
       case mode =>
-        new PlanOnlyStatement(session, statement, mode)
+        new PlanOnlyStatement(
+          session,
+          statement,
+          mode,
+          queryTimeout,
+          resultMaxRows,
+          resultFetchTimeout)
     }
     addOperation(op)
   }
diff --git 
a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/PlanOnlyStatement.scala
 
b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/PlanOnlyStatement.scala
index 4f5d8218f..1284bfd73 100644
--- 
a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/PlanOnlyStatement.scala
+++ 
b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/PlanOnlyStatement.scala
@@ -17,6 +17,8 @@
 
 package org.apache.kyuubi.engine.flink.operation
 
+import scala.concurrent.duration.Duration
+
 import com.google.common.base.Preconditions
 import org.apache.flink.table.api.TableEnvironment
 import org.apache.flink.table.gateway.api.operation.OperationHandle
@@ -34,7 +36,10 @@ import org.apache.kyuubi.session.Session
 class PlanOnlyStatement(
     session: Session,
     override val statement: String,
-    mode: PlanOnlyMode) extends FlinkOperation(session) {
+    mode: PlanOnlyMode,
+    queryTimeout: Long,
+    resultMaxRows: Int,
+    resultFetchTimeout: Duration) extends FlinkOperation(session) {
 
   private val operationLog: OperationLog = 
OperationLog.createOperationLog(session, getHandle)
   private val lineSeparator: String = System.lineSeparator()
@@ -46,6 +51,7 @@ class PlanOnlyStatement(
   }
 
   override protected def runInternal(): Unit = {
+    addTimeoutMonitor(queryTimeout)
     try {
       val operations = executor.getTableEnvironment.getParser.parse(statement)
       Preconditions.checkArgument(
@@ -59,7 +65,8 @@ class PlanOnlyStatement(
           val resultFetcher = executor.executeStatement(
             new OperationHandle(getHandle.identifier),
             statement)
-          resultSet = ResultSetUtil.fromResultFetcher(resultFetcher);
+          resultSet =
+            ResultSetUtil.fromResultFetcher(resultFetcher, resultMaxRows, 
resultFetchTimeout);
         case _ => explainOperation(statement)
       }
     } catch {
diff --git 
a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/result/QueryResultFetchIterator.scala
 
b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/result/QueryResultFetchIterator.scala
new file mode 100644
index 000000000..60ae08d9d
--- /dev/null
+++ 
b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/result/QueryResultFetchIterator.scala
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.engine.flink.result
+
+import java.util
+import java.util.concurrent.Executors
+
+import scala.collection.convert.ImplicitConversions._
+import scala.concurrent.{Await, ExecutionContext, ExecutionContextExecutor, 
Future}
+import scala.concurrent.duration.Duration
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder
+import org.apache.flink.table.api.DataTypes
+import org.apache.flink.table.catalog.ResolvedSchema
+import org.apache.flink.table.data.RowData
+import org.apache.flink.table.data.conversion.DataStructureConverters
+import org.apache.flink.table.gateway.service.result.ResultFetcher
+import org.apache.flink.table.types.DataType
+import org.apache.flink.types.Row
+
+import org.apache.kyuubi.Logging
+import org.apache.kyuubi.engine.flink.shim.FlinkResultSet
+import org.apache.kyuubi.operation.FetchIterator
+
+class QueryResultFetchIterator(
+    resultFetcher: ResultFetcher,
+    maxRows: Int = 1000000,
+    resultFetchTimeout: Duration = Duration.Inf) extends FetchIterator[Row] 
with Logging {
+
+  val schema: ResolvedSchema = resultFetcher.getResultSchema
+
+  val dataTypes: util.List[DataType] = schema.getColumnDataTypes
+
+  var token: Long = 0
+
+  var pos: Long = 0
+
+  var fetchStart: Long = 0
+
+  var bufferedRows: Array[Row] = new Array[Row](0)
+
+  var hasNext: Boolean = true
+
+  val FETCH_INTERVAL_MS: Long = 1000
+
+  private val executor = Executors.newSingleThreadScheduledExecutor(
+    new 
ThreadFactoryBuilder().setNameFormat("flink-query-iterator-%d").setDaemon(true).build)
+
+  implicit private val executionContext: ExecutionContextExecutor =
+    ExecutionContext.fromExecutor(executor)
+
+  /**
+   * Begin a fetch block, forward from the current position.
+   *
+   * Throws TimeoutException if no data is fetched within the timeout.
+   */
+  override def fetchNext(): Unit = {
+    if (!hasNext) {
+      return
+    }
+    val future = Future(() -> {
+      var fetched = false
+      // if no timeout is set, this would block until some rows are fetched
+      debug(s"Fetching from result store with timeout $resultFetchTimeout ms")
+      while (!fetched && !Thread.interrupted()) {
+        val rs = resultFetcher.fetchResults(token, maxRows - 
bufferedRows.length)
+        val flinkRs = new FlinkResultSet(rs)
+        // TODO: replace string-based match when Flink 1.16 support is dropped
+        flinkRs.getResultType.name() match {
+          case "EOS" =>
+            debug("EOS received, no more data to fetch.")
+            fetched = true
+            hasNext = false
+          case "NOT_READY" =>
+            // if flink jobs are not ready, continue to retry
+            debug("Result not ready, retrying...")
+          case "PAYLOAD" =>
+            val fetchedData = flinkRs.getData
+            // if no data fetched, continue to retry
+            if (!fetchedData.isEmpty) {
+              debug(s"Fetched ${fetchedData.length} rows from result store.")
+              fetched = true
+              bufferedRows ++= fetchedData.map(rd => convertToRow(rd, 
dataTypes.toList))
+              fetchStart = pos
+            } else {
+              debug("No data fetched, retrying...")
+            }
+          case _ =>
+            throw new RuntimeException(s"Unexpected result type: 
${flinkRs.getResultType}")
+        }
+        if (hasNext) {
+          val nextToken = flinkRs.getNextToken
+          if (nextToken == null) {
+            hasNext = false
+          } else {
+            token = nextToken
+          }
+        }
+        Thread.sleep(FETCH_INTERVAL_MS)
+      }
+    })
+    Await.result(future, resultFetchTimeout)
+  }
+
+  /**
+   * Begin a fetch block, moving the iterator to the given position.
+   * Resets the fetch start offset.
+   *
+   * @param pos index to move a position of iterator.
+   */
+  override def fetchAbsolute(pos: Long): Unit = {
+    val effectivePos = Math.max(pos, 0)
+    if (effectivePos < bufferedRows.length) {
+      this.fetchStart = effectivePos
+      return
+    }
+    throw new IllegalArgumentException(s"Cannot skip to an unreachable 
position $effectivePos.")
+  }
+
+  override def getFetchStart: Long = fetchStart
+
+  override def getPosition: Long = pos
+
+  /**
+   * @return returns row if any and null if no more rows can be fetched.
+   */
+  override def next(): Row = {
+    if (pos < bufferedRows.length) {
+      debug(s"Fetching from buffered rows at pos $pos.")
+      val row = bufferedRows(pos.toInt)
+      pos += 1
+      if (pos >= maxRows) {
+        hasNext = false
+      }
+      row
+    } else {
+      // block until some rows are fetched or TimeoutException is thrown
+      fetchNext()
+      if (hasNext) {
+        val row = bufferedRows(pos.toInt)
+        pos += 1
+        if (pos >= maxRows) {
+          hasNext = false
+        }
+        row
+      } else {
+        null
+      }
+    }
+  }
+
+  def close(): Unit = {
+    resultFetcher.close()
+    executor.shutdown()
+  }
+
+  private[this] def convertToRow(r: RowData, dataTypes: List[DataType]): Row = 
{
+    val converter = 
DataStructureConverters.getConverter(DataTypes.ROW(dataTypes: _*))
+    converter.toExternal(r).asInstanceOf[Row]
+  }
+}
diff --git 
a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/result/ResultSet.scala
 
b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/result/ResultSet.scala
index 1e94042d0..b8d407297 100644
--- 
a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/result/ResultSet.scala
+++ 
b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/result/ResultSet.scala
@@ -50,6 +50,13 @@ case class ResultSet(
   def getColumns: util.List[Column] = columns
 
   def getData: FetchIterator[Row] = data
+
+  def close: Unit = {
+    data match {
+      case queryIte: QueryResultFetchIterator => queryIte.close()
+      case _ =>
+    }
+  }
 }
 
 /**
diff --git 
a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/result/ResultSetUtil.scala
 
b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/result/ResultSetUtil.scala
index c1169528c..8b722f1e5 100644
--- 
a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/result/ResultSetUtil.scala
+++ 
b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/result/ResultSetUtil.scala
@@ -17,25 +17,17 @@
 
 package org.apache.kyuubi.engine.flink.result
 
-import scala.collection.convert.ImplicitConversions._
-import scala.collection.mutable.ListBuffer
+import scala.concurrent.duration.Duration
 
 import org.apache.flink.table.api.DataTypes
 import org.apache.flink.table.api.ResultKind
 import org.apache.flink.table.catalog.Column
-import org.apache.flink.table.data.RowData
-import org.apache.flink.table.data.conversion.DataStructureConverters
 import org.apache.flink.table.gateway.service.result.ResultFetcher
-import org.apache.flink.table.types.DataType
 import org.apache.flink.types.Row
 
-import org.apache.kyuubi.engine.flink.shim.FlinkResultSet
-
 /** Utility object for building ResultSet. */
 object ResultSetUtil {
 
-  private val FETCH_ROWS_PER_SECOND = 1000
-
   /**
    * Build a ResultSet with a column name and a list of String values.
    *
@@ -66,63 +58,19 @@ object ResultSetUtil {
       .data(Array[Row](Row.of("OK")))
       .build
 
-  def fromResultFetcher(resultFetcher: ResultFetcher, maxRows: Int): ResultSet 
= {
+  def fromResultFetcher(
+      resultFetcher: ResultFetcher,
+      maxRows: Int,
+      resultFetchTimeout: Duration): ResultSet = {
+    if (maxRows <= 0) {
+      throw new IllegalArgumentException("maxRows should be positive")
+    }
     val schema = resultFetcher.getResultSchema
-    val resultRowData = ListBuffer.newBuilder[RowData]
-    var fetched: FlinkResultSet = null
-    var token: Long = 0
-    var rowNum: Int = 0
-    do {
-      fetched = new FlinkResultSet(resultFetcher.fetchResults(token, 
FETCH_ROWS_PER_SECOND))
-      val data = fetched.getData
-      val slice = data.slice(0, maxRows - rowNum)
-      resultRowData ++= slice
-      rowNum += slice.size
-      token = fetched.getNextToken
-      try Thread.sleep(1000L)
-      catch {
-        case _: InterruptedException => fetched.getNextToken == null
-      }
-    } while (
-      fetched.getNextToken != null &&
-        rowNum < maxRows &&
-        fetched.getResultType != 
org.apache.flink.table.gateway.api.results.ResultSet.ResultType.EOS
-    )
-    val dataTypes = resultFetcher.getResultSchema.getColumnDataTypes
+    val ite = new QueryResultFetchIterator(resultFetcher, maxRows, 
resultFetchTimeout)
     ResultSet.builder
       .resultKind(ResultKind.SUCCESS_WITH_CONTENT)
       .columns(schema.getColumns)
-      .data(resultRowData.result().map(rd => convertToRow(rd, 
dataTypes.toList)).toArray)
+      .data(ite)
       .build
   }
-
-  def fromResultFetcher(resultFetcher: ResultFetcher): ResultSet = {
-    val schema = resultFetcher.getResultSchema
-    val resultRowData = ListBuffer.newBuilder[RowData]
-    var fetched: FlinkResultSet = null
-    var token: Long = 0
-    do {
-      fetched = new FlinkResultSet(resultFetcher.fetchResults(token, 
FETCH_ROWS_PER_SECOND))
-      resultRowData ++= fetched.getData
-      token = fetched.getNextToken
-      try Thread.sleep(1000L)
-      catch {
-        case _: InterruptedException =>
-      }
-    } while (
-      fetched.getNextToken != null &&
-        fetched.getResultType != 
org.apache.flink.table.gateway.api.results.ResultSet.ResultType.EOS
-    )
-    val dataTypes = resultFetcher.getResultSchema.getColumnDataTypes
-    ResultSet.builder
-      .resultKind(ResultKind.SUCCESS_WITH_CONTENT)
-      .columns(schema.getColumns)
-      .data(resultRowData.result().map(rd => convertToRow(rd, 
dataTypes.toList)).toArray)
-      .build
-  }
-
-  private[this] def convertToRow(r: RowData, dataTypes: List[DataType]): Row = 
{
-    val converter = 
DataStructureConverters.getConverter(DataTypes.ROW(dataTypes: _*))
-    converter.toExternal(r).asInstanceOf[Row]
-  }
 }
diff --git 
a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/session/FlinkSQLSessionManager.scala
 
b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/session/FlinkSQLSessionManager.scala
index c7aa7c3c5..b7cd46217 100644
--- 
a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/session/FlinkSQLSessionManager.scala
+++ 
b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/session/FlinkSQLSessionManager.scala
@@ -77,9 +77,10 @@ class FlinkSQLSessionManager(engineContext: DefaultContext)
   }
 
   override def closeSession(sessionHandle: SessionHandle): Unit = {
+    val fSession = super.getSessionOption(sessionHandle)
+    fSession.foreach(s =>
+      
sessionManager.closeSession(s.asInstanceOf[FlinkSessionImpl].fSession.getSessionHandle))
     super.closeSession(sessionHandle)
-    sessionManager.closeSession(
-      new 
org.apache.flink.table.gateway.api.session.SessionHandle(sessionHandle.identifier))
   }
 
   override def stop(): Unit = synchronized {
diff --git 
a/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationSuite.scala
 
b/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationSuite.scala
index 35b59b661..8e7c35a95 100644
--- 
a/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationSuite.scala
+++ 
b/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationSuite.scala
@@ -34,7 +34,7 @@ import 
org.apache.kyuubi.engine.flink.FlinkEngineUtils.FLINK_RUNTIME_VERSION
 import org.apache.kyuubi.engine.flink.WithFlinkTestResources
 import org.apache.kyuubi.engine.flink.result.Constants
 import org.apache.kyuubi.engine.flink.util.TestUserClassLoaderJar
-import org.apache.kyuubi.jdbc.hive.KyuubiStatement
+import org.apache.kyuubi.jdbc.hive.{KyuubiSQLException, KyuubiStatement}
 import org.apache.kyuubi.jdbc.hive.common.TimestampTZ
 import org.apache.kyuubi.operation.HiveJDBCTestHelper
 import org.apache.kyuubi.operation.meta.ResultSetSchemaConstant._
@@ -676,12 +676,10 @@ abstract class FlinkOperationSuite extends 
HiveJDBCTestHelper with WithFlinkTest
           assert(stopResult1.next())
           assert(stopResult1.getString(1) === "OK")
 
-          val selectResult = statement.executeQuery("select * from tbl_a")
-          val jobId2 = statement.asInstanceOf[KyuubiStatement].getQueryId
-          assert(jobId2 !== null)
-          while (!selectResult.next()) {
-            Thread.sleep(1000L)
-          }
+          val insertResult2 = statement.executeQuery("insert into tbl_b select 
* from tbl_a")
+          assert(insertResult2.next())
+          val jobId2 = insertResult2.getString(1)
+
           val stopResult2 = statement.executeQuery(s"stop job '$jobId2' with 
savepoint")
           assert(stopResult2.getMetaData.getColumnName(1).equals("savepoint 
path"))
           assert(stopResult2.next())
@@ -1252,4 +1250,17 @@ abstract class FlinkOperationSuite extends 
HiveJDBCTestHelper with WithFlinkTest
       }
     }
   }
+
+  test("test result fetch timeout") {
+    val exception = intercept[KyuubiSQLException](
+      withSessionConf()(Map(ENGINE_FLINK_FETCH_TIMEOUT.key -> "60000"))() {
+        withJdbcStatement("tbl_a") { stmt =>
+          stmt.executeQuery("create table tbl_a (a int) " +
+            "with ('connector' = 'datagen', 'rows-per-second'='0')")
+          val resultSet = stmt.executeQuery("select * from tbl_a")
+          while (resultSet.next()) {}
+        }
+      })
+    assert(exception.getMessage === "Futures timed out after [60000 
milliseconds]")
+  }
 }
diff --git 
a/integration-tests/kyuubi-flink-it/src/test/scala/org/apache/kyuubi/it/flink/operation/FlinkOperationSuite.scala
 
b/integration-tests/kyuubi-flink-it/src/test/scala/org/apache/kyuubi/it/flink/operation/FlinkOperationSuite.scala
index 893e0020a..55476bfd0 100644
--- 
a/integration-tests/kyuubi-flink-it/src/test/scala/org/apache/kyuubi/it/flink/operation/FlinkOperationSuite.scala
+++ 
b/integration-tests/kyuubi-flink-it/src/test/scala/org/apache/kyuubi/it/flink/operation/FlinkOperationSuite.scala
@@ -31,7 +31,7 @@ class FlinkOperationSuite extends 
WithKyuubiServerAndFlinkMiniCluster
   override val conf: KyuubiConf = KyuubiConf()
     .set(s"$KYUUBI_ENGINE_ENV_PREFIX.$KYUUBI_HOME", kyuubiHome)
     .set(ENGINE_TYPE, "FLINK_SQL")
-    .set("flink.parallelism.default", "6")
+    .set("flink.parallelism.default", "2")
 
   override protected def jdbcUrl: String = getJdbcUrl
 
@@ -72,7 +72,7 @@ class FlinkOperationSuite extends 
WithKyuubiServerAndFlinkMiniCluster
       var success = false
       while (resultSet.next() && !success) {
         if (resultSet.getString(1) == "parallelism.default" &&
-          resultSet.getString(2) == "6") {
+          resultSet.getString(2) == "2") {
           success = true
         }
       }
diff --git 
a/integration-tests/kyuubi-flink-it/src/test/scala/org/apache/kyuubi/it/flink/operation/FlinkOperationSuiteOnYarn.scala
 
b/integration-tests/kyuubi-flink-it/src/test/scala/org/apache/kyuubi/it/flink/operation/FlinkOperationSuiteOnYarn.scala
index afa4dce8f..ee6b9bb98 100644
--- 
a/integration-tests/kyuubi-flink-it/src/test/scala/org/apache/kyuubi/it/flink/operation/FlinkOperationSuiteOnYarn.scala
+++ 
b/integration-tests/kyuubi-flink-it/src/test/scala/org/apache/kyuubi/it/flink/operation/FlinkOperationSuiteOnYarn.scala
@@ -40,7 +40,7 @@ class FlinkOperationSuiteOnYarn extends 
WithKyuubiServerAndYarnMiniCluster
       .set(s"$KYUUBI_ENGINE_ENV_PREFIX.$KYUUBI_HOME", kyuubiHome)
       .set(ENGINE_TYPE, "FLINK_SQL")
       .set("flink.execution.target", "yarn-application")
-      .set("flink.parallelism.default", "6")
+      .set("flink.parallelism.default", "2")
     super.beforeAll()
   }
 
@@ -81,7 +81,7 @@ class FlinkOperationSuiteOnYarn extends 
WithKyuubiServerAndYarnMiniCluster
       var success = false
       while (resultSet.next() && !success) {
         if (resultSet.getString(1) == "parallelism.default" &&
-          resultSet.getString(2) == "6") {
+          resultSet.getString(2) == "2") {
           success = true
         }
       }
diff --git 
a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala 
b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
index 515ffbc34..3f1c3b868 100644
--- a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
@@ -1319,6 +1319,15 @@ object KyuubiConf {
       .intConf
       .createWithDefault(1000000)
 
+  val ENGINE_FLINK_FETCH_TIMEOUT: OptionalConfigEntry[Long] =
+    buildConf("kyuubi.session.engine.flink.fetch.timeout")
+      .doc("Result fetch timeout for Flink engine. If the timeout is reached, 
the result " +
+        "fetch would be stopped and the current fetched would be returned. If 
no data are " +
+        "fetched, a TimeoutException would be thrown.")
+      .version("1.8.0")
+      .timeConf
+      .createOptional
+
   val ENGINE_TRINO_MAIN_RESOURCE: OptionalConfigEntry[String] =
     buildConf("kyuubi.session.engine.trino.main.resource")
       .doc("The package used to create Trino engine remote job. If it is 
undefined," +

Reply via email to