Repository: spark
Updated Branches:
refs/heads/master 480357cc6 -> 4f8ceed59
[SPARK-16371][SQL] Do not push down filters incorrectly when inner name and
outer name are the same in Parquet
## What changes were proposed in this pull request?
Currently, if there is a schema as below:
```
root
|-- _1: struct (nullable = true)
| |-- _1: integer (nullable = true)
```
and if we execute the codes below:
```scala
df.filter("_1 IS NOT NULL").count()
```
This pushes down a filter although this filter is being applied to
`StructType`.(If my understanding is correct, Spark does not pushes down
filters for those).
The reason is, `ParquetFilters.getFieldMap` produces results below:
```
(_1,StructType(StructField(_1,IntegerType,true)))
(_1,IntegerType)
```
and then it becomes a `Map`
```
(_1,IntegerType)
```
Now, because of ` ....lift(dataTypeOf(name)).map(_(name, value))`, this pushes
down filters for `_1` which Parquet thinks is `IntegerType`. However, it is
actually `StructType`.
So, Parquet filter2 produces incorrect results, for example, the codes below:
```
df.filter("_1 IS NOT NULL").count()
```
produces always 0.
This PR prevents this by not finding nested fields.
## How was this patch tested?
Unit test in `ParquetFilterSuite`.
Author: hyukjinkwon <[email protected]>
Closes #14067 from HyukjinKwon/SPARK-16371.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4f8ceed5
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4f8ceed5
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4f8ceed5
Branch: refs/heads/master
Commit: 4f8ceed59367319300e4bfa5b957c387be81ffa3
Parents: 480357c
Author: hyukjinkwon <[email protected]>
Authored: Wed Jul 6 12:42:16 2016 -0700
Committer: Reynold Xin <[email protected]>
Committed: Wed Jul 6 12:42:16 2016 -0700
----------------------------------------------------------------------
.../datasources/parquet/ParquetFileFormat.scala | 2 +-
.../datasources/parquet/ParquetFilters.scala | 5 ++++-
.../datasources/parquet/ParquetFilterSuite.scala | 14 ++++++++++++++
3 files changed, 19 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/4f8ceed5/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
----------------------------------------------------------------------
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
index 9833620..76d7f5c 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
@@ -438,7 +438,7 @@ private[sql] class ParquetOutputWriterFactory(
ParquetOutputFormat.setWriteSupportClass(job, classOf[ParquetWriteSupport])
// We want to clear this temporary metadata from saving into Parquet file.
- // This metadata is only useful for detecting optional columns when
pushdowning filters.
+ // This metadata is only useful for detecting optional columns when
pushing down filters.
val dataSchemaToWrite = StructType.removeMetadata(
StructType.metadataKeyForOptionalField,
dataSchema).asInstanceOf[StructType]
http://git-wip-us.apache.org/repos/asf/spark/blob/4f8ceed5/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
----------------------------------------------------------------------
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
index 7213a38..e0a113a 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
@@ -185,10 +185,13 @@ private[sql] object ParquetFilters {
*/
private def getFieldMap(dataType: DataType): Array[(String, DataType)] =
dataType match {
case StructType(fields) =>
+ // Here we don't flatten the fields in the nested schema but just look
up through
+ // root fields. Currently, accessing to nested fields does not push down
filters
+ // and it does not support to create filters for them.
fields.filter { f =>
!f.metadata.contains(StructType.metadataKeyForOptionalField) ||
!f.metadata.getBoolean(StructType.metadataKeyForOptionalField)
- }.map(f => f.name -> f.dataType) ++ fields.flatMap { f =>
getFieldMap(f.dataType) }
+ }.map(f => f.name -> f.dataType)
case _ => Array.empty[(String, DataType)]
}
http://git-wip-us.apache.org/repos/asf/spark/blob/4f8ceed5/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
----------------------------------------------------------------------
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
index 2a5666e..18a3128 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
@@ -543,4 +543,18 @@ class ParquetFilterSuite extends QueryTest with
ParquetTest with SharedSQLContex
}
}
}
+
+ test("Do not push down filters incorrectly when inner name and outer name
are the same") {
+ withParquetDataFrame((1 to 4).map(i => Tuple1(Tuple1(i)))) { implicit df =>
+ // Here the schema becomes as below:
+ //
+ // root
+ // |-- _1: struct (nullable = true)
+ // | |-- _1: integer (nullable = true)
+ //
+ // The inner column name, `_1` and outer column name `_1` are the same.
+ // Obviously this should not push down filters because the outer column
is struct.
+ assert(df.filter("_1 IS NOT NULL").count() === 4)
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]