This is an automated email from the ASF dual-hosted git repository.
paullin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git
The following commit(s) were added to refs/heads/master by this push:
new e9ca8272b [KYUUBI #4806][FLINK] Support time-out incremental result
fetch for Flink engine
e9ca8272b is described below
commit e9ca8272b07e9cc48292c21cb6b035a9381b2c93
Author: Paul Lin <[email protected]>
AuthorDate: Thu Aug 24 11:58:08 2023 +0800
[KYUUBI #4806][FLINK] Support time-out incremental result fetch for Flink
engine
### _Why are the changes needed?_
As titled.
### _How was this patch tested?_
- [x] Add some test cases that check the changes thoroughly including
negative and positive cases if possible
- [ ] Add screenshots for manual tests if appropriate
- [x] [Run
test](https://kyuubi.readthedocs.io/en/master/contributing/code/testing.html#running-tests)
locally before make a pull request
Closes #5134 from link3280/KYUUBI-4806.
Closes #4806
a1b74783c [Paul Lin] Optimize code style
546cfdf5b [Paul Lin] Update
externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperation.scala
b6eb7af4f [Paul Lin] Update
externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/result/ResultSet.scala
1563fa98b [Paul Lin] Remove explicit StartRowOffset for Flink
4e61a348c [Paul Lin] Add comments
c93294650 [Paul Lin] Improve code style
6bd0c8e69 [Paul Lin] Use dedicated thread pool
15412db3a [Paul Lin] Improve logging
d6a2a9cff [Paul Lin] [KYUUBI #4806][FLINK] Implement incremental result
fetching
Authored-by: Paul Lin <[email protected]>
Signed-off-by: Paul Lin <[email protected]>
---
docs/configuration/settings.md | 1 +
.../engine/flink/operation/ExecuteStatement.scala | 11 +-
.../engine/flink/operation/FlinkOperation.scala | 34 +++-
.../flink/operation/FlinkSQLOperationManager.scala | 23 ++-
.../engine/flink/operation/PlanOnlyStatement.scala | 11 +-
.../flink/result/QueryResultFetchIterator.scala | 176 +++++++++++++++++++++
.../kyuubi/engine/flink/result/ResultSet.scala | 7 +
.../kyuubi/engine/flink/result/ResultSetUtil.scala | 72 ++-------
.../flink/session/FlinkSQLSessionManager.scala | 5 +-
.../flink/operation/FlinkOperationSuite.scala | 25 ++-
.../it/flink/operation/FlinkOperationSuite.scala | 4 +-
.../operation/FlinkOperationSuiteOnYarn.scala | 4 +-
.../org/apache/kyuubi/config/KyuubiConf.scala | 9 ++
13 files changed, 292 insertions(+), 90 deletions(-)
diff --git a/docs/configuration/settings.md b/docs/configuration/settings.md
index 928ff0ab8..ddb8546c2 100644
--- a/docs/configuration/settings.md
+++ b/docs/configuration/settings.md
@@ -411,6 +411,7 @@ You can configure the Kyuubi properties in
`$KYUUBI_HOME/conf/kyuubi-defaults.co
| kyuubi.session.engine.alive.probe.interval | PT10S
| The interval for engine alive probe.
[...]
| kyuubi.session.engine.alive.timeout | PT2M
| The timeout for engine alive. If there is no alive probe success in the
last timeout window, the engine will be marked as no-alive.
[...]
| kyuubi.session.engine.check.interval | PT1M
| The check interval for engine timeout
[...]
+| kyuubi.session.engine.flink.fetch.timeout | <undefined>
| Result fetch timeout for Flink engine. If the timeout is reached, the
result fetch would be stopped and the current fetched would be returned. If no
data are fetched, a TimeoutException would be thrown.
[...]
| kyuubi.session.engine.flink.main.resource | <undefined>
| The package used to create Flink SQL engine remote job. If it is undefined,
Kyuubi will use the default
[...]
| kyuubi.session.engine.flink.max.rows | 1000000
| Max rows of Flink query results. For batch queries, rows exceeding the
limit would be ignored. For streaming queries, the query would be canceled if
the limit is reached.
[...]
| kyuubi.session.engine.hive.main.resource | <undefined>
| The package used to create Hive engine remote job. If it is undefined,
Kyuubi will use the default
[...]
diff --git
a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/ExecuteStatement.scala
b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/ExecuteStatement.scala
index 4042756b6..0e0c476e2 100644
---
a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/ExecuteStatement.scala
+++
b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/ExecuteStatement.scala
@@ -17,6 +17,8 @@
package org.apache.kyuubi.engine.flink.operation
+import scala.concurrent.duration.Duration
+
import org.apache.flink.api.common.JobID
import org.apache.flink.table.gateway.api.operation.OperationHandle
@@ -32,7 +34,8 @@ class ExecuteStatement(
override val statement: String,
override val shouldRunAsync: Boolean,
queryTimeout: Long,
- resultMaxRows: Int)
+ resultMaxRows: Int,
+ resultFetchTimeout: Duration)
extends FlinkOperation(session) with Logging {
private val operationLog: OperationLog =
@@ -48,10 +51,6 @@ class ExecuteStatement(
setHasResultSet(true)
}
- override protected def afterRun(): Unit = {
- OperationLog.removeCurrentOperationLog()
- }
-
override protected def runInternal(): Unit = {
addTimeoutMonitor(queryTimeout)
executeStatement()
@@ -64,7 +63,7 @@ class ExecuteStatement(
new OperationHandle(getHandle.identifier),
statement)
jobId = FlinkEngineUtils.getResultJobId(resultFetcher)
- resultSet = ResultSetUtil.fromResultFetcher(resultFetcher, resultMaxRows)
+ resultSet = ResultSetUtil.fromResultFetcher(resultFetcher,
resultMaxRows, resultFetchTimeout)
setState(OperationState.FINISHED)
} catch {
onError(cancel = true)
diff --git
a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperation.scala
b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperation.scala
index 5a79d2c0e..1424b721c 100644
---
a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperation.scala
+++
b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperation.scala
@@ -19,12 +19,15 @@ package org.apache.kyuubi.engine.flink.operation
import java.io.IOException
import java.time.ZoneId
+import java.util.concurrent.TimeoutException
import scala.collection.JavaConverters.collectionAsScalaIterableConverter
+import scala.collection.mutable.ListBuffer
import org.apache.flink.configuration.Configuration
import org.apache.flink.table.gateway.service.context.SessionContext
import org.apache.flink.table.gateway.service.operation.OperationExecutor
+import org.apache.flink.types.Row
import org.apache.hive.service.rpc.thrift.{TFetchResultsResp,
TGetResultSetMetadataResp, TTableSchema}
import org.apache.kyuubi.{KyuubiSQLException, Utils}
@@ -72,6 +75,10 @@ abstract class FlinkOperation(session: Session) extends
AbstractOperation(sessio
override def close(): Unit = {
cleanup(OperationState.CLOSED)
+ // the result set may be null if the operation ends exceptionally
+ if (resultSet != null) {
+ resultSet.close
+ }
try {
getOperationLog.foreach(_.close())
} catch {
@@ -98,25 +105,42 @@ abstract class FlinkOperation(session: Session) extends
AbstractOperation(sessio
assertState(OperationState.FINISHED)
setHasResultSet(true)
order match {
- case FETCH_NEXT => resultSet.getData.fetchNext()
case FETCH_PRIOR => resultSet.getData.fetchPrior(rowSetSize);
case FETCH_FIRST => resultSet.getData.fetchAbsolute(0);
+ case FETCH_NEXT => // ignored because new data are fetched lazily
+ }
+ val batch = new ListBuffer[Row]
+ try {
+ // there could be null values at the end of the batch
+ // because Flink could return an EOS
+ var rows = 0
+ while (resultSet.getData.hasNext && rows < rowSetSize) {
+ Option(resultSet.getData.next()).foreach { r => batch += r; rows += 1 }
+ }
+ } catch {
+ case e: TimeoutException =>
+ // ignore and return the current batch if there's some data
+ // otherwise, rethrow the timeout exception
+ if (batch.nonEmpty) {
+ debug(s"Timeout fetching more data for $opType operation. " +
+ s"Returning the current fetched data.")
+ } else {
+ throw e
+ }
}
- val token = resultSet.getData.take(rowSetSize)
val timeZone =
Option(flinkSession.getSessionConfig.get("table.local-time-zone"))
val zoneId = timeZone match {
case Some(tz) => ZoneId.of(tz)
case None => ZoneId.systemDefault()
}
val resultRowSet = RowSet.resultSetToTRowSet(
- token.toList,
+ batch.toList,
resultSet,
zoneId,
getProtocolVersion)
- resultRowSet.setStartRowOffset(resultSet.getData.getPosition)
val resp = new TFetchResultsResp(OK_STATUS)
resp.setResults(resultRowSet)
- resp.setHasMoreRows(false)
+ resp.setHasMoreRows(resultSet.getData.hasNext)
resp
}
diff --git
a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/FlinkSQLOperationManager.scala
b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/FlinkSQLOperationManager.scala
index 712c13596..d5c0629ee 100644
---
a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/FlinkSQLOperationManager.scala
+++
b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/FlinkSQLOperationManager.scala
@@ -20,6 +20,8 @@ package org.apache.kyuubi.engine.flink.operation
import java.util
import scala.collection.JavaConverters._
+import scala.concurrent.duration.{Duration, DurationLong}
+import scala.language.postfixOps
import org.apache.kyuubi.KyuubiSQLException
import org.apache.kyuubi.config.KyuubiConf._
@@ -66,14 +68,31 @@ class FlinkSQLOperationManager extends
OperationManager("FlinkSQLOperationManage
flinkSession.normalizedConf.getOrElse(
ENGINE_FLINK_MAX_ROWS.key,
resultMaxRowsDefault.toString).toInt
+
+ val resultFetchTimeout =
+
flinkSession.normalizedConf.get(ENGINE_FLINK_FETCH_TIMEOUT.key).map(_.toLong
milliseconds)
+ .getOrElse(Duration.Inf)
+
val op = mode match {
case NoneMode =>
// FLINK-24427 seals calcite classes which required to access in async
mode, considering
// there is no much benefit in async mode, here we just ignore
`runAsync` and always run
// statement in sync mode as a workaround
- new ExecuteStatement(session, statement, false, queryTimeout,
resultMaxRows)
+ new ExecuteStatement(
+ session,
+ statement,
+ false,
+ queryTimeout,
+ resultMaxRows,
+ resultFetchTimeout)
case mode =>
- new PlanOnlyStatement(session, statement, mode)
+ new PlanOnlyStatement(
+ session,
+ statement,
+ mode,
+ queryTimeout,
+ resultMaxRows,
+ resultFetchTimeout)
}
addOperation(op)
}
diff --git
a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/PlanOnlyStatement.scala
b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/PlanOnlyStatement.scala
index 4f5d8218f..1284bfd73 100644
---
a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/PlanOnlyStatement.scala
+++
b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/PlanOnlyStatement.scala
@@ -17,6 +17,8 @@
package org.apache.kyuubi.engine.flink.operation
+import scala.concurrent.duration.Duration
+
import com.google.common.base.Preconditions
import org.apache.flink.table.api.TableEnvironment
import org.apache.flink.table.gateway.api.operation.OperationHandle
@@ -34,7 +36,10 @@ import org.apache.kyuubi.session.Session
class PlanOnlyStatement(
session: Session,
override val statement: String,
- mode: PlanOnlyMode) extends FlinkOperation(session) {
+ mode: PlanOnlyMode,
+ queryTimeout: Long,
+ resultMaxRows: Int,
+ resultFetchTimeout: Duration) extends FlinkOperation(session) {
private val operationLog: OperationLog =
OperationLog.createOperationLog(session, getHandle)
private val lineSeparator: String = System.lineSeparator()
@@ -46,6 +51,7 @@ class PlanOnlyStatement(
}
override protected def runInternal(): Unit = {
+ addTimeoutMonitor(queryTimeout)
try {
val operations = executor.getTableEnvironment.getParser.parse(statement)
Preconditions.checkArgument(
@@ -59,7 +65,8 @@ class PlanOnlyStatement(
val resultFetcher = executor.executeStatement(
new OperationHandle(getHandle.identifier),
statement)
- resultSet = ResultSetUtil.fromResultFetcher(resultFetcher);
+ resultSet =
+ ResultSetUtil.fromResultFetcher(resultFetcher, resultMaxRows,
resultFetchTimeout);
case _ => explainOperation(statement)
}
} catch {
diff --git
a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/result/QueryResultFetchIterator.scala
b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/result/QueryResultFetchIterator.scala
new file mode 100644
index 000000000..60ae08d9d
--- /dev/null
+++
b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/result/QueryResultFetchIterator.scala
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.engine.flink.result
+
+import java.util
+import java.util.concurrent.Executors
+
+import scala.collection.convert.ImplicitConversions._
+import scala.concurrent.{Await, ExecutionContext, ExecutionContextExecutor,
Future}
+import scala.concurrent.duration.Duration
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder
+import org.apache.flink.table.api.DataTypes
+import org.apache.flink.table.catalog.ResolvedSchema
+import org.apache.flink.table.data.RowData
+import org.apache.flink.table.data.conversion.DataStructureConverters
+import org.apache.flink.table.gateway.service.result.ResultFetcher
+import org.apache.flink.table.types.DataType
+import org.apache.flink.types.Row
+
+import org.apache.kyuubi.Logging
+import org.apache.kyuubi.engine.flink.shim.FlinkResultSet
+import org.apache.kyuubi.operation.FetchIterator
+
+class QueryResultFetchIterator(
+ resultFetcher: ResultFetcher,
+ maxRows: Int = 1000000,
+ resultFetchTimeout: Duration = Duration.Inf) extends FetchIterator[Row]
with Logging {
+
+ val schema: ResolvedSchema = resultFetcher.getResultSchema
+
+ val dataTypes: util.List[DataType] = schema.getColumnDataTypes
+
+ var token: Long = 0
+
+ var pos: Long = 0
+
+ var fetchStart: Long = 0
+
+ var bufferedRows: Array[Row] = new Array[Row](0)
+
+ var hasNext: Boolean = true
+
+ val FETCH_INTERVAL_MS: Long = 1000
+
+ private val executor = Executors.newSingleThreadScheduledExecutor(
+ new
ThreadFactoryBuilder().setNameFormat("flink-query-iterator-%d").setDaemon(true).build)
+
+ implicit private val executionContext: ExecutionContextExecutor =
+ ExecutionContext.fromExecutor(executor)
+
+ /**
+ * Begin a fetch block, forward from the current position.
+ *
+ * Throws TimeoutException if no data is fetched within the timeout.
+ */
+ override def fetchNext(): Unit = {
+ if (!hasNext) {
+ return
+ }
+ val future = Future(() -> {
+ var fetched = false
+ // if no timeout is set, this would block until some rows are fetched
+ debug(s"Fetching from result store with timeout $resultFetchTimeout ms")
+ while (!fetched && !Thread.interrupted()) {
+ val rs = resultFetcher.fetchResults(token, maxRows -
bufferedRows.length)
+ val flinkRs = new FlinkResultSet(rs)
+ // TODO: replace string-based match when Flink 1.16 support is dropped
+ flinkRs.getResultType.name() match {
+ case "EOS" =>
+ debug("EOS received, no more data to fetch.")
+ fetched = true
+ hasNext = false
+ case "NOT_READY" =>
+ // if flink jobs are not ready, continue to retry
+ debug("Result not ready, retrying...")
+ case "PAYLOAD" =>
+ val fetchedData = flinkRs.getData
+ // if no data fetched, continue to retry
+ if (!fetchedData.isEmpty) {
+ debug(s"Fetched ${fetchedData.length} rows from result store.")
+ fetched = true
+ bufferedRows ++= fetchedData.map(rd => convertToRow(rd,
dataTypes.toList))
+ fetchStart = pos
+ } else {
+ debug("No data fetched, retrying...")
+ }
+ case _ =>
+ throw new RuntimeException(s"Unexpected result type:
${flinkRs.getResultType}")
+ }
+ if (hasNext) {
+ val nextToken = flinkRs.getNextToken
+ if (nextToken == null) {
+ hasNext = false
+ } else {
+ token = nextToken
+ }
+ }
+ Thread.sleep(FETCH_INTERVAL_MS)
+ }
+ })
+ Await.result(future, resultFetchTimeout)
+ }
+
+ /**
+ * Begin a fetch block, moving the iterator to the given position.
+ * Resets the fetch start offset.
+ *
+ * @param pos index to move a position of iterator.
+ */
+ override def fetchAbsolute(pos: Long): Unit = {
+ val effectivePos = Math.max(pos, 0)
+ if (effectivePos < bufferedRows.length) {
+ this.fetchStart = effectivePos
+ return
+ }
+ throw new IllegalArgumentException(s"Cannot skip to an unreachable
position $effectivePos.")
+ }
+
+ override def getFetchStart: Long = fetchStart
+
+ override def getPosition: Long = pos
+
+ /**
+ * @return returns row if any and null if no more rows can be fetched.
+ */
+ override def next(): Row = {
+ if (pos < bufferedRows.length) {
+ debug(s"Fetching from buffered rows at pos $pos.")
+ val row = bufferedRows(pos.toInt)
+ pos += 1
+ if (pos >= maxRows) {
+ hasNext = false
+ }
+ row
+ } else {
+ // block until some rows are fetched or TimeoutException is thrown
+ fetchNext()
+ if (hasNext) {
+ val row = bufferedRows(pos.toInt)
+ pos += 1
+ if (pos >= maxRows) {
+ hasNext = false
+ }
+ row
+ } else {
+ null
+ }
+ }
+ }
+
+ def close(): Unit = {
+ resultFetcher.close()
+ executor.shutdown()
+ }
+
+ private[this] def convertToRow(r: RowData, dataTypes: List[DataType]): Row =
{
+ val converter =
DataStructureConverters.getConverter(DataTypes.ROW(dataTypes: _*))
+ converter.toExternal(r).asInstanceOf[Row]
+ }
+}
diff --git
a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/result/ResultSet.scala
b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/result/ResultSet.scala
index 1e94042d0..b8d407297 100644
---
a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/result/ResultSet.scala
+++
b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/result/ResultSet.scala
@@ -50,6 +50,13 @@ case class ResultSet(
def getColumns: util.List[Column] = columns
def getData: FetchIterator[Row] = data
+
+ def close: Unit = {
+ data match {
+ case queryIte: QueryResultFetchIterator => queryIte.close()
+ case _ =>
+ }
+ }
}
/**
diff --git
a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/result/ResultSetUtil.scala
b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/result/ResultSetUtil.scala
index c1169528c..8b722f1e5 100644
---
a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/result/ResultSetUtil.scala
+++
b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/result/ResultSetUtil.scala
@@ -17,25 +17,17 @@
package org.apache.kyuubi.engine.flink.result
-import scala.collection.convert.ImplicitConversions._
-import scala.collection.mutable.ListBuffer
+import scala.concurrent.duration.Duration
import org.apache.flink.table.api.DataTypes
import org.apache.flink.table.api.ResultKind
import org.apache.flink.table.catalog.Column
-import org.apache.flink.table.data.RowData
-import org.apache.flink.table.data.conversion.DataStructureConverters
import org.apache.flink.table.gateway.service.result.ResultFetcher
-import org.apache.flink.table.types.DataType
import org.apache.flink.types.Row
-import org.apache.kyuubi.engine.flink.shim.FlinkResultSet
-
/** Utility object for building ResultSet. */
object ResultSetUtil {
- private val FETCH_ROWS_PER_SECOND = 1000
-
/**
* Build a ResultSet with a column name and a list of String values.
*
@@ -66,63 +58,19 @@ object ResultSetUtil {
.data(Array[Row](Row.of("OK")))
.build
- def fromResultFetcher(resultFetcher: ResultFetcher, maxRows: Int): ResultSet
= {
+ def fromResultFetcher(
+ resultFetcher: ResultFetcher,
+ maxRows: Int,
+ resultFetchTimeout: Duration): ResultSet = {
+ if (maxRows <= 0) {
+ throw new IllegalArgumentException("maxRows should be positive")
+ }
val schema = resultFetcher.getResultSchema
- val resultRowData = ListBuffer.newBuilder[RowData]
- var fetched: FlinkResultSet = null
- var token: Long = 0
- var rowNum: Int = 0
- do {
- fetched = new FlinkResultSet(resultFetcher.fetchResults(token,
FETCH_ROWS_PER_SECOND))
- val data = fetched.getData
- val slice = data.slice(0, maxRows - rowNum)
- resultRowData ++= slice
- rowNum += slice.size
- token = fetched.getNextToken
- try Thread.sleep(1000L)
- catch {
- case _: InterruptedException => fetched.getNextToken == null
- }
- } while (
- fetched.getNextToken != null &&
- rowNum < maxRows &&
- fetched.getResultType !=
org.apache.flink.table.gateway.api.results.ResultSet.ResultType.EOS
- )
- val dataTypes = resultFetcher.getResultSchema.getColumnDataTypes
+ val ite = new QueryResultFetchIterator(resultFetcher, maxRows,
resultFetchTimeout)
ResultSet.builder
.resultKind(ResultKind.SUCCESS_WITH_CONTENT)
.columns(schema.getColumns)
- .data(resultRowData.result().map(rd => convertToRow(rd,
dataTypes.toList)).toArray)
+ .data(ite)
.build
}
-
- def fromResultFetcher(resultFetcher: ResultFetcher): ResultSet = {
- val schema = resultFetcher.getResultSchema
- val resultRowData = ListBuffer.newBuilder[RowData]
- var fetched: FlinkResultSet = null
- var token: Long = 0
- do {
- fetched = new FlinkResultSet(resultFetcher.fetchResults(token,
FETCH_ROWS_PER_SECOND))
- resultRowData ++= fetched.getData
- token = fetched.getNextToken
- try Thread.sleep(1000L)
- catch {
- case _: InterruptedException =>
- }
- } while (
- fetched.getNextToken != null &&
- fetched.getResultType !=
org.apache.flink.table.gateway.api.results.ResultSet.ResultType.EOS
- )
- val dataTypes = resultFetcher.getResultSchema.getColumnDataTypes
- ResultSet.builder
- .resultKind(ResultKind.SUCCESS_WITH_CONTENT)
- .columns(schema.getColumns)
- .data(resultRowData.result().map(rd => convertToRow(rd,
dataTypes.toList)).toArray)
- .build
- }
-
- private[this] def convertToRow(r: RowData, dataTypes: List[DataType]): Row =
{
- val converter =
DataStructureConverters.getConverter(DataTypes.ROW(dataTypes: _*))
- converter.toExternal(r).asInstanceOf[Row]
- }
}
diff --git
a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/session/FlinkSQLSessionManager.scala
b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/session/FlinkSQLSessionManager.scala
index c7aa7c3c5..b7cd46217 100644
---
a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/session/FlinkSQLSessionManager.scala
+++
b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/session/FlinkSQLSessionManager.scala
@@ -77,9 +77,10 @@ class FlinkSQLSessionManager(engineContext: DefaultContext)
}
override def closeSession(sessionHandle: SessionHandle): Unit = {
+ val fSession = super.getSessionOption(sessionHandle)
+ fSession.foreach(s =>
+
sessionManager.closeSession(s.asInstanceOf[FlinkSessionImpl].fSession.getSessionHandle))
super.closeSession(sessionHandle)
- sessionManager.closeSession(
- new
org.apache.flink.table.gateway.api.session.SessionHandle(sessionHandle.identifier))
}
override def stop(): Unit = synchronized {
diff --git
a/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationSuite.scala
b/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationSuite.scala
index 35b59b661..8e7c35a95 100644
---
a/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationSuite.scala
+++
b/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationSuite.scala
@@ -34,7 +34,7 @@ import
org.apache.kyuubi.engine.flink.FlinkEngineUtils.FLINK_RUNTIME_VERSION
import org.apache.kyuubi.engine.flink.WithFlinkTestResources
import org.apache.kyuubi.engine.flink.result.Constants
import org.apache.kyuubi.engine.flink.util.TestUserClassLoaderJar
-import org.apache.kyuubi.jdbc.hive.KyuubiStatement
+import org.apache.kyuubi.jdbc.hive.{KyuubiSQLException, KyuubiStatement}
import org.apache.kyuubi.jdbc.hive.common.TimestampTZ
import org.apache.kyuubi.operation.HiveJDBCTestHelper
import org.apache.kyuubi.operation.meta.ResultSetSchemaConstant._
@@ -676,12 +676,10 @@ abstract class FlinkOperationSuite extends
HiveJDBCTestHelper with WithFlinkTest
assert(stopResult1.next())
assert(stopResult1.getString(1) === "OK")
- val selectResult = statement.executeQuery("select * from tbl_a")
- val jobId2 = statement.asInstanceOf[KyuubiStatement].getQueryId
- assert(jobId2 !== null)
- while (!selectResult.next()) {
- Thread.sleep(1000L)
- }
+ val insertResult2 = statement.executeQuery("insert into tbl_b select
* from tbl_a")
+ assert(insertResult2.next())
+ val jobId2 = insertResult2.getString(1)
+
val stopResult2 = statement.executeQuery(s"stop job '$jobId2' with
savepoint")
assert(stopResult2.getMetaData.getColumnName(1).equals("savepoint
path"))
assert(stopResult2.next())
@@ -1252,4 +1250,17 @@ abstract class FlinkOperationSuite extends
HiveJDBCTestHelper with WithFlinkTest
}
}
}
+
+ test("test result fetch timeout") {
+ val exception = intercept[KyuubiSQLException](
+ withSessionConf()(Map(ENGINE_FLINK_FETCH_TIMEOUT.key -> "60000"))() {
+ withJdbcStatement("tbl_a") { stmt =>
+ stmt.executeQuery("create table tbl_a (a int) " +
+ "with ('connector' = 'datagen', 'rows-per-second'='0')")
+ val resultSet = stmt.executeQuery("select * from tbl_a")
+ while (resultSet.next()) {}
+ }
+ })
+ assert(exception.getMessage === "Futures timed out after [60000
milliseconds]")
+ }
}
diff --git
a/integration-tests/kyuubi-flink-it/src/test/scala/org/apache/kyuubi/it/flink/operation/FlinkOperationSuite.scala
b/integration-tests/kyuubi-flink-it/src/test/scala/org/apache/kyuubi/it/flink/operation/FlinkOperationSuite.scala
index 893e0020a..55476bfd0 100644
---
a/integration-tests/kyuubi-flink-it/src/test/scala/org/apache/kyuubi/it/flink/operation/FlinkOperationSuite.scala
+++
b/integration-tests/kyuubi-flink-it/src/test/scala/org/apache/kyuubi/it/flink/operation/FlinkOperationSuite.scala
@@ -31,7 +31,7 @@ class FlinkOperationSuite extends
WithKyuubiServerAndFlinkMiniCluster
override val conf: KyuubiConf = KyuubiConf()
.set(s"$KYUUBI_ENGINE_ENV_PREFIX.$KYUUBI_HOME", kyuubiHome)
.set(ENGINE_TYPE, "FLINK_SQL")
- .set("flink.parallelism.default", "6")
+ .set("flink.parallelism.default", "2")
override protected def jdbcUrl: String = getJdbcUrl
@@ -72,7 +72,7 @@ class FlinkOperationSuite extends
WithKyuubiServerAndFlinkMiniCluster
var success = false
while (resultSet.next() && !success) {
if (resultSet.getString(1) == "parallelism.default" &&
- resultSet.getString(2) == "6") {
+ resultSet.getString(2) == "2") {
success = true
}
}
diff --git
a/integration-tests/kyuubi-flink-it/src/test/scala/org/apache/kyuubi/it/flink/operation/FlinkOperationSuiteOnYarn.scala
b/integration-tests/kyuubi-flink-it/src/test/scala/org/apache/kyuubi/it/flink/operation/FlinkOperationSuiteOnYarn.scala
index afa4dce8f..ee6b9bb98 100644
---
a/integration-tests/kyuubi-flink-it/src/test/scala/org/apache/kyuubi/it/flink/operation/FlinkOperationSuiteOnYarn.scala
+++
b/integration-tests/kyuubi-flink-it/src/test/scala/org/apache/kyuubi/it/flink/operation/FlinkOperationSuiteOnYarn.scala
@@ -40,7 +40,7 @@ class FlinkOperationSuiteOnYarn extends
WithKyuubiServerAndYarnMiniCluster
.set(s"$KYUUBI_ENGINE_ENV_PREFIX.$KYUUBI_HOME", kyuubiHome)
.set(ENGINE_TYPE, "FLINK_SQL")
.set("flink.execution.target", "yarn-application")
- .set("flink.parallelism.default", "6")
+ .set("flink.parallelism.default", "2")
super.beforeAll()
}
@@ -81,7 +81,7 @@ class FlinkOperationSuiteOnYarn extends
WithKyuubiServerAndYarnMiniCluster
var success = false
while (resultSet.next() && !success) {
if (resultSet.getString(1) == "parallelism.default" &&
- resultSet.getString(2) == "6") {
+ resultSet.getString(2) == "2") {
success = true
}
}
diff --git
a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
index 515ffbc34..3f1c3b868 100644
--- a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
@@ -1319,6 +1319,15 @@ object KyuubiConf {
.intConf
.createWithDefault(1000000)
+ val ENGINE_FLINK_FETCH_TIMEOUT: OptionalConfigEntry[Long] =
+ buildConf("kyuubi.session.engine.flink.fetch.timeout")
+ .doc("Result fetch timeout for Flink engine. If the timeout is reached,
the result " +
+ "fetch would be stopped and the current fetched would be returned. If
no data are " +
+ "fetched, a TimeoutException would be thrown.")
+ .version("1.8.0")
+ .timeConf
+ .createOptional
+
val ENGINE_TRINO_MAIN_RESOURCE: OptionalConfigEntry[String] =
buildConf("kyuubi.session.engine.trino.main.resource")
.doc("The package used to create Trino engine remote job. If it is
undefined," +