This is an automated email from the ASF dual-hosted git repository.

chengpan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git


The following commit(s) were added to refs/heads/master by this push:
     new 71649daed [KYUUBI #6437] Fix Spark engine query result save to HDFS
71649daed is described below

commit 71649daedcdc120a2c2949f93fa374f28baca4b5
Author: camper42 <[email protected]>
AuthorDate: Tue Jun 4 11:28:30 2024 +0800

    [KYUUBI #6437] Fix Spark engine query result save to HDFS
    
    # :mag: Description
    ## Issue References ๐Ÿ”—
    
    This pull request fixes #6437
    
    ## Describe Your Solution ๐Ÿ”ง
    
    Use `org.apache.hadoop.fs.Path` instead of `java.nio.file.Paths` to avoid 
`OPERATION_RESULT_SAVE_TO_FILE_DIR` scheme unexpected change.
    
    ## Types of changes :bookmark:
    
    - [x] Bugfix (non-breaking change which fixes an issue)
    - [ ] New feature (non-breaking change which adds functionality)
    - [ ] Breaking change (fix or feature that would cause existing 
functionality to change)
    
    ## Test Plan ๐Ÿงช
    
    #### Behavior Without This Pull Request :coffin:
    
    Spark Job failed to start with error: `java.io.IOException: JuiceFS 
initialized failed for jfs:///` with conf 
`kyuubi.operation.result.saveToFile.dir=jfs://datalake/tmp`.
    
    `hdfs://xxx:port/tmp` may encounter similar errors
    
    #### Behavior With This Pull Request :tada:
    
    User Can use hdfs dir as `kyuubi.operation.result.saveToFile.dir` without 
error.
    
    #### Related Unit Tests
    
    Seems no test suites added in #5591 and #5986, I'll try to build a dist and 
test with our internal cluster.
    
    ---
    
    # Checklist ๐Ÿ“
    
    - [x] This patch was not authored or co-authored using [Generative 
Tooling](https://www.apache.org/legal/generative-tooling.html)
    
    **Be nice. Be informative.**
    
    Closes #6444 from camper42/save-to-hdfs.
    
    Closes #6437
    
    990f0a728 [camper42] [Kyuubi #6437] Fix Spark engine query result save to 
HDFS
    
    Authored-by: camper42 <[email protected]>
    Signed-off-by: Cheng Pan <[email protected]>
---
 .../apache/kyuubi/engine/spark/SparkSQLEngine.scala  | 15 ++++++---------
 .../engine/spark/operation/ExecuteStatement.scala    | 17 ++++++++---------
 .../spark/session/SparkSQLSessionManager.scala       | 20 +++++++++-----------
 3 files changed, 23 insertions(+), 29 deletions(-)

diff --git 
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala
 
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala
index d4418ec26..2d7367fe5 100644
--- 
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala
+++ 
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala
@@ -26,7 +26,6 @@ import scala.concurrent.duration.Duration
 import scala.util.control.NonFatal
 
 import com.google.common.annotations.VisibleForTesting
-import org.apache.hadoop.fs.Path
 import org.apache.spark.{ui, SparkConf}
 import org.apache.spark.kyuubi.{SparkContextHelper, 
SparkSQLEngineEventListener, SparkSQLEngineListener}
 import org.apache.spark.kyuubi.SparkUtilsHelper.getLocalDir
@@ -92,10 +91,9 @@ case class SparkSQLEngine(spark: SparkSession) extends 
Serverable("SparkSQLEngin
     }
 
     if 
(backendService.sessionManager.getConf.get(OPERATION_RESULT_SAVE_TO_FILE)) {
-      val path = new Path(engineSavePath)
-      val fs = path.getFileSystem(spark.sparkContext.hadoopConfiguration)
-      fs.mkdirs(path)
-      fs.deleteOnExit(path)
+      val fs = 
engineSavePath.getFileSystem(spark.sparkContext.hadoopConfiguration)
+      fs.mkdirs(engineSavePath)
+      fs.deleteOnExit(engineSavePath)
     }
   }
 
@@ -113,10 +111,9 @@ case class SparkSQLEngine(spark: SparkSession) extends 
Serverable("SparkSQLEngin
         Duration(60, TimeUnit.SECONDS))
     })
     try {
-      val path = new Path(engineSavePath)
-      val fs = path.getFileSystem(spark.sparkContext.hadoopConfiguration)
-      if (fs.exists(path)) {
-        fs.delete(path, true)
+      val fs = 
engineSavePath.getFileSystem(spark.sparkContext.hadoopConfiguration)
+      if (fs.exists(engineSavePath)) {
+        fs.delete(engineSavePath, true)
       }
     } catch {
       case e: Throwable => error(s"Error cleaning engine result save path: 
$engineSavePath", e)
diff --git 
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala
 
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala
index 06193d83c..d67e8b251 100644
--- 
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala
+++ 
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala
@@ -49,7 +49,7 @@ class ExecuteStatement(
   override protected def supportProgress: Boolean = true
 
   private var fetchOrcStatement: Option[FetchOrcStatement] = None
-  private var saveFileName: Option[String] = None
+  private var saveFilePath: Option[Path] = None
   override protected def resultSchema: StructType = {
     if (result == null || result.schema.isEmpty) {
       new StructType().add("Result", "string")
@@ -71,9 +71,8 @@ class ExecuteStatement(
   override def close(): Unit = {
     super.close()
     fetchOrcStatement.foreach(_.close())
-    saveFileName.foreach { p =>
-      val path = new Path(p)
-      path.getFileSystem(spark.sparkContext.hadoopConfiguration).delete(path, 
true)
+    saveFilePath.foreach { p =>
+      p.getFileSystem(spark.sparkContext.hadoopConfiguration).delete(p, true)
     }
   }
 
@@ -179,7 +178,7 @@ class ExecuteStatement(
           resultSaveSizeThreshold,
           resultSaveRowsThreshold,
           result)) {
-        saveFileName =
+        saveFilePath =
           Some(
             
session.sessionManager.asInstanceOf[SparkSQLSessionManager].getOperationResultSavePath(
               session.handle,
@@ -190,14 +189,14 @@ class ExecuteStatement(
         // df.write will introduce an extra shuffle for the outermost limit, 
and hurt performance
         if (resultMaxRows > 0) {
           result.toDF(colName: _*).limit(resultMaxRows).write
-            .option("compression", "zstd").format("orc").save(saveFileName.get)
+            .option("compression", 
"zstd").format("orc").save(saveFilePath.get.toString)
         } else {
           result.toDF(colName: _*).write
-            .option("compression", "zstd").format("orc").save(saveFileName.get)
+            .option("compression", 
"zstd").format("orc").save(saveFilePath.get.toString)
         }
-        info(s"Save result to ${saveFileName.get}")
+        info(s"Save result to ${saveFilePath.get}")
         fetchOrcStatement = Some(new FetchOrcStatement(spark))
-        return fetchOrcStatement.get.getIterator(saveFileName.get, 
resultSchema)
+        return fetchOrcStatement.get.getIterator(saveFilePath.get.toString, 
resultSchema)
       }
       val internalArray = if (resultMaxRows <= 0) {
         info("Execute in full collect mode")
diff --git 
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/session/SparkSQLSessionManager.scala
 
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/session/SparkSQLSessionManager.scala
index 574d98cfa..7144188a4 100644
--- 
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/session/SparkSQLSessionManager.scala
+++ 
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/session/SparkSQLSessionManager.scala
@@ -17,7 +17,6 @@
 
 package org.apache.kyuubi.engine.spark.session
 
-import java.nio.file.Paths
 import java.util.concurrent.{ScheduledExecutorService, TimeUnit}
 
 import org.apache.hadoop.fs.Path
@@ -188,10 +187,9 @@ class SparkSQLSessionManager private (name: String, spark: 
SparkSession)
       if (getSessionConf(KyuubiConf.OPERATION_RESULT_SAVE_TO_FILE, spark)) {
         val sessionSavePath = getSessionResultSavePath(sessionHandle)
         try {
-          val path = new Path(sessionSavePath)
-          val fs = path.getFileSystem(spark.sparkContext.hadoopConfiguration)
-          if (fs.exists(path)) {
-            fs.delete(path, true)
+          val fs = 
sessionSavePath.getFileSystem(spark.sparkContext.hadoopConfiguration)
+          if (fs.exists(sessionSavePath)) {
+            fs.delete(sessionSavePath, true)
             info(s"Deleted session result path: $sessionSavePath")
           }
         } catch {
@@ -211,17 +209,17 @@ class SparkSQLSessionManager private (name: String, 
spark: SparkSession)
 
   override protected def isServer: Boolean = false
 
-  private[spark] def getEngineResultSavePath(): String = {
-    Paths.get(conf.get(OPERATION_RESULT_SAVE_TO_FILE_DIR), engineId).toString
+  private[spark] def getEngineResultSavePath(): Path = {
+    new Path(conf.get(OPERATION_RESULT_SAVE_TO_FILE_DIR), engineId)
   }
 
-  private def getSessionResultSavePath(sessionHandle: SessionHandle): String = 
{
-    Paths.get(getEngineResultSavePath(), 
sessionHandle.identifier.toString).toString
+  private def getSessionResultSavePath(sessionHandle: SessionHandle): Path = {
+    new Path(getEngineResultSavePath(), sessionHandle.identifier.toString)
   }
 
   private[spark] def getOperationResultSavePath(
       sessionHandle: SessionHandle,
-      opHandle: OperationHandle): String = {
-    Paths.get(getSessionResultSavePath(sessionHandle), 
opHandle.identifier.toString).toString
+      opHandle: OperationHandle): Path = {
+    new Path(getSessionResultSavePath(sessionHandle), 
opHandle.identifier.toString)
   }
 }

Reply via email to