This is an automated email from the ASF dual-hosted git repository.

fchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git


The following commit(s) were added to refs/heads/master by this push:
     new 4c029f991 [KYUUBI #5377] Spark engine query result save to file
4c029f991 is described below

commit 4c029f991f3eb189912efe634da9472401da7ff0
Author: senmiaoliu <[email protected]>
AuthorDate: Wed Dec 13 16:03:11 2023 +0800

    [KYUUBI #5377] Spark engine query result save to file
    
    ### _Why are the changes needed?_
    
    close #5377
    
    ### _How was this patch tested?_
    - [ ] Add some test cases that check the changes thoroughly including 
negative and positive cases if possible
    
    - [ ] Add screenshots for manual tests if appropriate
    
    - [ ] [Run 
test](https://kyuubi.readthedocs.io/en/master/contributing/code/testing.html#running-tests)
 locally before make a pull request
    
    ### _Was this patch authored or co-authored using generative AI tooling?_
    
    NO
    
    Closes #5591 from lsm1/branch-kyuubi-5377.
    
    Closes #5377
    
    9d1a18c15 [senmiaoliu] ignore empty file
    3c70a1e9e [LSM] fix doc
    73d3c3abf [senmiaoliu] fix style and add some comment
    80e1f0d70 [senmiaoliu] Close orc fetchOrcStatement and remove result save 
file when ExecuteStatement close
    42634a1d0 [senmiaoliu] fix style
    979125d9b [senmiaoliu] fix style
    1dc07a553 [senmiaoliu] spark engine save into hdfs file
    
    Lead-authored-by: senmiaoliu <[email protected]>
    Co-authored-by: LSM <[email protected]>
    Signed-off-by: Fu Chen <[email protected]>
---
 docs/configuration/settings.md                     |   3 +
 .../kyuubi/engine/spark/SparkSQLEngine.scala       |  16 +++
 .../engine/spark/operation/ExecuteStatement.scala  |  38 +++++-
 .../engine/spark/operation/FetchOrcStatement.scala | 151 +++++++++++++++++++++
 .../spark/session/SparkSQLSessionManager.scala     |   8 ++
 .../spark/sql/kyuubi/SparkDatasetHelper.scala      |  29 ++++
 .../org/apache/kyuubi/config/KyuubiConf.scala      |  27 ++++
 7 files changed, 271 insertions(+), 1 deletion(-)

diff --git a/docs/configuration/settings.md b/docs/configuration/settings.md
index 6d46eeff2..212208b83 100644
--- a/docs/configuration/settings.md
+++ b/docs/configuration/settings.md
@@ -395,6 +395,9 @@ You can configure the Kyuubi properties in 
`$KYUUBI_HOME/conf/kyuubi-defaults.co
 | kyuubi.operation.result.arrow.timestampAsString  | false                     
                                                      | When true, arrow-based 
rowsets will convert columns of type timestamp to strings for transmission.     
                                                                                
                                                                                
                                                                                
               [...]
 | kyuubi.operation.result.format                   | thrift                    
                                                      | Specify the result 
format, available configs are: <ul> <li>THRIFT: the result will convert to TRow 
at the engine driver side. </li> <li>ARROW: the result will be encoded as Arrow 
at the executor side before collecting by the driver, and deserialized at the 
client side. note that it only takes effect for kyuubi-hive-jdbc clients 
now.</li></ul>              [...]
 | kyuubi.operation.result.max.rows                 | 0                         
                                                      | Max rows of Spark query 
results. Rows exceeding the limit would be ignored. By setting this value to 0 
to disable the max rows limit.                                                  
                                                                                
                                                                                
               [...]
+| kyuubi.operation.result.saveToFile.dir           | 
/tmp/kyuubi/tmp_kyuubi_result                                                   
| The Spark query result save dir, it should be a public accessible to every 
engine. Results are saved in ORC format, and the directory structure is 
`/OPERATION_RESULT_SAVE_TO_FILE_DIR/engineId/sessionId/statementId`. Each query 
result will delete when query finished.                                         
                                                   [...]
+| kyuubi.operation.result.saveToFile.enabled       | false                     
                                                      | The switch for Spark 
query result save to file.                                                      
                                                                                
                                                                                
                                                                                
                 [...]
+| kyuubi.operation.result.saveToFile.minSize       | 209715200                 
                                                      | The minSize of Spark 
result save to file, default value is 200 MB.we use spark's 
`EstimationUtils#getSizePerRowestimate` to estimate the output size of the 
execution plan.                                                                 
                                                                                
                                          [...]
 | kyuubi.operation.scheduler.pool                  | &lt;undefined&gt;         
                                                      | The scheduler pool of 
job. Note that, this config should be used after changing Spark config 
spark.scheduler.mode=FAIR.                                                      
                                                                                
                                                                                
                         [...]
 | kyuubi.operation.spark.listener.enabled          | true                      
                                                      | When set to true, Spark 
engine registers an SQLOperationListener before executing the statement, 
logging a few summary statistics when each stage completes.                     
                                                                                
                                                                                
                     [...]
 | kyuubi.operation.status.polling.timeout          | PT5S                      
                                                      | Timeout(ms) for long 
polling asynchronous running sql query's status                                 
                                                                                
                                                                                
                                                                                
                 [...]
diff --git 
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala
 
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala
index bf7be14b8..3dc771e6c 100644
--- 
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala
+++ 
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala
@@ -26,6 +26,7 @@ import scala.concurrent.duration.Duration
 import scala.util.control.NonFatal
 
 import com.google.common.annotations.VisibleForTesting
+import org.apache.hadoop.fs.Path
 import org.apache.spark.{ui, SparkConf}
 import org.apache.spark.kyuubi.{SparkContextHelper, 
SparkSQLEngineEventListener, SparkSQLEngineListener}
 import org.apache.spark.kyuubi.SparkUtilsHelper.getLocalDir
@@ -37,6 +38,7 @@ import org.apache.kyuubi.config.{KyuubiConf, 
KyuubiReservedKeys}
 import org.apache.kyuubi.config.KyuubiConf._
 import 
org.apache.kyuubi.config.KyuubiReservedKeys.{KYUUBI_ENGINE_SUBMIT_TIME_KEY, 
KYUUBI_ENGINE_URL}
 import org.apache.kyuubi.engine.ShareLevel
+import org.apache.kyuubi.engine.spark.KyuubiSparkUtil.engineId
 import org.apache.kyuubi.engine.spark.SparkSQLEngine.{countDownLatch, 
currentEngine}
 import org.apache.kyuubi.engine.spark.events.{EngineEvent, EngineEventsStore, 
SparkEventHandlerRegister}
 import org.apache.kyuubi.engine.spark.session.SparkSessionImpl
@@ -58,6 +60,7 @@ case class SparkSQLEngine(spark: SparkSession) extends 
Serverable("SparkSQLEngin
 
   @volatile private var lifetimeTerminatingChecker: 
Option[ScheduledExecutorService] = None
   @volatile private var stopEngineExec: Option[ThreadPoolExecutor] = None
+  @volatile private var engineSavePath: Option[String] = None
 
   override def initialize(conf: KyuubiConf): Unit = {
     val listener = new SparkSQLEngineListener(this)
@@ -87,6 +90,15 @@ case class SparkSQLEngine(spark: SparkSession) extends 
Serverable("SparkSQLEngin
       maxInitTimeout > 0) {
       startFastFailChecker(maxInitTimeout)
     }
+
+    if 
(backendService.sessionManager.getConf.get(OPERATION_RESULT_SAVE_TO_FILE)) {
+      val savePath = 
backendService.sessionManager.getConf.get(OPERATION_RESULT_SAVE_TO_FILE_DIR)
+      engineSavePath = Some(s"$savePath/$engineId")
+      val path = new Path(engineSavePath.get)
+      val fs = path.getFileSystem(spark.sparkContext.hadoopConfiguration)
+      fs.mkdirs(path)
+      fs.deleteOnExit(path)
+    }
   }
 
   override def stop(): Unit = if (shutdown.compareAndSet(false, true)) {
@@ -102,6 +114,10 @@ case class SparkSQLEngine(spark: SparkSession) extends 
Serverable("SparkSQLEngin
         exec,
         Duration(60, TimeUnit.SECONDS))
     })
+    engineSavePath.foreach { p =>
+      val path = new Path(p)
+      path.getFileSystem(spark.sparkContext.hadoopConfiguration).delete(path, 
true)
+    }
   }
 
   def gracefulStop(): Unit = if (gracefulStopDeregistered.compareAndSet(false, 
true)) {
diff --git 
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala
 
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala
index acb49d65e..d1a213067 100644
--- 
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala
+++ 
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala
@@ -19,14 +19,16 @@ package org.apache.kyuubi.engine.spark.operation
 
 import java.util.concurrent.RejectedExecutionException
 
+import scala.Array._
 import scala.collection.JavaConverters._
 
+import org.apache.hadoop.fs.Path
 import org.apache.spark.sql.DataFrame
 import org.apache.spark.sql.kyuubi.SparkDatasetHelper._
 import org.apache.spark.sql.types._
 
 import org.apache.kyuubi.{KyuubiSQLException, Logging}
-import org.apache.kyuubi.config.KyuubiConf.OPERATION_RESULT_MAX_ROWS
+import org.apache.kyuubi.config.KyuubiConf.{OPERATION_RESULT_MAX_ROWS, 
OPERATION_RESULT_SAVE_TO_FILE, OPERATION_RESULT_SAVE_TO_FILE_DIR, 
OPERATION_RESULT_SAVE_TO_FILE_MINSIZE}
 import org.apache.kyuubi.engine.spark.KyuubiSparkUtil._
 import org.apache.kyuubi.engine.spark.session.SparkSessionImpl
 import org.apache.kyuubi.operation.{ArrayFetchIterator, FetchIterator, 
IterableFetchIterator, OperationHandle, OperationState}
@@ -46,6 +48,8 @@ class ExecuteStatement(
   override def getOperationLog: Option[OperationLog] = Option(operationLog)
   override protected def supportProgress: Boolean = true
 
+  private var fetchOrcStatement: Option[FetchOrcStatement] = None
+  private var saveFileName: Option[String] = None
   override protected def resultSchema: StructType = {
     if (result == null || result.schema.isEmpty) {
       new StructType().add("Result", "string")
@@ -64,6 +68,15 @@ class ExecuteStatement(
     OperationLog.removeCurrentOperationLog()
   }
 
+  override def close(): Unit = {
+    super.close()
+    fetchOrcStatement.foreach(_.close())
+    saveFileName.foreach { p =>
+      val path = new Path(p)
+      path.getFileSystem(spark.sparkContext.hadoopConfiguration).delete(path, 
true)
+    }
+  }
+
   protected def incrementalCollectResult(resultDF: DataFrame): Iterator[Any] = 
{
     resultDF.toLocalIterator().asScala
   }
@@ -158,6 +171,29 @@ class ExecuteStatement(
         override def iterator: Iterator[Any] = 
incrementalCollectResult(resultDF)
       })
     } else {
+      val resultSaveEnabled = getSessionConf(OPERATION_RESULT_SAVE_TO_FILE, 
spark)
+      lazy val resultSaveThreshold = 
getSessionConf(OPERATION_RESULT_SAVE_TO_FILE_MINSIZE, spark)
+      if (hasResultSet && resultSaveEnabled && shouldSaveResultToFs(
+          resultMaxRows,
+          resultSaveThreshold,
+          result)) {
+        val sessionId = session.handle.identifier.toString
+        val savePath = 
session.sessionManager.getConf.get(OPERATION_RESULT_SAVE_TO_FILE_DIR)
+        saveFileName = Some(s"$savePath/$engineId/$sessionId/$statementId")
+        // Rename all col name to avoid duplicate columns
+        val colName = range(0, result.schema.size).map(x => "col" + x)
+        // df.write will introduce an extra shuffle for the outermost limit, 
and hurt performance
+        if (resultMaxRows > 0) {
+          result.toDF(colName: _*).limit(resultMaxRows).write
+            .option("compression", "zstd").format("orc").save(saveFileName.get)
+        } else {
+          result.toDF(colName: _*).write
+            .option("compression", "zstd").format("orc").save(saveFileName.get)
+        }
+        info(s"Save result to $saveFileName")
+        fetchOrcStatement = Some(new FetchOrcStatement(spark))
+        return fetchOrcStatement.get.getIterator(saveFileName.get, 
resultSchema)
+      }
       val internalArray = if (resultMaxRows <= 0) {
         info("Execute in full collect mode")
         fullCollectResult(resultDF)
diff --git 
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/FetchOrcStatement.scala
 
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/FetchOrcStatement.scala
new file mode 100644
index 000000000..861539b95
--- /dev/null
+++ 
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/FetchOrcStatement.scala
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.engine.spark.operation
+
+import scala.Array._
+import scala.collection.mutable.ListBuffer
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{LocatedFileStatus, Path}
+import org.apache.hadoop.mapreduce.{JobID, TaskAttemptID, TaskID, TaskType}
+import org.apache.hadoop.mapreduce.lib.input.FileSplit
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
+import org.apache.orc.mapred.OrcStruct
+import org.apache.orc.mapreduce.OrcInputFormat
+import org.apache.spark.sql.{Row, SparkSession}
+import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
+import org.apache.spark.sql.catalyst.expressions.AttributeReference
+import 
org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
+import org.apache.spark.sql.execution.datasources.RecordReaderIterator
+import org.apache.spark.sql.execution.datasources.orc.OrcDeserializer
+import org.apache.spark.sql.types.StructType
+
+import org.apache.kyuubi.KyuubiException
+import 
org.apache.kyuubi.engine.spark.KyuubiSparkUtil.SPARK_ENGINE_RUNTIME_VERSION
+import org.apache.kyuubi.operation.{FetchIterator, IterableFetchIterator}
+import org.apache.kyuubi.util.reflect.DynConstructors
+
+class FetchOrcStatement(spark: SparkSession) {
+
+  var orcIter: OrcFileIterator = _
+  def getIterator(path: String, orcSchema: StructType): FetchIterator[Row] = {
+    val conf = spark.sparkContext.hadoopConfiguration
+    val savePath = new Path(path)
+    val fsIterator = savePath.getFileSystem(conf).listFiles(savePath, false)
+    val list = new ListBuffer[LocatedFileStatus]
+    while (fsIterator.hasNext) {
+      val file = fsIterator.next()
+      if (file.getPath.getName.endsWith(".orc") && file.getLen > 0) {
+        list += file
+      }
+    }
+    val toRowConverter: InternalRow => Row = {
+      CatalystTypeConverters.createToScalaConverter(orcSchema)
+        .asInstanceOf[InternalRow => Row]
+    }
+    val colId = range(0, orcSchema.size)
+    val fullSchema = orcSchema.map(f =>
+      AttributeReference(f.name, f.dataType, f.nullable, f.metadata)())
+    val unsafeProjection = GenerateUnsafeProjection.generate(fullSchema, 
fullSchema)
+    val deserializer = getOrcDeserializer(orcSchema, colId)
+    orcIter = new OrcFileIterator(list)
+    val iterRow = orcIter.map(value =>
+      unsafeProjection(deserializer.deserialize(value)))
+      .map(value => toRowConverter(value))
+    new IterableFetchIterator[Row](iterRow.toIterable)
+  }
+
+  def close(): Unit = {
+    orcIter.close()
+  }
+
+  private def getOrcDeserializer(orcSchema: StructType, colId: Array[Int]): 
OrcDeserializer = {
+    try {
+      if (SPARK_ENGINE_RUNTIME_VERSION >= "3.2") {
+        // SPARK-34535 changed the constructor signature of OrcDeserializer
+        DynConstructors.builder()
+          .impl(classOf[OrcDeserializer], classOf[StructType], 
classOf[Array[Int]])
+          .build[OrcDeserializer]()
+          .newInstance(
+            orcSchema,
+            colId)
+      } else {
+        DynConstructors.builder()
+          .impl(
+            classOf[OrcDeserializer],
+            classOf[StructType],
+            classOf[StructType],
+            classOf[Array[Int]])
+          .build[OrcDeserializer]()
+          .newInstance(
+            new StructType,
+            orcSchema,
+            colId)
+      }
+    } catch {
+      case e: Throwable =>
+        throw new KyuubiException("Failed to create OrcDeserializer", e)
+    }
+  }
+}
+
+class OrcFileIterator(fileList: ListBuffer[LocatedFileStatus]) extends 
Iterator[OrcStruct] {
+
+  private val iters = fileList.map(x => getOrcFileIterator(x))
+
+  var idx = 0
+
+  override def hasNext: Boolean = {
+    val hasNext = iters(idx).hasNext
+    if (!hasNext) {
+      iters(idx).close()
+      idx += 1
+      // skip empty file
+      while (idx < iters.size) {
+        if (iters(idx).hasNext) {
+          return true
+        } else {
+          iters(idx).close()
+          idx = idx + 1
+        }
+      }
+    }
+    hasNext
+  }
+
+  override def next(): OrcStruct = {
+    iters(idx).next()
+  }
+
+  def close(): Unit = {
+    iters.foreach(_.close())
+  }
+
+  private def getOrcFileIterator(file: LocatedFileStatus): 
RecordReaderIterator[OrcStruct] = {
+    val orcRecordReader = {
+      val split =
+        new FileSplit(file.getPath, 0, file.getLen, Array.empty[String])
+      val attemptId = new TaskAttemptID(new TaskID(new JobID(), TaskType.MAP, 
0), 0)
+      val hadoopAttemptContext =
+        new TaskAttemptContextImpl(new Configuration(), attemptId)
+      val oif = new OrcInputFormat[OrcStruct]
+      oif.createRecordReader(split, hadoopAttemptContext)
+    }
+    new RecordReaderIterator[OrcStruct](orcRecordReader)
+  }
+}
diff --git 
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/session/SparkSQLSessionManager.scala
 
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/session/SparkSQLSessionManager.scala
index c64fb52c0..aab2d5106 100644
--- 
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/session/SparkSQLSessionManager.scala
+++ 
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/session/SparkSQLSessionManager.scala
@@ -19,6 +19,7 @@ package org.apache.kyuubi.engine.spark.session
 
 import java.util.concurrent.{ScheduledExecutorService, TimeUnit}
 
+import org.apache.hadoop.fs.Path
 import org.apache.spark.api.python.KyuubiPythonGatewayServer
 import org.apache.spark.sql.SparkSession
 
@@ -28,6 +29,7 @@ import 
org.apache.kyuubi.config.KyuubiReservedKeys.KYUUBI_SESSION_HANDLE_KEY
 import org.apache.kyuubi.engine.ShareLevel
 import org.apache.kyuubi.engine.ShareLevel._
 import org.apache.kyuubi.engine.spark.{KyuubiSparkUtil, SparkSQLEngine}
+import org.apache.kyuubi.engine.spark.KyuubiSparkUtil.engineId
 import org.apache.kyuubi.engine.spark.operation.SparkSQLOperationManager
 import org.apache.kyuubi.session._
 import org.apache.kyuubi.shaded.hive.service.rpc.thrift.TProtocolVersion
@@ -184,6 +186,12 @@ class SparkSQLSessionManager private (name: String, spark: 
SparkSession)
       info("Session stopped due to shared level is Connection.")
       stopSession()
     }
+    if (conf.get(OPERATION_RESULT_SAVE_TO_FILE)) {
+      val path = new Path(s"${conf.get(OPERATION_RESULT_SAVE_TO_FILE_DIR)}/" +
+        s"$engineId/${sessionHandle.identifier}")
+      path.getFileSystem(spark.sparkContext.hadoopConfiguration).delete(path, 
true)
+      info(s"Delete session result file $path")
+    }
   }
 
   private def stopSession(): Unit = {
diff --git 
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/kyuubi/SparkDatasetHelper.scala
 
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/kyuubi/SparkDatasetHelper.scala
index e84312268..16f597cdb 100644
--- 
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/kyuubi/SparkDatasetHelper.scala
+++ 
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/kyuubi/SparkDatasetHelper.scala
@@ -25,6 +25,7 @@ import org.apache.spark.network.util.{ByteUnit, JavaUtils}
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
 import org.apache.spark.sql.catalyst.InternalRow
+import 
org.apache.spark.sql.catalyst.plans.logical.statsEstimation.EstimationUtils
 import org.apache.spark.sql.execution.{CollectLimitExec, LocalTableScanExec, 
SparkPlan, SQLExecution}
 import org.apache.spark.sql.execution.HiveResult
 import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec
@@ -293,4 +294,32 @@ object SparkDatasetHelper extends Logging {
     val executionId = sc.getLocalProperty(SQLExecution.EXECUTION_ID_KEY)
     SQLMetrics.postDriverMetricUpdates(sc, executionId, metrics.values.toSeq)
   }
+
+  def shouldSaveResultToFs(resultMaxRows: Int, minSize: Long, result: 
DataFrame): Boolean = {
+    if (isCommandExec(result.queryExecution.executedPlan.nodeName)) {
+      return false
+    }
+    lazy val limit = result.queryExecution.executedPlan match {
+      case collectLimit: CollectLimitExec => collectLimit.limit
+      case _ => resultMaxRows
+    }
+    lazy val stats = if (limit > 0) {
+      limit * EstimationUtils.getSizePerRow(
+        result.queryExecution.executedPlan.output)
+    } else {
+      result.queryExecution.optimizedPlan.stats.sizeInBytes
+    }
+    lazy val colSize =
+      if (result == null || result.schema.isEmpty) {
+        0
+      } else {
+        result.schema.size
+      }
+    minSize > 0 && colSize > 0 && stats >= minSize
+  }
+
+  private def isCommandExec(nodeName: String): Boolean = {
+    nodeName == "org.apache.spark.sql.execution.command.ExecutedCommandExec" ||
+    nodeName == "org.apache.spark.sql.execution.CommandResultExec"
+  }
 }
diff --git 
a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala 
b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
index be525f4e8..00c1b8995 100644
--- a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
@@ -1941,6 +1941,33 @@ object KyuubiConf {
       .intConf
       .createWithDefault(0)
 
+  val OPERATION_RESULT_SAVE_TO_FILE: ConfigEntry[Boolean] =
+    buildConf("kyuubi.operation.result.saveToFile.enabled")
+      .doc("The switch for Spark query result save to file.")
+      .version("1.9.0")
+      .booleanConf
+      .createWithDefault(false)
+
+  val OPERATION_RESULT_SAVE_TO_FILE_DIR: ConfigEntry[String] =
+    buildConf("kyuubi.operation.result.saveToFile.dir")
+      .doc("The Spark query result save dir, it should be a public accessible 
to every engine." +
+        " Results are saved in ORC format, and the directory structure is" +
+        " 
`/OPERATION_RESULT_SAVE_TO_FILE_DIR/engineId/sessionId/statementId`." +
+        " Each query result will delete when query finished.")
+      .version("1.9.0")
+      .stringConf
+      .createWithDefault("/tmp/kyuubi/tmp_kyuubi_result")
+
+  val OPERATION_RESULT_SAVE_TO_FILE_MINSIZE: ConfigEntry[Long] =
+    buildConf("kyuubi.operation.result.saveToFile.minSize")
+      .doc("The minSize of Spark result save to file, default value is 200 
MB." +
+        "we use spark's `EstimationUtils#getSizePerRowestimate` to estimate" +
+        " the output size of the execution plan.")
+      .version("1.9.0")
+      .longConf
+      .checkValue(_ > 0, "must be positive value")
+      .createWithDefault(200 * 1024 * 1024)
+
   val OPERATION_INCREMENTAL_COLLECT: ConfigEntry[Boolean] =
     buildConf("kyuubi.operation.incremental.collect")
       .internal

Reply via email to