This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 24b82dfd6cfb [SPARK-45452][SQL] Improve `InMemoryFileIndex` to use
`FileSystem.listFiles` API
24b82dfd6cfb is described below
commit 24b82dfd6cfb9a658af615446be5423695830dd9
Author: Dongjoon Hyun <[email protected]>
AuthorDate: Sat Oct 7 16:52:33 2023 -0700
[SPARK-45452][SQL] Improve `InMemoryFileIndex` to use
`FileSystem.listFiles` API
### What changes were proposed in this pull request?
This PR aims to use `InMemoryFileIndex` to use `FileSystem.listFiles` API
instead of many recursive FileSystem calls.
To do this, a new configuration is introduced with the default value, `s3a`.
```
spark.sql.sources.useListFilesFileSystemList=s3a
```
### Why are the changes needed?
To improve the nested directory listing performance. Currently, it's
designed to apply for a single root directory listing case on S3A file system.
For example, the improvement on a 3-year-data table with `year/month/day/hour`
hierarchy is **158x**.
**3-YEAR PARTITIONED TABLE (2021 ~ 2023)**
```
$ bin/spark-sql
spark-sql (default)> CREATE TABLE t USING CSV PARTITIONED BY (year, month,
day, hour) LOCATION "file:///tmp/three_year" AS SELECT /*+ REPARTITION(64,
year, month, day, hour) */ Y.id year, M.id month, D.id day, H.id hour, 0 value
FROM RANGE(2021, 2024) Y, RANGE(1, 13) M, RANGE(1, 31) D, RANGE(0, 24) H;
$ aws s3 sync three_year s3://dongjoon/three_year
$ aws s3 ls s3://dongjoon/three_year --summarize --recursive | more
2023-10-07 01:07:00 8 three_year/._SUCCESS.crc
2023-10-07 01:07:00 0 three_year/_SUCCESS
2023-10-07 01:07:00 12
three_year/year=2021/month=1/day=1/hour=0/.part-00033-264e1d55-d466-43c8-8972-283a1f84082b.c000.csv.crc
2023-10-07 01:07:00 2
three_year/year=2021/month=1/day=1/hour=0/part-00033-264e1d55-d466-43c8-8972-283a1f84082b.c000.csv
...
$ aws s3 ls s3://dongjoon/three_year --summarize --recursive | tail -n2
Total Objects: 51842
Total Size: 362888
```
**AFTER (11s)**
```
$ bin/spark-shell \
-c
spark.hadoop.fs.s3a.aws.credentials.provider=com.amazonaws.auth.profile.ProfileCredentialsProvider
...
scala> spark.time(new
org.apache.spark.sql.execution.datasources.InMemoryFileIndex(spark, Seq(new
org.apache.hadoop.fs.Path("s3a://dongjoon/three_year")), Map.empty, None))
Time taken: 11270 ms
```
**BEFORE (1741s)**
- I used one year to estimate 3 years at this time. I'll update with 3 year
result soon.
```
$ bin/spark-shell \
-c
spark.hadoop.fs.s3a.aws.credentials.provider=com.amazonaws.auth.profile.ProfileCredentialsProvider
\
-c spark.sql.sources.useListFilesFileSystemList=none
...
scala> spark.time(new
org.apache.spark.sql.execution.datasources.InMemoryFileIndex(spark, Seq(new
org.apache.hadoop.fs.Path("s3a://dongjoon/three_year")), Map.empty, None))
Time taken: 1741315 ms
```
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Pass the CIs with the newly added test case.
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #43261 from dongjoon-hyun/SPARK-45452.
Authored-by: Dongjoon Hyun <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
---
.../org/apache/spark/util/HadoopFSUtils.scala | 42 ++++++++++++++++++++++
.../org/apache/spark/sql/internal/SQLConf.scala | 12 +++++++
.../execution/datasources/InMemoryFileIndex.scala | 32 +++++++++++------
.../spark/sql/execution/command/DDLSuite.scala | 2 ++
.../sql/execution/datasources/FileIndexSuite.scala | 41 +++++++++++++++++++++
.../datasources/FileSourceStrategySuite.scala | 2 ++
.../spark/sql/streaming/FileStreamSinkSuite.scala | 1 +
.../sql/streaming/FileStreamSourceSuite.scala | 2 ++
8 files changed, 124 insertions(+), 10 deletions(-)
diff --git a/core/src/main/scala/org/apache/spark/util/HadoopFSUtils.scala
b/core/src/main/scala/org/apache/spark/util/HadoopFSUtils.scala
index db1eeeef4712..eb0a79c7c69b 100644
--- a/core/src/main/scala/org/apache/spark/util/HadoopFSUtils.scala
+++ b/core/src/main/scala/org/apache/spark/util/HadoopFSUtils.scala
@@ -69,6 +69,38 @@ private[spark] object HadoopFSUtils extends Logging {
ignoreMissingFiles, ignoreLocality, parallelismThreshold, parallelismMax)
}
+ /**
+ * Lists a collection of paths recursively with a single API invocation.
+ * Like parallelListLeafFiles, this ignores FileNotFoundException on the
given root path.
+ *
+ * This is able to be called on both driver and executors.
+ *
+ * @param path a path to list
+ * @param hadoopConf Hadoop configuration
+ * @param filter Path filter used to exclude leaf files from result
+ * @return the set of discovered files for the path
+ */
+ def listFiles(
+ path: Path,
+ hadoopConf: Configuration,
+ filter: PathFilter): Seq[(Path, Seq[FileStatus])] = {
+ logInfo(s"Listing $path with listFiles API")
+ try {
+ val remoteIter = path.getFileSystem(hadoopConf).listFiles(path, true)
+ val statues = new Iterator[LocatedFileStatus]() {
+ def next(): LocatedFileStatus = remoteIter.next
+ def hasNext(): Boolean = remoteIter.hasNext
+ }.filterNot(status => shouldFilterOutPath(status.getPath.toString))
+ .filter(f => filter.accept(f.getPath))
+ .toArray
+ Seq((path, statues))
+ } catch {
+ case _: FileNotFoundException =>
+ logWarning(s"The root directory $path was not found. Was it deleted
very recently?")
+ Seq((path, Array.empty[FileStatus]))
+ }
+ }
+
private def parallelListLeafFilesInternal(
sc: SparkContext,
paths: Seq[Path],
@@ -309,4 +341,14 @@ private[spark] object HadoopFSUtils extends Logging {
val include = pathName.startsWith("_common_metadata") ||
pathName.startsWith("_metadata")
exclude && !include
}
+
+ /** Checks if we should filter out this path. */
+ def shouldFilterOutPath(path: String): Boolean = {
+ val exclude = (path.contains("/_") && !path.contains("=")) ||
path.contains("/.")
+ val include = path.contains("/_common_metadata/") ||
+ path.endsWith("/_common_metadata") ||
+ path.contains("/_metadata/") ||
+ path.endsWith("/_metadata")
+ (exclude && !include) || shouldFilterOutPathName(path)
+ }
}
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index db6cc2736c9e..65d2e6136e9f 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -1641,6 +1641,16 @@ object SQLConf {
.booleanConf
.createWithDefault(false)
+ val USE_LISTFILES_FILESYSTEM_LIST =
+ buildConf("spark.sql.sources.useListFilesFileSystemList")
+ .doc("A comma-separated list of file system schemes to use
FileSystem.listFiles API " +
+ "for a single root path listing")
+ .version("4.0.0")
+ .internal()
+ .stringConf
+ .transform(_.toLowerCase(Locale.ROOT))
+ .createWithDefault("s3a")
+
// Whether to automatically resolve ambiguity in join conditions for
self-joins.
// See SPARK-6231.
val DATAFRAME_SELF_JOIN_AUTO_RESOLVE_AMBIGUITY =
@@ -5257,6 +5267,8 @@ class SQLConf extends Serializable with Logging with
SqlApiConf {
def ignoreDataLocality: Boolean = getConf(SQLConf.IGNORE_DATA_LOCALITY)
+ def useListFilesFileSystemList: String =
getConf(SQLConf.USE_LISTFILES_FILESYSTEM_LIST)
+
def csvFilterPushDown: Boolean = getConf(CSV_FILTER_PUSHDOWN_ENABLED)
def jsonFilterPushDown: Boolean = getConf(JSON_FILTER_PUSHDOWN_ENABLED)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala
index 4d917994123f..3335454023b6 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala
@@ -149,16 +149,28 @@ object InMemoryFileIndex extends Logging {
filter: PathFilter,
sparkSession: SparkSession,
parameters: Map[String, String] = Map.empty): Seq[(Path,
Seq[FileStatus])] = {
- HadoopFSUtils.parallelListLeafFiles(
- sc = sparkSession.sparkContext,
- paths = paths,
- hadoopConf = hadoopConf,
- filter = new PathFilterWrapper(filter),
- ignoreMissingFiles =
- new
FileSourceOptions(CaseInsensitiveMap(parameters)).ignoreMissingFiles,
- ignoreLocality = sparkSession.sessionState.conf.ignoreDataLocality,
- parallelismThreshold =
sparkSession.sessionState.conf.parallelPartitionDiscoveryThreshold,
- parallelismMax =
sparkSession.sessionState.conf.parallelPartitionDiscoveryParallelism)
+ val fileSystemList =
+
sparkSession.sessionState.conf.useListFilesFileSystemList.split(",").map(_.trim)
+ val ignoreMissingFiles =
+ new FileSourceOptions(CaseInsensitiveMap(parameters)).ignoreMissingFiles
+ val useListFiles = paths.size == 1 &&
+ fileSystemList.contains(paths.head.getFileSystem(hadoopConf).getScheme)
+ if (useListFiles) {
+ HadoopFSUtils.listFiles(
+ path = paths.head,
+ hadoopConf = hadoopConf,
+ filter = new PathFilterWrapper(filter))
+ } else {
+ HadoopFSUtils.parallelListLeafFiles(
+ sc = sparkSession.sparkContext,
+ paths = paths,
+ hadoopConf = hadoopConf,
+ filter = new PathFilterWrapper(filter),
+ ignoreMissingFiles = ignoreMissingFiles,
+ ignoreLocality = sparkSession.sessionState.conf.ignoreDataLocality,
+ parallelismThreshold =
sparkSession.sessionState.conf.parallelPartitionDiscoveryThreshold,
+ parallelismMax =
sparkSession.sessionState.conf.parallelPartitionDiscoveryParallelism)
+ }
}
}
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
index 0692a794f5d5..b037aaac15e5 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
@@ -2417,6 +2417,8 @@ object FakeLocalFsFileSystem {
class FakeLocalFsFileSystem extends RawLocalFileSystem {
import FakeLocalFsFileSystem._
+ override def getScheme(): String = "fakelocalfs"
+
override def delete(f: Path, recursive: Boolean): Boolean = {
aclStatus = new AclStatus.Builder().build()
super.delete(f, recursive)
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala
index 9ac61f0cee5f..b0193813bdfd 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala
@@ -285,6 +285,22 @@ class FileIndexSuite extends SharedSparkSession {
}
}
+ test("SPARK-45452: PartitioningAwareFileIndex uses listFiles API for large
child dirs") {
+ withSQLConf(SQLConf.USE_LISTFILES_FILESYSTEM_LIST.key -> "file") {
+ for (scale <- Seq(10, 50)) {
+ withTempDir { dir =>
+ for (i <- 1 to scale) {
+ new File(dir, s"foo=$i.txt").mkdir()
+ }
+ HiveCatalogMetrics.reset()
+
assert(HiveCatalogMetrics.METRIC_PARALLEL_LISTING_JOB_COUNT.getCount() == 0)
+ new InMemoryFileIndex(spark, Seq(new Path(dir.getCanonicalPath)),
Map.empty, None)
+
assert(HiveCatalogMetrics.METRIC_PARALLEL_LISTING_JOB_COUNT.getCount() == 0)
+ }
+ }
+ }
+ }
+
test("PartitioningAwareFileIndex listing parallelized with large, deeply
nested child dirs") {
for ((scale, expectedNumPar) <- Seq((10, 0), (50, 4))) {
withTempDir { dir =>
@@ -307,6 +323,31 @@ class FileIndexSuite extends SharedSparkSession {
}
}
+ test("SPARK-45452: PartitioningAwareFileIndex listing parallelized with
large, deeply nested " +
+ "child dirs") {
+ withSQLConf(SQLConf.USE_LISTFILES_FILESYSTEM_LIST.key -> "file") {
+ for (scale <- Seq(10, 50)) {
+ withTempDir { dir =>
+ for (i <- 1 to 2) {
+ val subdirA = new File(dir, s"a=$i")
+ subdirA.mkdir()
+ for (j <- 1 to 2) {
+ val subdirB = new File(subdirA, s"b=$j")
+ subdirB.mkdir()
+ for (k <- 1 to scale) {
+ new File(subdirB, s"foo=$k.txt").mkdir()
+ }
+ }
+ }
+ HiveCatalogMetrics.reset()
+
assert(HiveCatalogMetrics.METRIC_PARALLEL_LISTING_JOB_COUNT.getCount() == 0)
+ new InMemoryFileIndex(spark, Seq(new Path(dir.getCanonicalPath)),
Map.empty, None)
+
assert(HiveCatalogMetrics.METRIC_PARALLEL_LISTING_JOB_COUNT.getCount() == 0)
+ }
+ }
+ }
+ }
+
test("SPARK-17613 - PartitioningAwareFileIndex: base path w/o '/' at end") {
class MockCatalog(
override val rootPaths: Seq[Path])
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
index 91182f6473d7..6527b47211b3 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
@@ -774,6 +774,8 @@ class TestFileFormat extends TextBasedFileFormat {
class LocalityTestFileSystem extends RawLocalFileSystem {
private val invocations = new AtomicInteger(0)
+ override def getScheme(): String = "localitytest"
+
override def getFileBlockLocations(
file: FileStatus, start: Long, len: Long): Array[BlockLocation] = {
require(!file.isDirectory, "The file path can not be a directory.")
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
index 5b2470334477..b40ab4cbda46 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
@@ -768,6 +768,7 @@ class FileStreamSinkV2Suite extends FileStreamSinkSuite {
* access.
*/
class FailFormatCheckFileSystem extends RawLocalFileSystem {
+ override def getScheme(): String = "failformatcheck"
override def getFileStatus(f: Path): FileStatus = {
if (f.getName == FileStreamSink.metadataDir) {
throw new IOException("cannot access metadata log")
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
index 84cf20ede259..684447ec7c61 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
@@ -2391,6 +2391,8 @@ class FileStreamSourceStressTestSuite extends
FileStreamSourceTest {
class ExistsThrowsExceptionFileSystem extends RawLocalFileSystem {
import ExistsThrowsExceptionFileSystem._
+ override def getScheme(): String = "existsthrowsexception"
+
override def getUri: URI = {
URI.create(s"$scheme:///")
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]