Erik Erlandson created SPARK-32159: -------------------------------------- Summary: New udaf(Aggregator) has an integration bug with UnresolvedMapObjects serialization Key: SPARK-32159 URL: https://issues.apache.org/jira/browse/SPARK-32159 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 3.0.0 Reporter: Erik Erlandson
The new user defined aggregator feature (SPARK-27296) based on calling 'functions.udaf(aggregator)' works fine when the aggregator input type is atomic, e.g. 'Aggregator[Double, _, _]', however if the input type is an array, like 'Aggregator[Array[Double], _, _]', it is tripping over the following: {{ /** * When constructing [[MapObjects]], the element type must be given, which may not be available * before analysis. This class acts like a placeholder for [[MapObjects]], and will be replaced by * [[MapObjects]] during analysis after the input data is resolved. * Note that, ideally we should not serialize and send unresolved expressions to executors, but * users may accidentally do this(e.g. mistakenly reference an encoder instance when implementing * Aggregator). Here we mark `function` as transient because it may reference scala Type, which is * not serializable. Then even users mistakenly reference unresolved expression and serialize it, * it's just a performance issue(more network traffic), and will not fail. */ case class UnresolvedMapObjects( @transient function: Expression => Expression, child: Expression, customCollectionCls: Option[Class[_]] = None) extends UnaryExpression with Unevaluable { override lazy val resolved = false override def dataType: DataType = customCollectionCls.map(ObjectType.apply).getOrElse { throw new UnsupportedOperationException("not resolved") } } }} The '@transient' is causing the function to be unpacked as 'null' over on the executors, and it is causing a null-pointer exception here, when it tries to do 'function(loopVar)' {{ object MapObjects { def apply( function: Expression => Expression, inputData: Expression, elementType: DataType, elementNullable: Boolean = true, customCollectionCls: Option[Class[_]] = None): MapObjects = { val loopVar = LambdaVariable("MapObject", elementType, elementNullable) MapObjects(loopVar, function(loopVar), inputData, customCollectionCls) } } }} I believe it may be possible to just use 'loopVar' instead of 'function(loopVar)', whenever 'function' is null, but need second opinion from catalyst developers on what a robust fix should be -- This message was sent by Atlassian Jira (v8.3.4#803005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org