This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 4e4e08f [SPARK-31047][SQL] Improve file listing for ViewFileSystem
4e4e08f is described below
commit 4e4e08f372db888797fd23faca88ac02d9466d5a
Author: manuzhang <[email protected]>
AuthorDate: Tue Mar 17 14:23:28 2020 -0700
[SPARK-31047][SQL] Improve file listing for ViewFileSystem
### What changes were proposed in this pull request?
Use `listLocatedStatus` when `lnMemoryFileIndex` is listing files from a
`ViewFileSystem` which should delegate to that of `DistributedFileSystem`.
### Why are the changes needed?
When `ViewFileSystem` is used to manage several `DistributedFileSystem`,
the change will improve performance of file listing, especially when there are
many files.
### Does this PR introduce any user-facing change?
No.
### How was this patch tested?
Existing tests.
Closes #27801 from manuzhang/spark-31047.
Authored-by: manuzhang <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
---
.../execution/datasources/InMemoryFileIndex.scala | 3 ++-
.../sql/execution/datasources/FileIndexSuite.scala | 25 +++++++++++++++++++++-
2 files changed, 26 insertions(+), 2 deletions(-)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala
index cac2d6e..84160f3 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala
@@ -23,6 +23,7 @@ import scala.collection.mutable
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs._
+import org.apache.hadoop.fs.viewfs.ViewFileSystem
import org.apache.hadoop.hdfs.DistributedFileSystem
import org.apache.hadoop.mapred.{FileInputFormat, JobConf}
@@ -313,7 +314,7 @@ object InMemoryFileIndex extends Logging {
// to retrieve the file status with the file block location. The
reason to still fallback
// to listStatus is because the default implementation would
potentially throw a
// FileNotFoundException which is better handled by doing the lookups
manually below.
- case _: DistributedFileSystem if !ignoreLocality =>
+ case (_: DistributedFileSystem | _: ViewFileSystem) if !ignoreLocality
=>
val remoteIter = fs.listLocatedStatus(path)
new Iterator[LocatedFileStatus]() {
def next(): LocatedFileStatus = remoteIter.next
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala
index 553773e..ea15f18 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala
@@ -22,7 +22,11 @@ import java.net.URI
import scala.collection.mutable
-import org.apache.hadoop.fs.{BlockLocation, FileStatus, LocatedFileStatus,
Path, RawLocalFileSystem}
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{BlockLocation, FileStatus, LocatedFileStatus,
Path, RawLocalFileSystem, RemoteIterator}
+import org.apache.hadoop.fs.viewfs.ViewFileSystem
+import org.mockito.ArgumentMatchers.any
+import org.mockito.Mockito.{mock, when}
import org.apache.spark.SparkException
import org.apache.spark.metrics.source.HiveCatalogMetrics
@@ -465,6 +469,25 @@ class FileIndexSuite extends SharedSparkSession {
}
}
}
+
+ test("SPARK-31047 - Improve file listing for ViewFileSystem") {
+ val path = mock(classOf[Path])
+ val dfs = mock(classOf[ViewFileSystem])
+ when(path.getFileSystem(any[Configuration])).thenReturn(dfs)
+ val statuses =
+ Seq(
+ new LocatedFileStatus(
+ new FileStatus(0, false, 0, 100, 0,
+ new Path("file")), Array(new BlockLocation()))
+ )
+ when(dfs.listLocatedStatus(path)).thenReturn(new
RemoteIterator[LocatedFileStatus] {
+ val iter = statuses.toIterator
+ override def hasNext: Boolean = iter.hasNext
+ override def next(): LocatedFileStatus = iter.next
+ })
+ val fileIndex = new TestInMemoryFileIndex(spark, path)
+ assert(fileIndex.leafFileStatuses.toSeq == statuses)
+ }
}
object DeletionRaceFileSystem {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]