This is an automated email from the ASF dual-hosted git repository.
fchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git
The following commit(s) were added to refs/heads/master by this push:
new 4c029f991 [KYUUBI #5377] Spark engine query result save to file
4c029f991 is described below
commit 4c029f991f3eb189912efe634da9472401da7ff0
Author: senmiaoliu <[email protected]>
AuthorDate: Wed Dec 13 16:03:11 2023 +0800
[KYUUBI #5377] Spark engine query result save to file
### _Why are the changes needed?_
close #5377
### _How was this patch tested?_
- [ ] Add some test cases that check the changes thoroughly including
negative and positive cases if possible
- [ ] Add screenshots for manual tests if appropriate
- [ ] [Run
test](https://kyuubi.readthedocs.io/en/master/contributing/code/testing.html#running-tests)
locally before make a pull request
### _Was this patch authored or co-authored using generative AI tooling?_
NO
Closes #5591 from lsm1/branch-kyuubi-5377.
Closes #5377
9d1a18c15 [senmiaoliu] ignore empty file
3c70a1e9e [LSM] fix doc
73d3c3abf [senmiaoliu] fix style and add some comment
80e1f0d70 [senmiaoliu] Close orc fetchOrcStatement and remove result save
file when ExecuteStatement close
42634a1d0 [senmiaoliu] fix style
979125d9b [senmiaoliu] fix style
1dc07a553 [senmiaoliu] spark engine save into hdfs file
Lead-authored-by: senmiaoliu <[email protected]>
Co-authored-by: LSM <[email protected]>
Signed-off-by: Fu Chen <[email protected]>
---
docs/configuration/settings.md | 3 +
.../kyuubi/engine/spark/SparkSQLEngine.scala | 16 +++
.../engine/spark/operation/ExecuteStatement.scala | 38 +++++-
.../engine/spark/operation/FetchOrcStatement.scala | 151 +++++++++++++++++++++
.../spark/session/SparkSQLSessionManager.scala | 8 ++
.../spark/sql/kyuubi/SparkDatasetHelper.scala | 29 ++++
.../org/apache/kyuubi/config/KyuubiConf.scala | 27 ++++
7 files changed, 271 insertions(+), 1 deletion(-)
diff --git a/docs/configuration/settings.md b/docs/configuration/settings.md
index 6d46eeff2..212208b83 100644
--- a/docs/configuration/settings.md
+++ b/docs/configuration/settings.md
@@ -395,6 +395,9 @@ You can configure the Kyuubi properties in
`$KYUUBI_HOME/conf/kyuubi-defaults.co
| kyuubi.operation.result.arrow.timestampAsString | false
| When true, arrow-based
rowsets will convert columns of type timestamp to strings for transmission.
[...]
| kyuubi.operation.result.format | thrift
| Specify the result
format, available configs are: <ul> <li>THRIFT: the result will convert to TRow
at the engine driver side. </li> <li>ARROW: the result will be encoded as Arrow
at the executor side before collecting by the driver, and deserialized at the
client side. note that it only takes effect for kyuubi-hive-jdbc clients
now.</li></ul> [...]
| kyuubi.operation.result.max.rows | 0
| Max rows of Spark query
results. Rows exceeding the limit would be ignored. By setting this value to 0
to disable the max rows limit.
[...]
+| kyuubi.operation.result.saveToFile.dir |
/tmp/kyuubi/tmp_kyuubi_result
| The Spark query result save dir, it should be a public accessible to every
engine. Results are saved in ORC format, and the directory structure is
`/OPERATION_RESULT_SAVE_TO_FILE_DIR/engineId/sessionId/statementId`. Each query
result will delete when query finished.
[...]
+| kyuubi.operation.result.saveToFile.enabled | false
| The switch for Spark
query result save to file.
[...]
+| kyuubi.operation.result.saveToFile.minSize | 209715200
| The minSize of Spark
result save to file, default value is 200 MB.we use spark's
`EstimationUtils#getSizePerRowestimate` to estimate the output size of the
execution plan.
[...]
| kyuubi.operation.scheduler.pool | <undefined>
| The scheduler pool of
job. Note that, this config should be used after changing Spark config
spark.scheduler.mode=FAIR.
[...]
| kyuubi.operation.spark.listener.enabled | true
| When set to true, Spark
engine registers an SQLOperationListener before executing the statement,
logging a few summary statistics when each stage completes.
[...]
| kyuubi.operation.status.polling.timeout | PT5S
| Timeout(ms) for long
polling asynchronous running sql query's status
[...]
diff --git
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala
index bf7be14b8..3dc771e6c 100644
---
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala
+++
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala
@@ -26,6 +26,7 @@ import scala.concurrent.duration.Duration
import scala.util.control.NonFatal
import com.google.common.annotations.VisibleForTesting
+import org.apache.hadoop.fs.Path
import org.apache.spark.{ui, SparkConf}
import org.apache.spark.kyuubi.{SparkContextHelper,
SparkSQLEngineEventListener, SparkSQLEngineListener}
import org.apache.spark.kyuubi.SparkUtilsHelper.getLocalDir
@@ -37,6 +38,7 @@ import org.apache.kyuubi.config.{KyuubiConf,
KyuubiReservedKeys}
import org.apache.kyuubi.config.KyuubiConf._
import
org.apache.kyuubi.config.KyuubiReservedKeys.{KYUUBI_ENGINE_SUBMIT_TIME_KEY,
KYUUBI_ENGINE_URL}
import org.apache.kyuubi.engine.ShareLevel
+import org.apache.kyuubi.engine.spark.KyuubiSparkUtil.engineId
import org.apache.kyuubi.engine.spark.SparkSQLEngine.{countDownLatch,
currentEngine}
import org.apache.kyuubi.engine.spark.events.{EngineEvent, EngineEventsStore,
SparkEventHandlerRegister}
import org.apache.kyuubi.engine.spark.session.SparkSessionImpl
@@ -58,6 +60,7 @@ case class SparkSQLEngine(spark: SparkSession) extends
Serverable("SparkSQLEngin
@volatile private var lifetimeTerminatingChecker:
Option[ScheduledExecutorService] = None
@volatile private var stopEngineExec: Option[ThreadPoolExecutor] = None
+ @volatile private var engineSavePath: Option[String] = None
override def initialize(conf: KyuubiConf): Unit = {
val listener = new SparkSQLEngineListener(this)
@@ -87,6 +90,15 @@ case class SparkSQLEngine(spark: SparkSession) extends
Serverable("SparkSQLEngin
maxInitTimeout > 0) {
startFastFailChecker(maxInitTimeout)
}
+
+ if
(backendService.sessionManager.getConf.get(OPERATION_RESULT_SAVE_TO_FILE)) {
+ val savePath =
backendService.sessionManager.getConf.get(OPERATION_RESULT_SAVE_TO_FILE_DIR)
+ engineSavePath = Some(s"$savePath/$engineId")
+ val path = new Path(engineSavePath.get)
+ val fs = path.getFileSystem(spark.sparkContext.hadoopConfiguration)
+ fs.mkdirs(path)
+ fs.deleteOnExit(path)
+ }
}
override def stop(): Unit = if (shutdown.compareAndSet(false, true)) {
@@ -102,6 +114,10 @@ case class SparkSQLEngine(spark: SparkSession) extends
Serverable("SparkSQLEngin
exec,
Duration(60, TimeUnit.SECONDS))
})
+ engineSavePath.foreach { p =>
+ val path = new Path(p)
+ path.getFileSystem(spark.sparkContext.hadoopConfiguration).delete(path,
true)
+ }
}
def gracefulStop(): Unit = if (gracefulStopDeregistered.compareAndSet(false,
true)) {
diff --git
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala
index acb49d65e..d1a213067 100644
---
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala
+++
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala
@@ -19,14 +19,16 @@ package org.apache.kyuubi.engine.spark.operation
import java.util.concurrent.RejectedExecutionException
+import scala.Array._
import scala.collection.JavaConverters._
+import org.apache.hadoop.fs.Path
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.kyuubi.SparkDatasetHelper._
import org.apache.spark.sql.types._
import org.apache.kyuubi.{KyuubiSQLException, Logging}
-import org.apache.kyuubi.config.KyuubiConf.OPERATION_RESULT_MAX_ROWS
+import org.apache.kyuubi.config.KyuubiConf.{OPERATION_RESULT_MAX_ROWS,
OPERATION_RESULT_SAVE_TO_FILE, OPERATION_RESULT_SAVE_TO_FILE_DIR,
OPERATION_RESULT_SAVE_TO_FILE_MINSIZE}
import org.apache.kyuubi.engine.spark.KyuubiSparkUtil._
import org.apache.kyuubi.engine.spark.session.SparkSessionImpl
import org.apache.kyuubi.operation.{ArrayFetchIterator, FetchIterator,
IterableFetchIterator, OperationHandle, OperationState}
@@ -46,6 +48,8 @@ class ExecuteStatement(
override def getOperationLog: Option[OperationLog] = Option(operationLog)
override protected def supportProgress: Boolean = true
+ private var fetchOrcStatement: Option[FetchOrcStatement] = None
+ private var saveFileName: Option[String] = None
override protected def resultSchema: StructType = {
if (result == null || result.schema.isEmpty) {
new StructType().add("Result", "string")
@@ -64,6 +68,15 @@ class ExecuteStatement(
OperationLog.removeCurrentOperationLog()
}
+ override def close(): Unit = {
+ super.close()
+ fetchOrcStatement.foreach(_.close())
+ saveFileName.foreach { p =>
+ val path = new Path(p)
+ path.getFileSystem(spark.sparkContext.hadoopConfiguration).delete(path,
true)
+ }
+ }
+
protected def incrementalCollectResult(resultDF: DataFrame): Iterator[Any] =
{
resultDF.toLocalIterator().asScala
}
@@ -158,6 +171,29 @@ class ExecuteStatement(
override def iterator: Iterator[Any] =
incrementalCollectResult(resultDF)
})
} else {
+ val resultSaveEnabled = getSessionConf(OPERATION_RESULT_SAVE_TO_FILE,
spark)
+ lazy val resultSaveThreshold =
getSessionConf(OPERATION_RESULT_SAVE_TO_FILE_MINSIZE, spark)
+ if (hasResultSet && resultSaveEnabled && shouldSaveResultToFs(
+ resultMaxRows,
+ resultSaveThreshold,
+ result)) {
+ val sessionId = session.handle.identifier.toString
+ val savePath =
session.sessionManager.getConf.get(OPERATION_RESULT_SAVE_TO_FILE_DIR)
+ saveFileName = Some(s"$savePath/$engineId/$sessionId/$statementId")
+ // Rename all col name to avoid duplicate columns
+ val colName = range(0, result.schema.size).map(x => "col" + x)
+ // df.write will introduce an extra shuffle for the outermost limit,
and hurt performance
+ if (resultMaxRows > 0) {
+ result.toDF(colName: _*).limit(resultMaxRows).write
+ .option("compression", "zstd").format("orc").save(saveFileName.get)
+ } else {
+ result.toDF(colName: _*).write
+ .option("compression", "zstd").format("orc").save(saveFileName.get)
+ }
+ info(s"Save result to $saveFileName")
+ fetchOrcStatement = Some(new FetchOrcStatement(spark))
+ return fetchOrcStatement.get.getIterator(saveFileName.get,
resultSchema)
+ }
val internalArray = if (resultMaxRows <= 0) {
info("Execute in full collect mode")
fullCollectResult(resultDF)
diff --git
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/FetchOrcStatement.scala
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/FetchOrcStatement.scala
new file mode 100644
index 000000000..861539b95
--- /dev/null
+++
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/FetchOrcStatement.scala
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.engine.spark.operation
+
+import scala.Array._
+import scala.collection.mutable.ListBuffer
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{LocatedFileStatus, Path}
+import org.apache.hadoop.mapreduce.{JobID, TaskAttemptID, TaskID, TaskType}
+import org.apache.hadoop.mapreduce.lib.input.FileSplit
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
+import org.apache.orc.mapred.OrcStruct
+import org.apache.orc.mapreduce.OrcInputFormat
+import org.apache.spark.sql.{Row, SparkSession}
+import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
+import org.apache.spark.sql.catalyst.expressions.AttributeReference
+import
org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
+import org.apache.spark.sql.execution.datasources.RecordReaderIterator
+import org.apache.spark.sql.execution.datasources.orc.OrcDeserializer
+import org.apache.spark.sql.types.StructType
+
+import org.apache.kyuubi.KyuubiException
+import
org.apache.kyuubi.engine.spark.KyuubiSparkUtil.SPARK_ENGINE_RUNTIME_VERSION
+import org.apache.kyuubi.operation.{FetchIterator, IterableFetchIterator}
+import org.apache.kyuubi.util.reflect.DynConstructors
+
+class FetchOrcStatement(spark: SparkSession) {
+
+ var orcIter: OrcFileIterator = _
+ def getIterator(path: String, orcSchema: StructType): FetchIterator[Row] = {
+ val conf = spark.sparkContext.hadoopConfiguration
+ val savePath = new Path(path)
+ val fsIterator = savePath.getFileSystem(conf).listFiles(savePath, false)
+ val list = new ListBuffer[LocatedFileStatus]
+ while (fsIterator.hasNext) {
+ val file = fsIterator.next()
+ if (file.getPath.getName.endsWith(".orc") && file.getLen > 0) {
+ list += file
+ }
+ }
+ val toRowConverter: InternalRow => Row = {
+ CatalystTypeConverters.createToScalaConverter(orcSchema)
+ .asInstanceOf[InternalRow => Row]
+ }
+ val colId = range(0, orcSchema.size)
+ val fullSchema = orcSchema.map(f =>
+ AttributeReference(f.name, f.dataType, f.nullable, f.metadata)())
+ val unsafeProjection = GenerateUnsafeProjection.generate(fullSchema,
fullSchema)
+ val deserializer = getOrcDeserializer(orcSchema, colId)
+ orcIter = new OrcFileIterator(list)
+ val iterRow = orcIter.map(value =>
+ unsafeProjection(deserializer.deserialize(value)))
+ .map(value => toRowConverter(value))
+ new IterableFetchIterator[Row](iterRow.toIterable)
+ }
+
+ def close(): Unit = {
+ orcIter.close()
+ }
+
+ private def getOrcDeserializer(orcSchema: StructType, colId: Array[Int]):
OrcDeserializer = {
+ try {
+ if (SPARK_ENGINE_RUNTIME_VERSION >= "3.2") {
+ // SPARK-34535 changed the constructor signature of OrcDeserializer
+ DynConstructors.builder()
+ .impl(classOf[OrcDeserializer], classOf[StructType],
classOf[Array[Int]])
+ .build[OrcDeserializer]()
+ .newInstance(
+ orcSchema,
+ colId)
+ } else {
+ DynConstructors.builder()
+ .impl(
+ classOf[OrcDeserializer],
+ classOf[StructType],
+ classOf[StructType],
+ classOf[Array[Int]])
+ .build[OrcDeserializer]()
+ .newInstance(
+ new StructType,
+ orcSchema,
+ colId)
+ }
+ } catch {
+ case e: Throwable =>
+ throw new KyuubiException("Failed to create OrcDeserializer", e)
+ }
+ }
+}
+
+class OrcFileIterator(fileList: ListBuffer[LocatedFileStatus]) extends
Iterator[OrcStruct] {
+
+ private val iters = fileList.map(x => getOrcFileIterator(x))
+
+ var idx = 0
+
+ override def hasNext: Boolean = {
+ val hasNext = iters(idx).hasNext
+ if (!hasNext) {
+ iters(idx).close()
+ idx += 1
+ // skip empty file
+ while (idx < iters.size) {
+ if (iters(idx).hasNext) {
+ return true
+ } else {
+ iters(idx).close()
+ idx = idx + 1
+ }
+ }
+ }
+ hasNext
+ }
+
+ override def next(): OrcStruct = {
+ iters(idx).next()
+ }
+
+ def close(): Unit = {
+ iters.foreach(_.close())
+ }
+
+ private def getOrcFileIterator(file: LocatedFileStatus):
RecordReaderIterator[OrcStruct] = {
+ val orcRecordReader = {
+ val split =
+ new FileSplit(file.getPath, 0, file.getLen, Array.empty[String])
+ val attemptId = new TaskAttemptID(new TaskID(new JobID(), TaskType.MAP,
0), 0)
+ val hadoopAttemptContext =
+ new TaskAttemptContextImpl(new Configuration(), attemptId)
+ val oif = new OrcInputFormat[OrcStruct]
+ oif.createRecordReader(split, hadoopAttemptContext)
+ }
+ new RecordReaderIterator[OrcStruct](orcRecordReader)
+ }
+}
diff --git
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/session/SparkSQLSessionManager.scala
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/session/SparkSQLSessionManager.scala
index c64fb52c0..aab2d5106 100644
---
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/session/SparkSQLSessionManager.scala
+++
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/session/SparkSQLSessionManager.scala
@@ -19,6 +19,7 @@ package org.apache.kyuubi.engine.spark.session
import java.util.concurrent.{ScheduledExecutorService, TimeUnit}
+import org.apache.hadoop.fs.Path
import org.apache.spark.api.python.KyuubiPythonGatewayServer
import org.apache.spark.sql.SparkSession
@@ -28,6 +29,7 @@ import
org.apache.kyuubi.config.KyuubiReservedKeys.KYUUBI_SESSION_HANDLE_KEY
import org.apache.kyuubi.engine.ShareLevel
import org.apache.kyuubi.engine.ShareLevel._
import org.apache.kyuubi.engine.spark.{KyuubiSparkUtil, SparkSQLEngine}
+import org.apache.kyuubi.engine.spark.KyuubiSparkUtil.engineId
import org.apache.kyuubi.engine.spark.operation.SparkSQLOperationManager
import org.apache.kyuubi.session._
import org.apache.kyuubi.shaded.hive.service.rpc.thrift.TProtocolVersion
@@ -184,6 +186,12 @@ class SparkSQLSessionManager private (name: String, spark:
SparkSession)
info("Session stopped due to shared level is Connection.")
stopSession()
}
+ if (conf.get(OPERATION_RESULT_SAVE_TO_FILE)) {
+ val path = new Path(s"${conf.get(OPERATION_RESULT_SAVE_TO_FILE_DIR)}/" +
+ s"$engineId/${sessionHandle.identifier}")
+ path.getFileSystem(spark.sparkContext.hadoopConfiguration).delete(path,
true)
+ info(s"Delete session result file $path")
+ }
}
private def stopSession(): Unit = {
diff --git
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/kyuubi/SparkDatasetHelper.scala
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/kyuubi/SparkDatasetHelper.scala
index e84312268..16f597cdb 100644
---
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/kyuubi/SparkDatasetHelper.scala
+++
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/kyuubi/SparkDatasetHelper.scala
@@ -25,6 +25,7 @@ import org.apache.spark.network.util.{ByteUnit, JavaUtils}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
import org.apache.spark.sql.catalyst.InternalRow
+import
org.apache.spark.sql.catalyst.plans.logical.statsEstimation.EstimationUtils
import org.apache.spark.sql.execution.{CollectLimitExec, LocalTableScanExec,
SparkPlan, SQLExecution}
import org.apache.spark.sql.execution.HiveResult
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec
@@ -293,4 +294,32 @@ object SparkDatasetHelper extends Logging {
val executionId = sc.getLocalProperty(SQLExecution.EXECUTION_ID_KEY)
SQLMetrics.postDriverMetricUpdates(sc, executionId, metrics.values.toSeq)
}
+
+ def shouldSaveResultToFs(resultMaxRows: Int, minSize: Long, result:
DataFrame): Boolean = {
+ if (isCommandExec(result.queryExecution.executedPlan.nodeName)) {
+ return false
+ }
+ lazy val limit = result.queryExecution.executedPlan match {
+ case collectLimit: CollectLimitExec => collectLimit.limit
+ case _ => resultMaxRows
+ }
+ lazy val stats = if (limit > 0) {
+ limit * EstimationUtils.getSizePerRow(
+ result.queryExecution.executedPlan.output)
+ } else {
+ result.queryExecution.optimizedPlan.stats.sizeInBytes
+ }
+ lazy val colSize =
+ if (result == null || result.schema.isEmpty) {
+ 0
+ } else {
+ result.schema.size
+ }
+ minSize > 0 && colSize > 0 && stats >= minSize
+ }
+
+ private def isCommandExec(nodeName: String): Boolean = {
+ nodeName == "org.apache.spark.sql.execution.command.ExecutedCommandExec" ||
+ nodeName == "org.apache.spark.sql.execution.CommandResultExec"
+ }
}
diff --git
a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
index be525f4e8..00c1b8995 100644
--- a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
@@ -1941,6 +1941,33 @@ object KyuubiConf {
.intConf
.createWithDefault(0)
+ val OPERATION_RESULT_SAVE_TO_FILE: ConfigEntry[Boolean] =
+ buildConf("kyuubi.operation.result.saveToFile.enabled")
+ .doc("The switch for Spark query result save to file.")
+ .version("1.9.0")
+ .booleanConf
+ .createWithDefault(false)
+
+ val OPERATION_RESULT_SAVE_TO_FILE_DIR: ConfigEntry[String] =
+ buildConf("kyuubi.operation.result.saveToFile.dir")
+ .doc("The Spark query result save dir, it should be a public accessible
to every engine." +
+ " Results are saved in ORC format, and the directory structure is" +
+ "
`/OPERATION_RESULT_SAVE_TO_FILE_DIR/engineId/sessionId/statementId`." +
+ " Each query result will delete when query finished.")
+ .version("1.9.0")
+ .stringConf
+ .createWithDefault("/tmp/kyuubi/tmp_kyuubi_result")
+
+ val OPERATION_RESULT_SAVE_TO_FILE_MINSIZE: ConfigEntry[Long] =
+ buildConf("kyuubi.operation.result.saveToFile.minSize")
+ .doc("The minSize of Spark result save to file, default value is 200
MB." +
+ "we use spark's `EstimationUtils#getSizePerRowestimate` to estimate" +
+ " the output size of the execution plan.")
+ .version("1.9.0")
+ .longConf
+ .checkValue(_ > 0, "must be positive value")
+ .createWithDefault(200 * 1024 * 1024)
+
val OPERATION_INCREMENTAL_COLLECT: ConfigEntry[Boolean] =
buildConf("kyuubi.operation.incremental.collect")
.internal