Herman van Hövell created SPARK-51356:
-----------------------------------------
Summary: FilterExec incorrectly reorders IsNotNull predicates for
nested access
Key: SPARK-51356
URL: https://issues.apache.org/jira/browse/SPARK-51356
Project: Spark
Issue Type: Bug
Components: SQL
Affects Versions: 3.5.5, 4.0.0
Reporter: Herman van Hövell
In whole stage code generation FilterExec reorders IsNotNull to make sure we
only materialize fields when we have to. This unfortunately does not seem to
work in cases where we access nested data.
For example:
{noformat}
case class A(b: B = null)
case class B(c: C = null)
case class C(d: Int)
val data = Seq(
A(null),
A(B(null)), // This causes the failure
A(B(C(0))),
A(B(C(1))))
val aValues = spark.createDataset(data).map(i => i) // Map is needed to avoid
the local relation from being optimized away.
val bValues = aValues.where(col("b").isNotNull).select(col("b").as[B])
val isDZero = udf((c: C) => c.d == 0)
// Select all from b where c.d == 0
val result = bValues
.filter(col("c").isNotNull)
.filter(not(isDZero(col("c"))))
// KABOOM!
result.show(){noformat}
This yields the following plan:
{noformat}
== Parsed Logical Plan ==
'Filter '`!`(UDF('c))
+- Filter isnotnull(c#18)
+- Project [b#11.c AS c#18]
+- Project [b#11]
+- Filter isnotnull(b#11)
+- SerializeFromObject [if
(isnull(invoke(knownnotnull(assertnotnull(input[0, $line15.$read$$iw$A,
true])).b()))) null else named_struct(c, if
(isnull(invoke(knownnotnull(invoke(knownnotnull(assertnotnull(input[0,
$line15.$read$$iw$A, true])).b())).c()))) null else named_struct(d,
invoke(knownnotnull(invoke(knownnotnull(invoke(knownnotnull(assertnotnull(input[0,
$line15.$read$$iw$A, true])).b())).c())).d()))) AS b#11]
+- MapElements
$line15.$read$$iw$$Lambda$3174/0x000000c801ba7518@70b62924, class
$line15.$read$$iw$A,
[StructField(b,StructType(StructField(c,StructType(StructField(d,IntegerType,false)),true)),true)],
obj#9: $line15.$read$$iw$A
+- DeserializeToObject newInstance(class
$line15.$read$$iw$A), obj#7: $line15.$read$$iw$A
+- LocalRelation [b#1]== Analyzed Logical Plan ==
c: struct<d:int>
Filter NOT UDF(c#18)
+- Filter isnotnull(c#18)
+- Project [b#11.c AS c#18]
+- Project [b#11]
+- Filter isnotnull(b#11)
+- SerializeFromObject [if
(isnull(invoke(knownnotnull(assertnotnull(input[0, $line15.$read$$iw$A,
true])).b()))) null else named_struct(c, if
(isnull(invoke(knownnotnull(invoke(knownnotnull(assertnotnull(input[0,
$line15.$read$$iw$A, true])).b())).c()))) null else named_struct(d,
invoke(knownnotnull(invoke(knownnotnull(invoke(knownnotnull(assertnotnull(input[0,
$line15.$read$$iw$A, true])).b())).c())).d()))) AS b#11]
+- MapElements
$line15.$read$$iw$$Lambda$3174/0x000000c801ba7518@70b62924, class
$line15.$read$$iw$A,
[StructField(b,StructType(StructField(c,StructType(StructField(d,IntegerType,false)),true)),true)],
obj#9: $line15.$read$$iw$A
+- DeserializeToObject newInstance(class
$line15.$read$$iw$A), obj#7: $line15.$read$$iw$A
+- LocalRelation [b#1]== Optimized Logical Plan ==
Project [b#11.c AS c#18]
+- Filter (isnotnull(b#11) AND (isnotnull(b#11.c) AND NOT UDF(b#11.c)))
+- SerializeFromObject [if
(isnull(invoke(knownnotnull(assertnotnull(input[0, $line15.$read$$iw$A,
true])).b()))) null else named_struct(c, if
(isnull(invoke(knownnotnull(invoke(knownnotnull(assertnotnull(input[0,
$line15.$read$$iw$A, true])).b())).c()))) null else named_struct(d,
invoke(knownnotnull(invoke(knownnotnull(invoke(knownnotnull(assertnotnull(input[0,
$line15.$read$$iw$A, true])).b())).c())).d()))) AS b#11]
+- MapElements
$line15.$read$$iw$$Lambda$3174/0x000000c801ba7518@70b62924, class
$line15.$read$$iw$A,
[StructField(b,StructType(StructField(c,StructType(StructField(d,IntegerType,false)),true)),true)],
obj#9: $line15.$read$$iw$A
+- DeserializeToObject newInstance(class $line15.$read$$iw$A), obj#7:
$line15.$read$$iw$A
+- LocalRelation [b#1]== Physical Plan ==
*(1) Project [b#11.c AS c#18]
+- *(1) Filter (isnotnull(b#11) AND (isnotnull(b#11.c) AND NOT UDF(b#11.c)))
+- *(1) SerializeFromObject [if
(isnull(invoke(knownnotnull(assertnotnull(input[0, $line15.$read$$iw$A,
true])).b()))) null else named_struct(c, if
(isnull(invoke(knownnotnull(invoke(knownnotnull(assertnotnull(input[0,
$line15.$read$$iw$A, true])).b())).c()))) null else named_struct(d,
invoke(knownnotnull(invoke(knownnotnull(invoke(knownnotnull(assertnotnull(input[0,
$line15.$read$$iw$A, true])).b())).c())).d()))) AS b#11]
+- *(1) MapElements
$line15.$read$$iw$$Lambda$3174/0x000000c801ba7518@70b62924, obj#9:
$line15.$read$$iw$A
+- *(1) DeserializeToObject newInstance(class $line15.$read$$iw$A),
obj#7: $line15.$read$$iw$A
+- *(1) LocalTableScan [b#1]{noformat}
The optimizer has collapsed the filters and moved the project. Now we have a
filter {{{}isnotnull(b#11) AND (isnotnull(b#11.c) AND NOT UDF(b#11.c)){}}}. In
WholeStageCodegen the second filter is moved after the UDF invocation. The
problem with that is that we now pass a null into a UDF which it should be
guarded from. In this case we don't even invoke the UDF because the encoder
will fail (which is also a bug). The IsNotNull reordering (and this bug) has
been there since we introduces whole stage code generation.
This particular case triggers in Spark 4, and not in earlier versions because
we improved typed select. Typed select used to use a map to flatten results if
the typed column returned a struct, now we use star expansion (which is
cheaper). This allows the optimizer to be more aggressive and fuse the two
filters. Reverting that change would fix this particular scenario, however it
is easy to trigger this in older versions by using {{{}val bValues =
aValues.where(col("b").isNotNull).select(col("b.*")).as[B]{}}}.
This problem can be mitigated by either disabling whole stage code generation
({{{}spark.conf.set("spark.sql.codegen.wholeStage", false){}}}), or by
introducing a barrier for optimization (e.g. val {{{}bValues =
aValues.where(col("b").isNotNull).map(_.b){}}})
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]