This is an automated email from the ASF dual-hosted git repository.

yao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-kyuubi.git


The following commit(s) were added to refs/heads/master by this push:
     new e54d431  [KYUUBI #1883] Support max result rows for Flink queries
e54d431 is described below

commit e54d431024e1fd614903bd960c0917da2d750937
Author: Paul Lin <[email protected]>
AuthorDate: Tue Feb 22 16:49:37 2022 +0800

    [KYUUBI #1883] Support max result rows for Flink queries
    
    ### _Why are the changes needed?_
    
    Currently, Flink engine would pull all result rows into memory before 
returning it to the client. This would be problematic for large result sets and 
infinite result sets.
    
    This is a sub-task of KPIP-2 
https://github.com/apache/incubator-kyuubi/issues/1322.
    
    ### _How was this patch tested?_
    - [x] Add some test cases that check the changes thoroughly including 
negative and positive cases if possible
    
    - [ ] Add screenshots for manual tests if appropriate
    
    - [x] [Run 
test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests)
 locally before make a pull request
    
    Closes #1938 from link3280/feature/FLINK-1883.
    
    Closes #1883
    
    80020cee [Paul Lin] Update 
externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationSuite.scala
    1b958221 [Paul Lin] [KYUUBI #1883] Avoid allocating too much buffer space
    5be7535c [Paul Lin] [KYUUBI #1883] Support max result rows for Flink queries
    
    Authored-by: Paul Lin <[email protected]>
    Signed-off-by: Kent Yao <[email protected]>
---
 docs/deployment/settings.md                               |  1 +
 .../kyuubi/engine/flink/operation/ExecuteStatement.scala  | 15 ++++++++++-----
 .../engine/flink/operation/FlinkSQLOperationManager.scala |  8 +++++++-
 .../engine/flink/operation/FlinkOperationSuite.scala      | 14 ++++++++++++++
 .../flink/operation/LegacyFlinkOperationSuite.scala       |  3 ++-
 .../main/scala/org/apache/kyuubi/config/KyuubiConf.scala  |  9 +++++++++
 6 files changed, 43 insertions(+), 7 deletions(-)

diff --git a/docs/deployment/settings.md b/docs/deployment/settings.md
index 4224f63..365912a 100644
--- a/docs/deployment/settings.md
+++ b/docs/deployment/settings.md
@@ -318,6 +318,7 @@ Key | Default | Meaning | Type | Since
 <code>kyuubi.session.conf.restrict.list</code>|<div style='width: 
65pt;word-wrap: break-word;white-space: normal'></div>|<div style='width: 
170pt;word-wrap: break-word;white-space: normal'>A comma separated list of 
restricted keys. If the client connection contains any of them, the connection 
will be rejected explicitly during engine bootstrap and connection setup. Note 
that this rule is for server-side protection defined via administrators to 
prevent some essential configs from tamperin [...]
 <code>kyuubi.session.engine.check.interval</code>|<div style='width: 
65pt;word-wrap: break-word;white-space: normal'>PT1M</div>|<div style='width: 
170pt;word-wrap: break-word;white-space: normal'>The check interval for engine 
timeout</div>|<div style='width: 30pt'>duration</div>|<div style='width: 
20pt'>1.0.0</div>
 <code>kyuubi.session.engine.flink.main.resource</code>|<div style='width: 
65pt;word-wrap: break-word;white-space: normal'>&lt;undefined&gt;</div>|<div 
style='width: 170pt;word-wrap: break-word;white-space: normal'>The package used 
to create Flink SQL engine remote job. If it is undefined, Kyuubi will use the 
default</div>|<div style='width: 30pt'>string</div>|<div style='width: 
20pt'>1.4.0</div>
+<code>kyuubi.session.engine.flink.max.rows</code>|<div style='width: 
65pt;word-wrap: break-word;white-space: normal'>1000000</div>|<div 
style='width: 170pt;word-wrap: break-word;white-space: normal'>Max rows of 
Flink query results. For batch queries, rows that exceeds the limit would be 
ignored. For streaming queries, the query would be canceled if the limit is 
reached.</div>|<div style='width: 30pt'>int</div>|<div style='width: 
20pt'>1.5.0</div>
 <code>kyuubi.session.engine.idle.timeout</code>|<div style='width: 
65pt;word-wrap: break-word;white-space: normal'>PT30M</div>|<div style='width: 
170pt;word-wrap: break-word;white-space: normal'>engine timeout, the engine 
will self-terminate when it's not accessed for this duration. 0 or negative 
means not to self-terminate.</div>|<div style='width: 30pt'>duration</div>|<div 
style='width: 20pt'>1.0.0</div>
 <code>kyuubi.session.engine.initialize.timeout</code>|<div style='width: 
65pt;word-wrap: break-word;white-space: normal'>PT3M</div>|<div style='width: 
170pt;word-wrap: break-word;white-space: normal'>Timeout for starting the 
background engine, e.g. SparkSQLEngine.</div>|<div style='width: 
30pt'>duration</div>|<div style='width: 20pt'>1.0.0</div>
 <code>kyuubi.session.engine.launch.async</code>|<div style='width: 
65pt;word-wrap: break-word;white-space: normal'>true</div>|<div style='width: 
170pt;word-wrap: break-word;white-space: normal'>When opening kyuubi session, 
whether to launch backend engine asynchronously. When true, the Kyuubi server 
will set up the connection with the client without delay as the backend engine 
will be created asynchronously.</div>|<div style='width: 
30pt'>boolean</div>|<div style='width: 20pt'>1.4.0</div>
diff --git 
a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/ExecuteStatement.scala
 
b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/ExecuteStatement.scala
index 855f803..96aff0e 100644
--- 
a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/ExecuteStatement.scala
+++ 
b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/ExecuteStatement.scala
@@ -41,7 +41,8 @@ class ExecuteStatement(
     session: Session,
     override val statement: String,
     override val shouldRunAsync: Boolean,
-    queryTimeout: Long)
+    queryTimeout: Long,
+    resultMaxRows: Int)
   extends FlinkOperation(OperationType.EXECUTE_STATEMENT, session) with 
Logging {
 
   private val operationLog: OperationLog =
@@ -132,12 +133,16 @@ class ExecuteStatement(
       while (loop) {
         Thread.sleep(50) // slow the processing down
 
-        val result = executor.snapshotResult(sessionId, resultId, 2)
+        val pageSize = Math.min(500, resultMaxRows)
+        val result = executor.snapshotResult(sessionId, resultId, pageSize)
         result.getType match {
           case TypedResult.ResultType.PAYLOAD =>
-            rows.clear()
             (1 to result.getPayload).foreach { page =>
-              rows ++= executor.retrieveResultPage(resultId, page).asScala
+              if (rows.size < resultMaxRows) {
+                rows ++= executor.retrieveResultPage(resultId, page).asScala
+              } else {
+                loop = false
+              }
             }
           case TypedResult.ResultType.EOS => loop = false
           case TypedResult.ResultType.EMPTY =>
@@ -147,7 +152,7 @@ class ExecuteStatement(
       resultSet = ResultSet.builder
         .resultKind(ResultKind.SUCCESS_WITH_CONTENT)
         .columns(resultDescriptor.getResultSchema.getColumns)
-        .data(rows.toArray[Row])
+        .data(rows.slice(0, resultMaxRows).toArray[Row])
         .build
     } finally {
       if (resultId != null) {
diff --git 
a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/FlinkSQLOperationManager.scala
 
b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/FlinkSQLOperationManager.scala
index 2ec7ed3..d89a591 100644
--- 
a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/FlinkSQLOperationManager.scala
+++ 
b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/FlinkSQLOperationManager.scala
@@ -33,6 +33,8 @@ class FlinkSQLOperationManager extends 
OperationManager("FlinkSQLOperationManage
 
   private lazy val operationModeDefault = getConf.get(OPERATION_PLAN_ONLY)
 
+  private lazy val resultMaxRowsDefault = getConf.get(ENGINE_FLINK_MAX_ROWS)
+
   override def newExecuteStatementOperation(
       session: Session,
       statement: String,
@@ -43,9 +45,13 @@ class FlinkSQLOperationManager extends 
OperationManager("FlinkSQLOperationManage
     val mode = flinkSession.sessionContext.getConfigMap.getOrDefault(
       OPERATION_PLAN_ONLY.key,
       operationModeDefault)
+    val resultMaxRows =
+      flinkSession.normalizedConf.getOrElse(
+        ENGINE_FLINK_MAX_ROWS.key,
+        resultMaxRowsDefault.toString).toInt
     val op = OperationModes.withName(mode.toUpperCase(Locale.ROOT)) match {
       case NONE =>
-        new ExecuteStatement(session, statement, runAsync, queryTimeout)
+        new ExecuteStatement(session, statement, runAsync, queryTimeout, 
resultMaxRows)
       case mode =>
         new PlanOnlyStatement(session, statement, mode)
     }
diff --git 
a/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationSuite.scala
 
b/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationSuite.scala
index 9039c19..46e0977 100644
--- 
a/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationSuite.scala
+++ 
b/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationSuite.scala
@@ -761,4 +761,18 @@ class FlinkOperationSuite extends WithFlinkSQLEngine with 
HiveJDBCTestHelper {
         .getStringVal.getValues.get(0) === "tmp.hello")
     }
   }
+
+  test("ensure result max rows") {
+    withSessionConf()(Map(KyuubiConf.ENGINE_FLINK_MAX_ROWS.key -> 
"200"))(Map.empty) {
+      withJdbcStatement() { statement =>
+        statement.execute("create table tbl_src (a bigint) with ('connector' = 
'datagen')")
+        val resultSet = statement.executeQuery(s"select a from tbl_src")
+        var rows = 0
+        while (resultSet.next()) {
+          rows += 1
+        }
+        assert(rows === 200)
+      }
+    }
+  }
 }
diff --git 
a/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/LegacyFlinkOperationSuite.scala
 
b/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/LegacyFlinkOperationSuite.scala
index 7a0ff8f..53c1e9d 100644
--- 
a/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/LegacyFlinkOperationSuite.scala
+++ 
b/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/LegacyFlinkOperationSuite.scala
@@ -116,7 +116,8 @@ class LegacyFlinkOperationSuite extends KyuubiFunSuite {
   }
 
   test("execute statement - select column name with dots") {
-    val executeStatementOp = new ExecuteStatement(flinkSession, "select 
'tmp.hello'", false, -1)
+    val executeStatementOp =
+      new ExecuteStatement(flinkSession, "select 'tmp.hello'", false, -1, 500)
     val executor = createLocalExecutor
     executor.openSession("test-session")
     executeStatementOp.setExecutor(executor)
diff --git 
a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala 
b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
index 1d4bf04..3ed3e4e 100644
--- a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
@@ -594,6 +594,15 @@ object KyuubiConf {
       .stringConf
       .createOptional
 
+  val ENGINE_FLINK_MAX_ROWS: ConfigEntry[Int] =
+    buildConf("session.engine.flink.max.rows")
+      .doc("Max rows of Flink query results. For batch queries, rows that 
exceeds the limit " +
+        "would be ignored. For streaming queries, the query would be canceled 
if the limit " +
+        "is reached.")
+      .version("1.5.0")
+      .intConf
+      .createWithDefault(1000000)
+
   val ENGINE_TRINO_MAIN_RESOURCE: OptionalConfigEntry[String] =
     buildConf("session.engine.trino.main.resource")
       .doc("The package used to create Trino engine remote job. If it is 
undefined," +

Reply via email to