Repository: spark
Updated Branches:
  refs/heads/master 2dd7f9308 -> e428b3a95


[SPARK-6566] [SQL] Related changes for newer parquet version

This brings in major improvement in that footers are not read on the driver. 
This also cleans up the code in parquetTableOperations, where we had to 
override getSplits to eliminate multiple listStatus calls.

cc liancheng

are there any other changes we need for this ?

Author: Yash Datta <yash.da...@guavus.com>

Closes #5889 from saucam/parquet_1.6 and squashes the following commits:

d1bf41e [Yash Datta] SPARK-7340: Fix scalastyle and incorporate review comments
c9aa042 [Yash Datta] SPARK-7340: Use the new user defined filter predicate for 
pushing down inset into parquet
56bc750 [Yash Datta] SPARK-7340: Change parquet version to latest release


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e428b3a9
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e428b3a9
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e428b3a9

Branch: refs/heads/master
Commit: e428b3a951377d47aa80d5f26d6bab979e72e8ab
Parents: 2dd7f93
Author: Yash Datta <yash.da...@guavus.com>
Authored: Fri Jun 12 13:44:09 2015 +0800
Committer: Cheng Lian <l...@databricks.com>
Committed: Fri Jun 12 13:44:09 2015 +0800

----------------------------------------------------------------------
 .../spark/sql/parquet/ParquetFilters.scala      |  42 ++++-
 .../sql/parquet/ParquetTableOperations.scala    | 187 +------------------
 2 files changed, 44 insertions(+), 185 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/e428b3a9/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetFilters.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetFilters.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetFilters.scala
index 4d659f2..d57b789 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetFilters.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetFilters.scala
@@ -17,6 +17,7 @@
 
 package org.apache.spark.sql.parquet
 
+import java.io.Serializable
 import java.nio.ByteBuffer
 
 import com.google.common.io.BaseEncoding
@@ -24,7 +25,8 @@ import org.apache.hadoop.conf.Configuration
 import org.apache.parquet.filter2.compat.FilterCompat
 import org.apache.parquet.filter2.compat.FilterCompat._
 import org.apache.parquet.filter2.predicate.FilterApi._
-import org.apache.parquet.filter2.predicate.{FilterApi, FilterPredicate}
+import org.apache.parquet.filter2.predicate.{FilterApi, FilterPredicate, 
Statistics}
+import org.apache.parquet.filter2.predicate.UserDefinedPredicate
 import org.apache.parquet.io.api.Binary
 
 import org.apache.spark.SparkEnv
@@ -42,6 +44,18 @@ private[sql] object ParquetFilters {
     }.reduceOption(FilterApi.and).map(FilterCompat.get)
   }
 
+  case class SetInFilter[T <: Comparable[T]](
+    valueSet: Set[T]) extends UserDefinedPredicate[T] with Serializable {
+
+    override def keep(value: T): Boolean = {
+      value != null && valueSet.contains(value)
+    }
+
+    override def canDrop(statistics: Statistics[T]): Boolean = false
+
+    override def inverseCanDrop(statistics: Statistics[T]): Boolean = false
+  }
+
   private val makeEq: PartialFunction[DataType, (String, Any) => 
FilterPredicate] = {
     case BooleanType =>
       (n: String, v: Any) => FilterApi.eq(booleanColumn(n), 
v.asInstanceOf[java.lang.Boolean])
@@ -154,6 +168,29 @@ private[sql] object ParquetFilters {
         FilterApi.gtEq(binaryColumn(n), 
Binary.fromByteArray(v.asInstanceOf[Array[Byte]]))
   }
 
+  private val makeInSet: PartialFunction[DataType, (String, Set[Any]) => 
FilterPredicate] = {
+    case IntegerType =>
+      (n: String, v: Set[Any]) =>
+        FilterApi.userDefined(intColumn(n), 
SetInFilter(v.asInstanceOf[Set[java.lang.Integer]]))
+    case LongType =>
+      (n: String, v: Set[Any]) =>
+        FilterApi.userDefined(longColumn(n), 
SetInFilter(v.asInstanceOf[Set[java.lang.Long]]))
+    case FloatType =>
+      (n: String, v: Set[Any]) =>
+        FilterApi.userDefined(floatColumn(n), 
SetInFilter(v.asInstanceOf[Set[java.lang.Float]]))
+    case DoubleType =>
+      (n: String, v: Set[Any]) =>
+        FilterApi.userDefined(doubleColumn(n), 
SetInFilter(v.asInstanceOf[Set[java.lang.Double]]))
+    case StringType =>
+      (n: String, v: Set[Any]) =>
+        FilterApi.userDefined(binaryColumn(n),
+          SetInFilter(v.map(e => 
Binary.fromByteArray(e.asInstanceOf[UTF8String].getBytes))))
+    case BinaryType =>
+      (n: String, v: Set[Any]) =>
+        FilterApi.userDefined(binaryColumn(n),
+          SetInFilter(v.map(e => 
Binary.fromByteArray(e.asInstanceOf[Array[Byte]]))))
+  }
+
   /**
    * Converts data sources filters to Parquet filter predicates.
    */
@@ -285,6 +322,9 @@ private[sql] object ParquetFilters {
       case Not(pred) =>
         createFilter(pred).map(FilterApi.not)
 
+      case InSet(NamedExpression(name, dataType), valueSet) =>
+        makeInSet.lift(dataType).map(_(name, valueSet))
+
       case _ => None
     }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/e428b3a9/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala
index 1e694f2..272608d 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala
@@ -117,6 +117,9 @@ private[sql] case class ParquetTableScan(
       SQLConf.PARQUET_CACHE_METADATA,
       sqlContext.getConf(SQLConf.PARQUET_CACHE_METADATA, "true"))
 
+    // Use task side metadata in parquet
+    conf.setBoolean(ParquetInputFormat.TASK_SIDE_METADATA, true);
+
     val baseRDD =
       new org.apache.spark.rdd.NewHadoopRDD(
         sc,
@@ -453,190 +456,6 @@ private[parquet] class FilteringParquetRowInputFormat
     }
   }
 
-  // This is only a temporary solution sicne we need to use fileStatuses in
-  // both getClientSideSplits and getTaskSideSplits. It can be removed once we 
get rid of these
-  // two methods.
-  override def getSplits(jobContext: JobContext): JList[InputSplit] = {
-    // First set fileStatuses.
-    val statuses = listStatus(jobContext)
-    fileStatuses = statuses.map(file => file.getPath -> file).toMap
-
-    super.getSplits(jobContext)
-  }
-
-  // TODO Remove this method and related code once PARQUET-16 is fixed
-  // This method together with the `getFooters` method and the `fileStatuses` 
field are just used
-  // to mimic this PR: https://github.com/apache/incubator-parquet-mr/pull/17
-  override def getSplits(
-      configuration: Configuration,
-      footers: JList[Footer]): JList[ParquetInputSplit] = {
-
-    // Use task side strategy by default
-    val taskSideMetaData = 
configuration.getBoolean(ParquetInputFormat.TASK_SIDE_METADATA, true)
-    val maxSplitSize: JLong = configuration.getLong("mapred.max.split.size", 
Long.MaxValue)
-    val minSplitSize: JLong =
-      Math.max(getFormatMinSplitSize, 
configuration.getLong("mapred.min.split.size", 0L))
-    if (maxSplitSize < 0 || minSplitSize < 0) {
-      throw new ParquetDecodingException(
-        s"maxSplitSize or minSplitSie should not be negative: maxSplitSize = 
$maxSplitSize;" +
-          s" minSplitSize = $minSplitSize")
-    }
-
-    // Uses strict type checking by default
-    val getGlobalMetaData =
-      classOf[ParquetFileWriter].getDeclaredMethod("getGlobalMetaData", 
classOf[JList[Footer]])
-    getGlobalMetaData.setAccessible(true)
-    var globalMetaData = getGlobalMetaData.invoke(null, 
footers).asInstanceOf[GlobalMetaData]
-
-    if (globalMetaData == null) {
-     val splits = mutable.ArrayBuffer.empty[ParquetInputSplit]
-     return splits
-    }
-
-    val metadata = configuration.get(RowWriteSupport.SPARK_ROW_SCHEMA)
-    val mergedMetadata = globalMetaData
-      .getKeyValueMetaData
-      .updated(RowReadSupport.SPARK_METADATA_KEY, setAsJavaSet(Set(metadata)))
-
-    globalMetaData = new GlobalMetaData(globalMetaData.getSchema,
-      mergedMetadata, globalMetaData.getCreatedBy)
-
-    val readContext = 
ParquetInputFormat.getReadSupportInstance(configuration).init(
-      new InitContext(configuration,
-        globalMetaData.getKeyValueMetaData,
-        globalMetaData.getSchema))
-
-    if (taskSideMetaData){
-      logInfo("Using Task Side Metadata Split Strategy")
-      getTaskSideSplits(configuration,
-        footers,
-        maxSplitSize,
-        minSplitSize,
-        readContext)
-    } else {
-      logInfo("Using Client Side Metadata Split Strategy")
-      getClientSideSplits(configuration,
-        footers,
-        maxSplitSize,
-        minSplitSize,
-        readContext)
-    }
-
-  }
-
-  def getClientSideSplits(
-    configuration: Configuration,
-    footers: JList[Footer],
-    maxSplitSize: JLong,
-    minSplitSize: JLong,
-    readContext: ReadContext): JList[ParquetInputSplit] = {
-
-    import org.apache.parquet.filter2.compat.FilterCompat.Filter
-    import org.apache.parquet.filter2.compat.RowGroupFilter
-
-    import 
org.apache.spark.sql.parquet.FilteringParquetRowInputFormat.blockLocationCache
-
-    val cacheMetadata = 
configuration.getBoolean(SQLConf.PARQUET_CACHE_METADATA, true)
-
-    val splits = mutable.ArrayBuffer.empty[ParquetInputSplit]
-    val filter: Filter = ParquetInputFormat.getFilter(configuration)
-    var rowGroupsDropped: Long = 0
-    var totalRowGroups: Long = 0
-
-    // Ugly hack, stuck with it until PR:
-    // https://github.com/apache/incubator-parquet-mr/pull/17
-    // is resolved
-    val generateSplits =
-      
Class.forName("org.apache.parquet.hadoop.ClientSideMetadataSplitStrategy")
-       .getDeclaredMethods.find(_.getName == "generateSplits").getOrElse(
-         sys.error(s"Failed to reflectively invoke 
ClientSideMetadataSplitStrategy.generateSplits"))
-    generateSplits.setAccessible(true)
-
-    for (footer <- footers) {
-      val fs = footer.getFile.getFileSystem(configuration)
-      val file = footer.getFile
-      val status = fileStatuses.getOrElse(file, fs.getFileStatus(file))
-      val parquetMetaData = footer.getParquetMetadata
-      val blocks = parquetMetaData.getBlocks
-      totalRowGroups = totalRowGroups + blocks.size
-      val filteredBlocks = RowGroupFilter.filterRowGroups(
-        filter,
-        blocks,
-        parquetMetaData.getFileMetaData.getSchema)
-      rowGroupsDropped = rowGroupsDropped + (blocks.size - filteredBlocks.size)
-
-      if (!filteredBlocks.isEmpty){
-          var blockLocations: Array[BlockLocation] = null
-          if (!cacheMetadata) {
-            blockLocations = fs.getFileBlockLocations(status, 0, status.getLen)
-          } else {
-            blockLocations = blockLocationCache.get(status, new 
Callable[Array[BlockLocation]] {
-              def call(): Array[BlockLocation] = 
fs.getFileBlockLocations(status, 0, status.getLen)
-            })
-          }
-          splits.addAll(
-            generateSplits.invoke(
-              null,
-              filteredBlocks,
-              blockLocations,
-              status,
-              readContext.getRequestedSchema.toString,
-              readContext.getReadSupportMetadata,
-              minSplitSize,
-              maxSplitSize).asInstanceOf[JList[ParquetInputSplit]])
-        }
-    }
-
-    if (rowGroupsDropped > 0 && totalRowGroups > 0){
-      val percentDropped = ((rowGroupsDropped/totalRowGroups.toDouble) * 
100).toInt
-      logInfo(s"Dropping $rowGroupsDropped row groups that do not pass filter 
predicate "
-        + s"($percentDropped %) !")
-    }
-    else {
-      logInfo("There were no row groups that could be dropped due to filter 
predicates")
-    }
-    splits
-
-  }
-
-  def getTaskSideSplits(
-    configuration: Configuration,
-    footers: JList[Footer],
-    maxSplitSize: JLong,
-    minSplitSize: JLong,
-    readContext: ReadContext): JList[ParquetInputSplit] = {
-
-    val splits = mutable.ArrayBuffer.empty[ParquetInputSplit]
-
-    // Ugly hack, stuck with it until PR:
-    // https://github.com/apache/incubator-parquet-mr/pull/17
-    // is resolved
-    val generateSplits =
-      Class.forName("org.apache.parquet.hadoop.TaskSideMetadataSplitStrategy")
-       .getDeclaredMethods.find(_.getName == 
"generateTaskSideMDSplits").getOrElse(
-         sys.error(
-           s"Failed to reflectively invoke 
TaskSideMetadataSplitStrategy.generateTaskSideMDSplits"))
-    generateSplits.setAccessible(true)
-
-    for (footer <- footers) {
-      val file = footer.getFile
-      val fs = file.getFileSystem(configuration)
-      val status = fileStatuses.getOrElse(file, fs.getFileStatus(file))
-      val blockLocations = fs.getFileBlockLocations(status, 0, status.getLen)
-      splits.addAll(
-        generateSplits.invoke(
-         null,
-         blockLocations,
-         status,
-         readContext.getRequestedSchema.toString,
-         readContext.getReadSupportMetadata,
-         minSplitSize,
-         maxSplitSize).asInstanceOf[JList[ParquetInputSplit]])
-    }
-
-    splits
-  }
-
 }
 
 private[parquet] object FilteringParquetRowInputFormat {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to