[ 
https://issues.apache.org/jira/browse/SPARK-45766?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Szul updated SPARK-45766:
-------------------------------
    Attachment: prunning_bug.scala

> ObjectSerializerPruning fails to align null types in custom serializer 'If' 
> expressions.
> ----------------------------------------------------------------------------------------
>
>                 Key: SPARK-45766
>                 URL: https://issues.apache.org/jira/browse/SPARK-45766
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 3.3.3, 3.4.1, 3.5.0
>            Reporter: Piotr Szul
>            Priority: Minor
>         Attachments: prunning_bug.scala
>
>
> We have a custom encoder for union like objects. 
> The our custom serializer uses an expression like:
> {{If(IsNull(If(.....)), Literal(null), NamedStruct(....)))}}
> Using this encoder in a SQL expression that applies the 
> `org.apache.spark.sql.catalyst.optimizer.ObjectSerializerPruning`  rule  
> results in the exception below.
> It's because the expression it transformed by `PushFoldableIntoBranches' rule 
> prior to `ObjectSerializerPruning`, which changes the expression to:
> {{If(If(.....), Literal(null), NamedStruct(....)))}}
> which no longer matches the expression for which null type alignment is 
> performed.
> See the attached scala repl code for the demonstration of this issue.
>  
> The exception:
>  
> java.lang.IllegalArgumentException: requirement failed: All input types must 
> be the same except nullable, containsNull, valueContainsNull flags. The 
> expression is: if (if (assertnotnull(input[0, UnionType, true]).hasValue) 
> isnull(assertnotnull(input[0, UnionType, true]).value) else true) null else 
> named_struct(given, staticinvoke(class 
> org.apache.spark.unsafe.types.UTF8String, StringType, fromString, 
> knownnotnull(knownnotnull(assertnotnull(input[0, UnionType, 
> true])).value).given, true, false, true)). The input types found are
>  
> StructType(StructField(given,StringType,true),StructField(family,StringType,true))
>  StructType(StructField(given,StringType,true)).
>   at scala.Predef$.require(Predef.scala:281)
>   at 
> org.apache.spark.sql.catalyst.expressions.ComplexTypeMergingExpression.dataTypeCheck(Expression.scala:1304)
>   at 
> org.apache.spark.sql.catalyst.expressions.ComplexTypeMergingExpression.dataTypeCheck$(Expression.scala:1297)
>   at 
> org.apache.spark.sql.catalyst.expressions.If.dataTypeCheck(conditionalExpressions.scala:41)
>   at 
> org.apache.spark.sql.catalyst.expressions.ComplexTypeMergingExpression.org$apache$spark$sql$catalyst$expressions$ComplexTypeMergingExpression$$internalDataType(Expression.scala:1309)
>   at 
> org.apache.spark.sql.catalyst.expressions.ComplexTypeMergingExpression.org$apache$spark$sql$catalyst$expressions$ComplexTypeMergingExpression$$internalDataType$(Expression.scala:1308)
>   at 
> org.apache.spark.sql.catalyst.expressions.If.org$apache$spark$sql$catalyst$expressions$ComplexTypeMergingExpression$$internalDataType$lzycompute(conditionalExpressions.scala:41)
>   at 
> org.apache.spark.sql.catalyst.expressions.If.org$apache$spark$sql$catalyst$expressions$ComplexTypeMergingExpression$$internalDataType(conditionalExpressions.scala:41)
>   at 
> org.apache.spark.sql.catalyst.expressions.ComplexTypeMergingExpression.dataType(Expression.scala:1313)
>   at 
> org.apache.spark.sql.catalyst.expressions.ComplexTypeMergingExpression.dataType$(Expression.scala:1313)
>   at 
> org.apache.spark.sql.catalyst.expressions.If.dataType(conditionalExpressions.scala:41)
>   at 
> org.apache.spark.sql.catalyst.expressions.Alias.dataType(namedExpressions.scala:166)
>   at 
> org.apache.spark.sql.catalyst.optimizer.ObjectSerializerPruning$.pruneSerializer(objects.scala:209)
>   at 
> org.apache.spark.sql.catalyst.optimizer.ObjectSerializerPruning$$anonfun$apply$8.$anonfun$applyOrElse$3(objects.scala:230)
>   at scala.collection.immutable.List.map(List.scala:293)
>   at 
> org.apache.spark.sql.catalyst.optimizer.ObjectSerializerPruning$$anonfun$apply$8.applyOrElse(objects.scala:229)
>   at 
> org.apache.spark.sql.catalyst.optimizer.ObjectSerializerPruning$$anonfun$apply$8.applyOrElse(objects.scala:217)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:461)
>   at 
> org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:76)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:461)
>   at 
> org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:32)
>   at 
> org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
>   at 
> org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
>   at 
> org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
>   at 
> org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode.transformWithPruning(TreeNode.scala:427)
>   at 
> org.apache.spark.sql.catalyst.optimizer.ObjectSerializerPruning$.apply(objects.scala:217)
>   at 
> org.apache.spark.sql.catalyst.optimizer.ObjectSerializerPruning$.apply(objects.scala:125)
>   at 
> org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$2(RuleExecutor.scala:222)
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to