This is an automated email from the ASF dual-hosted git repository.

paullin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git


The following commit(s) were added to refs/heads/master by this push:
     new 51c3e0372 [KYUUBI #5814] [BUG][FLINK] Skip the max-rows limitation if 
the statement is not DQL
51c3e0372 is described below

commit 51c3e0372feee5e7dd8983f85076242c4a5fed40
Author: Paul Lin <[email protected]>
AuthorDate: Tue Dec 5 15:18:37 2023 +0800

    [KYUUBI #5814] [BUG][FLINK] Skip the max-rows limitation if the statement 
is not DQL
    
    # :mag: Description
    ## Issue References ๐Ÿ”—
    Currently, Flink max-rows limitation is applied to DDL as well. It's 
unexpected and should be fixed.
    
    ## Describe Your Solution ๐Ÿ”ง
    
    Skip max-rows limitation when the result set is not query result.
    
    ## Types of changes :bookmark:
    
    - [x] Bugfix (non-breaking change which fixes an issue)
    - [ ] New feature (non-breaking change which adds functionality)
    - [ ] Breaking change (fix or feature that would cause existing 
functionality to change)
    
    ## Test Plan ๐Ÿงช
    
    #### Behavior Without This Pull Request :coffin:
    
    #### Behavior With This Pull Request :tada:
    
    #### Related Unit Tests
    
    FlinkOperationSuite - ensure result max rows
    
    ---
    
    # Checklists
    ## ๐Ÿ“ Author Self Checklist
    
    - [x] My code follows the [style 
guidelines](https://kyuubi.readthedocs.io/en/master/contributing/code/style.html)
 of this project
    - [x] I have performed a self-review
    - [x] I have commented my code, particularly in hard-to-understand areas
    - [ ] I have made corresponding changes to the documentation
    - [x] My changes generate no new warnings
    - [x] I have added tests that prove my fix is effective or that my feature 
works
    - [x] New and existing unit tests pass locally with my changes
    - [x] This patch was not authored or co-authored using [Generative 
Tooling](https://www.apache.org/legal/generative-tooling.html)
    
    ## ๐Ÿ“ Committer Pre-Merge Checklist
    
    - [ ] Pull request title is okay.
    - [ ] No license issues.
    - [ ] Milestone correctly set?
    - [ ] Test coverage is ok
    - [ ] Assignees are selected.
    - [ ] Minimum number of approvals
    - [ ] No changes are requested
    
    **Be nice. Be informative.**
    
    Closes #5814 from link3280/fix/flink_ddl_max_rows.
    
    Closes #5814
    
    07c146a77 [Paul Lin] [FLINK] Fix if condition
    28981fc60 [Paul Lin] [FLINK] Fix if condition
    0c7cce33d [Paul Lin] [FLINK] Fix Flink 1.16 compatiblity
    869502e8c [Paul Lin] [FLINK] Update thread name
    60e76cc57 [Paul Lin] [FLINK] Replace scala bounded reflection utils with 
the common ones
    2d8b9c163 [Paul Lin] [FLINK] Skip the max-rows limitation if the statement 
is not DQL
    
    Authored-by: Paul Lin <[email protected]>
    Signed-off-by: Paul Lin <[email protected]>
---
 ...r.scala => IncrementalResultFetchIterator.scala} | 21 ++++++++++++++++-----
 .../kyuubi/engine/flink/result/ResultSet.scala      |  2 +-
 .../kyuubi/engine/flink/result/ResultSetUtil.scala  |  2 +-
 .../flink/operation/FlinkOperationSuite.scala       | 16 ++++++++++++++++
 4 files changed, 34 insertions(+), 7 deletions(-)

diff --git 
a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/result/QueryResultFetchIterator.scala
 
b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/result/IncrementalResultFetchIterator.scala
similarity index 88%
rename from 
externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/result/QueryResultFetchIterator.scala
rename to 
externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/result/IncrementalResultFetchIterator.scala
index 60ae08d9d..60c92d9af 100644
--- 
a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/result/QueryResultFetchIterator.scala
+++ 
b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/result/IncrementalResultFetchIterator.scala
@@ -34,10 +34,12 @@ import org.apache.flink.table.types.DataType
 import org.apache.flink.types.Row
 
 import org.apache.kyuubi.Logging
+import org.apache.kyuubi.engine.flink.FlinkEngineUtils
 import org.apache.kyuubi.engine.flink.shim.FlinkResultSet
 import org.apache.kyuubi.operation.FetchIterator
+import org.apache.kyuubi.util.reflect.DynFields
 
-class QueryResultFetchIterator(
+class IncrementalResultFetchIterator(
     resultFetcher: ResultFetcher,
     maxRows: Int = 1000000,
     resultFetchTimeout: Duration = Duration.Inf) extends FetchIterator[Row] 
with Logging {
@@ -58,8 +60,17 @@ class QueryResultFetchIterator(
 
   val FETCH_INTERVAL_MS: Long = 1000
 
+  // for Flink 1.16 and below, isQueryResult is not supported
+  val isQueryResult: Boolean =
+    FlinkEngineUtils.FLINK_RUNTIME_VERSION < "1.17" ||
+      DynFields.builder
+        .hiddenImpl(classOf[ResultFetcher], "isQueryResult")
+        .build[Boolean](resultFetcher).get()
+
+  val effectiveMaxRows: Int = if (isQueryResult) maxRows else Int.MaxValue
+
   private val executor = Executors.newSingleThreadScheduledExecutor(
-    new 
ThreadFactoryBuilder().setNameFormat("flink-query-iterator-%d").setDaemon(true).build)
+    new 
ThreadFactoryBuilder().setNameFormat("flink-result-iterator-%d").setDaemon(true).build)
 
   implicit private val executionContext: ExecutionContextExecutor =
     ExecutionContext.fromExecutor(executor)
@@ -78,7 +89,7 @@ class QueryResultFetchIterator(
       // if no timeout is set, this would block until some rows are fetched
       debug(s"Fetching from result store with timeout $resultFetchTimeout ms")
       while (!fetched && !Thread.interrupted()) {
-        val rs = resultFetcher.fetchResults(token, maxRows - 
bufferedRows.length)
+        val rs = resultFetcher.fetchResults(token, effectiveMaxRows - 
bufferedRows.length)
         val flinkRs = new FlinkResultSet(rs)
         // TODO: replace string-based match when Flink 1.16 support is dropped
         flinkRs.getResultType.name() match {
@@ -144,7 +155,7 @@ class QueryResultFetchIterator(
       debug(s"Fetching from buffered rows at pos $pos.")
       val row = bufferedRows(pos.toInt)
       pos += 1
-      if (pos >= maxRows) {
+      if (pos >= effectiveMaxRows) {
         hasNext = false
       }
       row
@@ -154,7 +165,7 @@ class QueryResultFetchIterator(
       if (hasNext) {
         val row = bufferedRows(pos.toInt)
         pos += 1
-        if (pos >= maxRows) {
+        if (pos >= effectiveMaxRows) {
           hasNext = false
         }
         row
diff --git 
a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/result/ResultSet.scala
 
b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/result/ResultSet.scala
index b8d407297..f9d3de0ab 100644
--- 
a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/result/ResultSet.scala
+++ 
b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/result/ResultSet.scala
@@ -53,7 +53,7 @@ case class ResultSet(
 
   def close: Unit = {
     data match {
-      case queryIte: QueryResultFetchIterator => queryIte.close()
+      case incIte: IncrementalResultFetchIterator => incIte.close()
       case _ =>
     }
   }
diff --git 
a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/result/ResultSetUtil.scala
 
b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/result/ResultSetUtil.scala
index 583c64fc6..032c86ac1 100644
--- 
a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/result/ResultSetUtil.scala
+++ 
b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/result/ResultSetUtil.scala
@@ -73,7 +73,7 @@ object ResultSetUtil {
       throw new IllegalArgumentException("maxRows should be positive")
     }
     val schema = resultFetcher.getResultSchema
-    val ite = new QueryResultFetchIterator(resultFetcher, maxRows, 
resultFetchTimeout)
+    val ite = new IncrementalResultFetchIterator(resultFetcher, maxRows, 
resultFetchTimeout)
     ResultSet.builder
       .resultKind(ResultKind.SUCCESS_WITH_CONTENT)
       .columns(schema.getColumns)
diff --git 
a/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationSuite.scala
 
b/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationSuite.scala
index 2e1a577ef..5995c2218 100644
--- 
a/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationSuite.scala
+++ 
b/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationSuite.scala
@@ -1148,6 +1148,22 @@ abstract class FlinkOperationSuite extends 
HiveJDBCTestHelper with WithFlinkTest
         assert(rows === 200)
       }
     }
+    if (FLINK_RUNTIME_VERSION >= "1.17") {
+      withSessionConf()(Map(ENGINE_FLINK_MAX_ROWS.key -> "10"))(Map.empty) {
+        withJdbcStatement() { statement =>
+          for (i <- 0 to 10) {
+            statement.execute(s"create table tbl_src$i (a bigint) " +
+              s"with ('connector' = 'blackhole')")
+          }
+          val resultSet = statement.executeQuery("show tables")
+          var rows = 0
+          while (resultSet.next()) {
+            rows += 1
+          }
+          assert(rows === 11)
+        }
+      }
+    }
   }
 
   test("execute statement - add/show jar") {

Reply via email to