Erik Erlandson created SPARK-32159:
--------------------------------------
Summary: New udaf(Aggregator) has an integration bug with
UnresolvedMapObjects serialization
Key: SPARK-32159
URL: https://issues.apache.org/jira/browse/SPARK-32159
Project: Spark
Issue Type: Bug
Components: SQL
Affects Versions: 3.0.0
Reporter: Erik Erlandson
The new user defined aggregator feature (SPARK-27296) based on calling
'functions.udaf(aggregator)' works fine when the aggregator input type is
atomic, e.g. 'Aggregator[Double, _, _]', however if the input type is an array,
like 'Aggregator[Array[Double], _, _]', it is tripping over the following:
{{
/**
* When constructing [[MapObjects]], the element type must be given, which may
not be available
* before analysis. This class acts like a placeholder for [[MapObjects]], and
will be replaced by
* [[MapObjects]] during analysis after the input data is resolved.
* Note that, ideally we should not serialize and send unresolved expressions
to executors, but
* users may accidentally do this(e.g. mistakenly reference an encoder instance
when implementing
* Aggregator). Here we mark `function` as transient because it may reference
scala Type, which is
* not serializable. Then even users mistakenly reference unresolved expression
and serialize it,
* it's just a performance issue(more network traffic), and will not fail.
*/
case class UnresolvedMapObjects(
@transient function: Expression => Expression,
child: Expression,
customCollectionCls: Option[Class[_]] = None) extends UnaryExpression with
Unevaluable {
override lazy val resolved = false
override def dataType: DataType =
customCollectionCls.map(ObjectType.apply).getOrElse {
throw new UnsupportedOperationException("not resolved")
}
}
}}
The '@transient' is causing the function to be unpacked as 'null' over on the
executors, and it is causing a null-pointer exception here, when it tries to do
'function(loopVar)'
{{
object MapObjects {
def apply(
function: Expression => Expression,
inputData: Expression,
elementType: DataType,
elementNullable: Boolean = true,
customCollectionCls: Option[Class[_]] = None): MapObjects = {
val loopVar = LambdaVariable("MapObject", elementType, elementNullable)
MapObjects(loopVar, function(loopVar), inputData, customCollectionCls)
}
}
}}
I believe it may be possible to just use 'loopVar' instead of
'function(loopVar)', whenever 'function' is null, but need second opinion from
catalyst developers on what a robust fix should be
--
This message was sent by Atlassian Jira
(v8.3.4#803005)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]