This is an automated email from the ASF dual-hosted git repository. yumwang pushed a commit to branch branch-3.3 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.3 by this push: new 073d0b60d31 [SPARK-44251][SQL] Set nullable correctly on coalesced join key in full outer USING join 073d0b60d31 is described below commit 073d0b60d31bf68ebacdc005f59b928a5902670f Author: Bruce Robbins <bersprock...@gmail.com> AuthorDate: Tue Jul 11 11:18:44 2023 +0800 [SPARK-44251][SQL] Set nullable correctly on coalesced join key in full outer USING join ### What changes were proposed in this pull request? For full outer joins employing USING, set the nullability of the coalesced join columns to true. ### Why are the changes needed? The following query produces incorrect results: ``` create or replace temp view v1 as values (1, 2), (null, 7) as (c1, c2); create or replace temp view v2 as values (2, 3) as (c1, c2); select explode(array(c1)) as x from v1 full outer join v2 using (c1); -1 <== should be null 1 2 ``` The following query fails with a `NullPointerException`: ``` create or replace temp view v1 as values ('1', 2), (null, 7) as (c1, c2); create or replace temp view v2 as values ('2', 3) as (c1, c2); select explode(array(c1)) as x from v1 full outer join v2 using (c1); 23/06/25 17:06:39 ERROR Executor: Exception in task 0.0 in stage 14.0 (TID 11) java.lang.NullPointerException at org.apache.spark.sql.catalyst.expressions.codegen.UnsafeWriter.write(UnsafeWriter.java:110) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.generate_doConsume_0$(Unknown Source) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.smj_consumeFullOuterJoinRow_0$(Unknown Source) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.wholestagecodegen_findNextJoinRows_0$(Unknown Source) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43) ... ``` The above full outer joins implicitly add an aliased coalesce to the parent projection of the join: `coalesce(v1.c1, v2.c1) as c1`. In the case where only one side's key is nullable, the coalesce's nullability is false. As a result, the generator's output has nullable set as false. But this is incorrect: If one side has a row with explicit null key values, the other side's row will also have null key values (because the other side's row will be "made up"), and both the `coalesce` and [...] While `UpdateNullability` actually repairs the nullability of the `coalesce` before execution, it doesn't recreate the generator output, so the nullability remains incorrect in `Generate#output`. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? New unit test. Closes #41809 from bersprockets/using_oddity2. Authored-by: Bruce Robbins <bersprock...@gmail.com> Signed-off-by: Yuming Wang <yumw...@ebay.com> (cherry picked from commit 7a27bc68c849041837e521285e33227c3d1f9853) Signed-off-by: Yuming Wang <yumw...@ebay.com> --- .../apache/spark/sql/catalyst/analysis/Analyzer.scala | 10 ++++++++-- .../test/scala/org/apache/spark/sql/JoinSuite.scala | 18 ++++++++++++++++++ 2 files changed, 26 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index b5242f631bd..d7bba23cf68 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -3465,8 +3465,14 @@ class Analyzer(override val catalogManager: CatalogManager) (rightKeys ++ lUniqueOutput.map(_.withNullability(true)) ++ rUniqueOutput, leftKeys.map(_.withNullability(true))) case FullOuter => - // in full outer join, joinCols should be non-null if there is. - val joinedCols = joinPairs.map { case (l, r) => Alias(Coalesce(Seq(l, r)), l.name)() } + // In full outer join, we should return non-null values for the join columns + // if either side has non-null values for those columns. Therefore, for each + // join column pair, add a coalesce to return the non-null value, if it exists. + val joinedCols = joinPairs.map { case (l, r) => + // Since this is a full outer join, either side could be null, so we explicitly + // set the nullability to true for both sides. + Alias(Coalesce(Seq(l.withNullability(true), r.withNullability(true))), l.name)() + } (joinedCols ++ lUniqueOutput.map(_.withNullability(true)) ++ rUniqueOutput.map(_.withNullability(true)), diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala index 3597ef8488b..fc46bdea6a2 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala @@ -1529,4 +1529,22 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan checkAnswer(sql(query), expected) } } + + test("SPARK-44251: Full outer USING join with null key value") { + withTempView("v1", "v2") { + sql("create or replace temp view v1 as values (1, 2), (null, 7) as (c1, c2)") + sql("create or replace temp view v2 as values (2, 3) as (c1, c2)") + + val query = + """select explode(array(c1)) as x + |from v1 + |full outer join v2 + |using (c1) + |""".stripMargin + + val expected = Seq(Row(null), Row(1), Row(2)) + + checkAnswer(sql(query), expected) + } + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org