This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 028c472b72c [SPARK-39014][SQL] Respect `ignoreMissingFiles` from Data 
Source options in `InMemoryFileIndex`
028c472b72c is described below

commit 028c472b72cdd0663c2d1e7769fee17e31e0abfb
Author: yaohua <[email protected]>
AuthorDate: Tue Apr 26 13:15:41 2022 +0900

    [SPARK-39014][SQL] Respect `ignoreMissingFiles` from Data Source options in 
`InMemoryFileIndex`
    
    ### What changes were proposed in this pull request?
    Currently, `InMemoryFileIndex` still only get `ignoreMissingFiles` from SQL 
Conf, we should respect `ignoreMissingFiles` from data source options as well.
    
    ### Why are the changes needed?
    As a part of the effort #36069 here
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    Modified the existing UT
    
    Closes #36348 from Yaohua628/spark-39014.
    
    Authored-by: yaohua <[email protected]>
    Signed-off-by: Hyukjin Kwon <[email protected]>
---
 .../sql/execution/datasources/InMemoryFileIndex.scala     | 10 +++++++---
 .../spark/sql/execution/datasources/FileIndexSuite.scala  | 15 +++++++++++----
 2 files changed, 18 insertions(+), 7 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala
index 6c3deee2c31..44d31131e9c 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala
@@ -26,6 +26,8 @@ import org.apache.hadoop.mapred.{FileInputFormat, JobConf}
 import org.apache.spark.internal.Logging
 import org.apache.spark.metrics.source.HiveCatalogMetrics
 import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.FileSourceOptions
+import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
 import org.apache.spark.sql.execution.streaming.FileStreamSink
 import org.apache.spark.sql.types.StructType
 import org.apache.spark.util.HadoopFSUtils
@@ -128,7 +130,7 @@ class InMemoryFileIndex(
     }
     val filter = FileInputFormat.getInputPathFilter(new JobConf(hadoopConf, 
this.getClass))
     val discovered = InMemoryFileIndex.bulkListLeafFiles(
-      pathsToFetch.toSeq, hadoopConf, filter, sparkSession)
+      pathsToFetch.toSeq, hadoopConf, filter, sparkSession, parameters)
     discovered.foreach { case (path, leafFiles) =>
       HiveCatalogMetrics.incrementFilesDiscovered(leafFiles.size)
       fileStatusCache.putLeafFiles(path, leafFiles.toArray)
@@ -146,13 +148,15 @@ object InMemoryFileIndex extends Logging {
       paths: Seq[Path],
       hadoopConf: Configuration,
       filter: PathFilter,
-      sparkSession: SparkSession): Seq[(Path, Seq[FileStatus])] = {
+      sparkSession: SparkSession,
+      parameters: Map[String, String] = Map.empty): Seq[(Path, 
Seq[FileStatus])] = {
     HadoopFSUtils.parallelListLeafFiles(
       sc = sparkSession.sparkContext,
       paths = paths,
       hadoopConf = hadoopConf,
       filter = new PathFilterWrapper(filter),
-      ignoreMissingFiles = sparkSession.sessionState.conf.ignoreMissingFiles,
+      ignoreMissingFiles =
+        new 
FileSourceOptions(CaseInsensitiveMap(parameters)).ignoreMissingFiles,
       ignoreLocality = sparkSession.sessionState.conf.ignoreDataLocality,
       parallelismThreshold = 
sparkSession.sessionState.conf.parallelPartitionDiscoveryThreshold,
       parallelismMax = 
sparkSession.sessionState.conf.parallelPartitionDiscoveryParallelism)
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala
index 08ddc67cd65..1897a347ef1 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala
@@ -191,20 +191,27 @@ class FileIndexSuite extends SharedSparkSession {
         classOf[SubdirectoryDeletionRaceFileSystem],
         classOf[FileDeletionRaceFileSystem]
       );
-      ignoreMissingFiles <- Seq(true, false);
+      (ignoreMissingFiles, sqlConf, options) <- Seq(
+        (true, "true", Map.empty[String, String]),
+        // Explicitly set sqlConf to false, but data source options should 
take precedence
+        (true, "false", Map("ignoreMissingFiles" -> "true")),
+        (false, "false", Map.empty[String, String]),
+        // Explicitly set sqlConf to true, but data source options should take 
precedence
+        (false, "true", Map("ignoreMissingFiles" -> "false"))
+      );
       parDiscoveryThreshold <- Seq(0, 100)
     ) {
       withClue(s"raceCondition=$raceCondition, 
ignoreMissingFiles=$ignoreMissingFiles, " +
-        s"parDiscoveryThreshold=$parDiscoveryThreshold"
+        s"parDiscoveryThreshold=$parDiscoveryThreshold, sqlConf=$sqlConf, 
options=$options"
       ) {
         withSQLConf(
-          SQLConf.IGNORE_MISSING_FILES.key -> ignoreMissingFiles.toString,
+          SQLConf.IGNORE_MISSING_FILES.key -> sqlConf,
           SQLConf.PARALLEL_PARTITION_DISCOVERY_THRESHOLD.key -> 
parDiscoveryThreshold.toString,
           "fs.mockFs.impl" -> raceCondition.getName,
           "fs.mockFs.impl.disable.cache" -> "true"
         ) {
           def makeCatalog(): InMemoryFileIndex = new InMemoryFileIndex(
-            spark, Seq(rootDirPath), Map.empty, None)
+            spark, Seq(rootDirPath), options, None)
           if (ignoreMissingFiles) {
             // We're ignoring missing files, so catalog construction should 
succeed
             val catalog = makeCatalog()


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to