This is an automated email from the ASF dual-hosted git repository.

feiwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git


The following commit(s) were added to refs/heads/master by this push:
     new 91840d439 [KYUUBI #5377][FOLLOWUP] Always try to cleanup session 
result path
91840d439 is described below

commit 91840d439c067b2b40c0bf801461a5d675685016
Author: Fei Wang <[email protected]>
AuthorDate: Sat Feb 3 17:46:58 2024 -0800

    [KYUUBI #5377][FOLLOWUP] Always try to cleanup session result path
    
    # :mag: Description
    ## Issue References ๐Ÿ”—
    
    We shall always try to clean session result path as 
`kyuubi.operation.result.saveToFile.enabled` can be enabled during runtime.
    This pull request fixes #5377
    
    ## Describe Your Solution ๐Ÿ”ง
    
    Please include a summary of the change and which issue is fixed. Please 
also include relevant motivation and context. List any dependencies that are 
required for this change.
    
    ## Types of changes :bookmark:
    
    - [ ] Bugfix (non-breaking change which fixes an issue)
    - [ ] New feature (non-breaking change which adds functionality)
    - [ ] Breaking change (fix or feature that would cause existing 
functionality to change)
    
    ## Test Plan ๐Ÿงช
    
    #### Behavior Without This Pull Request :coffin:
    
    #### Behavior With This Pull Request :tada:
    
    #### Related Unit Tests
    
    ---
    
    # Checklist ๐Ÿ“
    
    - [ ] This patch was not authored or co-authored using [Generative 
Tooling](https://www.apache.org/legal/generative-tooling.html)
    
    **Be nice. Be informative.**
    
    Closes #5986 from turboFei/OPERATION_RESULT_SAVE_TO_FILE_runtime.
    
    Closes #5377
    
    5dfdf96c3 [Fei Wang] comments
    895caec8e [Fei Wang] [KYUUBI #5377][FOLLOWUP] Always try to cleanup session 
result path
    
    Authored-by: Fei Wang <[email protected]>
    Signed-off-by: Fei Wang <[email protected]>
---
 .../kyuubi/engine/spark/SparkSQLEngine.scala       | 21 +++++++-----
 .../engine/spark/operation/ExecuteStatement.scala  | 16 +++++----
 .../spark/session/SparkSQLSessionManager.scala     | 39 ++++++++++++++++++----
 3 files changed, 53 insertions(+), 23 deletions(-)

diff --git 
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala
 
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala
index 3dc771e6c..d2e738366 100644
--- 
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala
+++ 
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala
@@ -38,10 +38,9 @@ import org.apache.kyuubi.config.{KyuubiConf, 
KyuubiReservedKeys}
 import org.apache.kyuubi.config.KyuubiConf._
 import 
org.apache.kyuubi.config.KyuubiReservedKeys.{KYUUBI_ENGINE_SUBMIT_TIME_KEY, 
KYUUBI_ENGINE_URL}
 import org.apache.kyuubi.engine.ShareLevel
-import org.apache.kyuubi.engine.spark.KyuubiSparkUtil.engineId
 import org.apache.kyuubi.engine.spark.SparkSQLEngine.{countDownLatch, 
currentEngine}
 import org.apache.kyuubi.engine.spark.events.{EngineEvent, EngineEventsStore, 
SparkEventHandlerRegister}
-import org.apache.kyuubi.engine.spark.session.SparkSessionImpl
+import org.apache.kyuubi.engine.spark.session.{SparkSessionImpl, 
SparkSQLSessionManager}
 import org.apache.kyuubi.events.EventBus
 import org.apache.kyuubi.ha.HighAvailabilityConf._
 import org.apache.kyuubi.ha.client.RetryPolicies
@@ -60,7 +59,8 @@ case class SparkSQLEngine(spark: SparkSession) extends 
Serverable("SparkSQLEngin
 
   @volatile private var lifetimeTerminatingChecker: 
Option[ScheduledExecutorService] = None
   @volatile private var stopEngineExec: Option[ThreadPoolExecutor] = None
-  @volatile private var engineSavePath: Option[String] = None
+  private lazy val engineSavePath =
+    
backendService.sessionManager.asInstanceOf[SparkSQLSessionManager].getEngineResultSavePath()
 
   override def initialize(conf: KyuubiConf): Unit = {
     val listener = new SparkSQLEngineListener(this)
@@ -92,9 +92,7 @@ case class SparkSQLEngine(spark: SparkSession) extends 
Serverable("SparkSQLEngin
     }
 
     if 
(backendService.sessionManager.getConf.get(OPERATION_RESULT_SAVE_TO_FILE)) {
-      val savePath = 
backendService.sessionManager.getConf.get(OPERATION_RESULT_SAVE_TO_FILE_DIR)
-      engineSavePath = Some(s"$savePath/$engineId")
-      val path = new Path(engineSavePath.get)
+      val path = new Path(engineSavePath)
       val fs = path.getFileSystem(spark.sparkContext.hadoopConfiguration)
       fs.mkdirs(path)
       fs.deleteOnExit(path)
@@ -114,9 +112,14 @@ case class SparkSQLEngine(spark: SparkSession) extends 
Serverable("SparkSQLEngin
         exec,
         Duration(60, TimeUnit.SECONDS))
     })
-    engineSavePath.foreach { p =>
-      val path = new Path(p)
-      path.getFileSystem(spark.sparkContext.hadoopConfiguration).delete(path, 
true)
+    try {
+      val path = new Path(engineSavePath)
+      val fs = path.getFileSystem(spark.sparkContext.hadoopConfiguration)
+      if (fs.exists(path)) {
+        fs.delete(path, true)
+      }
+    } catch {
+      case e: Throwable => error(s"Error cleaning engine result save path: 
$engineSavePath", e)
     }
   }
 
diff --git 
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala
 
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala
index 8b47e2075..bf68f18f0 100644
--- 
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala
+++ 
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala
@@ -28,9 +28,9 @@ import org.apache.spark.sql.kyuubi.SparkDatasetHelper._
 import org.apache.spark.sql.types._
 
 import org.apache.kyuubi.{KyuubiSQLException, Logging}
-import org.apache.kyuubi.config.KyuubiConf.{OPERATION_RESULT_MAX_ROWS, 
OPERATION_RESULT_SAVE_TO_FILE, OPERATION_RESULT_SAVE_TO_FILE_DIR, 
OPERATION_RESULT_SAVE_TO_FILE_MINSIZE}
+import org.apache.kyuubi.config.KyuubiConf.{OPERATION_RESULT_MAX_ROWS, 
OPERATION_RESULT_SAVE_TO_FILE, OPERATION_RESULT_SAVE_TO_FILE_MINSIZE}
 import org.apache.kyuubi.engine.spark.KyuubiSparkUtil._
-import org.apache.kyuubi.engine.spark.session.SparkSessionImpl
+import org.apache.kyuubi.engine.spark.session.{SparkSessionImpl, 
SparkSQLSessionManager}
 import org.apache.kyuubi.operation.{ArrayFetchIterator, FetchIterator, 
IterableFetchIterator, OperationHandle, OperationState}
 import org.apache.kyuubi.operation.log.OperationLog
 import org.apache.kyuubi.session.Session
@@ -172,14 +172,16 @@ class ExecuteStatement(
       })
     } else {
       val resultSaveEnabled = getSessionConf(OPERATION_RESULT_SAVE_TO_FILE, 
spark)
-      lazy val resultSaveThreshold = 
getSessionConf(OPERATION_RESULT_SAVE_TO_FILE_MINSIZE, spark)
+      val resultSaveThreshold = 
getSessionConf(OPERATION_RESULT_SAVE_TO_FILE_MINSIZE, spark)
       if (hasResultSet && resultSaveEnabled && shouldSaveResultToFs(
           resultMaxRows,
           resultSaveThreshold,
           result)) {
-        val sessionId = session.handle.identifier.toString
-        val savePath = 
session.sessionManager.getConf.get(OPERATION_RESULT_SAVE_TO_FILE_DIR)
-        saveFileName = Some(s"$savePath/$engineId/$sessionId/$statementId")
+        saveFileName =
+          Some(
+            
session.sessionManager.asInstanceOf[SparkSQLSessionManager].getOperationResultSavePath(
+              session.handle,
+              handle))
         // Rename all col name to avoid duplicate columns
         val colName = range(0, result.schema.size).map(x => "col" + x)
 
@@ -192,7 +194,7 @@ class ExecuteStatement(
           result.toDF(colName: _*).write
             .option("compression", codec).format("orc").save(saveFileName.get)
         }
-        info(s"Save result to $saveFileName")
+        info(s"Save result to ${saveFileName.get}")
         fetchOrcStatement = Some(new FetchOrcStatement(spark))
         return fetchOrcStatement.get.getIterator(saveFileName.get, 
resultSchema)
       }
diff --git 
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/session/SparkSQLSessionManager.scala
 
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/session/SparkSQLSessionManager.scala
index aab2d5106..b6768c697 100644
--- 
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/session/SparkSQLSessionManager.scala
+++ 
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/session/SparkSQLSessionManager.scala
@@ -17,6 +17,7 @@
 
 package org.apache.kyuubi.engine.spark.session
 
+import java.nio.file.Paths
 import java.util.concurrent.{ScheduledExecutorService, TimeUnit}
 
 import org.apache.hadoop.fs.Path
@@ -24,13 +25,15 @@ import org.apache.spark.api.python.KyuubiPythonGatewayServer
 import org.apache.spark.sql.SparkSession
 
 import org.apache.kyuubi.KyuubiSQLException
+import org.apache.kyuubi.config.KyuubiConf
 import org.apache.kyuubi.config.KyuubiConf._
 import org.apache.kyuubi.config.KyuubiReservedKeys.KYUUBI_SESSION_HANDLE_KEY
 import org.apache.kyuubi.engine.ShareLevel
 import org.apache.kyuubi.engine.ShareLevel._
 import org.apache.kyuubi.engine.spark.{KyuubiSparkUtil, SparkSQLEngine}
-import org.apache.kyuubi.engine.spark.KyuubiSparkUtil.engineId
+import org.apache.kyuubi.engine.spark.KyuubiSparkUtil.{engineId, 
getSessionConf}
 import org.apache.kyuubi.engine.spark.operation.SparkSQLOperationManager
+import org.apache.kyuubi.operation.OperationHandle
 import org.apache.kyuubi.session._
 import org.apache.kyuubi.shaded.hive.service.rpc.thrift.TProtocolVersion
 import org.apache.kyuubi.util.ThreadUtils
@@ -181,17 +184,25 @@ class SparkSQLSessionManager private (name: String, 
spark: SparkSession)
     } catch {
       case e: KyuubiSQLException =>
         warn(s"Error closing session ${sessionHandle}", e)
+    } finally {
+      if (getSessionConf(KyuubiConf.OPERATION_RESULT_SAVE_TO_FILE, spark)) {
+        val sessionSavePath = getSessionResultSavePath(sessionHandle)
+        try {
+          val path = new Path(sessionSavePath)
+          val fs = path.getFileSystem(spark.sparkContext.hadoopConfiguration)
+          if (fs.exists(path)) {
+            fs.delete(path, true)
+            info(s"Deleted session result path: $sessionSavePath")
+          }
+        } catch {
+          case e: Throwable => error(s"Error cleaning session result path: 
$sessionSavePath", e)
+        }
+      }
     }
     if (shareLevel == ShareLevel.CONNECTION) {
       info("Session stopped due to shared level is Connection.")
       stopSession()
     }
-    if (conf.get(OPERATION_RESULT_SAVE_TO_FILE)) {
-      val path = new Path(s"${conf.get(OPERATION_RESULT_SAVE_TO_FILE_DIR)}/" +
-        s"$engineId/${sessionHandle.identifier}")
-      path.getFileSystem(spark.sparkContext.hadoopConfiguration).delete(path, 
true)
-      info(s"Delete session result file $path")
-    }
   }
 
   private def stopSession(): Unit = {
@@ -199,4 +210,18 @@ class SparkSQLSessionManager private (name: String, spark: 
SparkSession)
   }
 
   override protected def isServer: Boolean = false
+
+  private[spark] def getEngineResultSavePath(): String = {
+    Paths.get(conf.get(OPERATION_RESULT_SAVE_TO_FILE_DIR), engineId).toString
+  }
+
+  private def getSessionResultSavePath(sessionHandle: SessionHandle): String = 
{
+    Paths.get(getEngineResultSavePath(), 
sessionHandle.identifier.toString).toString
+  }
+
+  private[spark] def getOperationResultSavePath(
+      sessionHandle: SessionHandle,
+      opHandle: OperationHandle): String = {
+    Paths.get(getSessionResultSavePath(sessionHandle), 
opHandle.identifier.toString).toString
+  }
 }

Reply via email to