This is an automated email from the ASF dual-hosted git repository.
feiwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git
The following commit(s) were added to refs/heads/master by this push:
new 3439ea03f [KYUUBI #6377] Fix isCommand check and set min rows
threshold for saveToFile
3439ea03f is described below
commit 3439ea03f283ab93c47ab8ffcc54231a409dc239
Author: Wang, Fei <[email protected]>
AuthorDate: Thu May 9 08:43:07 2024 -0700
[KYUUBI #6377] Fix isCommand check and set min rows threshold for saveToFile
# :mag: Description
## Issue References ๐
This pull request fixes #
I found that, with saveToFile enabled with the default min size threshold,
even I run a simple `set` command, It also save the result to file.
<img width="1718" alt="image"
src="https://github.com/apache/kyuubi/assets/6757692/5bcc0da1-201a-453a-8568-d1bfadd7adef">
I think we need to skip this kind of queries.
## Describe Your Solution ๐ง
Please include a summary of the change and which issue is fixed. Please
also include relevant motivation and context. List any dependencies that are
required for this change.
## Types of changes :bookmark:
- [ ] Bugfix (non-breaking change which fixes an issue)
- [ ] New feature (non-breaking change which adds functionality)
- [ ] Breaking change (fix or feature that would cause existing
functionality to change)
## Test Plan ๐งช
#### Behavior Without This Pull Request :coffin:
#### Behavior With This Pull Request :tada:
#### Related Unit Tests
---
# Checklist ๐
- [ ] This patch was not authored or co-authored using [Generative
Tooling](https://www.apache.org/legal/generative-tooling.html)
**Be nice. Be informative.**
Closes #6377 from turboFei/check_is_DQL.
Closes #6377
da9c2a921 [Wang, Fei] ut
04e20db5f [Wang, Fei] conf
8f20ed84b [Wang, Fei] refine the check
f558dcca5 [Wang, Fei] ut
c81340333 [Wang, Fei] DQL
Authored-by: Wang, Fei <[email protected]>
Signed-off-by: Wang, Fei <[email protected]>
---
docs/configuration/settings.md | 1 +
.../engine/spark/operation/ExecuteStatement.scala | 8 +++--
.../spark/sql/kyuubi/SparkDatasetHelper.scala | 38 ++++++++++++++--------
.../spark/sql/kyuubi/SparkDatasetHelperSuite.scala | 11 +++++++
.../org/apache/kyuubi/config/KyuubiConf.scala | 8 +++++
5 files changed, 50 insertions(+), 16 deletions(-)
diff --git a/docs/configuration/settings.md b/docs/configuration/settings.md
index e922e3dca..c3231cbb0 100644
--- a/docs/configuration/settings.md
+++ b/docs/configuration/settings.md
@@ -420,6 +420,7 @@ You can configure the Kyuubi properties in
`$KYUUBI_HOME/conf/kyuubi-defaults.co
| kyuubi.operation.result.max.rows | 0
| Max rows of Spark query
results. Rows exceeding the limit would be ignored. By setting this value to 0
to disable the max rows limit.
[...]
| kyuubi.operation.result.saveToFile.dir |
/tmp/kyuubi/tmp_kyuubi_result
| The Spark query result save dir, it should be a public accessible to every
engine. Results are saved in ORC format, and the directory structure is
`/OPERATION_RESULT_SAVE_TO_FILE_DIR/engineId/sessionId/statementId`. Each query
result will delete when query finished.
[...]
| kyuubi.operation.result.saveToFile.enabled | false
| The switch for Spark
query result save to file.
[...]
+| kyuubi.operation.result.saveToFile.minRows | 10000
| The minRows of Spark
result save to file, default value is 10000.
[...]
| kyuubi.operation.result.saveToFile.minSize | 209715200
| The minSize of Spark
result save to file, default value is 200 MB.we use spark's
`EstimationUtils#getSizePerRowestimate` to estimate the output size of the
execution plan.
[...]
| kyuubi.operation.scheduler.pool | <undefined>
| The scheduler pool of
job. Note that, this config should be used after changing Spark config
spark.scheduler.mode=FAIR.
[...]
| kyuubi.operation.spark.listener.enabled | true
| When set to true, Spark
engine registers an SQLOperationListener before executing the statement,
logging a few summary statistics when each stage completes.
[...]
diff --git
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala
index a52d32be9..06193d83c 100644
---
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala
+++
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala
@@ -28,7 +28,7 @@ import org.apache.spark.sql.kyuubi.SparkDatasetHelper._
import org.apache.spark.sql.types._
import org.apache.kyuubi.{KyuubiSQLException, Logging}
-import org.apache.kyuubi.config.KyuubiConf.{OPERATION_RESULT_MAX_ROWS,
OPERATION_RESULT_SAVE_TO_FILE, OPERATION_RESULT_SAVE_TO_FILE_MINSIZE}
+import org.apache.kyuubi.config.KyuubiConf.{OPERATION_RESULT_MAX_ROWS,
OPERATION_RESULT_SAVE_TO_FILE, OPERATION_RESULT_SAVE_TO_FILE_MIN_ROWS,
OPERATION_RESULT_SAVE_TO_FILE_MINSIZE}
import org.apache.kyuubi.engine.spark.KyuubiSparkUtil._
import org.apache.kyuubi.engine.spark.session.{SparkSessionImpl,
SparkSQLSessionManager}
import org.apache.kyuubi.operation.{ArrayFetchIterator, FetchIterator,
IterableFetchIterator, OperationHandle, OperationState}
@@ -172,10 +172,12 @@ class ExecuteStatement(
})
} else {
val resultSaveEnabled = getSessionConf(OPERATION_RESULT_SAVE_TO_FILE,
spark)
- val resultSaveThreshold =
getSessionConf(OPERATION_RESULT_SAVE_TO_FILE_MINSIZE, spark)
+ val resultSaveSizeThreshold =
getSessionConf(OPERATION_RESULT_SAVE_TO_FILE_MINSIZE, spark)
+ val resultSaveRowsThreshold =
getSessionConf(OPERATION_RESULT_SAVE_TO_FILE_MIN_ROWS, spark)
if (hasResultSet && resultSaveEnabled && shouldSaveResultToFs(
resultMaxRows,
- resultSaveThreshold,
+ resultSaveSizeThreshold,
+ resultSaveRowsThreshold,
result)) {
saveFileName =
Some(
diff --git
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/kyuubi/SparkDatasetHelper.scala
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/kyuubi/SparkDatasetHelper.scala
index 2dbfe7348..b78c8b7a3 100644
---
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/kyuubi/SparkDatasetHelper.scala
+++
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/kyuubi/SparkDatasetHelper.scala
@@ -246,22 +246,33 @@ object SparkDatasetHelper extends Logging {
case _ => None
}
- def shouldSaveResultToFs(resultMaxRows: Int, minSize: Long, result:
DataFrame): Boolean = {
- if (isCommandExec(result.queryExecution.executedPlan.nodeName)) {
+ def shouldSaveResultToFs(
+ resultMaxRows: Int,
+ minSize: Long,
+ minRows: Long,
+ result: DataFrame): Boolean = {
+ if (isCommandExec(result) ||
+ (resultMaxRows > 0 && resultMaxRows < minRows) ||
+ result.queryExecution.optimizedPlan.stats.rowCount.getOrElse(
+ BigInt(Long.MaxValue)) < minRows) {
return false
}
- val finalLimit = optimizedPlanLimit(result.queryExecution) match {
- case Some(limit) if resultMaxRows > 0 => math.min(limit, resultMaxRows)
- case Some(limit) => limit
- case None => resultMaxRows
+ val finalLimit: Option[Long] = optimizedPlanLimit(result.queryExecution)
match {
+ case Some(limit) if resultMaxRows > 0 => Some(math.min(limit,
resultMaxRows))
+ case Some(limit) => Some(limit)
+ case None if resultMaxRows > 0 => Some(resultMaxRows)
+ case _ => None
}
- lazy val stats = if (finalLimit > 0) {
- finalLimit * EstimationUtils.getSizePerRow(
- result.queryExecution.executedPlan.output)
- } else {
- result.queryExecution.optimizedPlan.stats.sizeInBytes
+ if (finalLimit.exists(_ < minRows)) {
+ return false
}
- lazy val colSize =
+ val sizeInBytes = result.queryExecution.optimizedPlan.stats.sizeInBytes
+ val stats = finalLimit.map { limit =>
+ val estimateSize =
+ limit *
EstimationUtils.getSizePerRow(result.queryExecution.executedPlan.output)
+ estimateSize min sizeInBytes
+ }.getOrElse(sizeInBytes)
+ val colSize =
if (result == null || result.schema.isEmpty) {
0
} else {
@@ -270,7 +281,8 @@ object SparkDatasetHelper extends Logging {
minSize > 0 && colSize > 0 && stats >= minSize
}
- private def isCommandExec(nodeName: String): Boolean = {
+ def isCommandExec(result: DataFrame): Boolean = {
+ val nodeName = result.queryExecution.executedPlan.getClass.getName
nodeName == "org.apache.spark.sql.execution.command.ExecutedCommandExec" ||
nodeName == "org.apache.spark.sql.execution.CommandResultExec"
}
diff --git
a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/spark/sql/kyuubi/SparkDatasetHelperSuite.scala
b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/spark/sql/kyuubi/SparkDatasetHelperSuite.scala
index 8ac00e602..791cb12b9 100644
---
a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/spark/sql/kyuubi/SparkDatasetHelperSuite.scala
+++
b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/spark/sql/kyuubi/SparkDatasetHelperSuite.scala
@@ -42,4 +42,15 @@ class SparkDatasetHelperSuite extends WithSparkSQLEngine {
spark.sql(collectLimitStatement).queryExecution) ===
Option(topKThreshold))
}
}
+
+ test("isCommandExec") {
+ var query = "set"
+ assert(SparkDatasetHelper.isCommandExec(spark.sql(query)))
+ query = "explain set"
+ assert(SparkDatasetHelper.isCommandExec(spark.sql(query)))
+ query = "show tables"
+ assert(SparkDatasetHelper.isCommandExec(spark.sql(query)))
+ query = "select * from VALUES(1),(2),(3),(4) AS t(id)"
+ assert(!SparkDatasetHelper.isCommandExec(spark.sql(query)))
+ }
}
diff --git
a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
index 22d31f7df..d8fec5833 100644
--- a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
@@ -2066,6 +2066,14 @@ object KyuubiConf {
.checkValue(_ > 0, "must be positive value")
.createWithDefault(200 * 1024 * 1024)
+ val OPERATION_RESULT_SAVE_TO_FILE_MIN_ROWS: ConfigEntry[Long] =
+ buildConf("kyuubi.operation.result.saveToFile.minRows")
+ .doc("The minRows of Spark result save to file, default value is 10000.")
+ .version("1.9.1")
+ .longConf
+ .checkValue(_ > 0, "must be positive value")
+ .createWithDefault(10000)
+
val OPERATION_INCREMENTAL_COLLECT: ConfigEntry[Boolean] =
buildConf("kyuubi.operation.incremental.collect")
.internal