[ 
https://issues.apache.org/jira/browse/SPARK-32159?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wenchen Fan resolved SPARK-32159.
---------------------------------
    Fix Version/s: 3.1.0
                   3.0.1
       Resolution: Fixed

Issue resolved by pull request 28983
[https://github.com/apache/spark/pull/28983]

> New udaf(Aggregator) has an integration bug with UnresolvedMapObjects 
> serialization
> -----------------------------------------------------------------------------------
>
>                 Key: SPARK-32159
>                 URL: https://issues.apache.org/jira/browse/SPARK-32159
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 3.0.0
>            Reporter: Erik Erlandson
>            Assignee: Erik Erlandson
>            Priority: Major
>             Fix For: 3.0.1, 3.1.0
>
>
> The new user defined aggregator feature (SPARK-27296) based on calling 
> 'functions.udaf(aggregator)' works fine when the aggregator input type is 
> atomic, e.g. 'Aggregator[Double, _, _]', however if the input type is an 
> array, like 'Aggregator[Array[Double], _, _]',  it is tripping over the 
> following:
> /**
>  * When constructing [[MapObjects]], the element type must be given, which 
> may not be available
>  * before analysis. This class acts like a placeholder for [[MapObjects]], 
> and will be replaced by
>  * [[MapObjects]] during analysis after the input data is resolved.
>  * Note that, ideally we should not serialize and send unresolved expressions 
> to executors, but
>  * users may accidentally do this(e.g. mistakenly reference an encoder 
> instance when implementing
>  * Aggregator). Here we mark `function` as transient because it may reference 
> scala Type, which is
>  * not serializable. Then even users mistakenly reference unresolved 
> expression and serialize it,
>  * it's just a performance issue(more network traffic), and will not fail.
>  */
>  case class UnresolvedMapObjects(
>  {color:#de350b}@transient function: Expression => Expression{color},
>  child: Expression,
>  customCollectionCls: Option[Class[_]] = None) extends UnaryExpression with 
> Unevaluable {
>  override lazy val resolved = false
> override def dataType: DataType = 
> customCollectionCls.map(ObjectType.apply).getOrElse
> { throw new UnsupportedOperationException("not resolved") }
> }
>  
> *The '@transient' is causing the function to be unpacked as 'null' over on 
> the executors, and it is causing a null-pointer exception here, when it tries 
> to do 'function(loopVar)'*
> object MapObjects {
>  def apply(
>  function: Expression => Expression,
>  inputData: Expression,
>  elementType: DataType,
>  elementNullable: Boolean = true,
>  customCollectionCls: Option[Class[_]] = None): MapObjects =
> { val loopVar = LambdaVariable("MapObject", elementType, elementNullable) 
> MapObjects(loopVar, {color:#de350b}function(loopVar){color}, inputData, 
> customCollectionCls) }
> }
> *I believe it may be possible to just use 'loopVar' instead of 
> 'function(loopVar)', whenever 'function' is null, but need second opinion 
> from catalyst developers on what a robust fix should be*



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to