This is an automated email from the ASF dual-hosted git repository.
paullin pushed a commit to branch branch-1.8
in repository https://gitbox.apache.org/repos/asf/kyuubi.git
The following commit(s) were added to refs/heads/branch-1.8 by this push:
new 331a1ddcb [KYUUBI #5814] [BUG][FLINK] Skip the max-rows limitation if
the statement is not DQL
331a1ddcb is described below
commit 331a1ddcb636bf3151f3045a1e1c7312b94d943b
Author: Paul Lin <[email protected]>
AuthorDate: Tue Dec 5 15:18:37 2023 +0800
[KYUUBI #5814] [BUG][FLINK] Skip the max-rows limitation if the statement
is not DQL
# :mag: Description
## Issue References ๐
Currently, Flink max-rows limitation is applied to DDL as well. It's
unexpected and should be fixed.
## Describe Your Solution ๐ง
Skip max-rows limitation when the result set is not query result.
## Types of changes :bookmark:
- [x] Bugfix (non-breaking change which fixes an issue)
- [ ] New feature (non-breaking change which adds functionality)
- [ ] Breaking change (fix or feature that would cause existing
functionality to change)
## Test Plan ๐งช
#### Behavior Without This Pull Request :coffin:
#### Behavior With This Pull Request :tada:
#### Related Unit Tests
FlinkOperationSuite - ensure result max rows
---
# Checklists
## ๐ Author Self Checklist
- [x] My code follows the [style
guidelines](https://kyuubi.readthedocs.io/en/master/contributing/code/style.html)
of this project
- [x] I have performed a self-review
- [x] I have commented my code, particularly in hard-to-understand areas
- [ ] I have made corresponding changes to the documentation
- [x] My changes generate no new warnings
- [x] I have added tests that prove my fix is effective or that my feature
works
- [x] New and existing unit tests pass locally with my changes
- [x] This patch was not authored or co-authored using [Generative
Tooling](https://www.apache.org/legal/generative-tooling.html)
## ๐ Committer Pre-Merge Checklist
- [ ] Pull request title is okay.
- [ ] No license issues.
- [ ] Milestone correctly set?
- [ ] Test coverage is ok
- [ ] Assignees are selected.
- [ ] Minimum number of approvals
- [ ] No changes are requested
**Be nice. Be informative.**
Closes #5814 from link3280/fix/flink_ddl_max_rows.
Closes #5814
07c146a77 [Paul Lin] [FLINK] Fix if condition
28981fc60 [Paul Lin] [FLINK] Fix if condition
0c7cce33d [Paul Lin] [FLINK] Fix Flink 1.16 compatiblity
869502e8c [Paul Lin] [FLINK] Update thread name
60e76cc57 [Paul Lin] [FLINK] Replace scala bounded reflection utils with
the common ones
2d8b9c163 [Paul Lin] [FLINK] Skip the max-rows limitation if the statement
is not DQL
Authored-by: Paul Lin <[email protected]>
Signed-off-by: Paul Lin <[email protected]>
(cherry picked from commit 51c3e0372feee5e7dd8983f85076242c4a5fed40)
Signed-off-by: Paul Lin <[email protected]>
---
...r.scala => IncrementalResultFetchIterator.scala} | 21 ++++++++++++++++-----
.../kyuubi/engine/flink/result/ResultSet.scala | 2 +-
.../kyuubi/engine/flink/result/ResultSetUtil.scala | 2 +-
.../flink/operation/FlinkOperationSuite.scala | 16 ++++++++++++++++
4 files changed, 34 insertions(+), 7 deletions(-)
diff --git
a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/result/QueryResultFetchIterator.scala
b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/result/IncrementalResultFetchIterator.scala
similarity index 88%
rename from
externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/result/QueryResultFetchIterator.scala
rename to
externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/result/IncrementalResultFetchIterator.scala
index 60ae08d9d..60c92d9af 100644
---
a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/result/QueryResultFetchIterator.scala
+++
b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/result/IncrementalResultFetchIterator.scala
@@ -34,10 +34,12 @@ import org.apache.flink.table.types.DataType
import org.apache.flink.types.Row
import org.apache.kyuubi.Logging
+import org.apache.kyuubi.engine.flink.FlinkEngineUtils
import org.apache.kyuubi.engine.flink.shim.FlinkResultSet
import org.apache.kyuubi.operation.FetchIterator
+import org.apache.kyuubi.util.reflect.DynFields
-class QueryResultFetchIterator(
+class IncrementalResultFetchIterator(
resultFetcher: ResultFetcher,
maxRows: Int = 1000000,
resultFetchTimeout: Duration = Duration.Inf) extends FetchIterator[Row]
with Logging {
@@ -58,8 +60,17 @@ class QueryResultFetchIterator(
val FETCH_INTERVAL_MS: Long = 1000
+ // for Flink 1.16 and below, isQueryResult is not supported
+ val isQueryResult: Boolean =
+ FlinkEngineUtils.FLINK_RUNTIME_VERSION < "1.17" ||
+ DynFields.builder
+ .hiddenImpl(classOf[ResultFetcher], "isQueryResult")
+ .build[Boolean](resultFetcher).get()
+
+ val effectiveMaxRows: Int = if (isQueryResult) maxRows else Int.MaxValue
+
private val executor = Executors.newSingleThreadScheduledExecutor(
- new
ThreadFactoryBuilder().setNameFormat("flink-query-iterator-%d").setDaemon(true).build)
+ new
ThreadFactoryBuilder().setNameFormat("flink-result-iterator-%d").setDaemon(true).build)
implicit private val executionContext: ExecutionContextExecutor =
ExecutionContext.fromExecutor(executor)
@@ -78,7 +89,7 @@ class QueryResultFetchIterator(
// if no timeout is set, this would block until some rows are fetched
debug(s"Fetching from result store with timeout $resultFetchTimeout ms")
while (!fetched && !Thread.interrupted()) {
- val rs = resultFetcher.fetchResults(token, maxRows -
bufferedRows.length)
+ val rs = resultFetcher.fetchResults(token, effectiveMaxRows -
bufferedRows.length)
val flinkRs = new FlinkResultSet(rs)
// TODO: replace string-based match when Flink 1.16 support is dropped
flinkRs.getResultType.name() match {
@@ -144,7 +155,7 @@ class QueryResultFetchIterator(
debug(s"Fetching from buffered rows at pos $pos.")
val row = bufferedRows(pos.toInt)
pos += 1
- if (pos >= maxRows) {
+ if (pos >= effectiveMaxRows) {
hasNext = false
}
row
@@ -154,7 +165,7 @@ class QueryResultFetchIterator(
if (hasNext) {
val row = bufferedRows(pos.toInt)
pos += 1
- if (pos >= maxRows) {
+ if (pos >= effectiveMaxRows) {
hasNext = false
}
row
diff --git
a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/result/ResultSet.scala
b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/result/ResultSet.scala
index b8d407297..f9d3de0ab 100644
---
a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/result/ResultSet.scala
+++
b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/result/ResultSet.scala
@@ -53,7 +53,7 @@ case class ResultSet(
def close: Unit = {
data match {
- case queryIte: QueryResultFetchIterator => queryIte.close()
+ case incIte: IncrementalResultFetchIterator => incIte.close()
case _ =>
}
}
diff --git
a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/result/ResultSetUtil.scala
b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/result/ResultSetUtil.scala
index 8b722f1e5..9ddfa7a0b 100644
---
a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/result/ResultSetUtil.scala
+++
b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/result/ResultSetUtil.scala
@@ -66,7 +66,7 @@ object ResultSetUtil {
throw new IllegalArgumentException("maxRows should be positive")
}
val schema = resultFetcher.getResultSchema
- val ite = new QueryResultFetchIterator(resultFetcher, maxRows,
resultFetchTimeout)
+ val ite = new IncrementalResultFetchIterator(resultFetcher, maxRows,
resultFetchTimeout)
ResultSet.builder
.resultKind(ResultKind.SUCCESS_WITH_CONTENT)
.columns(schema.getColumns)
diff --git
a/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationSuite.scala
b/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationSuite.scala
index 5c05e2f23..6f4919477 100644
---
a/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationSuite.scala
+++
b/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationSuite.scala
@@ -1148,6 +1148,22 @@ abstract class FlinkOperationSuite extends
HiveJDBCTestHelper with WithFlinkTest
assert(rows === 200)
}
}
+ if (FLINK_RUNTIME_VERSION >= "1.17") {
+ withSessionConf()(Map(ENGINE_FLINK_MAX_ROWS.key -> "10"))(Map.empty) {
+ withJdbcStatement() { statement =>
+ for (i <- 0 to 10) {
+ statement.execute(s"create table tbl_src$i (a bigint) " +
+ s"with ('connector' = 'blackhole')")
+ }
+ val resultSet = statement.executeQuery("show tables")
+ var rows = 0
+ while (resultSet.next()) {
+ rows += 1
+ }
+ assert(rows === 11)
+ }
+ }
+ }
}
test("execute statement - add/show jar") {