This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch branch-3.2 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.2 by this push: new 497d17f38a1 [SPARK-39496][SQL] Handle null struct in `Inline.eval` 497d17f38a1 is described below commit 497d17f38a13fa1ac883c1f628b53b85c8a35085 Author: Bruce Robbins <bersprock...@gmail.com> AuthorDate: Sat Jun 18 09:25:11 2022 +0900 [SPARK-39496][SQL] Handle null struct in `Inline.eval` Change `Inline.eval` to return a row of null values rather than a null row in the case of a null input struct. Consider the following query: ``` set spark.sql.codegen.wholeStage=false; select inline(array(named_struct('a', 1, 'b', 2), null)); ``` This query fails with a `NullPointerException`: ``` 22/06/16 15:10:06 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0) java.lang.NullPointerException at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source) at org.apache.spark.sql.execution.GenerateExec.$anonfun$doExecute$11(GenerateExec.scala:122) ``` (In Spark 3.1.3, you don't need to set `spark.sql.codegen.wholeStage` to false to reproduce the error, since Spark 3.1.3 has no codegen path for `Inline`). This query fails regardless of the setting of `spark.sql.codegen.wholeStage`: ``` val dfWide = (Seq((1)) .toDF("col0") .selectExpr(Seq.tabulate(99)(x => s"$x as col${x + 1}"): _*)) val df = (dfWide .selectExpr("*", "array(named_struct('a', 1, 'b', 2), null) as struct_array")) df.selectExpr("*", "inline(struct_array)").collect ``` It fails with ``` 22/06/16 15:18:55 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)/ 1] java.lang.NullPointerException at org.apache.spark.sql.catalyst.expressions.JoinedRow.isNullAt(JoinedRow.scala:80) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.writeFields_0_8$(Unknown Source) ``` When `Inline.eval` returns a null row in the collection, GenerateExec gets a NullPointerException either when joining the null row with required child output, or projecting the null row. This PR avoids producing the null row and produces a row of null values instead: ``` spark-sql> set spark.sql.codegen.wholeStage=false; spark.sql.codegen.wholeStage false Time taken: 3.095 seconds, Fetched 1 row(s) spark-sql> select inline(array(named_struct('a', 1, 'b', 2), null)); 1 2 NULL NULL Time taken: 1.214 seconds, Fetched 2 row(s) spark-sql> ``` No. New unit test. Closes #36903 from bersprockets/inline_eval_null_struct_issue. Authored-by: Bruce Robbins <bersprock...@gmail.com> Signed-off-by: Hyukjin Kwon <gurwls...@apache.org> (cherry picked from commit c4d5390dd032d17a40ad50e38f0ed7bd9bbd4698) Signed-off-by: Hyukjin Kwon <gurwls...@apache.org> --- .../apache/spark/sql/catalyst/expressions/generators.scala | 8 ++++++-- .../scala/org/apache/spark/sql/GeneratorFunctionSuite.scala | 13 ++++++++++++- 2 files changed, 18 insertions(+), 3 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/generators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/generators.scala index 1079f0a333d..d305b4d3700 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/generators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/generators.scala @@ -452,13 +452,17 @@ case class Inline(child: Expression) extends UnaryExpression with CollectionGene private lazy val numFields = elementSchema.fields.length + private lazy val generatorNullRow = new GenericInternalRow(elementSchema.length) + override def eval(input: InternalRow): TraversableOnce[InternalRow] = { val inputArray = child.eval(input).asInstanceOf[ArrayData] if (inputArray == null) { Nil } else { - for (i <- 0 until inputArray.numElements()) - yield inputArray.getStruct(i, numFields) + for (i <- 0 until inputArray.numElements()) yield { + val s = inputArray.getStruct(i, numFields) + if (s == null) generatorNullRow else s + } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/GeneratorFunctionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/GeneratorFunctionSuite.scala index d41e87f516f..4ce9fc3a17b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/GeneratorFunctionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/GeneratorFunctionSuite.scala @@ -23,6 +23,7 @@ import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCo import org.apache.spark.sql.catalyst.expressions.codegen.Block._ import org.apache.spark.sql.catalyst.trees.LeafLike import org.apache.spark.sql.functions._ +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSparkSession import org.apache.spark.sql.types.{IntegerType, StructType} @@ -365,7 +366,7 @@ class GeneratorFunctionSuite extends QueryTest with SharedSparkSession { Row(1, 6) :: Row(3, 6) :: Nil) } - test("SPARK-39061: inline should handle null struct") { + def testNullStruct(): Unit = { val df = sql( """select * from values |( @@ -389,6 +390,16 @@ class GeneratorFunctionSuite extends QueryTest with SharedSparkSession { sql("select a, inline(b) from t1"), Row(1, 0, 1) :: Row(1, null, null) :: Row(1, 2, 3) :: Row(1, null, null) :: Nil) } + + test("SPARK-39061: inline should handle null struct") { + testNullStruct + } + + test("SPARK-39496: inline eval path should handle null struct") { + withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "false") { + testNullStruct + } + } } case class EmptyGenerator() extends Generator with LeafLike[Expression] { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org