This is an automated email from the ASF dual-hosted git repository.

yao pushed a commit to branch branch-3.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.4 by this push:
     new 147a98b7e1a3 [SPARK-48991][SQL] Move path initialization into 
try-catch block in FileStreamSink.hasMetadata
147a98b7e1a3 is described below

commit 147a98b7e1a374b859c229a35d418cd88d71bcb2
Author: Kent Yao <[email protected]>
AuthorDate: Wed Jul 24 19:47:29 2024 +0800

    [SPARK-48991][SQL] Move path initialization into try-catch block in 
FileStreamSink.hasMetadata
    
    ### What changes were proposed in this pull request?
    
    This pull request proposed to move path initialization into try-catch block 
in FileStreamSink.hasMetadata. Then, exceptions from invalid paths can be 
handled consistently like other path-related exceptions in the current 
try-catch block. At last, we can make the errors fall into the correct code 
branches to be handled
    
    ### Why are the changes needed?
    
    bugfix for improperly handled exceptions in FileStreamSink.hasMetadata
    
    ### Does this PR introduce _any_ user-facing change?
    
    no, an invalid path is still invalid, but fails in the correct places
    
    ### How was this patch tested?
    
    new test
    
    ### Was this patch authored or co-authored using generative AI tooling?
    no
    
    Closes #47471 from yaooqinn/SPARK-48991.
    
    Authored-by: Kent Yao <[email protected]>
    Signed-off-by: Kent Yao <[email protected]>
    (cherry picked from commit d68cde812c6f904d6f01b7fde1eed10b12edd766)
    Signed-off-by: Kent Yao <[email protected]>
---
 .../spark/sql/execution/streaming/FileStreamSink.scala      |  2 +-
 .../apache/spark/sql/streaming/FileStreamSinkSuite.scala    | 13 +++++++++++++
 2 files changed, 14 insertions(+), 1 deletion(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala
index 04a1de02ea58..23855db9d7f5 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala
@@ -48,8 +48,8 @@ object FileStreamSink extends Logging {
 
     path match {
       case Seq(singlePath) =>
-        val hdfsPath = new Path(singlePath)
         try {
+          val hdfsPath = new Path(singlePath)
           val fs = hdfsPath.getFileSystem(hadoopConf)
           if (fs.isDirectory(hdfsPath)) {
             val metadataPath = getMetadataLogPath(fs, hdfsPath, sqlConf)
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
index 75f440caefc3..1954cce7fdc2 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
@@ -650,6 +650,19 @@ abstract class FileStreamSinkSuite extends StreamTest {
       }
     }
   }
+
+  test("SPARK-48991: Move path initialization into try-catch block") {
+    val logAppender = new LogAppender("Assume no metadata directory.")
+    Seq(null, "", "file:tmp").foreach { path =>
+      withLogAppender(logAppender) {
+        assert(!FileStreamSink.hasMetadata(Seq(path), 
spark.sessionState.newHadoopConf(), conf))
+      }
+
+      
assert(logAppender.loggingEvents.map(_.getMessage.getFormattedMessage).contains(
+        "Assume no metadata directory. Error while looking for metadata 
directory in the path:" +
+        s" $path."))
+    }
+  }
 }
 
 object PendingCommitFilesTrackingManifestFileCommitProtocol {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to