This is an automated email from the ASF dual-hosted git repository.
chengpan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git
The following commit(s) were added to refs/heads/master by this push:
new 71649daed [KYUUBI #6437] Fix Spark engine query result save to HDFS
71649daed is described below
commit 71649daedcdc120a2c2949f93fa374f28baca4b5
Author: camper42 <[email protected]>
AuthorDate: Tue Jun 4 11:28:30 2024 +0800
[KYUUBI #6437] Fix Spark engine query result save to HDFS
# :mag: Description
## Issue References ๐
This pull request fixes #6437
## Describe Your Solution ๐ง
Use `org.apache.hadoop.fs.Path` instead of `java.nio.file.Paths` to avoid
`OPERATION_RESULT_SAVE_TO_FILE_DIR` scheme unexpected change.
## Types of changes :bookmark:
- [x] Bugfix (non-breaking change which fixes an issue)
- [ ] New feature (non-breaking change which adds functionality)
- [ ] Breaking change (fix or feature that would cause existing
functionality to change)
## Test Plan ๐งช
#### Behavior Without This Pull Request :coffin:
Spark Job failed to start with error: `java.io.IOException: JuiceFS
initialized failed for jfs:///` with conf
`kyuubi.operation.result.saveToFile.dir=jfs://datalake/tmp`.
`hdfs://xxx:port/tmp` may encounter similar errors
#### Behavior With This Pull Request :tada:
User Can use hdfs dir as `kyuubi.operation.result.saveToFile.dir` without
error.
#### Related Unit Tests
Seems no test suites added in #5591 and #5986, I'll try to build a dist and
test with our internal cluster.
---
# Checklist ๐
- [x] This patch was not authored or co-authored using [Generative
Tooling](https://www.apache.org/legal/generative-tooling.html)
**Be nice. Be informative.**
Closes #6444 from camper42/save-to-hdfs.
Closes #6437
990f0a728 [camper42] [Kyuubi #6437] Fix Spark engine query result save to
HDFS
Authored-by: camper42 <[email protected]>
Signed-off-by: Cheng Pan <[email protected]>
---
.../apache/kyuubi/engine/spark/SparkSQLEngine.scala | 15 ++++++---------
.../engine/spark/operation/ExecuteStatement.scala | 17 ++++++++---------
.../spark/session/SparkSQLSessionManager.scala | 20 +++++++++-----------
3 files changed, 23 insertions(+), 29 deletions(-)
diff --git
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala
index d4418ec26..2d7367fe5 100644
---
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala
+++
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala
@@ -26,7 +26,6 @@ import scala.concurrent.duration.Duration
import scala.util.control.NonFatal
import com.google.common.annotations.VisibleForTesting
-import org.apache.hadoop.fs.Path
import org.apache.spark.{ui, SparkConf}
import org.apache.spark.kyuubi.{SparkContextHelper,
SparkSQLEngineEventListener, SparkSQLEngineListener}
import org.apache.spark.kyuubi.SparkUtilsHelper.getLocalDir
@@ -92,10 +91,9 @@ case class SparkSQLEngine(spark: SparkSession) extends
Serverable("SparkSQLEngin
}
if
(backendService.sessionManager.getConf.get(OPERATION_RESULT_SAVE_TO_FILE)) {
- val path = new Path(engineSavePath)
- val fs = path.getFileSystem(spark.sparkContext.hadoopConfiguration)
- fs.mkdirs(path)
- fs.deleteOnExit(path)
+ val fs =
engineSavePath.getFileSystem(spark.sparkContext.hadoopConfiguration)
+ fs.mkdirs(engineSavePath)
+ fs.deleteOnExit(engineSavePath)
}
}
@@ -113,10 +111,9 @@ case class SparkSQLEngine(spark: SparkSession) extends
Serverable("SparkSQLEngin
Duration(60, TimeUnit.SECONDS))
})
try {
- val path = new Path(engineSavePath)
- val fs = path.getFileSystem(spark.sparkContext.hadoopConfiguration)
- if (fs.exists(path)) {
- fs.delete(path, true)
+ val fs =
engineSavePath.getFileSystem(spark.sparkContext.hadoopConfiguration)
+ if (fs.exists(engineSavePath)) {
+ fs.delete(engineSavePath, true)
}
} catch {
case e: Throwable => error(s"Error cleaning engine result save path:
$engineSavePath", e)
diff --git
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala
index 06193d83c..d67e8b251 100644
---
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala
+++
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala
@@ -49,7 +49,7 @@ class ExecuteStatement(
override protected def supportProgress: Boolean = true
private var fetchOrcStatement: Option[FetchOrcStatement] = None
- private var saveFileName: Option[String] = None
+ private var saveFilePath: Option[Path] = None
override protected def resultSchema: StructType = {
if (result == null || result.schema.isEmpty) {
new StructType().add("Result", "string")
@@ -71,9 +71,8 @@ class ExecuteStatement(
override def close(): Unit = {
super.close()
fetchOrcStatement.foreach(_.close())
- saveFileName.foreach { p =>
- val path = new Path(p)
- path.getFileSystem(spark.sparkContext.hadoopConfiguration).delete(path,
true)
+ saveFilePath.foreach { p =>
+ p.getFileSystem(spark.sparkContext.hadoopConfiguration).delete(p, true)
}
}
@@ -179,7 +178,7 @@ class ExecuteStatement(
resultSaveSizeThreshold,
resultSaveRowsThreshold,
result)) {
- saveFileName =
+ saveFilePath =
Some(
session.sessionManager.asInstanceOf[SparkSQLSessionManager].getOperationResultSavePath(
session.handle,
@@ -190,14 +189,14 @@ class ExecuteStatement(
// df.write will introduce an extra shuffle for the outermost limit,
and hurt performance
if (resultMaxRows > 0) {
result.toDF(colName: _*).limit(resultMaxRows).write
- .option("compression", "zstd").format("orc").save(saveFileName.get)
+ .option("compression",
"zstd").format("orc").save(saveFilePath.get.toString)
} else {
result.toDF(colName: _*).write
- .option("compression", "zstd").format("orc").save(saveFileName.get)
+ .option("compression",
"zstd").format("orc").save(saveFilePath.get.toString)
}
- info(s"Save result to ${saveFileName.get}")
+ info(s"Save result to ${saveFilePath.get}")
fetchOrcStatement = Some(new FetchOrcStatement(spark))
- return fetchOrcStatement.get.getIterator(saveFileName.get,
resultSchema)
+ return fetchOrcStatement.get.getIterator(saveFilePath.get.toString,
resultSchema)
}
val internalArray = if (resultMaxRows <= 0) {
info("Execute in full collect mode")
diff --git
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/session/SparkSQLSessionManager.scala
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/session/SparkSQLSessionManager.scala
index 574d98cfa..7144188a4 100644
---
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/session/SparkSQLSessionManager.scala
+++
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/session/SparkSQLSessionManager.scala
@@ -17,7 +17,6 @@
package org.apache.kyuubi.engine.spark.session
-import java.nio.file.Paths
import java.util.concurrent.{ScheduledExecutorService, TimeUnit}
import org.apache.hadoop.fs.Path
@@ -188,10 +187,9 @@ class SparkSQLSessionManager private (name: String, spark:
SparkSession)
if (getSessionConf(KyuubiConf.OPERATION_RESULT_SAVE_TO_FILE, spark)) {
val sessionSavePath = getSessionResultSavePath(sessionHandle)
try {
- val path = new Path(sessionSavePath)
- val fs = path.getFileSystem(spark.sparkContext.hadoopConfiguration)
- if (fs.exists(path)) {
- fs.delete(path, true)
+ val fs =
sessionSavePath.getFileSystem(spark.sparkContext.hadoopConfiguration)
+ if (fs.exists(sessionSavePath)) {
+ fs.delete(sessionSavePath, true)
info(s"Deleted session result path: $sessionSavePath")
}
} catch {
@@ -211,17 +209,17 @@ class SparkSQLSessionManager private (name: String,
spark: SparkSession)
override protected def isServer: Boolean = false
- private[spark] def getEngineResultSavePath(): String = {
- Paths.get(conf.get(OPERATION_RESULT_SAVE_TO_FILE_DIR), engineId).toString
+ private[spark] def getEngineResultSavePath(): Path = {
+ new Path(conf.get(OPERATION_RESULT_SAVE_TO_FILE_DIR), engineId)
}
- private def getSessionResultSavePath(sessionHandle: SessionHandle): String =
{
- Paths.get(getEngineResultSavePath(),
sessionHandle.identifier.toString).toString
+ private def getSessionResultSavePath(sessionHandle: SessionHandle): Path = {
+ new Path(getEngineResultSavePath(), sessionHandle.identifier.toString)
}
private[spark] def getOperationResultSavePath(
sessionHandle: SessionHandle,
- opHandle: OperationHandle): String = {
- Paths.get(getSessionResultSavePath(sessionHandle),
opHandle.identifier.toString).toString
+ opHandle: OperationHandle): Path = {
+ new Path(getSessionResultSavePath(sessionHandle),
opHandle.identifier.toString)
}
}