Repository: spark Updated Branches: refs/heads/master 9cbf105ab -> b0cee9605
[SPARK-25062][SQL] Clean up BlockLocations in InMemoryFileIndex ## What changes were proposed in this pull request? `InMemoryFileIndex` contains a cache of `LocatedFileStatus` objects. Each `LocatedFileStatus` object can contain several `BlockLocation`s or some subclass of it. Filling up this cache by listing files happens recursively either on the driver or on the executors, depending on the parallel discovery threshold (`spark.sql.sources.parallelPartitionDiscovery.threshold`). If the listing happens on the executors block location objects are converted to simple `BlockLocation` objects to ensure serialization requirements. If it happens on the driver then there is no conversion and depending on the file system a `BlockLocation` object can be a subclass like `HdfsBlockLocation` and consume more memory. This PR adds the conversion to the latter case and decreases memory consumption. ## How was this patch tested? Added unit test. Closes #22603 from peter-toth/SPARK-25062. Authored-by: Peter Toth <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b0cee960 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b0cee960 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b0cee960 Branch: refs/heads/master Commit: b0cee9605e7c71cfd020aa917319478f9ac61bdb Parents: 9cbf105 Author: Peter Toth <[email protected]> Authored: Sat Oct 6 14:50:03 2018 -0700 Committer: Dongjoon Hyun <[email protected]> Committed: Sat Oct 6 14:50:03 2018 -0700 ---------------------------------------------------------------------- .../datasources/InMemoryFileIndex.scala | 9 ++++- .../execution/datasources/FileIndexSuite.scala | 39 +++++++++++++++++++- 2 files changed, 46 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/b0cee960/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala index dc5c2ff..fe418e6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala @@ -315,7 +315,14 @@ object InMemoryFileIndex extends Logging { // which is very slow on some file system (RawLocalFileSystem, which is launch a // subprocess and parse the stdout). try { - val locations = fs.getFileBlockLocations(f, 0, f.getLen) + val locations = fs.getFileBlockLocations(f, 0, f.getLen).map { loc => + // Store BlockLocation objects to consume less memory + if (loc.getClass == classOf[BlockLocation]) { + loc + } else { + new BlockLocation(loc.getNames, loc.getHosts, loc.getOffset, loc.getLength) + } + } val lfs = new LocatedFileStatus(f.getLen, f.isDirectory, f.getReplication, f.getBlockSize, f.getModificationTime, 0, null, null, null, null, f.getPath, locations) if (f.isSymlink) { http://git-wip-us.apache.org/repos/asf/spark/blob/b0cee960/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala index 18bb4bf..49e7af4 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala @@ -23,7 +23,7 @@ import java.net.URI import scala.collection.mutable import scala.language.reflectiveCalls -import org.apache.hadoop.fs.{FileStatus, Path, RawLocalFileSystem} +import org.apache.hadoop.fs.{BlockLocation, FileStatus, LocatedFileStatus, Path, RawLocalFileSystem} import org.apache.spark.metrics.source.HiveCatalogMetrics import org.apache.spark.sql.catalyst.util._ @@ -248,6 +248,26 @@ class FileIndexSuite extends SharedSQLContext { assert(spark.read.parquet(path.getAbsolutePath).schema.exists(_.name == colToUnescape)) } } + + test("SPARK-25062 - InMemoryFileIndex stores BlockLocation objects no matter what subclass " + + "the FS returns") { + withSQLConf("fs.file.impl" -> classOf[SpecialBlockLocationFileSystem].getName) { + withTempDir { dir => + val file = new File(dir, "text.txt") + stringToFile(file, "text") + + val inMemoryFileIndex = new InMemoryFileIndex( + spark, Seq(new Path(file.getCanonicalPath)), Map.empty, None) { + def leafFileStatuses = leafFiles.values + } + val blockLocations = inMemoryFileIndex.leafFileStatuses.flatMap( + _.asInstanceOf[LocatedFileStatus].getBlockLocations) + + assert(blockLocations.forall(_.getClass == classOf[BlockLocation])) + } + } + } + } class FakeParentPathFileSystem extends RawLocalFileSystem { @@ -257,3 +277,20 @@ class FakeParentPathFileSystem extends RawLocalFileSystem { URI.create("mockFs://some-bucket") } } + +class SpecialBlockLocationFileSystem extends RawLocalFileSystem { + + class SpecialBlockLocation( + names: Array[String], + hosts: Array[String], + offset: Long, + length: Long) + extends BlockLocation(names, hosts, offset, length) + + override def getFileBlockLocations( + file: FileStatus, + start: Long, + len: Long): Array[BlockLocation] = { + Array(new SpecialBlockLocation(Array("dummy"), Array("dummy"), 0L, file.getLen)) + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
