Rinaz Belhaj created SPARK-27213:
------------------------------------

             Summary: Unexpected results due when filter is used after distinct
                 Key: SPARK-27213
                 URL: https://issues.apache.org/jira/browse/SPARK-27213
             Project: Spark
          Issue Type: Bug
          Components: PySpark
    Affects Versions: 2.4.0, 2.3.2
            Reporter: Rinaz Belhaj


The following code gives unexpected output due to the filter not getting pushed 
down in catalyst optimizer.

 
{code:java}
df = 
spark.createDataFrame([['a','123','12.3','n'],['a','123','12.3','y'],['a','123','12.4','y']],['x','y','z','y_n'])
df.show(5)
df.filter("y_n='y'").select('x','y','z').distinct().show()
df.select('x','y','z').distinct().filter("y_n='y'").show()
{code}
 

 
{panel:title=Output}


 
|x|y|z|y_n|
|a|123|12.3|n|
|a|123|12.3|y|
|a|123|12.4|y|

|x|y|z|
|a|123|12.3|
|a|123|12.4|

|x|y|z|
|a|123|12.4|

 

 
{panel}
 

Ideally, the second statement should result in an error since the column used 
in the filter is not present in the preceding select statement. But the 
catalyst optimizer is using first() on column y_n and then applying the filter.

Even if the filter was pushed down, the result would have been accurate.

 

{{df = 
spark.createDataFrame([['a','123','12.3','n'],['a','123','12.3','y'],['a','123','12.4','y']],['x','y','z','y_n'])}}

{{df.filter("y_n='y'").select('x','y','z').distinct().explain(True)}}

{{df.select('x','y','z').distinct().filter("y_n='y'").explain(True)}}

 

 
 
{panel:title=Output}
 
== Parsed Logical Plan ==
Deduplicate [x#74, y#75, z#76]
+- AnalysisBarrier
+- Project [x#74, y#75, z#76]
+- Filter (y_n#77 = y)
+- LogicalRDD [x#74, y#75, z#76, y_n#77], false
 
== Analyzed Logical Plan ==
x: string, y: string, z: string
Deduplicate [x#74, y#75, z#76]
+- Project [x#74, y#75, z#76]
+- Filter (y_n#77 = y)
+- LogicalRDD [x#74, y#75, z#76, y_n#77], false
 
== Optimized Logical Plan ==
Aggregate [x#74, y#75, z#76], [x#74, y#75, z#76]
+- Project [x#74, y#75, z#76]
+- Filter (isnotnull(y_n#77) && (y_n#77 = y))
+- LogicalRDD [x#74, y#75, z#76, y_n#77], false
 
== Physical Plan ==
*(2) HashAggregate(keys=[x#74, y#75, z#76], functions=[], output=[x#74, y#75, 
z#76])
+- Exchange hashpartitioning(x#74, y#75, z#76, 10)
+- *(1) HashAggregate(keys=[x#74, y#75, z#76], functions=[], output=[x#74, 
y#75, z#76])
+- *(1) Project [x#74, y#75, z#76]
+- *(1) Filter (isnotnull(y_n#77) && (y_n#77 = y))
+- Scan ExistingRDD[x#74,y#75,z#76,y_n#77]
 
-------------------------------------------------------------------------------------------------------------------
 
 
== Parsed Logical Plan ==
'Filter ('y_n = y)
+- AnalysisBarrier
+- Deduplicate [x#74, y#75, z#76]
+- Project [x#74, y#75, z#76]
+- LogicalRDD [x#74, y#75, z#76, y_n#77], false
 
== Analyzed Logical Plan ==
x: string, y: string, z: string
Project [x#74, y#75, z#76]
+- Filter (y_n#77 = y)
+- Deduplicate [x#74, y#75, z#76]
+- Project [x#74, y#75, z#76, y_n#77]
+- LogicalRDD [x#74, y#75, z#76, y_n#77], false
 
== Optimized Logical Plan ==
Project [x#74, y#75, z#76]
+- Filter (isnotnull(y_n#77) && (y_n#77 = y))
+- Aggregate [x#74, y#75, z#76], [x#74, y#75, z#76, first(y_n#77, false) AS 
y_n#77]
+- LogicalRDD [x#74, y#75, z#76, y_n#77], false
 
== Physical Plan ==
*(3) Project [x#74, y#75, z#76]
+- *(3) Filter (isnotnull(y_n#77) && (y_n#77 = y))
+- SortAggregate(key=[x#74, y#75, z#76], functions=[first(y_n#77, false)], 
output=[x#74, y#75, z#76, y_n#77])
+- *(2) Sort [x#74 ASC NULLS FIRST, y#75 ASC NULLS FIRST, z#76 ASC NULLS 
FIRST], false, 0
+- Exchange hashpartitioning(x#74, y#75, z#76, 10)
+- SortAggregate(key=[x#74, y#75, z#76], functions=[partial_first(y_n#77, 
false)], output=[x#74, y#75, z#76, first#95, valueSet#96])
+- *(1) Sort [x#74 ASC NULLS FIRST, y#75 ASC NULLS FIRST, z#76 ASC NULLS 
FIRST], false, 0
+- Scan ExistingRDD[x#74,y#75,z#76,y_n#77]
 
{panel}
 
 

The second query. ie 
*"df.select('x','y','z').distinct().filter("y_n='y'").explain(True)"* should 
result in error rather than giving wrong output.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to