[
https://issues.apache.org/jira/browse/SPARK-27213?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16827591#comment-16827591
]
Junichi Koizumi commented on SPARK-27213:
-------------------------------------------
When I return the second query , the following error occurs as an Analysis
Exception .
>>> df.filter("y_n='y'").select('x','y','z').distinct().explain(True) == Parsed
>>> Logical Plan ==
Aggregate [x#11,y#12,z#13], [x#11,y#12,z#13]
+- Project [x#11,y#12,z#13]
+- Filter (y_n#14 = y)
+- LogicalRDD [x#11,y#12,z#13,y_n#14], MapPartitionsRDD[23] at
applySchemaToPythonRDD at NativeMethodAccessorImpl.java:-2
== Analyzed Logical Plan ==
x: string, y: string, z: string
Aggregate [x#11,y#12,z#13], [x#11,y#12,z#13]
+- Project [x#11,y#12,z#13]
+- Filter (y_n#14 = y)
+- LogicalRDD [x#11,y#12,z#13,y_n#14], MapPartitionsRDD[23] at
applySchemaToPythonRDD at NativeMethodAccessorImpl.java:-2
== Optimized Logical Plan ==
Aggregate [x#11,y#12,z#13], [x#11,y#12,z#13]
+- Project [x#11,y#12,z#13]
+- Filter (y_n#14 = y)
+- LogicalRDD [x#11,y#12,z#13,y_n#14], MapPartitionsRDD[23] at
applySchemaToPythonRDD at NativeMethodAccessorImpl.java:-2
== Physical Plan ==
TungstenAggregate(key=[x#11,y#12,z#13], functions=[], output=[x#11,y#12,z#13])
+- TungstenExchange hashpartitioning(x#11,y#12,z#13,200), None
+- TungstenAggregate(key=[x#11,y#12,z#13], functions=[],
output=[x#11,y#12,z#13])
+- Project [x#11,y#12,z#13]
+- Filter (y_n#14 = y)
+- Scan ExistingRDD[x#11,y#12,z#13,y_n#14]
>>> df.select('x','y','z').distinct().filter("y_n='y'").explain(True)
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/usr/hdp/2.5.0.0-1245/spark/python/pyspark/sql/dataframe.py", line 900,
in filter
jdf = self._jdf.filter(condition)
File
"/usr/hdp/2.5.0.0-1245/spark/python/lib/py4j-0.9-src.zip/py4j/java_gateway.py",
line 813, in __ call__
File "/usr/hdp/2.5.0.0-1245/spark/python/pyspark/sql/utils.py", line 51, in
deco
raise AnalysisException(s.split(': ', 1)[1], stackTrace)
pyspark.sql.utils.AnalysisException: u"cannot resolve 'y_n' given input
columns: [x, y, z];"
> Unexpected results when filter is used after distinct
> -----------------------------------------------------
>
> Key: SPARK-27213
> URL: https://issues.apache.org/jira/browse/SPARK-27213
> Project: Spark
> Issue Type: Bug
> Components: PySpark
> Affects Versions: 2.3.2, 2.4.0
> Reporter: Rinaz Belhaj
> Priority: Major
> Labels: correctness, distinct, filter
>
> The following code gives unexpected output due to the filter not getting
> pushed down in catalyst optimizer.
> {code:java}
> df =
> spark.createDataFrame([['a','123','12.3','n'],['a','123','12.3','y'],['a','123','12.4','y']],['x','y','z','y_n'])
> df.show(5)
> df.filter("y_n='y'").select('x','y','z').distinct().show()
> df.select('x','y','z').distinct().filter("y_n='y'").show()
> {code}
> {panel:title=Output}
> |x|y|z|y_n|
> |a|123|12.3|n|
> |a|123|12.3|y|
> |a|123|12.4|y|
>
> |x|y|z|
> |a|123|12.3|
> |a|123|12.4|
>
> |x|y|z|
> |a|123|12.4|
> {panel}
> Ideally, the second statement should result in an error since the column used
> in the filter is not present in the preceding select statement. But the
> catalyst optimizer is using first() on column y_n and then applying the
> filter.
> Even if the filter was pushed down, the result would have been accurate.
> {code:java}
> df =
> spark.createDataFrame([['a','123','12.3','n'],['a','123','12.3','y'],['a','123','12.4','y']],['x','y','z','y_n'])
> df.filter("y_n='y'").select('x','y','z').distinct().explain(True)
> df.select('x','y','z').distinct().filter("y_n='y'").explain(True)
> {code}
> {panel:title=Output}
>
> == Parsed Logical Plan ==
> Deduplicate [x#74, y#75, z#76|#74, y#75, z#76]
> +- AnalysisBarrier
> +- Project [x#74, y#75, z#76|#74, y#75, z#76]
> +- Filter (y_n#77 = y)
> +- LogicalRDD [x#74, y#75, z#76, y_n#77|#74, y#75, z#76, y_n#77], false
>
> == Analyzed Logical Plan ==
> x: string, y: string, z: string
> Deduplicate [x#74, y#75, z#76|#74, y#75, z#76]
> +- Project [x#74, y#75, z#76|#74, y#75, z#76]
> +- Filter (y_n#77 = y)
> +- LogicalRDD [x#74, y#75, z#76, y_n#77|#74, y#75, z#76, y_n#77], false
>
> == Optimized Logical Plan ==
> Aggregate [x#74, y#75, z#76|#74, y#75, z#76], [x#74, y#75, z#76|#74, y#75,
> z#76]
> +- Project [x#74, y#75, z#76|#74, y#75, z#76]
> +- Filter (isnotnull(y_n#77) && (y_n#77 = y))
> +- LogicalRDD [x#74, y#75, z#76, y_n#77|#74, y#75, z#76, y_n#77], false
>
> == Physical Plan ==
> *(2) HashAggregate(keys=[x#74, y#75, z#76|#74, y#75, z#76], functions=[],
> output=[x#74, y#75, z#76|#74, y#75, z#76])
> +- Exchange hashpartitioning(x#74, y#75, z#76, 10)
> +- *(1) HashAggregate(keys=[x#74, y#75, z#76|#74, y#75, z#76], functions=[],
> output=[x#74, y#75, z#76|#74, y#75, z#76])
> +- *(1) Project [x#74, y#75, z#76|#74, y#75, z#76]
> +- *(1) Filter (isnotnull(y_n#77) && (y_n#77 = y))
> +- Scan ExistingRDD[x#74,y#75,z#76,y_n#77|#74,y#75,z#76,y_n#77]
>
>
> -------------------------------------------------------------------------------------------------------------------
>
>
> == Parsed Logical Plan ==
> 'Filter ('y_n = y)
> +- AnalysisBarrier
> +- Deduplicate [x#74, y#75, z#76|#74, y#75, z#76]
> +- Project [x#74, y#75, z#76|#74, y#75, z#76]
> +- LogicalRDD [x#74, y#75, z#76, y_n#77|#74, y#75, z#76, y_n#77], false
>
> == Analyzed Logical Plan ==
> x: string, y: string, z: string
> Project [x#74, y#75, z#76|#74, y#75, z#76]
> +- Filter (y_n#77 = y)
> +- Deduplicate [x#74, y#75, z#76|#74, y#75, z#76]
> +- Project [x#74, y#75, z#76, y_n#77|#74, y#75, z#76, y_n#77]
> +- LogicalRDD [x#74, y#75, z#76, y_n#77|#74, y#75, z#76, y_n#77], false
>
> == Optimized Logical Plan ==
> Project [x#74, y#75, z#76|#74, y#75, z#76]
> +- Filter (isnotnull(y_n#77) && (y_n#77 = y))
> +- Aggregate [x#74, y#75, z#76|#74, y#75, z#76], [x#74, y#75, z#76,
> first(y_n#77, false) AS y_n#77|#74, y#75, z#76, first(y_n#77, false) AS
> y_n#77]
> +- LogicalRDD [x#74, y#75, z#76, y_n#77|#74, y#75, z#76, y_n#77], false
>
> == Physical Plan ==
> *(3) Project [x#74, y#75, z#76|#74, y#75, z#76]
> +- *(3) Filter (isnotnull(y_n#77) && (y_n#77 = y))
> +- SortAggregate(key=[x#74, y#75, z#76|#74, y#75, z#76],
> functions=[first(y_n#77, false)|#77, false)], output=[x#74, y#75, z#76,
> y_n#77|#74, y#75, z#76, y_n#77])
> +- *(2) Sort [x#74 ASC NULLS FIRST, y#75 ASC NULLS FIRST, z#76 ASC NULLS
> FIRST|#74 ASC NULLS FIRST, y#75 ASC NULLS FIRST, z#76 ASC NULLS FIRST],
> false, 0
> +- Exchange hashpartitioning(x#74, y#75, z#76, 10)
> +- SortAggregate(key=[x#74, y#75, z#76|#74, y#75, z#76],
> functions=[partial_first(y_n#77, false)|#77, false)], output=[x#74, y#75,
> z#76, first#95, valueSet#96|#74, y#75, z#76, first#95, valueSet#96])
> +- *(1) Sort [x#74 ASC NULLS FIRST, y#75 ASC NULLS FIRST, z#76 ASC NULLS
> FIRST|#74 ASC NULLS FIRST, y#75 ASC NULLS FIRST, z#76 ASC NULLS FIRST],
> false, 0
> +- Scan ExistingRDD[x#74,y#75,z#76,y_n#77|#74,y#75,z#76,y_n#77]
>
> {panel}
> The second query. ie
> *"df.select('x','y','z').distinct().filter("y_n='y'").explain(True)"* should
> result in error rather than giving wrong output.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]