This is an automated email from the ASF dual-hosted git repository.

feiwang pushed a commit to branch branch-1.9
in repository https://gitbox.apache.org/repos/asf/kyuubi.git


The following commit(s) were added to refs/heads/branch-1.9 by this push:
     new 89a2c0c55 [KYUUBI #6377] Fix isCommand check and set min rows 
threshold for saveToFile
89a2c0c55 is described below

commit 89a2c0c550165ab6d57a8b5bdcb89460a0b8484d
Author: Wang, Fei <[email protected]>
AuthorDate: Thu May 9 08:43:07 2024 -0700

    [KYUUBI #6377] Fix isCommand check and set min rows threshold for saveToFile
    
    # :mag: Description
    ## Issue References ๐Ÿ”—
    
    This pull request fixes #
    I found that, with saveToFile enabled with the default min size threshold, 
even I run a simple `set` command, It also save the result to file.
    <img width="1718" alt="image" 
src="https://github.com/apache/kyuubi/assets/6757692/5bcc0da1-201a-453a-8568-d1bfadd7adef";>
    
    I think we need to skip this kind of queries.
    
    ## Describe Your Solution ๐Ÿ”ง
    
    Please include a summary of the change and which issue is fixed. Please 
also include relevant motivation and context. List any dependencies that are 
required for this change.
    
    ## Types of changes :bookmark:
    
    - [ ] Bugfix (non-breaking change which fixes an issue)
    - [ ] New feature (non-breaking change which adds functionality)
    - [ ] Breaking change (fix or feature that would cause existing 
functionality to change)
    
    ## Test Plan ๐Ÿงช
    
    #### Behavior Without This Pull Request :coffin:
    
    #### Behavior With This Pull Request :tada:
    
    #### Related Unit Tests
    
    ---
    
    # Checklist ๐Ÿ“
    
    - [ ] This patch was not authored or co-authored using [Generative 
Tooling](https://www.apache.org/legal/generative-tooling.html)
    
    **Be nice. Be informative.**
    
    Closes #6377 from turboFei/check_is_DQL.
    
    Closes #6377
    
    da9c2a921 [Wang, Fei] ut
    04e20db5f [Wang, Fei] conf
    8f20ed84b [Wang, Fei] refine the check
    f558dcca5 [Wang, Fei] ut
    c81340333 [Wang, Fei] DQL
    
    Authored-by: Wang, Fei <[email protected]>
    Signed-off-by: Wang, Fei <[email protected]>
    (cherry picked from commit 3439ea03f283ab93c47ab8ffcc54231a409dc239)
    Signed-off-by: Wang, Fei <[email protected]>
---
 docs/configuration/settings.md                     |  1 +
 .../engine/spark/operation/ExecuteStatement.scala  |  8 +++--
 .../spark/sql/kyuubi/SparkDatasetHelper.scala      | 38 ++++++++++++++--------
 .../spark/sql/kyuubi/SparkDatasetHelperSuite.scala | 11 +++++++
 .../org/apache/kyuubi/config/KyuubiConf.scala      |  8 +++++
 5 files changed, 50 insertions(+), 16 deletions(-)

diff --git a/docs/configuration/settings.md b/docs/configuration/settings.md
index 56639c271..49d56810b 100644
--- a/docs/configuration/settings.md
+++ b/docs/configuration/settings.md
@@ -406,6 +406,7 @@ You can configure the Kyuubi properties in 
`$KYUUBI_HOME/conf/kyuubi-defaults.co
 | kyuubi.operation.result.max.rows                 | 0                         
                                                      | Max rows of Spark query 
results. Rows exceeding the limit would be ignored. By setting this value to 0 
to disable the max rows limit.                                                  
                                                                                
                                                                                
               [...]
 | kyuubi.operation.result.saveToFile.dir           | 
/tmp/kyuubi/tmp_kyuubi_result                                                   
| The Spark query result save dir, it should be a public accessible to every 
engine. Results are saved in ORC format, and the directory structure is 
`/OPERATION_RESULT_SAVE_TO_FILE_DIR/engineId/sessionId/statementId`. Each query 
result will delete when query finished.                                         
                                                   [...]
 | kyuubi.operation.result.saveToFile.enabled       | false                     
                                                      | The switch for Spark 
query result save to file.                                                      
                                                                                
                                                                                
                                                                                
                 [...]
+| kyuubi.operation.result.saveToFile.minRows       | 10000                     
                                                      | The minRows of Spark 
result save to file, default value is 10000.                                    
                                                                                
                                                                                
                                                                                
                 [...]
 | kyuubi.operation.result.saveToFile.minSize       | 209715200                 
                                                      | The minSize of Spark 
result save to file, default value is 200 MB.we use spark's 
`EstimationUtils#getSizePerRowestimate` to estimate the output size of the 
execution plan.                                                                 
                                                                                
                                          [...]
 | kyuubi.operation.scheduler.pool                  | &lt;undefined&gt;         
                                                      | The scheduler pool of 
job. Note that, this config should be used after changing Spark config 
spark.scheduler.mode=FAIR.                                                      
                                                                                
                                                                                
                         [...]
 | kyuubi.operation.spark.listener.enabled          | true                      
                                                      | When set to true, Spark 
engine registers an SQLOperationListener before executing the statement, 
logging a few summary statistics when each stage completes.                     
                                                                                
                                                                                
                     [...]
diff --git 
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala
 
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala
index bf68f18f0..782b07c7d 100644
--- 
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala
+++ 
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala
@@ -28,7 +28,7 @@ import org.apache.spark.sql.kyuubi.SparkDatasetHelper._
 import org.apache.spark.sql.types._
 
 import org.apache.kyuubi.{KyuubiSQLException, Logging}
-import org.apache.kyuubi.config.KyuubiConf.{OPERATION_RESULT_MAX_ROWS, 
OPERATION_RESULT_SAVE_TO_FILE, OPERATION_RESULT_SAVE_TO_FILE_MINSIZE}
+import org.apache.kyuubi.config.KyuubiConf.{OPERATION_RESULT_MAX_ROWS, 
OPERATION_RESULT_SAVE_TO_FILE, OPERATION_RESULT_SAVE_TO_FILE_MIN_ROWS, 
OPERATION_RESULT_SAVE_TO_FILE_MINSIZE}
 import org.apache.kyuubi.engine.spark.KyuubiSparkUtil._
 import org.apache.kyuubi.engine.spark.session.{SparkSessionImpl, 
SparkSQLSessionManager}
 import org.apache.kyuubi.operation.{ArrayFetchIterator, FetchIterator, 
IterableFetchIterator, OperationHandle, OperationState}
@@ -172,10 +172,12 @@ class ExecuteStatement(
       })
     } else {
       val resultSaveEnabled = getSessionConf(OPERATION_RESULT_SAVE_TO_FILE, 
spark)
-      val resultSaveThreshold = 
getSessionConf(OPERATION_RESULT_SAVE_TO_FILE_MINSIZE, spark)
+      val resultSaveSizeThreshold = 
getSessionConf(OPERATION_RESULT_SAVE_TO_FILE_MINSIZE, spark)
+      val resultSaveRowsThreshold = 
getSessionConf(OPERATION_RESULT_SAVE_TO_FILE_MIN_ROWS, spark)
       if (hasResultSet && resultSaveEnabled && shouldSaveResultToFs(
           resultMaxRows,
-          resultSaveThreshold,
+          resultSaveSizeThreshold,
+          resultSaveRowsThreshold,
           result)) {
         saveFileName =
           Some(
diff --git 
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/kyuubi/SparkDatasetHelper.scala
 
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/kyuubi/SparkDatasetHelper.scala
index 7af51abfe..082c9ff70 100644
--- 
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/kyuubi/SparkDatasetHelper.scala
+++ 
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/kyuubi/SparkDatasetHelper.scala
@@ -301,22 +301,33 @@ object SparkDatasetHelper extends Logging {
       case _ => None
     }
 
-  def shouldSaveResultToFs(resultMaxRows: Int, minSize: Long, result: 
DataFrame): Boolean = {
-    if (isCommandExec(result.queryExecution.executedPlan.nodeName)) {
+  def shouldSaveResultToFs(
+      resultMaxRows: Int,
+      minSize: Long,
+      minRows: Long,
+      result: DataFrame): Boolean = {
+    if (isCommandExec(result) ||
+      (resultMaxRows > 0 && resultMaxRows < minRows) ||
+      result.queryExecution.optimizedPlan.stats.rowCount.getOrElse(
+        BigInt(Long.MaxValue)) < minRows) {
       return false
     }
-    val finalLimit = optimizedPlanLimit(result.queryExecution) match {
-      case Some(limit) if resultMaxRows > 0 => math.min(limit, resultMaxRows)
-      case Some(limit) => limit
-      case None => resultMaxRows
+    val finalLimit: Option[Long] = optimizedPlanLimit(result.queryExecution) 
match {
+      case Some(limit) if resultMaxRows > 0 => Some(math.min(limit, 
resultMaxRows))
+      case Some(limit) => Some(limit)
+      case None if resultMaxRows > 0 => Some(resultMaxRows)
+      case _ => None
     }
-    lazy val stats = if (finalLimit > 0) {
-      finalLimit * EstimationUtils.getSizePerRow(
-        result.queryExecution.executedPlan.output)
-    } else {
-      result.queryExecution.optimizedPlan.stats.sizeInBytes
+    if (finalLimit.exists(_ < minRows)) {
+      return false
     }
-    lazy val colSize =
+    val sizeInBytes = result.queryExecution.optimizedPlan.stats.sizeInBytes
+    val stats = finalLimit.map { limit =>
+      val estimateSize =
+        limit * 
EstimationUtils.getSizePerRow(result.queryExecution.executedPlan.output)
+      estimateSize min sizeInBytes
+    }.getOrElse(sizeInBytes)
+    val colSize =
       if (result == null || result.schema.isEmpty) {
         0
       } else {
@@ -325,7 +336,8 @@ object SparkDatasetHelper extends Logging {
     minSize > 0 && colSize > 0 && stats >= minSize
   }
 
-  private def isCommandExec(nodeName: String): Boolean = {
+  def isCommandExec(result: DataFrame): Boolean = {
+    val nodeName = result.queryExecution.executedPlan.getClass.getName
     nodeName == "org.apache.spark.sql.execution.command.ExecutedCommandExec" ||
     nodeName == "org.apache.spark.sql.execution.CommandResultExec"
   }
diff --git 
a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/spark/sql/kyuubi/SparkDatasetHelperSuite.scala
 
b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/spark/sql/kyuubi/SparkDatasetHelperSuite.scala
index 8ac00e602..791cb12b9 100644
--- 
a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/spark/sql/kyuubi/SparkDatasetHelperSuite.scala
+++ 
b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/spark/sql/kyuubi/SparkDatasetHelperSuite.scala
@@ -42,4 +42,15 @@ class SparkDatasetHelperSuite extends WithSparkSQLEngine {
         spark.sql(collectLimitStatement).queryExecution) === 
Option(topKThreshold))
     }
   }
+
+  test("isCommandExec") {
+    var query = "set"
+    assert(SparkDatasetHelper.isCommandExec(spark.sql(query)))
+    query = "explain set"
+    assert(SparkDatasetHelper.isCommandExec(spark.sql(query)))
+    query = "show tables"
+    assert(SparkDatasetHelper.isCommandExec(spark.sql(query)))
+    query = "select * from VALUES(1),(2),(3),(4) AS t(id)"
+    assert(!SparkDatasetHelper.isCommandExec(spark.sql(query)))
+  }
 }
diff --git 
a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala 
b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
index 5dc42c3dd..909b07802 100644
--- a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
@@ -1981,6 +1981,14 @@ object KyuubiConf {
       .checkValue(_ > 0, "must be positive value")
       .createWithDefault(200 * 1024 * 1024)
 
+  val OPERATION_RESULT_SAVE_TO_FILE_MIN_ROWS: ConfigEntry[Long] =
+    buildConf("kyuubi.operation.result.saveToFile.minRows")
+      .doc("The minRows of Spark result save to file, default value is 10000.")
+      .version("1.9.1")
+      .longConf
+      .checkValue(_ > 0, "must be positive value")
+      .createWithDefault(10000)
+
   val OPERATION_INCREMENTAL_COLLECT: ConfigEntry[Boolean] =
     buildConf("kyuubi.operation.incremental.collect")
       .internal

Reply via email to