[ https://issues.apache.org/jira/browse/SPARK-17099?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15424356#comment-15424356 ]
Herman van Hovell commented on SPARK-17099: ------------------------------------------- TL;DR: This is caused by a bug in the optimizer's {{InferFiltersFromConstraints}} rule. If you look at the (optimized) query plan: {noformat} == Analyzed Logical Plan == sum(coalesce(int_col_5, int_col_2)): bigint, (coalesce(int_col_5, int_col_2) * 2): int Project [sum(coalesce(int_col_5, int_col_2))#34L, (coalesce(int_col_5, int_col_2) * 2)#32] +- Filter (sum(cast(coalesce(int_col_5#4, int_col_2#13) as bigint))#37L > cast((coalesce(int_col_5#4, int_col_2#13)#38 * 2) as bigint)) +- Aggregate [greatest(coalesce(int_col_5#14, 109), coalesce(int_col_5#4, -449)), coalesce(int_col_5#4, int_col_2#13)], [sum(cast(coalesce(int_col_5#4, int_col_2#13) as bigint)) AS sum(coalesce(int_col_5, int_col_2))#34L, (coalesce(int_col_5#4, int_col_2#13) * 2) AS (coalesce(int_col_5, int_col_2) * 2)#32, sum(cast(coalesce(int_col_5#4, int_col_2#13) as bigint)) AS sum(cast(coalesce(int_col_5#4, int_col_2#13) as bigint))#37L, coalesce(int_col_5#4, int_col_2#13) AS coalesce(int_col_5#4, int_col_2#13)#38] +- Join RightOuter, (int_col_2#13 = int_col_5#4) :- SubqueryAlias t1 : +- Project [value#2 AS int_col_5#4] : +- SerializeFromObject [input[0, int, true] AS value#2] : +- ExternalRDD [obj#1] +- SubqueryAlias t2 +- Project [_1#10 AS int_col_2#13, _2#11 AS int_col_5#14] +- SerializeFromObject [assertnotnull(input[0, scala.Tuple2, true], top level non-flat input object)._1 AS _1#10, assertnotnull(input[0, scala.Tuple2, true], top level non-flat input object)._2 AS _2#11] +- ExternalRDD [obj#9] == Optimized Logical Plan == Project [sum(coalesce(int_col_5, int_col_2))#34L, (coalesce(int_col_5, int_col_2) * 2)#32] +- Filter (isnotnull(sum(cast(coalesce(int_col_5#4, int_col_2#13) as bigint))#37L) && (sum(cast(coalesce(int_col_5#4, int_col_2#13) as bigint))#37L > cast((coalesce(int_col_5#4, int_col_2#13)#38 * 2) as bigint))) +- Aggregate [greatest(coalesce(int_col_5#14, 109), coalesce(int_col_5#4, -449)), coalesce(int_col_5#4, int_col_2#13)], [sum(cast(coalesce(int_col_5#4, int_col_2#13) as bigint)) AS sum(coalesce(int_col_5, int_col_2))#34L, (coalesce(int_col_5#4, int_col_2#13) * 2) AS (coalesce(int_col_5, int_col_2) * 2)#32, sum(cast(coalesce(int_col_5#4, int_col_2#13) as bigint)) AS sum(cast(coalesce(int_col_5#4, int_col_2#13) as bigint))#37L, coalesce(int_col_5#4, int_col_2#13) AS coalesce(int_col_5#4, int_col_2#13)#38] +- Join Inner, (isnotnull(coalesce(int_col_5#4, int_col_2#13)) && (int_col_2#13 = int_col_5#4)) :- Project [value#2 AS int_col_5#4] : +- Filter (isnotnull(value#2) && isnotnull(coalesce(value#2, value#2))) : +- SerializeFromObject [input[0, int, true] AS value#2] : +- ExternalRDD [obj#1] +- Project [_1#10 AS int_col_2#13, _2#11 AS int_col_5#14] +- Filter (isnotnull(coalesce(_1#10, _1#10)) && isnotnull(_1#10)) +- SerializeFromObject [assertnotnull(input[0, scala.Tuple2, true], top level non-flat input object)._1 AS _1#10, assertnotnull(input[0, scala.Tuple2, true], top level non-flat input object)._2 AS _2#11] +- ExternalRDD [obj#9] {noformat} It seems that it is pushing down a not null predicate through an outer join (which kinda defeats the whole idea of an outer join). Disabling the {{InferFiltersFromConstraints}} rule gives the correct answer. > Incorrect result when HAVING clause is added to group by query > -------------------------------------------------------------- > > Key: SPARK-17099 > URL: https://issues.apache.org/jira/browse/SPARK-17099 > Project: Spark > Issue Type: Bug > Affects Versions: 2.1.0 > Reporter: Josh Rosen > Priority: Critical > Fix For: 2.1.0 > > > Random query generation uncovered the following query which returns incorrect > results when run on Spark SQL. This wasn't the original query uncovered by > the generator, since I performed a bit of minimization to try to make it more > understandable. > With the following tables: > {code} > val t1 = sc.parallelize(Seq(-234, 145, 367, 975, 298)).toDF("int_col_5") > val t2 = sc.parallelize( > Seq( > (-769, -244), > (-800, -409), > (940, 86), > (-507, 304), > (-367, 158)) > ).toDF("int_col_2", "int_col_5") > t1.registerTempTable("t1") > t2.registerTempTable("t2") > {code} > Run > {code} > SELECT > (SUM(COALESCE(t1.int_col_5, t2.int_col_2))), > ((COALESCE(t1.int_col_5, t2.int_col_2)) * 2) > FROM t1 > RIGHT JOIN t2 > ON (t2.int_col_2) = (t1.int_col_5) > GROUP BY GREATEST(COALESCE(t2.int_col_5, 109), COALESCE(t1.int_col_5, -449)), > COALESCE(t1.int_col_5, t2.int_col_2) > HAVING (SUM(COALESCE(t1.int_col_5, t2.int_col_2))) > ((COALESCE(t1.int_col_5, > t2.int_col_2)) * 2) > {code} > In Spark SQL, this returns an empty result set, whereas Postgres returns four > rows. However, if I omit the {{HAVING}} clause I see that the group's rows > are being incorrectly filtered by the {{HAVING}} clause: > {code} > +--------------------------------------+---------------------------------------+--+ > | sum(coalesce(int_col_5, int_col_2)) | (coalesce(int_col_5, int_col_2) * 2) > | > +--------------------------------------+---------------------------------------+--+ > | -507 | -1014 > | > | 940 | 1880 > | > | -769 | -1538 > | > | -367 | -734 > | > | -800 | -1600 > | > +--------------------------------------+---------------------------------------+--+ > {code} > Based on this, the output after adding the {{HAVING}} should contain four > rows, not zero. > I'm not sure how to further shrink this in a straightforward way, so I'm > opening this bug to get help in triaging further. -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org