This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 24b82dfd6cfb [SPARK-45452][SQL] Improve `InMemoryFileIndex` to use 
`FileSystem.listFiles` API
24b82dfd6cfb is described below

commit 24b82dfd6cfb9a658af615446be5423695830dd9
Author: Dongjoon Hyun <[email protected]>
AuthorDate: Sat Oct 7 16:52:33 2023 -0700

    [SPARK-45452][SQL] Improve `InMemoryFileIndex` to use 
`FileSystem.listFiles` API
    
    ### What changes were proposed in this pull request?
    
    This PR aims to use `InMemoryFileIndex` to use `FileSystem.listFiles` API 
instead of many recursive FileSystem calls.
    
    To do this, a new configuration is introduced with the default value, `s3a`.
    
    ```
    spark.sql.sources.useListFilesFileSystemList=s3a
    ```
    
    ### Why are the changes needed?
    
    To improve the nested directory listing performance. Currently, it's 
designed to apply for a single root directory listing case on S3A file system. 
For example, the improvement on a 3-year-data table with `year/month/day/hour` 
hierarchy is **158x**.
    
    **3-YEAR PARTITIONED TABLE (2021 ~ 2023)**
    ```
    $ bin/spark-sql
    spark-sql (default)> CREATE TABLE t USING CSV PARTITIONED BY (year, month, 
day, hour) LOCATION "file:///tmp/three_year" AS SELECT /*+ REPARTITION(64, 
year, month, day, hour) */ Y.id year, M.id month, D.id day, H.id hour, 0 value 
FROM RANGE(2021, 2024) Y, RANGE(1, 13) M, RANGE(1, 31) D, RANGE(0, 24) H;
    
    $ aws s3 sync three_year s3://dongjoon/three_year
    
    $ aws s3 ls s3://dongjoon/three_year --summarize --recursive | more
    2023-10-07 01:07:00          8 three_year/._SUCCESS.crc
    2023-10-07 01:07:00          0 three_year/_SUCCESS
    2023-10-07 01:07:00         12 
three_year/year=2021/month=1/day=1/hour=0/.part-00033-264e1d55-d466-43c8-8972-283a1f84082b.c000.csv.crc
    2023-10-07 01:07:00          2 
three_year/year=2021/month=1/day=1/hour=0/part-00033-264e1d55-d466-43c8-8972-283a1f84082b.c000.csv
    ...
    
    $ aws s3 ls s3://dongjoon/three_year --summarize --recursive | tail -n2
    Total Objects: 51842
       Total Size: 362888
    ```
    
    **AFTER (11s)**
    ```
    $ bin/spark-shell \
    -c 
spark.hadoop.fs.s3a.aws.credentials.provider=com.amazonaws.auth.profile.ProfileCredentialsProvider
    ...
    scala> spark.time(new 
org.apache.spark.sql.execution.datasources.InMemoryFileIndex(spark, Seq(new 
org.apache.hadoop.fs.Path("s3a://dongjoon/three_year")), Map.empty, None))
    Time taken: 11270 ms
    ```
    
    **BEFORE (1741s)**
    - I used one year to estimate 3 years at this time. I'll update with 3 year 
result soon.
    ```
    $ bin/spark-shell \
    -c 
spark.hadoop.fs.s3a.aws.credentials.provider=com.amazonaws.auth.profile.ProfileCredentialsProvider
 \
    -c spark.sql.sources.useListFilesFileSystemList=none
    ...
    scala> spark.time(new 
org.apache.spark.sql.execution.datasources.InMemoryFileIndex(spark, Seq(new 
org.apache.hadoop.fs.Path("s3a://dongjoon/three_year")), Map.empty, None))
    Time taken: 1741315 ms
    ```
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Pass the CIs with the newly added test case.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No.
    
    Closes #43261 from dongjoon-hyun/SPARK-45452.
    
    Authored-by: Dongjoon Hyun <[email protected]>
    Signed-off-by: Dongjoon Hyun <[email protected]>
---
 .../org/apache/spark/util/HadoopFSUtils.scala      | 42 ++++++++++++++++++++++
 .../org/apache/spark/sql/internal/SQLConf.scala    | 12 +++++++
 .../execution/datasources/InMemoryFileIndex.scala  | 32 +++++++++++------
 .../spark/sql/execution/command/DDLSuite.scala     |  2 ++
 .../sql/execution/datasources/FileIndexSuite.scala | 41 +++++++++++++++++++++
 .../datasources/FileSourceStrategySuite.scala      |  2 ++
 .../spark/sql/streaming/FileStreamSinkSuite.scala  |  1 +
 .../sql/streaming/FileStreamSourceSuite.scala      |  2 ++
 8 files changed, 124 insertions(+), 10 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/util/HadoopFSUtils.scala 
b/core/src/main/scala/org/apache/spark/util/HadoopFSUtils.scala
index db1eeeef4712..eb0a79c7c69b 100644
--- a/core/src/main/scala/org/apache/spark/util/HadoopFSUtils.scala
+++ b/core/src/main/scala/org/apache/spark/util/HadoopFSUtils.scala
@@ -69,6 +69,38 @@ private[spark] object HadoopFSUtils extends Logging {
       ignoreMissingFiles, ignoreLocality, parallelismThreshold, parallelismMax)
   }
 
+  /**
+   * Lists a collection of paths recursively with a single API invocation.
+   * Like parallelListLeafFiles, this ignores FileNotFoundException on the 
given root path.
+   *
+   * This is able to be called on both driver and executors.
+   *
+   * @param path a path to list
+   * @param hadoopConf Hadoop configuration
+   * @param filter Path filter used to exclude leaf files from result
+   * @return  the set of discovered files for the path
+   */
+  def listFiles(
+      path: Path,
+      hadoopConf: Configuration,
+      filter: PathFilter): Seq[(Path, Seq[FileStatus])] = {
+    logInfo(s"Listing $path with listFiles API")
+    try {
+      val remoteIter = path.getFileSystem(hadoopConf).listFiles(path, true)
+      val statues = new Iterator[LocatedFileStatus]() {
+        def next(): LocatedFileStatus = remoteIter.next
+        def hasNext(): Boolean = remoteIter.hasNext
+      }.filterNot(status => shouldFilterOutPath(status.getPath.toString))
+        .filter(f => filter.accept(f.getPath))
+        .toArray
+      Seq((path, statues))
+    } catch {
+      case _: FileNotFoundException =>
+        logWarning(s"The root directory $path was not found. Was it deleted 
very recently?")
+        Seq((path, Array.empty[FileStatus]))
+    }
+  }
+
   private def parallelListLeafFilesInternal(
       sc: SparkContext,
       paths: Seq[Path],
@@ -309,4 +341,14 @@ private[spark] object HadoopFSUtils extends Logging {
     val include = pathName.startsWith("_common_metadata") || 
pathName.startsWith("_metadata")
     exclude && !include
   }
+
+  /** Checks if we should filter out this path. */
+  def shouldFilterOutPath(path: String): Boolean = {
+    val exclude = (path.contains("/_") && !path.contains("=")) || 
path.contains("/.")
+    val include = path.contains("/_common_metadata/") ||
+      path.endsWith("/_common_metadata") ||
+      path.contains("/_metadata/") ||
+      path.endsWith("/_metadata")
+    (exclude && !include) || shouldFilterOutPathName(path)
+  }
 }
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index db6cc2736c9e..65d2e6136e9f 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -1641,6 +1641,16 @@ object SQLConf {
       .booleanConf
       .createWithDefault(false)
 
+  val USE_LISTFILES_FILESYSTEM_LIST =
+    buildConf("spark.sql.sources.useListFilesFileSystemList")
+      .doc("A comma-separated list of file system schemes to use 
FileSystem.listFiles API " +
+        "for a single root path listing")
+      .version("4.0.0")
+      .internal()
+      .stringConf
+      .transform(_.toLowerCase(Locale.ROOT))
+      .createWithDefault("s3a")
+
   // Whether to automatically resolve ambiguity in join conditions for 
self-joins.
   // See SPARK-6231.
   val DATAFRAME_SELF_JOIN_AUTO_RESOLVE_AMBIGUITY =
@@ -5257,6 +5267,8 @@ class SQLConf extends Serializable with Logging with 
SqlApiConf {
 
   def ignoreDataLocality: Boolean = getConf(SQLConf.IGNORE_DATA_LOCALITY)
 
+  def useListFilesFileSystemList: String = 
getConf(SQLConf.USE_LISTFILES_FILESYSTEM_LIST)
+
   def csvFilterPushDown: Boolean = getConf(CSV_FILTER_PUSHDOWN_ENABLED)
 
   def jsonFilterPushDown: Boolean = getConf(JSON_FILTER_PUSHDOWN_ENABLED)
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala
index 4d917994123f..3335454023b6 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala
@@ -149,16 +149,28 @@ object InMemoryFileIndex extends Logging {
       filter: PathFilter,
       sparkSession: SparkSession,
       parameters: Map[String, String] = Map.empty): Seq[(Path, 
Seq[FileStatus])] = {
-    HadoopFSUtils.parallelListLeafFiles(
-      sc = sparkSession.sparkContext,
-      paths = paths,
-      hadoopConf = hadoopConf,
-      filter = new PathFilterWrapper(filter),
-      ignoreMissingFiles =
-        new 
FileSourceOptions(CaseInsensitiveMap(parameters)).ignoreMissingFiles,
-      ignoreLocality = sparkSession.sessionState.conf.ignoreDataLocality,
-      parallelismThreshold = 
sparkSession.sessionState.conf.parallelPartitionDiscoveryThreshold,
-      parallelismMax = 
sparkSession.sessionState.conf.parallelPartitionDiscoveryParallelism)
+    val fileSystemList =
+      
sparkSession.sessionState.conf.useListFilesFileSystemList.split(",").map(_.trim)
+    val ignoreMissingFiles =
+      new FileSourceOptions(CaseInsensitiveMap(parameters)).ignoreMissingFiles
+    val useListFiles = paths.size == 1 &&
+      fileSystemList.contains(paths.head.getFileSystem(hadoopConf).getScheme)
+    if (useListFiles) {
+      HadoopFSUtils.listFiles(
+        path = paths.head,
+        hadoopConf = hadoopConf,
+        filter = new PathFilterWrapper(filter))
+    } else {
+      HadoopFSUtils.parallelListLeafFiles(
+        sc = sparkSession.sparkContext,
+        paths = paths,
+        hadoopConf = hadoopConf,
+        filter = new PathFilterWrapper(filter),
+        ignoreMissingFiles = ignoreMissingFiles,
+        ignoreLocality = sparkSession.sessionState.conf.ignoreDataLocality,
+        parallelismThreshold = 
sparkSession.sessionState.conf.parallelPartitionDiscoveryThreshold,
+        parallelismMax = 
sparkSession.sessionState.conf.parallelPartitionDiscoveryParallelism)
+    }
   }
 
 }
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
index 0692a794f5d5..b037aaac15e5 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
@@ -2417,6 +2417,8 @@ object FakeLocalFsFileSystem {
 class FakeLocalFsFileSystem extends RawLocalFileSystem {
   import FakeLocalFsFileSystem._
 
+  override def getScheme(): String = "fakelocalfs"
+
   override def delete(f: Path, recursive: Boolean): Boolean = {
     aclStatus = new AclStatus.Builder().build()
     super.delete(f, recursive)
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala
index 9ac61f0cee5f..b0193813bdfd 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala
@@ -285,6 +285,22 @@ class FileIndexSuite extends SharedSparkSession {
     }
   }
 
+  test("SPARK-45452: PartitioningAwareFileIndex uses listFiles API for large 
child dirs") {
+    withSQLConf(SQLConf.USE_LISTFILES_FILESYSTEM_LIST.key -> "file") {
+      for (scale <- Seq(10, 50)) {
+        withTempDir { dir =>
+          for (i <- 1 to scale) {
+            new File(dir, s"foo=$i.txt").mkdir()
+          }
+          HiveCatalogMetrics.reset()
+          
assert(HiveCatalogMetrics.METRIC_PARALLEL_LISTING_JOB_COUNT.getCount() == 0)
+          new InMemoryFileIndex(spark, Seq(new Path(dir.getCanonicalPath)), 
Map.empty, None)
+          
assert(HiveCatalogMetrics.METRIC_PARALLEL_LISTING_JOB_COUNT.getCount() == 0)
+        }
+      }
+    }
+  }
+
   test("PartitioningAwareFileIndex listing parallelized with large, deeply 
nested child dirs") {
     for ((scale, expectedNumPar) <- Seq((10, 0), (50, 4))) {
       withTempDir { dir =>
@@ -307,6 +323,31 @@ class FileIndexSuite extends SharedSparkSession {
     }
   }
 
+  test("SPARK-45452: PartitioningAwareFileIndex listing parallelized with 
large, deeply nested " +
+      "child dirs") {
+    withSQLConf(SQLConf.USE_LISTFILES_FILESYSTEM_LIST.key -> "file") {
+      for (scale <- Seq(10, 50)) {
+        withTempDir { dir =>
+          for (i <- 1 to 2) {
+            val subdirA = new File(dir, s"a=$i")
+            subdirA.mkdir()
+            for (j <- 1 to 2) {
+              val subdirB = new File(subdirA, s"b=$j")
+              subdirB.mkdir()
+              for (k <- 1 to scale) {
+                new File(subdirB, s"foo=$k.txt").mkdir()
+              }
+            }
+          }
+          HiveCatalogMetrics.reset()
+          
assert(HiveCatalogMetrics.METRIC_PARALLEL_LISTING_JOB_COUNT.getCount() == 0)
+          new InMemoryFileIndex(spark, Seq(new Path(dir.getCanonicalPath)), 
Map.empty, None)
+          
assert(HiveCatalogMetrics.METRIC_PARALLEL_LISTING_JOB_COUNT.getCount() == 0)
+        }
+      }
+    }
+  }
+
   test("SPARK-17613 - PartitioningAwareFileIndex: base path w/o '/' at end") {
     class MockCatalog(
       override val rootPaths: Seq[Path])
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
index 91182f6473d7..6527b47211b3 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
@@ -774,6 +774,8 @@ class TestFileFormat extends TextBasedFileFormat {
 class LocalityTestFileSystem extends RawLocalFileSystem {
   private val invocations = new AtomicInteger(0)
 
+  override def getScheme(): String = "localitytest"
+
   override def getFileBlockLocations(
       file: FileStatus, start: Long, len: Long): Array[BlockLocation] = {
     require(!file.isDirectory, "The file path can not be a directory.")
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
index 5b2470334477..b40ab4cbda46 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
@@ -768,6 +768,7 @@ class FileStreamSinkV2Suite extends FileStreamSinkSuite {
  * access.
  */
 class FailFormatCheckFileSystem extends RawLocalFileSystem {
+  override def getScheme(): String = "failformatcheck"
   override def getFileStatus(f: Path): FileStatus = {
     if (f.getName == FileStreamSink.metadataDir) {
       throw new IOException("cannot access metadata log")
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
index 84cf20ede259..684447ec7c61 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
@@ -2391,6 +2391,8 @@ class FileStreamSourceStressTestSuite extends 
FileStreamSourceTest {
 class ExistsThrowsExceptionFileSystem extends RawLocalFileSystem {
   import ExistsThrowsExceptionFileSystem._
 
+  override def getScheme(): String = "existsthrowsexception"
+
   override def getUri: URI = {
     URI.create(s"$scheme:///")
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to