Erik Erlandson created SPARK-32159:
--------------------------------------

             Summary: New udaf(Aggregator) has an integration bug with 
UnresolvedMapObjects serialization
                 Key: SPARK-32159
                 URL: https://issues.apache.org/jira/browse/SPARK-32159
             Project: Spark
          Issue Type: Bug
          Components: SQL
    Affects Versions: 3.0.0
            Reporter: Erik Erlandson


The new user defined aggregator feature (SPARK-27296) based on calling 
'functions.udaf(aggregator)' works fine when the aggregator input type is 
atomic, e.g. 'Aggregator[Double, _, _]', however if the input type is an array, 
like 'Aggregator[Array[Double], _, _]',  it is tripping over the following:

{{
/**
 * When constructing [[MapObjects]], the element type must be given, which may 
not be available
 * before analysis. This class acts like a placeholder for [[MapObjects]], and 
will be replaced by
 * [[MapObjects]] during analysis after the input data is resolved.
 * Note that, ideally we should not serialize and send unresolved expressions 
to executors, but
 * users may accidentally do this(e.g. mistakenly reference an encoder instance 
when implementing
 * Aggregator). Here we mark `function` as transient because it may reference 
scala Type, which is
 * not serializable. Then even users mistakenly reference unresolved expression 
and serialize it,
 * it's just a performance issue(more network traffic), and will not fail.
 */
case class UnresolvedMapObjects(
    @transient function: Expression => Expression,
    child: Expression,
    customCollectionCls: Option[Class[_]] = None) extends UnaryExpression with 
Unevaluable {
  override lazy val resolved = false

  override def dataType: DataType = 
customCollectionCls.map(ObjectType.apply).getOrElse {
    throw new UnsupportedOperationException("not resolved")
  }
}

}} 

The '@transient' is causing the function to be unpacked as 'null' over on the 
executors, and it is causing a null-pointer exception here, when it tries to do 
'function(loopVar)'

{{
object MapObjects {
  def apply(
      function: Expression => Expression,
      inputData: Expression,
      elementType: DataType,
      elementNullable: Boolean = true,
      customCollectionCls: Option[Class[_]] = None): MapObjects = {
    val loopVar = LambdaVariable("MapObject", elementType, elementNullable)
    MapObjects(loopVar, function(loopVar), inputData, customCollectionCls)
  }
}
}}

I believe it may be possible to just use 'loopVar' instead of 
'function(loopVar)', whenever 'function' is null, but need second opinion from 
catalyst developers on what a robust fix should be



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to