This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 4e4e08f [SPARK-31047][SQL] Improve file listing for ViewFileSystem 4e4e08f is described below commit 4e4e08f372db888797fd23faca88ac02d9466d5a Author: manuzhang <owenzhang1...@gmail.com> AuthorDate: Tue Mar 17 14:23:28 2020 -0700 [SPARK-31047][SQL] Improve file listing for ViewFileSystem ### What changes were proposed in this pull request? Use `listLocatedStatus` when `lnMemoryFileIndex` is listing files from a `ViewFileSystem` which should delegate to that of `DistributedFileSystem`. ### Why are the changes needed? When `ViewFileSystem` is used to manage several `DistributedFileSystem`, the change will improve performance of file listing, especially when there are many files. ### Does this PR introduce any user-facing change? No. ### How was this patch tested? Existing tests. Closes #27801 from manuzhang/spark-31047. Authored-by: manuzhang <owenzhang1...@gmail.com> Signed-off-by: Dongjoon Hyun <dongj...@apache.org> --- .../execution/datasources/InMemoryFileIndex.scala | 3 ++- .../sql/execution/datasources/FileIndexSuite.scala | 25 +++++++++++++++++++++- 2 files changed, 26 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala index cac2d6e..84160f3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala @@ -23,6 +23,7 @@ import scala.collection.mutable import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs._ +import org.apache.hadoop.fs.viewfs.ViewFileSystem import org.apache.hadoop.hdfs.DistributedFileSystem import org.apache.hadoop.mapred.{FileInputFormat, JobConf} @@ -313,7 +314,7 @@ object InMemoryFileIndex extends Logging { // to retrieve the file status with the file block location. The reason to still fallback // to listStatus is because the default implementation would potentially throw a // FileNotFoundException which is better handled by doing the lookups manually below. - case _: DistributedFileSystem if !ignoreLocality => + case (_: DistributedFileSystem | _: ViewFileSystem) if !ignoreLocality => val remoteIter = fs.listLocatedStatus(path) new Iterator[LocatedFileStatus]() { def next(): LocatedFileStatus = remoteIter.next diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala index 553773e..ea15f18 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala @@ -22,7 +22,11 @@ import java.net.URI import scala.collection.mutable -import org.apache.hadoop.fs.{BlockLocation, FileStatus, LocatedFileStatus, Path, RawLocalFileSystem} +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{BlockLocation, FileStatus, LocatedFileStatus, Path, RawLocalFileSystem, RemoteIterator} +import org.apache.hadoop.fs.viewfs.ViewFileSystem +import org.mockito.ArgumentMatchers.any +import org.mockito.Mockito.{mock, when} import org.apache.spark.SparkException import org.apache.spark.metrics.source.HiveCatalogMetrics @@ -465,6 +469,25 @@ class FileIndexSuite extends SharedSparkSession { } } } + + test("SPARK-31047 - Improve file listing for ViewFileSystem") { + val path = mock(classOf[Path]) + val dfs = mock(classOf[ViewFileSystem]) + when(path.getFileSystem(any[Configuration])).thenReturn(dfs) + val statuses = + Seq( + new LocatedFileStatus( + new FileStatus(0, false, 0, 100, 0, + new Path("file")), Array(new BlockLocation())) + ) + when(dfs.listLocatedStatus(path)).thenReturn(new RemoteIterator[LocatedFileStatus] { + val iter = statuses.toIterator + override def hasNext: Boolean = iter.hasNext + override def next(): LocatedFileStatus = iter.next + }) + val fileIndex = new TestInMemoryFileIndex(spark, path) + assert(fileIndex.leafFileStatuses.toSeq == statuses) + } } object DeletionRaceFileSystem { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org