Repository: spark
Updated Branches:
  refs/heads/master 9cbf105ab -> b0cee9605


[SPARK-25062][SQL] Clean up BlockLocations in InMemoryFileIndex

## What changes were proposed in this pull request?

`InMemoryFileIndex` contains a cache of `LocatedFileStatus` objects. Each 
`LocatedFileStatus` object can contain several `BlockLocation`s or some 
subclass of it. Filling up this cache by listing files happens recursively 
either on the driver or on the executors, depending on the parallel discovery 
threshold (`spark.sql.sources.parallelPartitionDiscovery.threshold`). If the 
listing happens on the executors block location objects are converted to simple 
`BlockLocation` objects to ensure serialization requirements. If it happens on 
the driver then there is no conversion and depending on the file system a 
`BlockLocation` object can be a subclass like `HdfsBlockLocation` and consume 
more memory. This PR adds the conversion to the latter case and decreases 
memory consumption.

## How was this patch tested?

Added unit test.

Closes #22603 from peter-toth/SPARK-25062.

Authored-by: Peter Toth <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b0cee960
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b0cee960
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b0cee960

Branch: refs/heads/master
Commit: b0cee9605e7c71cfd020aa917319478f9ac61bdb
Parents: 9cbf105
Author: Peter Toth <[email protected]>
Authored: Sat Oct 6 14:50:03 2018 -0700
Committer: Dongjoon Hyun <[email protected]>
Committed: Sat Oct 6 14:50:03 2018 -0700

----------------------------------------------------------------------
 .../datasources/InMemoryFileIndex.scala         |  9 ++++-
 .../execution/datasources/FileIndexSuite.scala  | 39 +++++++++++++++++++-
 2 files changed, 46 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/b0cee960/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala
index dc5c2ff..fe418e6 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala
@@ -315,7 +315,14 @@ object InMemoryFileIndex extends Logging {
         // which is very slow on some file system (RawLocalFileSystem, which 
is launch a
         // subprocess and parse the stdout).
         try {
-          val locations = fs.getFileBlockLocations(f, 0, f.getLen)
+          val locations = fs.getFileBlockLocations(f, 0, f.getLen).map { loc =>
+            // Store BlockLocation objects to consume less memory
+            if (loc.getClass == classOf[BlockLocation]) {
+              loc
+            } else {
+              new BlockLocation(loc.getNames, loc.getHosts, loc.getOffset, 
loc.getLength)
+            }
+          }
           val lfs = new LocatedFileStatus(f.getLen, f.isDirectory, 
f.getReplication, f.getBlockSize,
             f.getModificationTime, 0, null, null, null, null, f.getPath, 
locations)
           if (f.isSymlink) {

http://git-wip-us.apache.org/repos/asf/spark/blob/b0cee960/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala
index 18bb4bf..49e7af4 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala
@@ -23,7 +23,7 @@ import java.net.URI
 import scala.collection.mutable
 import scala.language.reflectiveCalls
 
-import org.apache.hadoop.fs.{FileStatus, Path, RawLocalFileSystem}
+import org.apache.hadoop.fs.{BlockLocation, FileStatus, LocatedFileStatus, 
Path, RawLocalFileSystem}
 
 import org.apache.spark.metrics.source.HiveCatalogMetrics
 import org.apache.spark.sql.catalyst.util._
@@ -248,6 +248,26 @@ class FileIndexSuite extends SharedSQLContext {
       assert(spark.read.parquet(path.getAbsolutePath).schema.exists(_.name == 
colToUnescape))
     }
   }
+
+  test("SPARK-25062 - InMemoryFileIndex stores BlockLocation objects no matter 
what subclass " +
+    "the FS returns") {
+    withSQLConf("fs.file.impl" -> 
classOf[SpecialBlockLocationFileSystem].getName) {
+      withTempDir { dir =>
+        val file = new File(dir, "text.txt")
+        stringToFile(file, "text")
+
+        val inMemoryFileIndex = new InMemoryFileIndex(
+          spark, Seq(new Path(file.getCanonicalPath)), Map.empty, None) {
+          def leafFileStatuses = leafFiles.values
+        }
+        val blockLocations = inMemoryFileIndex.leafFileStatuses.flatMap(
+          _.asInstanceOf[LocatedFileStatus].getBlockLocations)
+
+        assert(blockLocations.forall(_.getClass == classOf[BlockLocation]))
+      }
+    }
+  }
+
 }
 
 class FakeParentPathFileSystem extends RawLocalFileSystem {
@@ -257,3 +277,20 @@ class FakeParentPathFileSystem extends RawLocalFileSystem {
     URI.create("mockFs://some-bucket")
   }
 }
+
+class SpecialBlockLocationFileSystem extends RawLocalFileSystem {
+
+  class SpecialBlockLocation(
+      names: Array[String],
+      hosts: Array[String],
+      offset: Long,
+      length: Long)
+    extends BlockLocation(names, hosts, offset, length)
+
+  override def getFileBlockLocations(
+      file: FileStatus,
+      start: Long,
+      len: Long): Array[BlockLocation] = {
+    Array(new SpecialBlockLocation(Array("dummy"), Array("dummy"), 0L, 
file.getLen))
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to