This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new fd009d652f7 [SPARK-45642][CORE][SQL] Fix `FileSystem.isFile &
FileSystem.isDirectory is deprecated`
fd009d652f7 is described below
commit fd009d652f7922254ccc7cc631b8df3a6b821532
Author: panbingkun <[email protected]>
AuthorDate: Sun Dec 10 14:11:19 2023 -0800
[SPARK-45642][CORE][SQL] Fix `FileSystem.isFile & FileSystem.isDirectory is
deprecated`
### What changes were proposed in this pull request?
The pr aims to fix `FileSystem.isFile & FileSystem.isDirectory is
deprecated` & make some error message prompts more accurate.
### Why are the changes needed?
- Prepare for future Hadoop to truly eliminate this method
- Reduce warn prompts.
- Make some error message prompts more accurate.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
- Pass GA.
- Manually test
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #43505 from panbingkun/SPARK-45642.
Authored-by: panbingkun <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
---
.../scala/org/apache/spark/deploy/SparkHadoopUtil.scala | 9 ++++++++-
.../apache/spark/deploy/history/FsHistoryProvider.scala | 2 +-
core/src/main/scala/org/apache/spark/util/Utils.scala | 2 +-
.../scala/org/apache/spark/deploy/SparkSubmitSuite.scala | 2 +-
.../execution/datasources/PartitioningAwareFileIndex.scala | 14 +++++++++++---
.../spark/sql/execution/streaming/FileStreamSink.scala | 2 +-
.../scala/org/apache/spark/streaming/util/HdfsUtils.scala | 2 +-
7 files changed, 24 insertions(+), 9 deletions(-)
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
index 50906f76b6e..628b688dedb 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
@@ -17,7 +17,7 @@
package org.apache.spark.deploy
-import java.io.{ByteArrayInputStream, ByteArrayOutputStream, DataInputStream,
DataOutputStream, File, IOException}
+import java.io.{ByteArrayInputStream, ByteArrayOutputStream, DataInputStream,
DataOutputStream, File, FileNotFoundException, IOException}
import java.net.InetAddress
import java.security.PrivilegedExceptionAction
import java.text.DateFormat
@@ -593,4 +593,11 @@ private[spark] object SparkHadoopUtil extends Logging {
}
}
+ def isFile(fs: FileSystem, path: Path): Boolean = {
+ try {
+ fs.getFileStatus(path).isFile
+ } catch {
+ case _: FileNotFoundException => false
+ }
+ }
}
diff --git
a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
index 565499bb610..73fb0086b33 100644
---
a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
+++
b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
@@ -860,7 +860,7 @@ private[history] class FsHistoryProvider(conf: SparkConf,
clock: Clock)
try {
// Fetch the entry first to avoid an RPC when it's already removed.
listing.read(classOf[LogInfo], inProgressLog)
- if (!fs.isFile(new Path(inProgressLog))) {
+ if (!SparkHadoopUtil.isFile(fs, new Path(inProgressLog))) {
listing.synchronized {
listing.delete(classOf[LogInfo], inProgressLog)
}
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala
b/core/src/main/scala/org/apache/spark/util/Utils.scala
index 049999281f5..a074bd53d26 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -674,7 +674,7 @@ private[spark] object Utils
throw new IOException(s"Failed to create directory ${targetDir.getPath}")
}
val dest = new File(targetDir, filename.getOrElse(path.getName))
- if (fs.isFile(path)) {
+ if (fs.getFileStatus(path).isFile) {
val in = fs.open(path)
try {
downloadFile(path.toString, in, dest, fileOverwrite)
diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
index e235b8aeb77..d16a15df1b5 100644
--- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
@@ -1774,7 +1774,7 @@ class TestFileSystem extends
org.apache.hadoop.fs.LocalFileSystem {
status
}
- override def isFile(path: Path): Boolean = super.isFile(local(path))
+ override def getFileStatus(path: Path): FileStatus =
super.getFileStatus(local(path))
override def globStatus(pathPattern: Path): Array[FileStatus] = {
val newPath = new Path(pathPattern.toUri.getPath)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala
index dc41afe226b..3efe614bcef 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala
@@ -17,6 +17,8 @@
package org.apache.spark.sql.execution.datasources
+import java.io.FileNotFoundException
+
import scala.collection.mutable
import org.apache.hadoop.conf.Configuration
@@ -222,9 +224,15 @@ abstract class PartitioningAwareFileIndex(
caseInsensitiveMap.get(FileIndexOptions.BASE_PATH_PARAM).map(new Path(_))
match {
case Some(userDefinedBasePath) =>
val fs = userDefinedBasePath.getFileSystem(hadoopConf)
- if (!fs.isDirectory(userDefinedBasePath)) {
- throw new IllegalArgumentException(s"Option
'${FileIndexOptions.BASE_PATH_PARAM}' " +
- s"must be a directory")
+ try {
+ if (!fs.getFileStatus(userDefinedBasePath).isDirectory) {
+ throw new IllegalArgumentException(s"Option
'${FileIndexOptions.BASE_PATH_PARAM}' " +
+ s"must be a directory")
+ }
+ } catch {
+ case _: FileNotFoundException =>
+ throw new IllegalArgumentException(s"Option
'${FileIndexOptions.BASE_PATH_PARAM}' " +
+ s"not found")
}
val qualifiedBasePath = fs.makeQualified(userDefinedBasePath)
val qualifiedBasePathStr = qualifiedBasePath.toString
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala
index 04a1de02ea5..ea8db3c99de 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala
@@ -51,7 +51,7 @@ object FileStreamSink extends Logging {
val hdfsPath = new Path(singlePath)
try {
val fs = hdfsPath.getFileSystem(hadoopConf)
- if (fs.isDirectory(hdfsPath)) {
+ if (fs.getFileStatus(hdfsPath).isDirectory) {
val metadataPath = getMetadataLogPath(fs, hdfsPath, sqlConf)
fs.exists(metadataPath)
} else {
diff --git
a/streaming/src/main/scala/org/apache/spark/streaming/util/HdfsUtils.scala
b/streaming/src/main/scala/org/apache/spark/streaming/util/HdfsUtils.scala
index ef040681adf..703fcb5edb3 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/util/HdfsUtils.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/util/HdfsUtils.scala
@@ -30,7 +30,7 @@ private[streaming] object HdfsUtils {
val dfs = getFileSystemForPath(dfsPath, conf)
// If the file exists and we have append support, append instead of
creating a new file
val stream: FSDataOutputStream = {
- if (dfs.isFile(dfsPath)) {
+ if (SparkHadoopUtil.isFile(dfs, dfsPath)) {
if (conf.getBoolean("dfs.support.append", true) ||
conf.getBoolean("hdfs.append.support", false) ||
dfs.isInstanceOf[RawLocalFileSystem]) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]