Repository: spark
Updated Branches:
  refs/heads/master 6bca8898a -> 9eb74c7d2


[SPARK-3091] [SQL] Add support for caching metadata on Parquet files

For larger Parquet files, reading the file footers (which is done in parallel 
on up to 5 threads) and HDFS block locations (which is serial) can take 
multiple seconds. We can add an option to cache this data within 
FilteringParquetInputFormat. Unfortunately ParquetInputFormat only caches 
footers within each instance of ParquetInputFormat, not across them.

Note: this PR leaves this turned off by default for 1.1, but I believe it's 
safe to turn it on after. The keys in the hash maps are FileStatus objects that 
include a modification time, so this will work fine if files are modified. The 
location cache could become invalid if files have moved within HDFS, but that's 
rare so I just made it invalidate entries every 15 minutes.

Author: Matei Zaharia <[email protected]>

Closes #2005 from mateiz/parquet-cache and squashes the following commits:

dae8efe [Matei Zaharia] Bug fix
c71e9ed [Matei Zaharia] Handle empty statuses directly
22072b0 [Matei Zaharia] Use Guava caches and add a config option for caching 
metadata
8fb56ce [Matei Zaharia] Cache file block locations too
453bd21 [Matei Zaharia] Bug fix
4094df6 [Matei Zaharia] First attempt at caching Parquet footers


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9eb74c7d
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9eb74c7d
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9eb74c7d

Branch: refs/heads/master
Commit: 9eb74c7d2cbe127dd4c32bf1a8318497b2fb55b6
Parents: 6bca889
Author: Matei Zaharia <[email protected]>
Authored: Mon Aug 18 11:00:10 2014 -0700
Committer: Michael Armbrust <[email protected]>
Committed: Mon Aug 18 11:00:10 2014 -0700

----------------------------------------------------------------------
 .../scala/org/apache/spark/sql/SQLConf.scala    |  1 +
 .../sql/parquet/ParquetTableOperations.scala    | 84 +++++++++++++++++---
 2 files changed, 72 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/9eb74c7d/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
index 56face2..4f2adb0 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
@@ -32,6 +32,7 @@ private[spark] object SQLConf {
   val CODEGEN_ENABLED = "spark.sql.codegen"
   val DIALECT = "spark.sql.dialect"
   val PARQUET_BINARY_AS_STRING = "spark.sql.parquet.binaryAsString"
+  val PARQUET_CACHE_METADATA = "spark.sql.parquet.cacheMetadata"
 
   // This is only used for the thriftserver
   val THRIFTSERVER_POOL = "spark.sql.thriftserver.scheduler.pool"

http://git-wip-us.apache.org/repos/asf/spark/blob/9eb74c7d/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala
index 759a2a5..c6dca10 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala
@@ -17,22 +17,23 @@
 
 package org.apache.spark.sql.parquet
 
-import scala.collection.JavaConversions._
-import scala.collection.mutable
-import scala.util.Try
-
 import java.io.IOException
 import java.lang.{Long => JLong}
 import java.text.SimpleDateFormat
-import java.util.{Date, List => JList}
+import java.util.concurrent.{Callable, TimeUnit}
+import java.util.{ArrayList, Collections, Date, List => JList}
 
+import scala.collection.JavaConversions._
+import scala.collection.mutable
+import scala.util.Try
+
+import com.google.common.cache.CacheBuilder
 import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.{FileStatus, Path}
+import org.apache.hadoop.fs.{BlockLocation, FileStatus, Path}
 import org.apache.hadoop.mapreduce._
 import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat => 
NewFileInputFormat}
 import org.apache.hadoop.mapreduce.lib.output.{FileOutputFormat => 
NewFileOutputFormat}
 import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter
-
 import parquet.hadoop._
 import parquet.hadoop.api.{InitContext, ReadSupport}
 import parquet.hadoop.metadata.GlobalMetaData
@@ -41,7 +42,7 @@ import parquet.io.ParquetDecodingException
 import parquet.schema.MessageType
 
 import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.SQLContext
+import org.apache.spark.sql.SQLConf
 import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, Row}
 import org.apache.spark.sql.execution.{LeafNode, SparkPlan, UnaryNode}
 import org.apache.spark.{Logging, SerializableWritable, TaskContext}
@@ -96,6 +97,11 @@ case class ParquetTableScan(
       ParquetFilters.serializeFilterExpressions(columnPruningPred, conf)
     }
 
+    // Tell FilteringParquetRowInputFormat whether it's okay to cache Parquet 
and FS metadata
+    conf.set(
+      SQLConf.PARQUET_CACHE_METADATA,
+      sqlContext.getConf(SQLConf.PARQUET_CACHE_METADATA, "false"))
+
     sc.newAPIHadoopRDD(
       conf,
       classOf[FilteringParquetRowInputFormat],
@@ -323,10 +329,40 @@ private[parquet] class FilteringParquetRowInputFormat
   }
 
   override def getFooters(jobContext: JobContext): JList[Footer] = {
+    import FilteringParquetRowInputFormat.footerCache
+
     if (footers eq null) {
+      val conf = ContextUtil.getConfiguration(jobContext)
+      val cacheMetadata = conf.getBoolean(SQLConf.PARQUET_CACHE_METADATA, 
false)
       val statuses = listStatus(jobContext)
       fileStatuses = statuses.map(file => file.getPath -> file).toMap
-      footers = getFooters(ContextUtil.getConfiguration(jobContext), statuses)
+      if (statuses.isEmpty) {
+        footers = Collections.emptyList[Footer]
+      } else if (!cacheMetadata) {
+        // Read the footers from HDFS
+        footers = getFooters(conf, statuses)
+      } else {
+        // Read only the footers that are not in the footerCache
+        val foundFooters = footerCache.getAllPresent(statuses)
+        val toFetch = new ArrayList[FileStatus]
+        for (s <- statuses) {
+          if (!foundFooters.containsKey(s)) {
+            toFetch.add(s)
+          }
+        }
+        val newFooters = new mutable.HashMap[FileStatus, Footer]
+        if (toFetch.size > 0) {
+          val fetched = getFooters(conf, toFetch)
+          for ((status, i) <- toFetch.zipWithIndex) {
+            newFooters(status) = fetched.get(i)
+          }
+          footerCache.putAll(newFooters)
+        }
+        footers = new ArrayList[Footer](statuses.size)
+        for (status <- statuses) {
+          footers.add(newFooters.getOrElse(status, foundFooters.get(status)))
+        }
+      }
     }
 
     footers
@@ -339,6 +375,10 @@ private[parquet] class FilteringParquetRowInputFormat
       configuration: Configuration,
       footers: JList[Footer]): JList[ParquetInputSplit] = {
 
+    import FilteringParquetRowInputFormat.blockLocationCache
+
+    val cacheMetadata = 
configuration.getBoolean(SQLConf.PARQUET_CACHE_METADATA, false)
+
     val maxSplitSize: JLong = configuration.getLong("mapred.max.split.size", 
Long.MaxValue)
     val minSplitSize: JLong =
       Math.max(getFormatMinSplitSize(), 
configuration.getLong("mapred.min.split.size", 0L))
@@ -366,16 +406,23 @@ private[parquet] class FilteringParquetRowInputFormat
     for (footer <- footers) {
       val fs = footer.getFile.getFileSystem(configuration)
       val file = footer.getFile
-      val fileStatus = fileStatuses.getOrElse(file, fs.getFileStatus(file))
+      val status = fileStatuses.getOrElse(file, fs.getFileStatus(file))
       val parquetMetaData = footer.getParquetMetadata
       val blocks = parquetMetaData.getBlocks
-      val fileBlockLocations = fs.getFileBlockLocations(fileStatus, 0, 
fileStatus.getLen)
+      var blockLocations: Array[BlockLocation] = null
+      if (!cacheMetadata) {
+        blockLocations = fs.getFileBlockLocations(status, 0, status.getLen)
+      } else {
+        blockLocations = blockLocationCache.get(status, new 
Callable[Array[BlockLocation]] {
+          def call(): Array[BlockLocation] = fs.getFileBlockLocations(status, 
0, status.getLen)
+        })
+      }
       splits.addAll(
         generateSplits.invoke(
           null,
           blocks,
-          fileBlockLocations,
-          fileStatus,
+          blockLocations,
+          status,
           parquetMetaData.getFileMetaData,
           readContext.getRequestedSchema.toString,
           readContext.getReadSupportMetadata,
@@ -387,6 +434,17 @@ private[parquet] class FilteringParquetRowInputFormat
   }
 }
 
+private[parquet] object FilteringParquetRowInputFormat {
+  private val footerCache = CacheBuilder.newBuilder()
+    .maximumSize(20000)
+    .build[FileStatus, Footer]()
+
+  private val blockLocationCache = CacheBuilder.newBuilder()
+    .maximumSize(20000)
+    .expireAfterWrite(15, TimeUnit.MINUTES)  // Expire locations since HDFS 
files might move
+    .build[FileStatus, Array[BlockLocation]]()
+}
+
 private[parquet] object FileSystemHelper {
   def listFiles(pathStr: String, conf: Configuration): Seq[Path] = {
     val origPath = new Path(pathStr)


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to