[ 
https://issues.apache.org/jira/browse/SPARK-45766?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Szul updated SPARK-45766:
-------------------------------
    Description: 
We have a custom encoder for union like objects. 

The our custom serializer uses an expression like:

{{If(IsNull(If(.....)), Literal(null), NamedStruct(....)))}}

Using this encoder in a SQL expression that applies the 

`org.apache.spark.sql.catalyst.optimizer.ObjectSerializerPruning`  rule  
results in the exception below.

It's because the expression it transformed by `PushFoldableIntoBranches' rule 
prior to `ObjectSerializerPruning`, which changes the expression to:

{{If(If(.....), Literal(null), NamedStruct(....)))}}

which no longer matches the expressions for which null type alignment is 
performed.

See the attached scala repl code for the demonstration of this issue.

 

The exception:

 

java.lang.IllegalArgumentException: requirement failed: All input types must be 
the same except nullable, containsNull, valueContainsNull flags. The expression 
is: if (if (assertnotnull(input[0, UnionType, true]).hasValue) 
isnull(assertnotnull(input[0, UnionType, true]).value) else true) null else 
named_struct(given, staticinvoke(class 
org.apache.spark.unsafe.types.UTF8String, StringType, fromString, 
knownnotnull(knownnotnull(assertnotnull(input[0, UnionType, 
true])).value).given, true, false, true)). The input types found are

StructType(StructField(given,StringType,true),StructField(family,StringType,true))

StructType(StructField(given,StringType,true)).

  at scala.Predef$.require(Predef.scala:281)

  at 
org.apache.spark.sql.catalyst.expressions.ComplexTypeMergingExpression.dataTypeCheck(Expression.scala:1304)

  at 
org.apache.spark.sql.catalyst.expressions.ComplexTypeMergingExpression.dataTypeCheck$(Expression.scala:1297)

  at 
org.apache.spark.sql.catalyst.expressions.If.dataTypeCheck(conditionalExpressions.scala:41)

  at 
org.apache.spark.sql.catalyst.expressions.ComplexTypeMergingExpression.org$apache$spark$sql$catalyst$expressions$ComplexTypeMergingExpression$$internalDataType(Expression.scala:1309)

  at 
org.apache.spark.sql.catalyst.expressions.ComplexTypeMergingExpression.org$apache$spark$sql$catalyst$expressions$ComplexTypeMergingExpression$$internalDataType$(Expression.scala:1308)

  at 
org.apache.spark.sql.catalyst.expressions.If.org$apache$spark$sql$catalyst$expressions$ComplexTypeMergingExpression$$internalDataType$lzycompute(conditionalExpressions.scala:41)

  at 
org.apache.spark.sql.catalyst.expressions.If.org$apache$spark$sql$catalyst$expressions$ComplexTypeMergingExpression$$internalDataType(conditionalExpressions.scala:41)

  at 
org.apache.spark.sql.catalyst.expressions.ComplexTypeMergingExpression.dataType(Expression.scala:1313)

  at 
org.apache.spark.sql.catalyst.expressions.ComplexTypeMergingExpression.dataType$(Expression.scala:1313)

  at 
org.apache.spark.sql.catalyst.expressions.If.dataType(conditionalExpressions.scala:41)

  at 
org.apache.spark.sql.catalyst.expressions.Alias.dataType(namedExpressions.scala:166)

  at 
org.apache.spark.sql.catalyst.optimizer.ObjectSerializerPruning$.pruneSerializer(objects.scala:209)

  at 
org.apache.spark.sql.catalyst.optimizer.ObjectSerializerPruning$$anonfun$apply$8.$anonfun$applyOrElse$3(objects.scala:230)

  at scala.collection.immutable.List.map(List.scala:293)

  at 
org.apache.spark.sql.catalyst.optimizer.ObjectSerializerPruning$$anonfun$apply$8.applyOrElse(objects.scala:229)

  at 
org.apache.spark.sql.catalyst.optimizer.ObjectSerializerPruning$$anonfun$apply$8.applyOrElse(objects.scala:217)

  at 
org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:461)

  at 
org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:76)

  at 
org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:461)

  at 
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:32)

  at 
org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)

  at 
org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)

  at 
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)

  at 
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)

  at 
org.apache.spark.sql.catalyst.trees.TreeNode.transformWithPruning(TreeNode.scala:427)

  at 
org.apache.spark.sql.catalyst.optimizer.ObjectSerializerPruning$.apply(objects.scala:217)

  at 
org.apache.spark.sql.catalyst.optimizer.ObjectSerializerPruning$.apply(objects.scala:125)

  at 
org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$2(RuleExecutor.scala:222)

 

  was:
We have a custom encoder for union like objects. 

The our custom serializer uses an expression like:

{{If(IsNull(If(.....)), Literal(null), NamedStruct(....)))}}

Using this encoder in a SQL expression that applies the 

`org.apache.spark.sql.catalyst.optimizer.ObjectSerializerPruning`  rule  
results in the exception below.

It's because the expression it transformed by `PushFoldableIntoBranches' rule 
prior to `ObjectSerializerPruning`, which changes the expression to:

{{If(If(.....), Literal(null), NamedStruct(....)))}}

which no longer matches the expression for which null type alignment is 
performed.

See the attached scala repl code for the demonstration of this issue.

 

The exception:

 

java.lang.IllegalArgumentException: requirement failed: All input types must be 
the same except nullable, containsNull, valueContainsNull flags. The expression 
is: if (if (assertnotnull(input[0, UnionType, true]).hasValue) 
isnull(assertnotnull(input[0, UnionType, true]).value) else true) null else 
named_struct(given, staticinvoke(class 
org.apache.spark.unsafe.types.UTF8String, StringType, fromString, 
knownnotnull(knownnotnull(assertnotnull(input[0, UnionType, 
true])).value).given, true, false, true)). The input types found are

 
StructType(StructField(given,StringType,true),StructField(family,StringType,true))

 StructType(StructField(given,StringType,true)).

  at scala.Predef$.require(Predef.scala:281)

  at 
org.apache.spark.sql.catalyst.expressions.ComplexTypeMergingExpression.dataTypeCheck(Expression.scala:1304)

  at 
org.apache.spark.sql.catalyst.expressions.ComplexTypeMergingExpression.dataTypeCheck$(Expression.scala:1297)

  at 
org.apache.spark.sql.catalyst.expressions.If.dataTypeCheck(conditionalExpressions.scala:41)

  at 
org.apache.spark.sql.catalyst.expressions.ComplexTypeMergingExpression.org$apache$spark$sql$catalyst$expressions$ComplexTypeMergingExpression$$internalDataType(Expression.scala:1309)

  at 
org.apache.spark.sql.catalyst.expressions.ComplexTypeMergingExpression.org$apache$spark$sql$catalyst$expressions$ComplexTypeMergingExpression$$internalDataType$(Expression.scala:1308)

  at 
org.apache.spark.sql.catalyst.expressions.If.org$apache$spark$sql$catalyst$expressions$ComplexTypeMergingExpression$$internalDataType$lzycompute(conditionalExpressions.scala:41)

  at 
org.apache.spark.sql.catalyst.expressions.If.org$apache$spark$sql$catalyst$expressions$ComplexTypeMergingExpression$$internalDataType(conditionalExpressions.scala:41)

  at 
org.apache.spark.sql.catalyst.expressions.ComplexTypeMergingExpression.dataType(Expression.scala:1313)

  at 
org.apache.spark.sql.catalyst.expressions.ComplexTypeMergingExpression.dataType$(Expression.scala:1313)

  at 
org.apache.spark.sql.catalyst.expressions.If.dataType(conditionalExpressions.scala:41)

  at 
org.apache.spark.sql.catalyst.expressions.Alias.dataType(namedExpressions.scala:166)

  at 
org.apache.spark.sql.catalyst.optimizer.ObjectSerializerPruning$.pruneSerializer(objects.scala:209)

  at 
org.apache.spark.sql.catalyst.optimizer.ObjectSerializerPruning$$anonfun$apply$8.$anonfun$applyOrElse$3(objects.scala:230)

  at scala.collection.immutable.List.map(List.scala:293)

  at 
org.apache.spark.sql.catalyst.optimizer.ObjectSerializerPruning$$anonfun$apply$8.applyOrElse(objects.scala:229)

  at 
org.apache.spark.sql.catalyst.optimizer.ObjectSerializerPruning$$anonfun$apply$8.applyOrElse(objects.scala:217)

  at 
org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:461)

  at 
org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:76)

  at 
org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:461)

  at 
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:32)

  at 
org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)

  at 
org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)

  at 
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)

  at 
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)

  at 
org.apache.spark.sql.catalyst.trees.TreeNode.transformWithPruning(TreeNode.scala:427)

  at 
org.apache.spark.sql.catalyst.optimizer.ObjectSerializerPruning$.apply(objects.scala:217)

  at 
org.apache.spark.sql.catalyst.optimizer.ObjectSerializerPruning$.apply(objects.scala:125)

  at 
org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$2(RuleExecutor.scala:222)

 


> ObjectSerializerPruning fails to align null types in custom serializer 'If' 
> expressions.
> ----------------------------------------------------------------------------------------
>
>                 Key: SPARK-45766
>                 URL: https://issues.apache.org/jira/browse/SPARK-45766
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 3.3.3, 3.4.1, 3.5.0
>            Reporter: Piotr Szul
>            Priority: Minor
>         Attachments: prunning_bug.scala
>
>
> We have a custom encoder for union like objects. 
> The our custom serializer uses an expression like:
> {{If(IsNull(If(.....)), Literal(null), NamedStruct(....)))}}
> Using this encoder in a SQL expression that applies the 
> `org.apache.spark.sql.catalyst.optimizer.ObjectSerializerPruning`  rule  
> results in the exception below.
> It's because the expression it transformed by `PushFoldableIntoBranches' rule 
> prior to `ObjectSerializerPruning`, which changes the expression to:
> {{If(If(.....), Literal(null), NamedStruct(....)))}}
> which no longer matches the expressions for which null type alignment is 
> performed.
> See the attached scala repl code for the demonstration of this issue.
>  
> The exception:
>  
> java.lang.IllegalArgumentException: requirement failed: All input types must 
> be the same except nullable, containsNull, valueContainsNull flags. The 
> expression is: if (if (assertnotnull(input[0, UnionType, true]).hasValue) 
> isnull(assertnotnull(input[0, UnionType, true]).value) else true) null else 
> named_struct(given, staticinvoke(class 
> org.apache.spark.unsafe.types.UTF8String, StringType, fromString, 
> knownnotnull(knownnotnull(assertnotnull(input[0, UnionType, 
> true])).value).given, true, false, true)). The input types found are
> StructType(StructField(given,StringType,true),StructField(family,StringType,true))
> StructType(StructField(given,StringType,true)).
>   at scala.Predef$.require(Predef.scala:281)
>   at 
> org.apache.spark.sql.catalyst.expressions.ComplexTypeMergingExpression.dataTypeCheck(Expression.scala:1304)
>   at 
> org.apache.spark.sql.catalyst.expressions.ComplexTypeMergingExpression.dataTypeCheck$(Expression.scala:1297)
>   at 
> org.apache.spark.sql.catalyst.expressions.If.dataTypeCheck(conditionalExpressions.scala:41)
>   at 
> org.apache.spark.sql.catalyst.expressions.ComplexTypeMergingExpression.org$apache$spark$sql$catalyst$expressions$ComplexTypeMergingExpression$$internalDataType(Expression.scala:1309)
>   at 
> org.apache.spark.sql.catalyst.expressions.ComplexTypeMergingExpression.org$apache$spark$sql$catalyst$expressions$ComplexTypeMergingExpression$$internalDataType$(Expression.scala:1308)
>   at 
> org.apache.spark.sql.catalyst.expressions.If.org$apache$spark$sql$catalyst$expressions$ComplexTypeMergingExpression$$internalDataType$lzycompute(conditionalExpressions.scala:41)
>   at 
> org.apache.spark.sql.catalyst.expressions.If.org$apache$spark$sql$catalyst$expressions$ComplexTypeMergingExpression$$internalDataType(conditionalExpressions.scala:41)
>   at 
> org.apache.spark.sql.catalyst.expressions.ComplexTypeMergingExpression.dataType(Expression.scala:1313)
>   at 
> org.apache.spark.sql.catalyst.expressions.ComplexTypeMergingExpression.dataType$(Expression.scala:1313)
>   at 
> org.apache.spark.sql.catalyst.expressions.If.dataType(conditionalExpressions.scala:41)
>   at 
> org.apache.spark.sql.catalyst.expressions.Alias.dataType(namedExpressions.scala:166)
>   at 
> org.apache.spark.sql.catalyst.optimizer.ObjectSerializerPruning$.pruneSerializer(objects.scala:209)
>   at 
> org.apache.spark.sql.catalyst.optimizer.ObjectSerializerPruning$$anonfun$apply$8.$anonfun$applyOrElse$3(objects.scala:230)
>   at scala.collection.immutable.List.map(List.scala:293)
>   at 
> org.apache.spark.sql.catalyst.optimizer.ObjectSerializerPruning$$anonfun$apply$8.applyOrElse(objects.scala:229)
>   at 
> org.apache.spark.sql.catalyst.optimizer.ObjectSerializerPruning$$anonfun$apply$8.applyOrElse(objects.scala:217)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:461)
>   at 
> org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:76)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:461)
>   at 
> org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:32)
>   at 
> org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
>   at 
> org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
>   at 
> org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
>   at 
> org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode.transformWithPruning(TreeNode.scala:427)
>   at 
> org.apache.spark.sql.catalyst.optimizer.ObjectSerializerPruning$.apply(objects.scala:217)
>   at 
> org.apache.spark.sql.catalyst.optimizer.ObjectSerializerPruning$.apply(objects.scala:125)
>   at 
> org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$2(RuleExecutor.scala:222)
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to