[ https://issues.apache.org/jira/browse/SPARK-21034?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Abhijit Bhole updated SPARK-21034: ---------------------------------- Description: If the column is involved in aggregation / join then pushing down filter should not change the results. Here is a sample code - {code:java} from pyspark.sql import functions as F df = spark.createDataFrame([{ "a": 1, "b" : 2, "c":7}, { "a": 3, "b" : 4, "c" : 8}, { "a": 1, "b" : 5, "c":7}, { "a": 1, "b" : 6, "c":7} ]) df.groupBy(["a"]).agg(F.sum("b")).where("a = 1").explain() df.groupBy(["a"]).agg(F.sum("b"), F.first("c")).where("a = 1").explain() == Physical Plan == *HashAggregate(keys=[a#15L], functions=[sum(b#16L)]) +- Exchange hashpartitioning(a#15L, 4) +- *HashAggregate(keys=[a#15L], functions=[partial_sum(b#16L)]) +- *Project [a#15L, b#16L] +- *Filter (isnotnull(a#15L) && (a#15L = 1)) +- Scan ExistingRDD[a#15L,b#16L,c#17L] >>> >>> df.groupBy(["a"]).agg(F.sum("b"), F.first("c")).where("a = 1").explain() == Physical Plan == *Filter (isnotnull(a#15L) && (a#15L = 1)) +- *HashAggregate(keys=[a#15L], functions=[sum(b#16L), first(c#17L, false)]) +- Exchange hashpartitioning(a#15L, 4) +- *HashAggregate(keys=[a#15L], functions=[partial_sum(b#16L), partial_first(c#17L, false)]) +- Scan ExistingRDD[a#15L,b#16L,c#17L] {code} As you can see, the filter is not pushed down when F.first aggregate function is used. was: Here is a sample code - {code:java} from pyspark.sql import functions as F df = spark.createDataFrame([{ "a": 1, "b" : 2, "c":7}, { "a": 3, "b" : 4, "c" : 8}, { "a": 1, "b" : 5, "c":7}, { "a": 1, "b" : 6, "c":7} ]) df.groupBy(["a"]).agg(F.sum("b")).where("a = 1").explain() df.groupBy(["a"]).agg(F.sum("b"), F.first("c")).where("a = 1").explain() == Physical Plan == *HashAggregate(keys=[a#15L], functions=[sum(b#16L)]) +- Exchange hashpartitioning(a#15L, 4) +- *HashAggregate(keys=[a#15L], functions=[partial_sum(b#16L)]) +- *Project [a#15L, b#16L] +- *Filter (isnotnull(a#15L) && (a#15L = 1)) +- Scan ExistingRDD[a#15L,b#16L,c#17L] >>> >>> df.groupBy(["a"]).agg(F.sum("b"), F.first("c")).where("a = 1").explain() == Physical Plan == *Filter (isnotnull(a#15L) && (a#15L = 1)) +- *HashAggregate(keys=[a#15L], functions=[sum(b#16L), first(c#17L, false)]) +- Exchange hashpartitioning(a#15L, 4) +- *HashAggregate(keys=[a#15L], functions=[partial_sum(b#16L), partial_first(c#17L, false)]) +- Scan ExistingRDD[a#15L,b#16L,c#17L] {code} As you can see, the filter is not pushed down when F.first aggregate function is used. > Allow filter pushdown filters through non deterministic functions for columns > involved in groupby / join > -------------------------------------------------------------------------------------------------------- > > Key: SPARK-21034 > URL: https://issues.apache.org/jira/browse/SPARK-21034 > Project: Spark > Issue Type: Bug > Components: Optimizer, SQL > Affects Versions: 2.1.1, 2.2.0 > Reporter: Abhijit Bhole > > If the column is involved in aggregation / join then pushing down filter > should not change the results. > Here is a sample code - > {code:java} > from pyspark.sql import functions as F > df = spark.createDataFrame([{ "a": 1, "b" : 2, "c":7}, { "a": 3, "b" : 4, "c" > : 8}, > { "a": 1, "b" : 5, "c":7}, { "a": 1, "b" : 6, > "c":7} ]) > df.groupBy(["a"]).agg(F.sum("b")).where("a = 1").explain() > df.groupBy(["a"]).agg(F.sum("b"), F.first("c")).where("a = 1").explain() > == Physical Plan == > *HashAggregate(keys=[a#15L], functions=[sum(b#16L)]) > +- Exchange hashpartitioning(a#15L, 4) > +- *HashAggregate(keys=[a#15L], functions=[partial_sum(b#16L)]) > +- *Project [a#15L, b#16L] > +- *Filter (isnotnull(a#15L) && (a#15L = 1)) > +- Scan ExistingRDD[a#15L,b#16L,c#17L] > >>> > >>> df.groupBy(["a"]).agg(F.sum("b"), F.first("c")).where("a = 1").explain() > == Physical Plan == > *Filter (isnotnull(a#15L) && (a#15L = 1)) > +- *HashAggregate(keys=[a#15L], functions=[sum(b#16L), first(c#17L, false)]) > +- Exchange hashpartitioning(a#15L, 4) > +- *HashAggregate(keys=[a#15L], functions=[partial_sum(b#16L), > partial_first(c#17L, false)]) > +- Scan ExistingRDD[a#15L,b#16L,c#17L] > {code} > As you can see, the filter is not pushed down when F.first aggregate function > is used. -- This message was sent by Atlassian JIRA (v6.4.14#64029) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org