This is an automated email from the ASF dual-hosted git repository.
feiwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git
The following commit(s) were added to refs/heads/master by this push:
new 91840d439 [KYUUBI #5377][FOLLOWUP] Always try to cleanup session
result path
91840d439 is described below
commit 91840d439c067b2b40c0bf801461a5d675685016
Author: Fei Wang <[email protected]>
AuthorDate: Sat Feb 3 17:46:58 2024 -0800
[KYUUBI #5377][FOLLOWUP] Always try to cleanup session result path
# :mag: Description
## Issue References ๐
We shall always try to clean session result path as
`kyuubi.operation.result.saveToFile.enabled` can be enabled during runtime.
This pull request fixes #5377
## Describe Your Solution ๐ง
Please include a summary of the change and which issue is fixed. Please
also include relevant motivation and context. List any dependencies that are
required for this change.
## Types of changes :bookmark:
- [ ] Bugfix (non-breaking change which fixes an issue)
- [ ] New feature (non-breaking change which adds functionality)
- [ ] Breaking change (fix or feature that would cause existing
functionality to change)
## Test Plan ๐งช
#### Behavior Without This Pull Request :coffin:
#### Behavior With This Pull Request :tada:
#### Related Unit Tests
---
# Checklist ๐
- [ ] This patch was not authored or co-authored using [Generative
Tooling](https://www.apache.org/legal/generative-tooling.html)
**Be nice. Be informative.**
Closes #5986 from turboFei/OPERATION_RESULT_SAVE_TO_FILE_runtime.
Closes #5377
5dfdf96c3 [Fei Wang] comments
895caec8e [Fei Wang] [KYUUBI #5377][FOLLOWUP] Always try to cleanup session
result path
Authored-by: Fei Wang <[email protected]>
Signed-off-by: Fei Wang <[email protected]>
---
.../kyuubi/engine/spark/SparkSQLEngine.scala | 21 +++++++-----
.../engine/spark/operation/ExecuteStatement.scala | 16 +++++----
.../spark/session/SparkSQLSessionManager.scala | 39 ++++++++++++++++++----
3 files changed, 53 insertions(+), 23 deletions(-)
diff --git
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala
index 3dc771e6c..d2e738366 100644
---
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala
+++
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala
@@ -38,10 +38,9 @@ import org.apache.kyuubi.config.{KyuubiConf,
KyuubiReservedKeys}
import org.apache.kyuubi.config.KyuubiConf._
import
org.apache.kyuubi.config.KyuubiReservedKeys.{KYUUBI_ENGINE_SUBMIT_TIME_KEY,
KYUUBI_ENGINE_URL}
import org.apache.kyuubi.engine.ShareLevel
-import org.apache.kyuubi.engine.spark.KyuubiSparkUtil.engineId
import org.apache.kyuubi.engine.spark.SparkSQLEngine.{countDownLatch,
currentEngine}
import org.apache.kyuubi.engine.spark.events.{EngineEvent, EngineEventsStore,
SparkEventHandlerRegister}
-import org.apache.kyuubi.engine.spark.session.SparkSessionImpl
+import org.apache.kyuubi.engine.spark.session.{SparkSessionImpl,
SparkSQLSessionManager}
import org.apache.kyuubi.events.EventBus
import org.apache.kyuubi.ha.HighAvailabilityConf._
import org.apache.kyuubi.ha.client.RetryPolicies
@@ -60,7 +59,8 @@ case class SparkSQLEngine(spark: SparkSession) extends
Serverable("SparkSQLEngin
@volatile private var lifetimeTerminatingChecker:
Option[ScheduledExecutorService] = None
@volatile private var stopEngineExec: Option[ThreadPoolExecutor] = None
- @volatile private var engineSavePath: Option[String] = None
+ private lazy val engineSavePath =
+
backendService.sessionManager.asInstanceOf[SparkSQLSessionManager].getEngineResultSavePath()
override def initialize(conf: KyuubiConf): Unit = {
val listener = new SparkSQLEngineListener(this)
@@ -92,9 +92,7 @@ case class SparkSQLEngine(spark: SparkSession) extends
Serverable("SparkSQLEngin
}
if
(backendService.sessionManager.getConf.get(OPERATION_RESULT_SAVE_TO_FILE)) {
- val savePath =
backendService.sessionManager.getConf.get(OPERATION_RESULT_SAVE_TO_FILE_DIR)
- engineSavePath = Some(s"$savePath/$engineId")
- val path = new Path(engineSavePath.get)
+ val path = new Path(engineSavePath)
val fs = path.getFileSystem(spark.sparkContext.hadoopConfiguration)
fs.mkdirs(path)
fs.deleteOnExit(path)
@@ -114,9 +112,14 @@ case class SparkSQLEngine(spark: SparkSession) extends
Serverable("SparkSQLEngin
exec,
Duration(60, TimeUnit.SECONDS))
})
- engineSavePath.foreach { p =>
- val path = new Path(p)
- path.getFileSystem(spark.sparkContext.hadoopConfiguration).delete(path,
true)
+ try {
+ val path = new Path(engineSavePath)
+ val fs = path.getFileSystem(spark.sparkContext.hadoopConfiguration)
+ if (fs.exists(path)) {
+ fs.delete(path, true)
+ }
+ } catch {
+ case e: Throwable => error(s"Error cleaning engine result save path:
$engineSavePath", e)
}
}
diff --git
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala
index 8b47e2075..bf68f18f0 100644
---
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala
+++
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala
@@ -28,9 +28,9 @@ import org.apache.spark.sql.kyuubi.SparkDatasetHelper._
import org.apache.spark.sql.types._
import org.apache.kyuubi.{KyuubiSQLException, Logging}
-import org.apache.kyuubi.config.KyuubiConf.{OPERATION_RESULT_MAX_ROWS,
OPERATION_RESULT_SAVE_TO_FILE, OPERATION_RESULT_SAVE_TO_FILE_DIR,
OPERATION_RESULT_SAVE_TO_FILE_MINSIZE}
+import org.apache.kyuubi.config.KyuubiConf.{OPERATION_RESULT_MAX_ROWS,
OPERATION_RESULT_SAVE_TO_FILE, OPERATION_RESULT_SAVE_TO_FILE_MINSIZE}
import org.apache.kyuubi.engine.spark.KyuubiSparkUtil._
-import org.apache.kyuubi.engine.spark.session.SparkSessionImpl
+import org.apache.kyuubi.engine.spark.session.{SparkSessionImpl,
SparkSQLSessionManager}
import org.apache.kyuubi.operation.{ArrayFetchIterator, FetchIterator,
IterableFetchIterator, OperationHandle, OperationState}
import org.apache.kyuubi.operation.log.OperationLog
import org.apache.kyuubi.session.Session
@@ -172,14 +172,16 @@ class ExecuteStatement(
})
} else {
val resultSaveEnabled = getSessionConf(OPERATION_RESULT_SAVE_TO_FILE,
spark)
- lazy val resultSaveThreshold =
getSessionConf(OPERATION_RESULT_SAVE_TO_FILE_MINSIZE, spark)
+ val resultSaveThreshold =
getSessionConf(OPERATION_RESULT_SAVE_TO_FILE_MINSIZE, spark)
if (hasResultSet && resultSaveEnabled && shouldSaveResultToFs(
resultMaxRows,
resultSaveThreshold,
result)) {
- val sessionId = session.handle.identifier.toString
- val savePath =
session.sessionManager.getConf.get(OPERATION_RESULT_SAVE_TO_FILE_DIR)
- saveFileName = Some(s"$savePath/$engineId/$sessionId/$statementId")
+ saveFileName =
+ Some(
+
session.sessionManager.asInstanceOf[SparkSQLSessionManager].getOperationResultSavePath(
+ session.handle,
+ handle))
// Rename all col name to avoid duplicate columns
val colName = range(0, result.schema.size).map(x => "col" + x)
@@ -192,7 +194,7 @@ class ExecuteStatement(
result.toDF(colName: _*).write
.option("compression", codec).format("orc").save(saveFileName.get)
}
- info(s"Save result to $saveFileName")
+ info(s"Save result to ${saveFileName.get}")
fetchOrcStatement = Some(new FetchOrcStatement(spark))
return fetchOrcStatement.get.getIterator(saveFileName.get,
resultSchema)
}
diff --git
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/session/SparkSQLSessionManager.scala
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/session/SparkSQLSessionManager.scala
index aab2d5106..b6768c697 100644
---
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/session/SparkSQLSessionManager.scala
+++
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/session/SparkSQLSessionManager.scala
@@ -17,6 +17,7 @@
package org.apache.kyuubi.engine.spark.session
+import java.nio.file.Paths
import java.util.concurrent.{ScheduledExecutorService, TimeUnit}
import org.apache.hadoop.fs.Path
@@ -24,13 +25,15 @@ import org.apache.spark.api.python.KyuubiPythonGatewayServer
import org.apache.spark.sql.SparkSession
import org.apache.kyuubi.KyuubiSQLException
+import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.config.KyuubiConf._
import org.apache.kyuubi.config.KyuubiReservedKeys.KYUUBI_SESSION_HANDLE_KEY
import org.apache.kyuubi.engine.ShareLevel
import org.apache.kyuubi.engine.ShareLevel._
import org.apache.kyuubi.engine.spark.{KyuubiSparkUtil, SparkSQLEngine}
-import org.apache.kyuubi.engine.spark.KyuubiSparkUtil.engineId
+import org.apache.kyuubi.engine.spark.KyuubiSparkUtil.{engineId,
getSessionConf}
import org.apache.kyuubi.engine.spark.operation.SparkSQLOperationManager
+import org.apache.kyuubi.operation.OperationHandle
import org.apache.kyuubi.session._
import org.apache.kyuubi.shaded.hive.service.rpc.thrift.TProtocolVersion
import org.apache.kyuubi.util.ThreadUtils
@@ -181,17 +184,25 @@ class SparkSQLSessionManager private (name: String,
spark: SparkSession)
} catch {
case e: KyuubiSQLException =>
warn(s"Error closing session ${sessionHandle}", e)
+ } finally {
+ if (getSessionConf(KyuubiConf.OPERATION_RESULT_SAVE_TO_FILE, spark)) {
+ val sessionSavePath = getSessionResultSavePath(sessionHandle)
+ try {
+ val path = new Path(sessionSavePath)
+ val fs = path.getFileSystem(spark.sparkContext.hadoopConfiguration)
+ if (fs.exists(path)) {
+ fs.delete(path, true)
+ info(s"Deleted session result path: $sessionSavePath")
+ }
+ } catch {
+ case e: Throwable => error(s"Error cleaning session result path:
$sessionSavePath", e)
+ }
+ }
}
if (shareLevel == ShareLevel.CONNECTION) {
info("Session stopped due to shared level is Connection.")
stopSession()
}
- if (conf.get(OPERATION_RESULT_SAVE_TO_FILE)) {
- val path = new Path(s"${conf.get(OPERATION_RESULT_SAVE_TO_FILE_DIR)}/" +
- s"$engineId/${sessionHandle.identifier}")
- path.getFileSystem(spark.sparkContext.hadoopConfiguration).delete(path,
true)
- info(s"Delete session result file $path")
- }
}
private def stopSession(): Unit = {
@@ -199,4 +210,18 @@ class SparkSQLSessionManager private (name: String, spark:
SparkSession)
}
override protected def isServer: Boolean = false
+
+ private[spark] def getEngineResultSavePath(): String = {
+ Paths.get(conf.get(OPERATION_RESULT_SAVE_TO_FILE_DIR), engineId).toString
+ }
+
+ private def getSessionResultSavePath(sessionHandle: SessionHandle): String =
{
+ Paths.get(getEngineResultSavePath(),
sessionHandle.identifier.toString).toString
+ }
+
+ private[spark] def getOperationResultSavePath(
+ sessionHandle: SessionHandle,
+ opHandle: OperationHandle): String = {
+ Paths.get(getSessionResultSavePath(sessionHandle),
opHandle.identifier.toString).toString
+ }
}