This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 8911578020f [SPARK-44132][SQL] Materialize `Stream` of join column names to avoid codegen failure 8911578020f is described below commit 8911578020f8a2428b12dd72cb0ed4b7d747d835 Author: Steven Aerts <steven.ae...@gmail.com> AuthorDate: Tue Aug 8 08:09:05 2023 +0900 [SPARK-44132][SQL] Materialize `Stream` of join column names to avoid codegen failure ### What changes were proposed in this pull request? Materialize passed join columns as an `IndexedSeq` before passing it to the lower layers. ### Why are the changes needed? When nesting multiple full outer joins using column names which are a `Stream`, the code generator will generate faulty code resulting in a NPE or bad `UnsafeRow` access at runtime. See the 2 added test cases. ### Why are the changes needed? Otherwise the code will crash, see the 2 added test cases. Which show an NPE and a bad `UnsafeRow` access. ### Does this PR introduce _any_ user-facing change? No, only bug fix. ### How was this patch tested? A reproduction scenario was created and added to the code base. Closes #41712 from steven-aerts/SPARK-44132-fix. Authored-by: Steven Aerts <steven.ae...@gmail.com> Signed-off-by: Hyukjin Kwon <gurwls...@apache.org> --- .../main/scala/org/apache/spark/sql/Dataset.scala | 2 +- .../sql/execution/joins/JoinCodegenSupport.scala | 2 +- .../test/scala/org/apache/spark/sql/JoinSuite.scala | 20 ++++++++++++++++++++ 3 files changed, 22 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index 61c83829d20..eda017937d9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -1092,7 +1092,7 @@ class Dataset[T] private[sql]( Join( joined.left, joined.right, - UsingJoin(JoinType(joinType), usingColumns), + UsingJoin(JoinType(joinType), usingColumns.toIndexedSeq), None, JoinHint.NONE) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/JoinCodegenSupport.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/JoinCodegenSupport.scala index a7d1edefcd6..6496f9a0006 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/JoinCodegenSupport.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/JoinCodegenSupport.scala @@ -79,7 +79,7 @@ trait JoinCodegenSupport extends CodegenSupport with BaseJoinExec { setDefaultValue: Boolean): Seq[ExprCode] = { ctx.currentVars = null ctx.INPUT_ROW = row - plan.output.zipWithIndex.map { case (a, i) => + plan.output.toIndexedSeq.zipWithIndex.map { case (a, i) => val ev = BoundReference(i, a.dataType, a.nullable).genCode(ctx) if (setDefaultValue) { // the variables are needed even there is no matched rows diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala index 7f358723eeb..14f1fb27906 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala @@ -1709,4 +1709,24 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan checkAnswer(sql(query), expected) } } + + test("SPARK-44132: FULL OUTER JOIN by streamed column name fails with NPE") { + val dsA = Seq((1, "a")).toDF("id", "c1") + val dsB = Seq((2, "b")).toDF("id", "c2") + val dsC = Seq((3, "c")).toDF("id", "c3") + val joined = dsA.join(dsB, Stream("id"), "full_outer").join(dsC, Stream("id"), "full_outer") + + val expected = Seq(Row(1, "a", null, null), Row(2, null, "b", null), Row(3, null, null, "c")) + + checkAnswer(joined, expected) + } + + test("SPARK-44132: FULL OUTER JOIN by streamed column name fails with invalid access") { + val ds = Seq((1, "a")).toDF("id", "c1") + val joined = ds.join(ds, Stream("id"), "full_outer").join(ds, Stream("id"), "full_outer") + + val expected = Seq(Row(1, "a", "a", "a")) + + checkAnswer(joined, expected) + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org