This is an automated email from the ASF dual-hosted git repository.
kabhwan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new a76bd9dcf96 [SPARK-42379][SS] Use FileSystem.exists in
FileSystemBasedCheckpointFileManager.exists
a76bd9dcf96 is described below
commit a76bd9dcf961e0e8bfd6e14ae30a249667f04982
Author: Jungtaek Lim <[email protected]>
AuthorDate: Thu Feb 9 07:12:13 2023 +0900
[SPARK-42379][SS] Use FileSystem.exists in
FileSystemBasedCheckpointFileManager.exists
### What changes were proposed in this pull request?
This PR proposes to use FileSystem.exists in
FileSystemBasedCheckpointFileManager.exists, which is consistent with other
methods in FileSystemBasedCheckpointFileManager.
This PR also removes the test case
QueryExecutionErrorsSuite.FAILED_RENAME_PATH: rename when destination path
already exists because the test relies on incorrect custom file system instance
with non-symmetric implementation between
`FileSystemBasedCheckpointFileManager.exists` vs `FileSystem.exists`.
(See detailed explanation from
https://github.com/apache/spark/pull/39936#issuecomment-1422101967)
### Why are the changes needed?
Other methods in FileSystemBasedCheckpointFileManager already uses
FileSystem.exists for all cases checking existence of the path.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Existing tests.
Closes #39936 from
HeartSaVioR/MINOR-FileSystemBasedCheckpointFileManager-calls-fs-exists-in-exists.
Authored-by: Jungtaek Lim <[email protected]>
Signed-off-by: Jungtaek Lim <[email protected]>
---
.../streaming/CheckpointFileManager.scala | 7 +----
.../sql/errors/QueryExecutionErrorsSuite.scala | 35 ----------------------
2 files changed, 1 insertion(+), 41 deletions(-)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala
index 013efd3c7ba..6df0a2f3063 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala
@@ -256,12 +256,7 @@ class FileSystemBasedCheckpointFileManager(path: Path,
hadoopConf: Configuration
fs.open(path)
}
- override def exists(path: Path): Boolean =
- try {
- fs.getFileStatus(path) != null
- } catch {
- case _: FileNotFoundException => false
- }
+ override def exists(path: Path): Boolean = fs.exists(path)
override def renameTempFile(srcPath: Path, dstPath: Path,
overwriteIfPossible: Boolean): Unit = {
if (!overwriteIfPossible && fs.exists(dstPath)) {
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryExecutionErrorsSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryExecutionErrorsSuite.scala
index 89e0bf7fe41..90180d5e600 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryExecutionErrorsSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryExecutionErrorsSuite.scala
@@ -634,37 +634,6 @@ class QueryExecutionErrorsSuite
sqlState = "0A000")
}
- test("FAILED_RENAME_PATH: rename when destination path already exists") {
- withTempPath { p =>
- withSQLConf(
- "spark.sql.streaming.checkpointFileManagerClass" ->
- classOf[FileSystemBasedCheckpointFileManager].getName,
- "fs.file.impl" -> classOf[FakeFileSystemAlwaysExists].getName,
- // FileSystem caching could cause a different implementation of
fs.file to be used
- "fs.file.impl.disable.cache" -> "true") {
- val checkpointLocation = p.getAbsolutePath
-
- val ds = spark.readStream.format("rate").load()
- val e = intercept[SparkConcurrentModificationException] {
- ds.writeStream
- .option("checkpointLocation", checkpointLocation)
- .queryName("_")
- .format("memory")
- .start()
- }
-
- val expectedPath = p.toURI
- checkError(
- exception = e.getCause.asInstanceOf[SparkFileAlreadyExistsException],
- errorClass = "FAILED_RENAME_PATH",
- sqlState = Some("42K04"),
- matchPVals = true,
- parameters = Map("sourcePath" -> s"$expectedPath.+",
- "targetPath" -> s"$expectedPath.+"))
- }
- }
- }
-
test("RENAME_SRC_PATH_NOT_FOUND: rename the file which source path does not
exist") {
withTempPath { p =>
withSQLConf(
@@ -805,10 +774,6 @@ class FakeFileSystemSetPermission extends LocalFileSystem {
}
}
-class FakeFileSystemAlwaysExists extends DebugFilesystem {
- override def exists(f: Path): Boolean = true
-}
-
class FakeFileSystemNeverExists extends DebugFilesystem {
override def exists(f: Path): Boolean = false
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]