This is an automated email from the ASF dual-hosted git repository.

cloud-fan pushed a commit to branch branch-4.2
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-4.2 by this push:
     new 1a1cc2e38ae9 [SPARK-57200][SQL] Fix JVM Codegen Bug - NULL for 3-arg 
form with column nullReplacement
1a1cc2e38ae9 is described below

commit 1a1cc2e38ae9c631aaa07923c131e4d551da9de3
Author: Roy Huang <[email protected]>
AuthorDate: Tue Jun 2 15:02:57 2026 +0800

    [SPARK-57200][SQL] Fix JVM Codegen Bug - NULL for 3-arg form with column 
nullReplacement
    
    ### What changes were proposed in this pull request?
    
    This PR fixes a whole-stage codegen (WSCG) correctness bug in ArrayJoin 
(array_join) where the generated code computes the correct joined string but 
discards it as NULL.
    
    `ArrayJoin.doGenCode` initializes `ev.isNull = true` whenever the 
expression is nullable (which is the case when the optional `nullReplacement` 
argument is a nullable column). The actual join is then produced by 
`genCodeForArrayAndDelimiter`, which has two branches:
    
    When array or delimiter is nullable, the body is wrapped in `nullSafeExec` 
and explicitly emits `ev.isNull = false` before building the result. When both 
array and delimiter are non-nullable, the else branch builds the result but 
never resets `ev.isNull`, leaving it at its initialized true.
    
    A minimal reproduction:
    
          SET spark.sql.codegen.wholeStage = true;
          SET spark.sql.codegen.factoryMode = CODEGEN_ONLY;
          -- Returns NULL for every row (buggy):
          SELECT array_join(
                   array('a', 'b'),
                   ',',
                   CASE WHEN id % 2 = 0 THEN 'NR' ELSE CAST(NULL AS STRING) END
                 ) AS r
          FROM range(4);
          SET spark.sql.codegen.wholeStage = false;
          SET spark.sql.codegen.factoryMode = NO_CODEGEN;
          -- Returns ['a,NR,b', NULL, 'a,NR,b', NULL] (correct):
          SELECT array_join(
                   array('a', 'b'),
                   ',',
                   CASE WHEN id % 2 = 0 THEN 'NR' ELSE CAST(NULL AS STRING) END
                 ) AS r
          FROM range(4);
    
    ### Why are the changes needed?
    
    This is a silent correctness bug: `array_join(arr, delimiter, repl)` 
returns `NULL` for every row instead of the joined string, but only under a 
specific (and realistic) combination:
    
    - The third argument nullReplacement is a nullable, non-foldable column, so 
`ArrayJoin.nullable` is true.
    - An upstream `Filter` containing `IsNotNull(array)` (and/or 
`IsNotNull(delimiter)`) tightens those children to non-nullable. 
`FilterExec.output` marks `IsNotNull`-referenced attributes as non-nullable, 
and `UpdateAttributeNullability` propagates this downstream, so 
`genCodeForArrayAndDelimiter` takes the non-nullable else branch.
    - The query stays in whole-stage codegen over a materialized source (e.g. 
`FileScan parquet`, or an `InMemoryRelation` from `CACHE TABLE`). Inline 
`VALUES / WITH` sources are folded by `ConvertToLocalRelation` to interpreted 
`eval()` and therefore do not hit the bug.
    
    Interpreted `eval()` returns the correct result, so the same query produces 
different answers depending on whether codegen kicks in.
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes. It fixes incorrect results. Previously, `array_join(arr, delimiter, 
nullReplacement)` could return `NULL` for every row under whole-stage codegen 
when nullReplacement was a nullable column and an upstream `IsNotNull` filter 
made the array/delimiter non-nullable. After this change, such queries return 
the correctly joined string, matching interpreted execution. Queries that were 
already correct (2-arg form, literal non-null `nullReplacement`, no upstream 
`IsNotNull` filter, or non [...]
    
    ### How was this patch tested?
    
    Unit testing in `CollectionExpressionsSuite` and `DataFrameFunctionsSuite`
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    Generated-by: Claude Opus 4.8
    
    Closes #56249 from rgyhuang/r-huang_data/rgyhuang/JVM-codegen-fix.
    
    Lead-authored-by: Roy Huang <[email protected]>
    Co-authored-by: Roy Huang <[email protected]>
    Signed-off-by: Wenchen Fan <[email protected]>
    (cherry picked from commit b0633b099bb9a9652f2ce36673dae94a776b30a8)
    Signed-off-by: Wenchen Fan <[email protected]>
---
 .../expressions/collectionOperations.scala         |  7 +++++
 .../expressions/CollectionExpressionsSuite.scala   | 27 +++++++++++++++++
 .../apache/spark/sql/DataFrameFunctionsSuite.scala | 35 ++++++++++++++++++++++
 3 files changed, 69 insertions(+)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
index b3465335ff5c..cf66932d882e 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
@@ -2308,9 +2308,16 @@ case class ArrayJoin(
         }
       }
     } else {
+      // When array and delimiter are both non-nullable, neither nullSafeExec 
wrapper above runs,
+      // so reset ev.isNull here. doGenCode initializes ev.isNull to true 
whenever the expression
+      // is nullable (e.g. a nullable nullReplacement), and without this reset 
the computed result
+      // would be discarded as NULL. When the expression is non-nullable, 
ev.isNull is a literal
+      // false and must not be assigned.
+      val resetIsNull = if (nullable) s"${ev.isNull} = false;" else ""
       s"""
          |${arrayGen.code}
          |${delimiterGen.code}
+         |$resetIsNull
          |$resultCode""".stripMargin
     }
   }
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CollectionExpressionsSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CollectionExpressionsSuite.scala
index e8da77e83433..0cf269b8360e 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CollectionExpressionsSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CollectionExpressionsSuite.scala
@@ -962,6 +962,33 @@ class CollectionExpressionsSuite
       Some(Literal.create(null, StringType))), null)
   }
 
+  test("ArrayJoin codegen with non-nullable array/delimiter and nullable " +
+    "nullReplacement") {
+    // When an upstream IsNotNull filter tightens the array and delimiter to
+    // non-nullable but the nullReplacement is a nullable column, 
ArrayJoin.nullable is true so
+    // doGenCode initializes ev.isNull = true. The non-nullable branch of
+    // genCodeForArrayAndDelimiter must still reset ev.isNull = false, 
otherwise codegen builds the
+    // joined string but discards it as NULL while interpreted eval() returns 
the correct result.
+    val arr = BoundReference(0, ArrayType(StringType, containsNull = true), 
nullable = false)
+    val delimiter = BoundReference(1, StringType, nullable = false)
+    val nullReplacement = BoundReference(2, StringType, nullable = true)
+    val arrayJoin = ArrayJoin(arr, delimiter, Some(nullReplacement))
+    // ArrayJoin is nullable only because nullReplacement is nullable.
+    assert(arrayJoin.nullable)
+
+    // Non-null replacement: NULL array elements are replaced and a joined 
string is produced.
+    checkEvaluation(
+      arrayJoin,
+      "a,NR,b",
+      create_row(Seq[String]("a", null, "b"), ",", "NR"))
+
+    // Null replacement value: the whole result is NULL, matching eval().
+    checkEvaluation(
+      arrayJoin,
+      null,
+      create_row(Seq[String]("a", null, "b"), ",", null))
+  }
+
   test("ArraysZip") {
     val literals = Seq(
       Literal.create(Seq(9001, 9002, 9003, null), ArrayType(IntegerType)),
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala
index 8f3098bedccc..b92b9c08c458 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala
@@ -31,6 +31,7 @@ import org.apache.spark.sql.catalyst.expressions.Cast._
 import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback
 import org.apache.spark.sql.catalyst.plans.logical.OneRowRelation
 import 
org.apache.spark.sql.catalyst.util.DateTimeTestUtils.{withDefaultTimeZone, UTC}
+import org.apache.spark.sql.execution.WholeStageCodegenExec
 import org.apache.spark.sql.functions._
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.test.SharedSparkSession
@@ -1993,6 +1994,40 @@ class DataFrameFunctionsSuite extends SharedSparkSession 
{
     )
   }
 
+  test("array_join with nullable nullReplacement under whole-stage codegen") {
+    // With a nullable nullReplacement column and an upstream IsNotNull
+    // filter that tightens the array (and delimiter) to non-nullable, 
whole-stage codegen used to
+    // build the joined string but leave ev.isNull = true, discarding every 
row as NULL. The result
+    // must match interpreted eval(). The source is materialized via a cached 
temp view (an
+    // InMemoryRelation), so the plan is not folded to interpreted eval by 
ConvertToLocalRelation.
+    withTempView("array_join_codegen") {
+      Seq(
+        (Seq[String]("a", null, "b"), ",", "NR"),
+        (Seq[String]("a", null, "b"), ",", null),
+        (Seq[String]("x", "y"), "-", "NR")
+      ).toDF("arr", "delim_col", 
"repl_col").createOrReplaceTempView("array_join_codegen")
+      spark.catalog.cacheTable("array_join_codegen")
+
+      val query =
+        "SELECT array_join(arr, delim_col, repl_col) FROM array_join_codegen " 
+
+          "WHERE arr IS NOT NULL AND delim_col IS NOT NULL"
+
+      withSQLConf(SQLConf.CODEGEN_FACTORY_MODE.key -> "CODEGEN_ONLY") {
+        val df = sql(query)
+        assert(
+          
df.queryExecution.executedPlan.exists(_.isInstanceOf[WholeStageCodegenExec]),
+          "expected the array_join query to run inside whole-stage codegen")
+        checkAnswer(df, Seq(Row("a,NR,b"), Row(null), Row("x-y")))
+      }
+
+      withSQLConf(
+          SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "false",
+          SQLConf.CODEGEN_FACTORY_MODE.key -> "NO_CODEGEN") {
+        checkAnswer(sql(query), Seq(Row("a,NR,b"), Row(null), Row("x-y")))
+      }
+    }
+  }
+
   test("array_min function") {
     val df = Seq(
       Seq[Option[Int]](Some(1), Some(3), Some(2)),


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to