This is an automated email from the ASF dual-hosted git repository.
yao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-kyuubi.git
The following commit(s) were added to refs/heads/master by this push:
new e54d431 [KYUUBI #1883] Support max result rows for Flink queries
e54d431 is described below
commit e54d431024e1fd614903bd960c0917da2d750937
Author: Paul Lin <[email protected]>
AuthorDate: Tue Feb 22 16:49:37 2022 +0800
[KYUUBI #1883] Support max result rows for Flink queries
### _Why are the changes needed?_
Currently, Flink engine would pull all result rows into memory before
returning it to the client. This would be problematic for large result sets and
infinite result sets.
This is a sub-task of KPIP-2
https://github.com/apache/incubator-kyuubi/issues/1322.
### _How was this patch tested?_
- [x] Add some test cases that check the changes thoroughly including
negative and positive cases if possible
- [ ] Add screenshots for manual tests if appropriate
- [x] [Run
test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests)
locally before make a pull request
Closes #1938 from link3280/feature/FLINK-1883.
Closes #1883
80020cee [Paul Lin] Update
externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationSuite.scala
1b958221 [Paul Lin] [KYUUBI #1883] Avoid allocating too much buffer space
5be7535c [Paul Lin] [KYUUBI #1883] Support max result rows for Flink queries
Authored-by: Paul Lin <[email protected]>
Signed-off-by: Kent Yao <[email protected]>
---
docs/deployment/settings.md | 1 +
.../kyuubi/engine/flink/operation/ExecuteStatement.scala | 15 ++++++++++-----
.../engine/flink/operation/FlinkSQLOperationManager.scala | 8 +++++++-
.../engine/flink/operation/FlinkOperationSuite.scala | 14 ++++++++++++++
.../flink/operation/LegacyFlinkOperationSuite.scala | 3 ++-
.../main/scala/org/apache/kyuubi/config/KyuubiConf.scala | 9 +++++++++
6 files changed, 43 insertions(+), 7 deletions(-)
diff --git a/docs/deployment/settings.md b/docs/deployment/settings.md
index 4224f63..365912a 100644
--- a/docs/deployment/settings.md
+++ b/docs/deployment/settings.md
@@ -318,6 +318,7 @@ Key | Default | Meaning | Type | Since
<code>kyuubi.session.conf.restrict.list</code>|<div style='width:
65pt;word-wrap: break-word;white-space: normal'></div>|<div style='width:
170pt;word-wrap: break-word;white-space: normal'>A comma separated list of
restricted keys. If the client connection contains any of them, the connection
will be rejected explicitly during engine bootstrap and connection setup. Note
that this rule is for server-side protection defined via administrators to
prevent some essential configs from tamperin [...]
<code>kyuubi.session.engine.check.interval</code>|<div style='width:
65pt;word-wrap: break-word;white-space: normal'>PT1M</div>|<div style='width:
170pt;word-wrap: break-word;white-space: normal'>The check interval for engine
timeout</div>|<div style='width: 30pt'>duration</div>|<div style='width:
20pt'>1.0.0</div>
<code>kyuubi.session.engine.flink.main.resource</code>|<div style='width:
65pt;word-wrap: break-word;white-space: normal'><undefined></div>|<div
style='width: 170pt;word-wrap: break-word;white-space: normal'>The package used
to create Flink SQL engine remote job. If it is undefined, Kyuubi will use the
default</div>|<div style='width: 30pt'>string</div>|<div style='width:
20pt'>1.4.0</div>
+<code>kyuubi.session.engine.flink.max.rows</code>|<div style='width:
65pt;word-wrap: break-word;white-space: normal'>1000000</div>|<div
style='width: 170pt;word-wrap: break-word;white-space: normal'>Max rows of
Flink query results. For batch queries, rows that exceeds the limit would be
ignored. For streaming queries, the query would be canceled if the limit is
reached.</div>|<div style='width: 30pt'>int</div>|<div style='width:
20pt'>1.5.0</div>
<code>kyuubi.session.engine.idle.timeout</code>|<div style='width:
65pt;word-wrap: break-word;white-space: normal'>PT30M</div>|<div style='width:
170pt;word-wrap: break-word;white-space: normal'>engine timeout, the engine
will self-terminate when it's not accessed for this duration. 0 or negative
means not to self-terminate.</div>|<div style='width: 30pt'>duration</div>|<div
style='width: 20pt'>1.0.0</div>
<code>kyuubi.session.engine.initialize.timeout</code>|<div style='width:
65pt;word-wrap: break-word;white-space: normal'>PT3M</div>|<div style='width:
170pt;word-wrap: break-word;white-space: normal'>Timeout for starting the
background engine, e.g. SparkSQLEngine.</div>|<div style='width:
30pt'>duration</div>|<div style='width: 20pt'>1.0.0</div>
<code>kyuubi.session.engine.launch.async</code>|<div style='width:
65pt;word-wrap: break-word;white-space: normal'>true</div>|<div style='width:
170pt;word-wrap: break-word;white-space: normal'>When opening kyuubi session,
whether to launch backend engine asynchronously. When true, the Kyuubi server
will set up the connection with the client without delay as the backend engine
will be created asynchronously.</div>|<div style='width:
30pt'>boolean</div>|<div style='width: 20pt'>1.4.0</div>
diff --git
a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/ExecuteStatement.scala
b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/ExecuteStatement.scala
index 855f803..96aff0e 100644
---
a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/ExecuteStatement.scala
+++
b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/ExecuteStatement.scala
@@ -41,7 +41,8 @@ class ExecuteStatement(
session: Session,
override val statement: String,
override val shouldRunAsync: Boolean,
- queryTimeout: Long)
+ queryTimeout: Long,
+ resultMaxRows: Int)
extends FlinkOperation(OperationType.EXECUTE_STATEMENT, session) with
Logging {
private val operationLog: OperationLog =
@@ -132,12 +133,16 @@ class ExecuteStatement(
while (loop) {
Thread.sleep(50) // slow the processing down
- val result = executor.snapshotResult(sessionId, resultId, 2)
+ val pageSize = Math.min(500, resultMaxRows)
+ val result = executor.snapshotResult(sessionId, resultId, pageSize)
result.getType match {
case TypedResult.ResultType.PAYLOAD =>
- rows.clear()
(1 to result.getPayload).foreach { page =>
- rows ++= executor.retrieveResultPage(resultId, page).asScala
+ if (rows.size < resultMaxRows) {
+ rows ++= executor.retrieveResultPage(resultId, page).asScala
+ } else {
+ loop = false
+ }
}
case TypedResult.ResultType.EOS => loop = false
case TypedResult.ResultType.EMPTY =>
@@ -147,7 +152,7 @@ class ExecuteStatement(
resultSet = ResultSet.builder
.resultKind(ResultKind.SUCCESS_WITH_CONTENT)
.columns(resultDescriptor.getResultSchema.getColumns)
- .data(rows.toArray[Row])
+ .data(rows.slice(0, resultMaxRows).toArray[Row])
.build
} finally {
if (resultId != null) {
diff --git
a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/FlinkSQLOperationManager.scala
b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/FlinkSQLOperationManager.scala
index 2ec7ed3..d89a591 100644
---
a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/FlinkSQLOperationManager.scala
+++
b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/FlinkSQLOperationManager.scala
@@ -33,6 +33,8 @@ class FlinkSQLOperationManager extends
OperationManager("FlinkSQLOperationManage
private lazy val operationModeDefault = getConf.get(OPERATION_PLAN_ONLY)
+ private lazy val resultMaxRowsDefault = getConf.get(ENGINE_FLINK_MAX_ROWS)
+
override def newExecuteStatementOperation(
session: Session,
statement: String,
@@ -43,9 +45,13 @@ class FlinkSQLOperationManager extends
OperationManager("FlinkSQLOperationManage
val mode = flinkSession.sessionContext.getConfigMap.getOrDefault(
OPERATION_PLAN_ONLY.key,
operationModeDefault)
+ val resultMaxRows =
+ flinkSession.normalizedConf.getOrElse(
+ ENGINE_FLINK_MAX_ROWS.key,
+ resultMaxRowsDefault.toString).toInt
val op = OperationModes.withName(mode.toUpperCase(Locale.ROOT)) match {
case NONE =>
- new ExecuteStatement(session, statement, runAsync, queryTimeout)
+ new ExecuteStatement(session, statement, runAsync, queryTimeout,
resultMaxRows)
case mode =>
new PlanOnlyStatement(session, statement, mode)
}
diff --git
a/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationSuite.scala
b/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationSuite.scala
index 9039c19..46e0977 100644
---
a/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationSuite.scala
+++
b/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationSuite.scala
@@ -761,4 +761,18 @@ class FlinkOperationSuite extends WithFlinkSQLEngine with
HiveJDBCTestHelper {
.getStringVal.getValues.get(0) === "tmp.hello")
}
}
+
+ test("ensure result max rows") {
+ withSessionConf()(Map(KyuubiConf.ENGINE_FLINK_MAX_ROWS.key ->
"200"))(Map.empty) {
+ withJdbcStatement() { statement =>
+ statement.execute("create table tbl_src (a bigint) with ('connector' =
'datagen')")
+ val resultSet = statement.executeQuery(s"select a from tbl_src")
+ var rows = 0
+ while (resultSet.next()) {
+ rows += 1
+ }
+ assert(rows === 200)
+ }
+ }
+ }
}
diff --git
a/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/LegacyFlinkOperationSuite.scala
b/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/LegacyFlinkOperationSuite.scala
index 7a0ff8f..53c1e9d 100644
---
a/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/LegacyFlinkOperationSuite.scala
+++
b/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/LegacyFlinkOperationSuite.scala
@@ -116,7 +116,8 @@ class LegacyFlinkOperationSuite extends KyuubiFunSuite {
}
test("execute statement - select column name with dots") {
- val executeStatementOp = new ExecuteStatement(flinkSession, "select
'tmp.hello'", false, -1)
+ val executeStatementOp =
+ new ExecuteStatement(flinkSession, "select 'tmp.hello'", false, -1, 500)
val executor = createLocalExecutor
executor.openSession("test-session")
executeStatementOp.setExecutor(executor)
diff --git
a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
index 1d4bf04..3ed3e4e 100644
--- a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
@@ -594,6 +594,15 @@ object KyuubiConf {
.stringConf
.createOptional
+ val ENGINE_FLINK_MAX_ROWS: ConfigEntry[Int] =
+ buildConf("session.engine.flink.max.rows")
+ .doc("Max rows of Flink query results. For batch queries, rows that
exceeds the limit " +
+ "would be ignored. For streaming queries, the query would be canceled
if the limit " +
+ "is reached.")
+ .version("1.5.0")
+ .intConf
+ .createWithDefault(1000000)
+
val ENGINE_TRINO_MAIN_RESOURCE: OptionalConfigEntry[String] =
buildConf("session.engine.trino.main.resource")
.doc("The package used to create Trino engine remote job. If it is
undefined," +