This is an automated email from the ASF dual-hosted git repository.

kabhwan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new a76bd9dcf96 [SPARK-42379][SS] Use FileSystem.exists in 
FileSystemBasedCheckpointFileManager.exists
a76bd9dcf96 is described below

commit a76bd9dcf961e0e8bfd6e14ae30a249667f04982
Author: Jungtaek Lim <kabhwan.opensou...@gmail.com>
AuthorDate: Thu Feb 9 07:12:13 2023 +0900

    [SPARK-42379][SS] Use FileSystem.exists in 
FileSystemBasedCheckpointFileManager.exists
    
    ### What changes were proposed in this pull request?
    
    This PR proposes to use FileSystem.exists in 
FileSystemBasedCheckpointFileManager.exists, which is consistent with other 
methods in FileSystemBasedCheckpointFileManager.
    
    This PR also removes the test case 
QueryExecutionErrorsSuite.FAILED_RENAME_PATH: rename when destination path 
already exists because the test relies on incorrect custom file system instance 
with non-symmetric implementation between 
`FileSystemBasedCheckpointFileManager.exists` vs `FileSystem.exists`.
    (See detailed explanation from 
https://github.com/apache/spark/pull/39936#issuecomment-1422101967)
    
    ### Why are the changes needed?
    
    Other methods in FileSystemBasedCheckpointFileManager already uses 
FileSystem.exists for all cases checking existence of the path.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Existing tests.
    
    Closes #39936 from 
HeartSaVioR/MINOR-FileSystemBasedCheckpointFileManager-calls-fs-exists-in-exists.
    
    Authored-by: Jungtaek Lim <kabhwan.opensou...@gmail.com>
    Signed-off-by: Jungtaek Lim <kabhwan.opensou...@gmail.com>
---
 .../streaming/CheckpointFileManager.scala          |  7 +----
 .../sql/errors/QueryExecutionErrorsSuite.scala     | 35 ----------------------
 2 files changed, 1 insertion(+), 41 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala
index 013efd3c7ba..6df0a2f3063 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala
@@ -256,12 +256,7 @@ class FileSystemBasedCheckpointFileManager(path: Path, 
hadoopConf: Configuration
     fs.open(path)
   }
 
-  override def exists(path: Path): Boolean =
-    try {
-      fs.getFileStatus(path) != null
-    } catch {
-      case _: FileNotFoundException => false
-    }
+  override def exists(path: Path): Boolean = fs.exists(path)
 
   override def renameTempFile(srcPath: Path, dstPath: Path, 
overwriteIfPossible: Boolean): Unit = {
     if (!overwriteIfPossible && fs.exists(dstPath)) {
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryExecutionErrorsSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryExecutionErrorsSuite.scala
index 89e0bf7fe41..90180d5e600 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryExecutionErrorsSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryExecutionErrorsSuite.scala
@@ -634,37 +634,6 @@ class QueryExecutionErrorsSuite
       sqlState = "0A000")
   }
 
-  test("FAILED_RENAME_PATH: rename when destination path already exists") {
-    withTempPath { p =>
-      withSQLConf(
-        "spark.sql.streaming.checkpointFileManagerClass" ->
-          classOf[FileSystemBasedCheckpointFileManager].getName,
-        "fs.file.impl" -> classOf[FakeFileSystemAlwaysExists].getName,
-        // FileSystem caching could cause a different implementation of 
fs.file to be used
-        "fs.file.impl.disable.cache" -> "true") {
-        val checkpointLocation = p.getAbsolutePath
-
-        val ds = spark.readStream.format("rate").load()
-        val e = intercept[SparkConcurrentModificationException] {
-          ds.writeStream
-            .option("checkpointLocation", checkpointLocation)
-            .queryName("_")
-            .format("memory")
-            .start()
-        }
-
-        val expectedPath = p.toURI
-        checkError(
-          exception = e.getCause.asInstanceOf[SparkFileAlreadyExistsException],
-          errorClass = "FAILED_RENAME_PATH",
-          sqlState = Some("42K04"),
-          matchPVals = true,
-          parameters = Map("sourcePath" -> s"$expectedPath.+",
-            "targetPath" -> s"$expectedPath.+"))
-      }
-    }
-  }
-
   test("RENAME_SRC_PATH_NOT_FOUND: rename the file which source path does not 
exist") {
     withTempPath { p =>
       withSQLConf(
@@ -805,10 +774,6 @@ class FakeFileSystemSetPermission extends LocalFileSystem {
   }
 }
 
-class FakeFileSystemAlwaysExists extends DebugFilesystem {
-  override def exists(f: Path): Boolean = true
-}
-
 class FakeFileSystemNeverExists extends DebugFilesystem {
   override def exists(f: Path): Boolean = false
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to