Tom Sisso created SPARK-50581:
---------------------------------
Summary: Dataset.observe() is not working with UDAF
Key: SPARK-50581
URL: https://issues.apache.org/jira/browse/SPARK-50581
Project: Spark
Issue Type: Bug
Components: SQL
Affects Versions: 3.5.3, 3.0.0
Reporter: Tom Sisso
Dataset.observe() supports generic aggregation expressions for collected
metrics, but it doesn't work when trying to use custom UDAF.
It fails on serialization exception (the fix may be very small: annotate
inputProjection field in ScalaAggregator as @transient).
Failure example:
{code:java}
import java.lang.{Long => JLong}
spark.udf.register("someUdaf", udaf(new Aggregator[JLong, JLong, JLong] {
def zero: JLong = 0L def reduce(b: JLong, a: JLong): JLong = a + b
def merge(b1: JLong, b2: JLong): JLong = b1 + b2 def finish(r:
JLong): JLong = r def bufferEncoder: Encoder[JLong] = Encoders.LONG
def outputEncoder: Encoder[JLong] = Encoders.LONG }))
val df = spark.range(100)
// regular udaf usage is working
df.agg(expr("someUdaf(id)").as("agg")).show()
// udaf usage in observe is not working (serialization exception)
df.observe( name = "my_metrics",
expr("someUdaf(id)").as("agg") ) .collect(){code}
Exception
(used Encoders.javaSerialization[Long] just for the serialization stack):
{code:java}
Job aborted due to stage failure: task 0.0 in stage 3.0 (TID 3) had a not
serializable result:
org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjectionSerialization
stack: - object not serializable (class:
org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection,
value: <function1>) - field (class:
org.apache.spark.sql.execution.aggregate.ScalaAggregator, name:
inputProjection, type: class
org.apache.spark.sql.catalyst.expressions.UnsafeProjection) - object (class
org.apache.spark.sql.execution.aggregate.ScalaAggregator, someUdaf(input[0,
bigint, false])) - element of array (index: 0) - array (class
[Lorg.apache.spark.sql.catalyst.expressions.aggregate.TypedImperativeAggregate;,
size 1) - field (class:
org.apache.spark.sql.execution.AggregatingAccumulator, name: typedImperatives,
type: class
[Lorg.apache.spark.sql.catalyst.expressions.aggregate.TypedImperativeAggregate;)
- object (class org.apache.spark.sql.execution.AggregatingAccumulator,
AggregatingAccumulator(id: 136, name: Some(Collected metrics), value: [empty
row])) - writeExternal data - externalizable object (class
org.apache.spark.scheduler.DirectTaskResult,
org.apache.spark.scheduler.DirectTaskResult@31bbb9d7)org.apache.spark.SparkException:
Job aborted due to stage failure: task 0.0 in stage 3.0 (TID 3) had a not
serializable result:
org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjectionSerialization
stack: - object not serializable (class:
org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection,
value: <function1>) - field (class:
org.apache.spark.sql.execution.aggregate.ScalaAggregator, name:
inputProjection, type: class
org.apache.spark.sql.catalyst.expressions.UnsafeProjection) - object (class
org.apache.spark.sql.execution.aggregate.ScalaAggregator, someUdaf(input[0,
bigint, false])) - element of array (index: 0) - array (class
[Lorg.apache.spark.sql.catalyst.expressions.aggregate.TypedImperativeAggregate;,
size 1) - field (class:
org.apache.spark.sql.execution.AggregatingAccumulator, name: typedImperatives,
type: class
[Lorg.apache.spark.sql.catalyst.expressions.aggregate.TypedImperativeAggregate;)
- object (class org.apache.spark.sql.execution.AggregatingAccumulator,
AggregatingAccumulator(id: 136, name: Some(Collected metrics), value: [empty
row])) - writeExternal data - externalizable object (class
org.apache.spark.scheduler.DirectTaskResult,
org.apache.spark.scheduler.DirectTaskResult@31bbb9d7) at
org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2790)
at
org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2726)
at
org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2725)
at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) at
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2725)
at
org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1211)
at
org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1211)
at scala.Option.foreach(Option.scala:407) at
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1211)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2989)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2928)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2917)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49) at
org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:976) at
org.apache.spark.SparkContext.runJob(SparkContext.scala:2258) at
org.apache.spark.SparkContext.runJob(SparkContext.scala:2279) at
org.apache.spark.SparkContext.runJob(SparkContext.scala:2298) at
org.apache.spark.SparkContext.runJob(SparkContext.scala:2323) at
org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1022) at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:408) at
org.apache.spark.rdd.RDD.collect(RDD.scala:1021) at
org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:448)
at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:4218) {code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]