[ 
https://issues.apache.org/jira/browse/SPARK-32159?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Erik Erlandson updated SPARK-32159:
-----------------------------------
    Description: 
The new user defined aggregator feature (SPARK-27296) based on calling 
'functions.udaf(aggregator)' works fine when the aggregator input type is 
atomic, e.g. 'Aggregator[Double, _, _]', however if the input type is an array, 
like 'Aggregator[Array[Double], _, _]',  it is tripping over the following:

/**
 * When constructing [[MapObjects]], the element type must be given, which may 
not be available
 * before analysis. This class acts like a placeholder for [[MapObjects]], and 
will be replaced by
 * [[MapObjects]] during analysis after the input data is resolved.
 * Note that, ideally we should not serialize and send unresolved expressions 
to executors, but
 * users may accidentally do this(e.g. mistakenly reference an encoder instance 
when implementing
 * Aggregator). Here we mark `function` as transient because it may reference 
scala Type, which is
 * not serializable. Then even users mistakenly reference unresolved expression 
and serialize it,
 * it's just a performance issue(more network traffic), and will not fail.
 */
 case class UnresolvedMapObjects(
 {color:#de350b}@transient function: Expression => Expression{color},
 child: Expression,
 customCollectionCls: Option[Class[_]] = None) extends UnaryExpression with 
Unevaluable {
 override lazy val resolved = false

override def dataType: DataType = 
customCollectionCls.map(ObjectType.apply).getOrElse

{ throw new UnsupportedOperationException("not resolved") }

}

 

*The '@transient' is causing the function to be unpacked as 'null' over on the 
executors, and it is causing a null-pointer exception here, when it tries to do 
'function(loopVar)'*

{{object MapObjects {
 def apply(
 function: Expression => Expression,
 inputData: Expression,
 elementType: DataType,
 elementNullable: Boolean = true,
 customCollectionCls: Option[Class[_]] = None): MapObjects =

{ val loopVar = LambdaVariable("MapObject", elementType, elementNullable) 
MapObjects(loopVar, {color:#de350b}function(loopVar){color}, inputData, 
customCollectionCls) }

}
 }}

*I believe it may be possible to just use 'loopVar' instead of 
'function(loopVar)', whenever 'function' is null, but need second opinion from 
catalyst developers on what a robust fix should be*

  was:
The new user defined aggregator feature (SPARK-27296) based on calling 
'functions.udaf(aggregator)' works fine when the aggregator input type is 
atomic, e.g. 'Aggregator[Double, _, _]', however if the input type is an array, 
like 'Aggregator[Array[Double], _, _]',  it is tripping over the following:

{{/**
 * When constructing [[MapObjects]], the element type must be given, which may 
not be available
 * before analysis. This class acts like a placeholder for [[MapObjects]], and 
will be replaced by
 * [[MapObjects]] during analysis after the input data is resolved.
 * Note that, ideally we should not serialize and send unresolved expressions 
to executors, but
 * users may accidentally do this(e.g. mistakenly reference an encoder instance 
when implementing
 * Aggregator). Here we mark `function` as transient because it may reference 
scala Type, which is
 * not serializable. Then even users mistakenly reference unresolved expression 
and serialize it,
 * it's just a performance issue(more network traffic), and will not fail.
 */
case class UnresolvedMapObjects(
    @transient function: Expression => Expression,
    child: Expression,
    customCollectionCls: Option[Class[_]] = None) extends UnaryExpression with 
Unevaluable {
  override lazy val resolved = false

  override def dataType: DataType = 
customCollectionCls.map(ObjectType.apply).getOrElse {
    throw new UnsupportedOperationException("not resolved")
  }
}}}

The '@transient' is causing the function to be unpacked as 'null' over on the 
executors, and it is causing a null-pointer exception here, when it tries to do 
'function(loopVar)'

{{object MapObjects {
  def apply(
      function: Expression => Expression,
      inputData: Expression,
      elementType: DataType,
      elementNullable: Boolean = true,
      customCollectionCls: Option[Class[_]] = None): MapObjects = {
    val loopVar = LambdaVariable("MapObject", elementType, elementNullable)
    MapObjects(loopVar, function(loopVar), inputData, customCollectionCls)
  }
}
}}

I believe it may be possible to just use 'loopVar' instead of 
'function(loopVar)', whenever 'function' is null, but need second opinion from 
catalyst developers on what a robust fix should be


> New udaf(Aggregator) has an integration bug with UnresolvedMapObjects 
> serialization
> -----------------------------------------------------------------------------------
>
>                 Key: SPARK-32159
>                 URL: https://issues.apache.org/jira/browse/SPARK-32159
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 3.0.0
>            Reporter: Erik Erlandson
>            Priority: Major
>
> The new user defined aggregator feature (SPARK-27296) based on calling 
> 'functions.udaf(aggregator)' works fine when the aggregator input type is 
> atomic, e.g. 'Aggregator[Double, _, _]', however if the input type is an 
> array, like 'Aggregator[Array[Double], _, _]',  it is tripping over the 
> following:
> /**
>  * When constructing [[MapObjects]], the element type must be given, which 
> may not be available
>  * before analysis. This class acts like a placeholder for [[MapObjects]], 
> and will be replaced by
>  * [[MapObjects]] during analysis after the input data is resolved.
>  * Note that, ideally we should not serialize and send unresolved expressions 
> to executors, but
>  * users may accidentally do this(e.g. mistakenly reference an encoder 
> instance when implementing
>  * Aggregator). Here we mark `function` as transient because it may reference 
> scala Type, which is
>  * not serializable. Then even users mistakenly reference unresolved 
> expression and serialize it,
>  * it's just a performance issue(more network traffic), and will not fail.
>  */
>  case class UnresolvedMapObjects(
>  {color:#de350b}@transient function: Expression => Expression{color},
>  child: Expression,
>  customCollectionCls: Option[Class[_]] = None) extends UnaryExpression with 
> Unevaluable {
>  override lazy val resolved = false
> override def dataType: DataType = 
> customCollectionCls.map(ObjectType.apply).getOrElse
> { throw new UnsupportedOperationException("not resolved") }
> }
>  
> *The '@transient' is causing the function to be unpacked as 'null' over on 
> the executors, and it is causing a null-pointer exception here, when it tries 
> to do 'function(loopVar)'*
> {{object MapObjects {
>  def apply(
>  function: Expression => Expression,
>  inputData: Expression,
>  elementType: DataType,
>  elementNullable: Boolean = true,
>  customCollectionCls: Option[Class[_]] = None): MapObjects =
> { val loopVar = LambdaVariable("MapObject", elementType, elementNullable) 
> MapObjects(loopVar, {color:#de350b}function(loopVar){color}, inputData, 
> customCollectionCls) }
> }
>  }}
> *I believe it may be possible to just use 'loopVar' instead of 
> 'function(loopVar)', whenever 'function' is null, but need second opinion 
> from catalyst developers on what a robust fix should be*



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to