This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 5cbfb07f94c9 [SPARK-45452][SQL][FOLLOWUP] Do not fail too early if 
FileSystem does not implement getSchema
5cbfb07f94c9 is described below

commit 5cbfb07f94c9b1a7e0ffc363f34de8ffee5f598d
Author: Wenchen Fan <[email protected]>
AuthorDate: Tue Oct 24 09:36:29 2023 -0700

    [SPARK-45452][SQL][FOLLOWUP] Do not fail too early if FileSystem does not 
implement getSchema
    
    ### What changes were proposed in this pull request?
    
    This is a small followup of https://github.com/apache/spark/pull/43261 . 
When a `FileSystem` does not implement `getSchema`, we fail earlier than before 
after https://github.com/apache/spark/pull/43261  . This PR restores the timing 
of throwing exception by skipping the new optimization if `getSchema` fails.
    
    ### Why are the changes needed?
    
    fix small behavior change
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    revert the tests changes in https://github.com/apache/spark/pull/43261 and 
tests still pass
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No
    
    Closes #43508 from cloud-fan/follow.
    
    Authored-by: Wenchen Fan <[email protected]>
    Signed-off-by: Dongjoon Hyun <[email protected]>
---
 .../spark/sql/execution/datasources/InMemoryFileIndex.scala      | 9 +++++++--
 .../scala/org/apache/spark/sql/execution/command/DDLSuite.scala  | 2 --
 .../sql/execution/datasources/FileSourceStrategySuite.scala      | 2 --
 .../org/apache/spark/sql/streaming/FileStreamSinkSuite.scala     | 1 -
 .../org/apache/spark/sql/streaming/FileStreamSourceSuite.scala   | 2 --
 5 files changed, 7 insertions(+), 9 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala
index 3335454023b6..0f66aa816d96 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala
@@ -18,6 +18,7 @@
 package org.apache.spark.sql.execution.datasources
 
 import scala.collection.mutable
+import scala.util.control.NonFatal
 
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs._
@@ -153,8 +154,12 @@ object InMemoryFileIndex extends Logging {
       
sparkSession.sessionState.conf.useListFilesFileSystemList.split(",").map(_.trim)
     val ignoreMissingFiles =
       new FileSourceOptions(CaseInsensitiveMap(parameters)).ignoreMissingFiles
-    val useListFiles = paths.size == 1 &&
-      fileSystemList.contains(paths.head.getFileSystem(hadoopConf).getScheme)
+    val useListFiles = try {
+      val scheme = paths.head.getFileSystem(hadoopConf).getScheme
+      paths.size == 1 && fileSystemList.contains(scheme)
+    } catch {
+      case NonFatal(_) => false
+    }
     if (useListFiles) {
       HadoopFSUtils.listFiles(
         path = paths.head,
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
index 56c4cde521db..fefbbec56e43 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
@@ -2432,8 +2432,6 @@ object FakeLocalFsFileSystem {
 class FakeLocalFsFileSystem extends RawLocalFileSystem {
   import FakeLocalFsFileSystem._
 
-  override def getScheme(): String = "fakelocalfs"
-
   override def delete(f: Path, recursive: Boolean): Boolean = {
     aclStatus = new AclStatus.Builder().build()
     super.delete(f, recursive)
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
index 6527b47211b3..91182f6473d7 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
@@ -774,8 +774,6 @@ class TestFileFormat extends TextBasedFileFormat {
 class LocalityTestFileSystem extends RawLocalFileSystem {
   private val invocations = new AtomicInteger(0)
 
-  override def getScheme(): String = "localitytest"
-
   override def getFileBlockLocations(
       file: FileStatus, start: Long, len: Long): Array[BlockLocation] = {
     require(!file.isDirectory, "The file path can not be a directory.")
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
index f03f7375b53f..5c8a55b3f63b 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
@@ -768,7 +768,6 @@ class FileStreamSinkV2Suite extends FileStreamSinkSuite {
  * access.
  */
 class FailFormatCheckFileSystem extends RawLocalFileSystem {
-  override def getScheme(): String = "failformatcheck"
   override def getFileStatus(f: Path): FileStatus = {
     if (f.getName == FileStreamSink.metadataDir) {
       throw new IOException("cannot access metadata log")
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
index d6867da6b4ed..f8ff4b6f85e6 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
@@ -2391,8 +2391,6 @@ class FileStreamSourceStressTestSuite extends 
FileStreamSourceTest {
 class ExistsThrowsExceptionFileSystem extends RawLocalFileSystem {
   import ExistsThrowsExceptionFileSystem._
 
-  override def getScheme(): String = "existsthrowsexception"
-
   override def getUri: URI = {
     URI.create(s"$scheme:///")
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to