Repository: spark
Updated Branches:
  refs/heads/master 54f758b5f -> bba5d7999


[SPARK-15639][SQL] Try to push down filter at RowGroups level for parquet reader

## What changes were proposed in this pull request?

The base class `SpecificParquetRecordReaderBase` used for vectorized parquet 
reader will try to get pushed-down filters from the given configuration. This 
pushed-down filters are used for RowGroups-level filtering. However, we don't 
set up the filters to push down into the configuration. In other words, the 
filters are not actually pushed down to do RowGroups-level filtering. This 
patch is to fix this and tries to set up the filters for pushing down to 
configuration for the reader.

## How was this patch tested?
Existing tests should be passed.

Author: Liang-Chi Hsieh <[email protected]>

Closes #13371 from viirya/vectorized-reader-push-down-filter.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/bba5d799
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/bba5d799
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/bba5d799

Branch: refs/heads/master
Commit: bba5d7999f7b3ae9d816ea552ba9378fea1615a6
Parents: 54f758b
Author: Liang-Chi Hsieh <[email protected]>
Authored: Fri Jun 10 18:23:59 2016 -0700
Committer: Cheng Lian <[email protected]>
Committed: Fri Jun 10 18:23:59 2016 -0700

----------------------------------------------------------------------
 .../catalyst/expressions/namedExpressions.scala |  8 +++
 .../datasources/FileSourceStrategy.scala        |  9 ++-
 .../datasources/parquet/ParquetFileFormat.scala | 61 ++------------------
 3 files changed, 21 insertions(+), 57 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/bba5d799/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala
index 306a99d..c06a1ea 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala
@@ -292,6 +292,14 @@ case class AttributeReference(
     }
   }
 
+  def withMetadata(newMetadata: Metadata): AttributeReference = {
+    if (metadata == newMetadata) {
+      this
+    } else {
+      AttributeReference(name, dataType, nullable, newMetadata)(exprId, 
qualifier, isGenerated)
+    }
+  }
+
   override protected final def otherCopyArgs: Seq[AnyRef] = {
     exprId :: qualifier :: isGenerated :: Nil
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/bba5d799/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
index 13a86bf..7fc842f 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
@@ -84,7 +84,14 @@ private[sql] object FileSourceStrategy extends Strategy with 
Logging {
       logInfo(s"Pruning directories with: 
${partitionKeyFilters.mkString(",")}")
 
       val dataColumns =
-        l.resolve(files.dataSchema, 
files.sparkSession.sessionState.analyzer.resolver)
+        l.resolve(files.dataSchema, 
files.sparkSession.sessionState.analyzer.resolver).map { c =>
+          files.dataSchema.find(_.name == c.name).map { f =>
+            c match {
+              case a: AttributeReference => a.withMetadata(f.metadata)
+              case _ => c
+            }
+          }.getOrElse(c)
+        }
 
       // Partition keys are not available in the statistics of the files.
       val dataFilters = 
normalizedFilters.filter(_.references.intersect(partitionSet).isEmpty)

http://git-wip-us.apache.org/repos/asf/spark/blob/bba5d799/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
index 3735c94..bc4a9de 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
@@ -357,6 +357,11 @@ private[sql] class ParquetFileFormat
       val hadoopAttemptContext =
         new TaskAttemptContextImpl(broadcastedHadoopConf.value.value, 
attemptId)
 
+      // Try to push down filters when filter push-down is enabled.
+      // Notice: This push-down is RowGroups level, not individual records.
+      pushed.foreach {
+        
ParquetInputFormat.setFilterPredicate(hadoopAttemptContext.getConfiguration, _)
+      }
       val parquetReader = if (enableVectorizedReader) {
         val vectorizedReader = new VectorizedParquetRecordReader()
         vectorizedReader.initialize(split, hadoopAttemptContext)
@@ -592,62 +597,6 @@ private[sql] object ParquetFileFormat extends Logging {
     }
   }
 
-  /** This closure sets various Parquet configurations at both driver side and 
executor side. */
-  private[parquet] def initializeLocalJobFunc(
-      requiredColumns: Array[String],
-      filters: Array[Filter],
-      dataSchema: StructType,
-      parquetBlockSize: Long,
-      useMetadataCache: Boolean,
-      parquetFilterPushDown: Boolean,
-      assumeBinaryIsString: Boolean,
-      assumeInt96IsTimestamp: Boolean)(job: Job): Unit = {
-    val conf = job.getConfiguration
-    conf.set(ParquetInputFormat.READ_SUPPORT_CLASS, 
classOf[CatalystReadSupport].getName)
-
-    // Try to push down filters when filter push-down is enabled.
-    if (parquetFilterPushDown) {
-      filters
-        // Collects all converted Parquet filter predicates. Notice that not 
all predicates can be
-        // converted (`ParquetFilters.createFilter` returns an `Option`). 
That's why a `flatMap`
-        // is used here.
-        .flatMap(ParquetFilters.createFilter(dataSchema, _))
-        .reduceOption(FilterApi.and)
-        .foreach(ParquetInputFormat.setFilterPredicate(conf, _))
-    }
-
-    conf.set(CatalystReadSupport.SPARK_ROW_REQUESTED_SCHEMA, {
-      val requestedSchema = StructType(requiredColumns.map(dataSchema(_)))
-      CatalystSchemaConverter.checkFieldNames(requestedSchema).json
-    })
-
-    conf.set(
-      CatalystWriteSupport.SPARK_ROW_SCHEMA,
-      CatalystSchemaConverter.checkFieldNames(dataSchema).json)
-
-    // Tell FilteringParquetRowInputFormat whether it's okay to cache Parquet 
and FS metadata
-    conf.setBoolean(SQLConf.PARQUET_CACHE_METADATA.key, useMetadataCache)
-
-    // Sets flags for `CatalystSchemaConverter`
-    conf.setBoolean(SQLConf.PARQUET_BINARY_AS_STRING.key, assumeBinaryIsString)
-    conf.setBoolean(SQLConf.PARQUET_INT96_AS_TIMESTAMP.key, 
assumeInt96IsTimestamp)
-
-    overrideMinSplitSize(parquetBlockSize, conf)
-  }
-
-  /** This closure sets input paths at the driver side. */
-  private[parquet] def initializeDriverSideJobFunc(
-      inputFiles: Array[FileStatus],
-      parquetBlockSize: Long)(job: Job): Unit = {
-    // We side the input paths at the driver side.
-    logInfo(s"Reading Parquet file(s) from 
${inputFiles.map(_.getPath).mkString(", ")}")
-    if (inputFiles.nonEmpty) {
-      FileInputFormat.setInputPaths(job, inputFiles.map(_.getPath): _*)
-    }
-
-    overrideMinSplitSize(parquetBlockSize, job.getConfiguration)
-  }
-
   private[parquet] def readSchema(
       footers: Seq[Footer], sparkSession: SparkSession): Option[StructType] = {
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to