Repository: spark
Updated Branches:
refs/heads/master 169b9d73e -> 89ae26dcd
[SPARK-18753][SQL] Keep pushed-down null literal as a filter in Spark-side
post-filter for FileFormat datasources
## What changes were proposed in this pull request?
Currently, `FileSourceStrategy` does not handle the case when the pushed-down
filter is `Literal(null)` and removes it at the post-filter in Spark-side.
For example, the codes below:
```scala
val df = Seq(Tuple1(Some(true)), Tuple1(None), Tuple1(Some(false))).toDF()
df.filter($"_1" === "true").explain(true)
```
shows it keeps `null` properly.
```
== Parsed Logical Plan ==
'Filter ('_1 = true)
+- LocalRelation [_1#17]
== Analyzed Logical Plan ==
_1: boolean
Filter (cast(_1#17 as double) = cast(true as double))
+- LocalRelation [_1#17]
== Optimized Logical Plan ==
Filter (isnotnull(_1#17) && null)
+- LocalRelation [_1#17]
== Physical Plan ==
*Filter (isnotnull(_1#17) && null) << Here `null` is there
+- LocalTableScan [_1#17]
```
However, when we read it back from Parquet,
```scala
val path = "/tmp/testfile"
df.write.parquet(path)
spark.read.parquet(path).filter($"_1" === "true").explain(true)
```
`null` is removed at the post-filter.
```
== Parsed Logical Plan ==
'Filter ('_1 = true)
+- Relation[_1#11] parquet
== Analyzed Logical Plan ==
_1: boolean
Filter (cast(_1#11 as double) = cast(true as double))
+- Relation[_1#11] parquet
== Optimized Logical Plan ==
Filter (isnotnull(_1#11) && null)
+- Relation[_1#11] parquet
== Physical Plan ==
*Project [_1#11]
+- *Filter isnotnull(_1#11) << Here `null` is missing
+- *FileScan parquet [_1#11] Batched: true, Format: ParquetFormat, Location:
InMemoryFileIndex[file:/tmp/testfile], PartitionFilters: [null], PushedFilters:
[IsNotNull(_1)], ReadSchema: struct<_1:boolean>
```
This PR fixes it to keep it properly. In more details,
```scala
val partitionKeyFilters =
ExpressionSet(normalizedFilters.filter(_.references.subsetOf(partitionSet)))
```
This keeps this `null` in `partitionKeyFilters` as `Literal` always don't have
`children` and `references` is being empty which is always the subset of
`partitionSet`.
And then in
```scala
val afterScanFilters = filterSet -- partitionKeyFilters
```
`null` is always removed from the post filter. So, if the referenced fields are
empty, it should be applied into data columns too.
After this PR, it becomes as below:
```
== Parsed Logical Plan ==
'Filter ('_1 = true)
+- Relation[_1#276] parquet
== Analyzed Logical Plan ==
_1: boolean
Filter (cast(_1#276 as double) = cast(true as double))
+- Relation[_1#276] parquet
== Optimized Logical Plan ==
Filter (isnotnull(_1#276) && null)
+- Relation[_1#276] parquet
== Physical Plan ==
*Project [_1#276]
+- *Filter (isnotnull(_1#276) && null)
+- *FileScan parquet [_1#276] Batched: true, Format: ParquetFormat,
Location:
InMemoryFileIndex[file:/private/var/folders/9j/gf_c342d7d150mwrxvkqnc180000gn/T/spark-a5d59bdb-5b...,
PartitionFilters: [null], PushedFilters: [IsNotNull(_1)], ReadSchema:
struct<_1:boolean>
```
## How was this patch tested?
Unit test in `FileSourceStrategySuite`
Author: hyukjinkwon <[email protected]>
Closes #16184 from HyukjinKwon/SPARK-18753.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/89ae26dc
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/89ae26dc
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/89ae26dc
Branch: refs/heads/master
Commit: 89ae26dcdb73266fbc3a8b6da9f5dff30dc4ec95
Parents: 169b9d7
Author: hyukjinkwon <[email protected]>
Authored: Wed Dec 14 11:29:11 2016 -0800
Committer: Cheng Lian <[email protected]>
Committed: Wed Dec 14 11:29:11 2016 -0800
----------------------------------------------------------------------
.../sql/execution/datasources/FileSourceStrategy.scala | 2 +-
.../execution/datasources/FileSourceStrategySuite.scala | 11 +++++++++++
2 files changed, 12 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/89ae26dc/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
----------------------------------------------------------------------
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
index 55ca4f1..ead3233 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
@@ -86,7 +86,7 @@ object FileSourceStrategy extends Strategy with Logging {
val dataFilters =
normalizedFilters.filter(_.references.intersect(partitionSet).isEmpty)
// Predicates with both partition keys and attributes need to be
evaluated after the scan.
- val afterScanFilters = filterSet -- partitionKeyFilters
+ val afterScanFilters = filterSet --
partitionKeyFilters.filter(_.references.nonEmpty)
logInfo(s"Post-Scan Filters: ${afterScanFilters.mkString(",")}")
val filterAttributes = AttributeSet(afterScanFilters)
http://git-wip-us.apache.org/repos/asf/spark/blob/89ae26dc/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
----------------------------------------------------------------------
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
index d900ce7..f361628 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
@@ -476,6 +476,17 @@ class FileSourceStrategySuite extends QueryTest with
SharedSQLContext with Predi
}
}
+ test("[SPARK-18753] keep pushed-down null literal as a filter in Spark-side
post-filter") {
+ val ds = Seq(Tuple1(Some(true)), Tuple1(None), Tuple1(Some(false))).toDS()
+ withTempPath { p =>
+ val path = p.getAbsolutePath
+ ds.write.parquet(path)
+ val readBack = spark.read.parquet(path).filter($"_1" === "true")
+ val filtered = ds.filter($"_1" === "true").toDF()
+ checkAnswer(readBack, filtered)
+ }
+ }
+
// Helpers for checking the arguments passed to the FileFormat.
protected val checkPartitionSchema =
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]