Repository: spark
Updated Branches:
  refs/heads/branch-2.0 03f336d89 -> 2465f0728


[SPARK-16371][SQL] Do not push down filters incorrectly when inner name and 
outer name are the same in Parquet

## What changes were proposed in this pull request?

Currently, if there is a schema as below:

```
root
  |-- _1: struct (nullable = true)
  |    |-- _1: integer (nullable = true)
```

and if we execute the codes below:

```scala
df.filter("_1 IS NOT NULL").count()
```

This pushes down a filter although this filter is being applied to 
`StructType`.(If my understanding is correct, Spark does not pushes down 
filters for those).

The reason is, `ParquetFilters.getFieldMap` produces results below:

```
(_1,StructType(StructField(_1,IntegerType,true)))
(_1,IntegerType)
```

and then it becomes a `Map`

```
(_1,IntegerType)
```

Now, because of ` ....lift(dataTypeOf(name)).map(_(name, value))`, this pushes 
down filters for `_1` which Parquet thinks is `IntegerType`. However, it is 
actually `StructType`.

So, Parquet filter2 produces incorrect results, for example, the codes below:

```
df.filter("_1 IS NOT NULL").count()
```

produces always 0.

This PR prevents this by not finding nested fields.

## How was this patch tested?

Unit test in `ParquetFilterSuite`.

Author: hyukjinkwon <gurwls...@gmail.com>

Closes #14067 from HyukjinKwon/SPARK-16371.

(cherry picked from commit 4f8ceed59367319300e4bfa5b957c387be81ffa3)
Signed-off-by: Reynold Xin <r...@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2465f072
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2465f072
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2465f072

Branch: refs/heads/branch-2.0
Commit: 2465f0728e95109ab851ab09b5badd697928ba2b
Parents: 03f336d
Author: hyukjinkwon <gurwls...@gmail.com>
Authored: Wed Jul 6 12:42:16 2016 -0700
Committer: Reynold Xin <r...@databricks.com>
Committed: Wed Jul 6 12:42:37 2016 -0700

----------------------------------------------------------------------
 .../datasources/parquet/ParquetFileFormat.scala       |  2 +-
 .../datasources/parquet/ParquetFilters.scala          |  5 ++++-
 .../datasources/parquet/ParquetFilterSuite.scala      | 14 ++++++++++++++
 3 files changed, 19 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/2465f072/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
index f38bf81..8cbdaeb 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
@@ -436,7 +436,7 @@ private[sql] class ParquetOutputWriterFactory(
     ParquetOutputFormat.setWriteSupportClass(job, classOf[ParquetWriteSupport])
 
     // We want to clear this temporary metadata from saving into Parquet file.
-    // This metadata is only useful for detecting optional columns when 
pushdowning filters.
+    // This metadata is only useful for detecting optional columns when 
pushing down filters.
     val dataSchemaToWrite = StructType.removeMetadata(
       StructType.metadataKeyForOptionalField,
       dataSchema).asInstanceOf[StructType]

http://git-wip-us.apache.org/repos/asf/spark/blob/2465f072/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
index 95afdc7..70ae829 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
@@ -215,10 +215,13 @@ private[sql] object ParquetFilters {
    */
   private def getFieldMap(dataType: DataType): Array[(String, DataType)] = 
dataType match {
     case StructType(fields) =>
+      // Here we don't flatten the fields in the nested schema but just look 
up through
+      // root fields. Currently, accessing to nested fields does not push down 
filters
+      // and it does not support to create filters for them.
       fields.filter { f =>
         !f.metadata.contains(StructType.metadataKeyForOptionalField) ||
           !f.metadata.getBoolean(StructType.metadataKeyForOptionalField)
-      }.map(f => f.name -> f.dataType) ++ fields.flatMap { f => 
getFieldMap(f.dataType) }
+      }.map(f => f.name -> f.dataType)
     case _ => Array.empty[(String, DataType)]
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/2465f072/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
index 45fd6a5..35d6915 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
@@ -545,4 +545,18 @@ class ParquetFilterSuite extends QueryTest with 
ParquetTest with SharedSQLContex
       }
     }
   }
+
+  test("Do not push down filters incorrectly when inner name and outer name 
are the same") {
+    withParquetDataFrame((1 to 4).map(i => Tuple1(Tuple1(i)))) { implicit df =>
+      // Here the schema becomes as below:
+      //
+      // root
+      //  |-- _1: struct (nullable = true)
+      //  |    |-- _1: integer (nullable = true)
+      //
+      // The inner column name, `_1` and outer column name `_1` are the same.
+      // Obviously this should not push down filters because the outer column 
is struct.
+      assert(df.filter("_1 IS NOT NULL").count() === 4)
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to