[
https://issues.apache.org/jira/browse/SPARK-27213?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16827449#comment-16827449
]
Josh Rosen edited comment on SPARK-27213 at 4/27/19 5:13 AM:
-------------------------------------------------------------
Thank you for the very detailed bug report; including the full example, output,
and query plans is very helpful.
Since this sounds like a legitimate query correctness bug, I'm adding the
{{correctness}} label.
/cc [~smilegator] FYI
was (Author: joshrosen):
Since this sounds like a legitimate query correctness bug, I'm adding the
{{correctness}} label.
/cc [~smilegator] FYI
> Unexpected results when filter is used after distinct
> -----------------------------------------------------
>
> Key: SPARK-27213
> URL: https://issues.apache.org/jira/browse/SPARK-27213
> Project: Spark
> Issue Type: Bug
> Components: PySpark
> Affects Versions: 2.3.2, 2.4.0
> Reporter: Rinaz Belhaj
> Priority: Major
> Labels: correctness, distinct, filter
>
> The following code gives unexpected output due to the filter not getting
> pushed down in catalyst optimizer.
> {code:java}
> df =
> spark.createDataFrame([['a','123','12.3','n'],['a','123','12.3','y'],['a','123','12.4','y']],['x','y','z','y_n'])
> df.show(5)
> df.filter("y_n='y'").select('x','y','z').distinct().show()
> df.select('x','y','z').distinct().filter("y_n='y'").show()
> {code}
> {panel:title=Output}
> |x|y|z|y_n|
> |a|123|12.3|n|
> |a|123|12.3|y|
> |a|123|12.4|y|
>
> |x|y|z|
> |a|123|12.3|
> |a|123|12.4|
>
> |x|y|z|
> |a|123|12.4|
> {panel}
> Ideally, the second statement should result in an error since the column used
> in the filter is not present in the preceding select statement. But the
> catalyst optimizer is using first() on column y_n and then applying the
> filter.
> Even if the filter was pushed down, the result would have been accurate.
> {code:java}
> df =
> spark.createDataFrame([['a','123','12.3','n'],['a','123','12.3','y'],['a','123','12.4','y']],['x','y','z','y_n'])
> df.filter("y_n='y'").select('x','y','z').distinct().explain(True)
> df.select('x','y','z').distinct().filter("y_n='y'").explain(True)
> {code}
> {panel:title=Output}
>
> == Parsed Logical Plan ==
> Deduplicate [x#74, y#75, z#76|#74, y#75, z#76]
> +- AnalysisBarrier
> +- Project [x#74, y#75, z#76|#74, y#75, z#76]
> +- Filter (y_n#77 = y)
> +- LogicalRDD [x#74, y#75, z#76, y_n#77|#74, y#75, z#76, y_n#77], false
>
> == Analyzed Logical Plan ==
> x: string, y: string, z: string
> Deduplicate [x#74, y#75, z#76|#74, y#75, z#76]
> +- Project [x#74, y#75, z#76|#74, y#75, z#76]
> +- Filter (y_n#77 = y)
> +- LogicalRDD [x#74, y#75, z#76, y_n#77|#74, y#75, z#76, y_n#77], false
>
> == Optimized Logical Plan ==
> Aggregate [x#74, y#75, z#76|#74, y#75, z#76], [x#74, y#75, z#76|#74, y#75,
> z#76]
> +- Project [x#74, y#75, z#76|#74, y#75, z#76]
> +- Filter (isnotnull(y_n#77) && (y_n#77 = y))
> +- LogicalRDD [x#74, y#75, z#76, y_n#77|#74, y#75, z#76, y_n#77], false
>
> == Physical Plan ==
> *(2) HashAggregate(keys=[x#74, y#75, z#76|#74, y#75, z#76], functions=[],
> output=[x#74, y#75, z#76|#74, y#75, z#76])
> +- Exchange hashpartitioning(x#74, y#75, z#76, 10)
> +- *(1) HashAggregate(keys=[x#74, y#75, z#76|#74, y#75, z#76], functions=[],
> output=[x#74, y#75, z#76|#74, y#75, z#76])
> +- *(1) Project [x#74, y#75, z#76|#74, y#75, z#76]
> +- *(1) Filter (isnotnull(y_n#77) && (y_n#77 = y))
> +- Scan ExistingRDD[x#74,y#75,z#76,y_n#77|#74,y#75,z#76,y_n#77]
>
>
> -------------------------------------------------------------------------------------------------------------------
>
>
> == Parsed Logical Plan ==
> 'Filter ('y_n = y)
> +- AnalysisBarrier
> +- Deduplicate [x#74, y#75, z#76|#74, y#75, z#76]
> +- Project [x#74, y#75, z#76|#74, y#75, z#76]
> +- LogicalRDD [x#74, y#75, z#76, y_n#77|#74, y#75, z#76, y_n#77], false
>
> == Analyzed Logical Plan ==
> x: string, y: string, z: string
> Project [x#74, y#75, z#76|#74, y#75, z#76]
> +- Filter (y_n#77 = y)
> +- Deduplicate [x#74, y#75, z#76|#74, y#75, z#76]
> +- Project [x#74, y#75, z#76, y_n#77|#74, y#75, z#76, y_n#77]
> +- LogicalRDD [x#74, y#75, z#76, y_n#77|#74, y#75, z#76, y_n#77], false
>
> == Optimized Logical Plan ==
> Project [x#74, y#75, z#76|#74, y#75, z#76]
> +- Filter (isnotnull(y_n#77) && (y_n#77 = y))
> +- Aggregate [x#74, y#75, z#76|#74, y#75, z#76], [x#74, y#75, z#76,
> first(y_n#77, false) AS y_n#77|#74, y#75, z#76, first(y_n#77, false) AS
> y_n#77]
> +- LogicalRDD [x#74, y#75, z#76, y_n#77|#74, y#75, z#76, y_n#77], false
>
> == Physical Plan ==
> *(3) Project [x#74, y#75, z#76|#74, y#75, z#76]
> +- *(3) Filter (isnotnull(y_n#77) && (y_n#77 = y))
> +- SortAggregate(key=[x#74, y#75, z#76|#74, y#75, z#76],
> functions=[first(y_n#77, false)|#77, false)], output=[x#74, y#75, z#76,
> y_n#77|#74, y#75, z#76, y_n#77])
> +- *(2) Sort [x#74 ASC NULLS FIRST, y#75 ASC NULLS FIRST, z#76 ASC NULLS
> FIRST|#74 ASC NULLS FIRST, y#75 ASC NULLS FIRST, z#76 ASC NULLS FIRST],
> false, 0
> +- Exchange hashpartitioning(x#74, y#75, z#76, 10)
> +- SortAggregate(key=[x#74, y#75, z#76|#74, y#75, z#76],
> functions=[partial_first(y_n#77, false)|#77, false)], output=[x#74, y#75,
> z#76, first#95, valueSet#96|#74, y#75, z#76, first#95, valueSet#96])
> +- *(1) Sort [x#74 ASC NULLS FIRST, y#75 ASC NULLS FIRST, z#76 ASC NULLS
> FIRST|#74 ASC NULLS FIRST, y#75 ASC NULLS FIRST, z#76 ASC NULLS FIRST],
> false, 0
> +- Scan ExistingRDD[x#74,y#75,z#76,y_n#77|#74,y#75,z#76,y_n#77]
>
> {panel}
> The second query. ie
> *"df.select('x','y','z').distinct().filter("y_n='y'").explain(True)"* should
> result in error rather than giving wrong output.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]