This is an automated email from the ASF dual-hosted git repository.
srowen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new a0d27b10fb2 [SPARK-40058][CORE] Avoid filter file path more than once
in HadoopFSUtils
a0d27b10fb2 is described below
commit a0d27b10fb21bdefe0cabacca303947cc46d282f
Author: guanziyue <[email protected]>
AuthorDate: Mon Aug 15 08:57:43 2022 -0500
[SPARK-40058][CORE] Avoid filter file path more than once in HadoopFSUtils
### What changes were proposed in this pull request?
Refactor path filter logic in HadoopFSUtils to avoid the same filter logic
is applied to a file multiple time. Method listLeafFiles is called recursively.
Especially, this filter will be used in single thread on all files at driver
side. This will lead to a performance issue when the filter logic is heavy.
### Why are the changes needed?
Apply filter only on filestatus as soon as they are firstly met.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
No test was added as such change is simple enough.
Closes #37498 from guanziyue/SPARK-40058.
Authored-by: guanziyue <[email protected]>
Signed-off-by: Sean Owen <[email protected]>
---
core/src/main/scala/org/apache/spark/util/HadoopFSUtils.scala | 10 +++++++---
1 file changed, 7 insertions(+), 3 deletions(-)
diff --git a/core/src/main/scala/org/apache/spark/util/HadoopFSUtils.scala
b/core/src/main/scala/org/apache/spark/util/HadoopFSUtils.scala
index 60a73adc858..01dc3ba68cc 100644
--- a/core/src/main/scala/org/apache/spark/util/HadoopFSUtils.scala
+++ b/core/src/main/scala/org/apache/spark/util/HadoopFSUtils.scala
@@ -254,7 +254,7 @@ private[spark] object HadoopFSUtils extends Logging {
val allLeafStatuses = {
val (dirs, topLevelFiles) = filteredStatuses.partition(_.isDirectory)
- val nestedFiles: Seq[FileStatus] = contextOpt match {
+ val filteredNestedFiles: Seq[FileStatus] = contextOpt match {
case Some(context) if dirs.size > parallelismThreshold =>
parallelListLeafFilesInternal(
context,
@@ -281,8 +281,12 @@ private[spark] object HadoopFSUtils extends Logging {
parallelismMax = parallelismMax)
}
}
- val allFiles = topLevelFiles ++ nestedFiles
- if (filter != null) allFiles.filter(f => filter.accept(f.getPath)) else
allFiles
+ val filteredTopLevelFiles = if (filter != null) {
+ topLevelFiles.filter(f => filter.accept(f.getPath))
+ } else {
+ topLevelFiles
+ }
+ filteredTopLevelFiles ++ filteredNestedFiles
}
val missingFiles = mutable.ArrayBuffer.empty[String]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]