This is an automated email from the ASF dual-hosted git repository.
gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 028c472b72c [SPARK-39014][SQL] Respect `ignoreMissingFiles` from Data
Source options in `InMemoryFileIndex`
028c472b72c is described below
commit 028c472b72cdd0663c2d1e7769fee17e31e0abfb
Author: yaohua <[email protected]>
AuthorDate: Tue Apr 26 13:15:41 2022 +0900
[SPARK-39014][SQL] Respect `ignoreMissingFiles` from Data Source options in
`InMemoryFileIndex`
### What changes were proposed in this pull request?
Currently, `InMemoryFileIndex` still only get `ignoreMissingFiles` from SQL
Conf, we should respect `ignoreMissingFiles` from data source options as well.
### Why are the changes needed?
As a part of the effort #36069 here
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Modified the existing UT
Closes #36348 from Yaohua628/spark-39014.
Authored-by: yaohua <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
---
.../sql/execution/datasources/InMemoryFileIndex.scala | 10 +++++++---
.../spark/sql/execution/datasources/FileIndexSuite.scala | 15 +++++++++++----
2 files changed, 18 insertions(+), 7 deletions(-)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala
index 6c3deee2c31..44d31131e9c 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala
@@ -26,6 +26,8 @@ import org.apache.hadoop.mapred.{FileInputFormat, JobConf}
import org.apache.spark.internal.Logging
import org.apache.spark.metrics.source.HiveCatalogMetrics
import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.FileSourceOptions
+import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
import org.apache.spark.sql.execution.streaming.FileStreamSink
import org.apache.spark.sql.types.StructType
import org.apache.spark.util.HadoopFSUtils
@@ -128,7 +130,7 @@ class InMemoryFileIndex(
}
val filter = FileInputFormat.getInputPathFilter(new JobConf(hadoopConf,
this.getClass))
val discovered = InMemoryFileIndex.bulkListLeafFiles(
- pathsToFetch.toSeq, hadoopConf, filter, sparkSession)
+ pathsToFetch.toSeq, hadoopConf, filter, sparkSession, parameters)
discovered.foreach { case (path, leafFiles) =>
HiveCatalogMetrics.incrementFilesDiscovered(leafFiles.size)
fileStatusCache.putLeafFiles(path, leafFiles.toArray)
@@ -146,13 +148,15 @@ object InMemoryFileIndex extends Logging {
paths: Seq[Path],
hadoopConf: Configuration,
filter: PathFilter,
- sparkSession: SparkSession): Seq[(Path, Seq[FileStatus])] = {
+ sparkSession: SparkSession,
+ parameters: Map[String, String] = Map.empty): Seq[(Path,
Seq[FileStatus])] = {
HadoopFSUtils.parallelListLeafFiles(
sc = sparkSession.sparkContext,
paths = paths,
hadoopConf = hadoopConf,
filter = new PathFilterWrapper(filter),
- ignoreMissingFiles = sparkSession.sessionState.conf.ignoreMissingFiles,
+ ignoreMissingFiles =
+ new
FileSourceOptions(CaseInsensitiveMap(parameters)).ignoreMissingFiles,
ignoreLocality = sparkSession.sessionState.conf.ignoreDataLocality,
parallelismThreshold =
sparkSession.sessionState.conf.parallelPartitionDiscoveryThreshold,
parallelismMax =
sparkSession.sessionState.conf.parallelPartitionDiscoveryParallelism)
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala
index 08ddc67cd65..1897a347ef1 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala
@@ -191,20 +191,27 @@ class FileIndexSuite extends SharedSparkSession {
classOf[SubdirectoryDeletionRaceFileSystem],
classOf[FileDeletionRaceFileSystem]
);
- ignoreMissingFiles <- Seq(true, false);
+ (ignoreMissingFiles, sqlConf, options) <- Seq(
+ (true, "true", Map.empty[String, String]),
+ // Explicitly set sqlConf to false, but data source options should
take precedence
+ (true, "false", Map("ignoreMissingFiles" -> "true")),
+ (false, "false", Map.empty[String, String]),
+ // Explicitly set sqlConf to true, but data source options should take
precedence
+ (false, "true", Map("ignoreMissingFiles" -> "false"))
+ );
parDiscoveryThreshold <- Seq(0, 100)
) {
withClue(s"raceCondition=$raceCondition,
ignoreMissingFiles=$ignoreMissingFiles, " +
- s"parDiscoveryThreshold=$parDiscoveryThreshold"
+ s"parDiscoveryThreshold=$parDiscoveryThreshold, sqlConf=$sqlConf,
options=$options"
) {
withSQLConf(
- SQLConf.IGNORE_MISSING_FILES.key -> ignoreMissingFiles.toString,
+ SQLConf.IGNORE_MISSING_FILES.key -> sqlConf,
SQLConf.PARALLEL_PARTITION_DISCOVERY_THRESHOLD.key ->
parDiscoveryThreshold.toString,
"fs.mockFs.impl" -> raceCondition.getName,
"fs.mockFs.impl.disable.cache" -> "true"
) {
def makeCatalog(): InMemoryFileIndex = new InMemoryFileIndex(
- spark, Seq(rootDirPath), Map.empty, None)
+ spark, Seq(rootDirPath), options, None)
if (ignoreMissingFiles) {
// We're ignoring missing files, so catalog construction should
succeed
val catalog = makeCatalog()
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]