[
https://issues.apache.org/jira/browse/SPARK-50581?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Hyukjin Kwon reassigned SPARK-50581:
------------------------------------
Assignee: Tom Sisso
> Dataset.observe() is not working with UDAF
> ------------------------------------------
>
> Key: SPARK-50581
> URL: https://issues.apache.org/jira/browse/SPARK-50581
> Project: Spark
> Issue Type: Bug
> Components: SQL
> Affects Versions: 3.0.0, 3.5.3
> Reporter: Tom Sisso
> Assignee: Tom Sisso
> Priority: Minor
> Labels: pull-request-available
>
> Dataset.observe() supports generic aggregation expressions for collected
> metrics, but it doesn't work when trying to use custom UDAF.
> It fails on serialization exception (the fix may be very small: annotate
> inputProjection field in ScalaAggregator as @transient).
>
> Failure example:
> {code:java}
> import java.lang.{Long => JLong}
> spark.udf.register("someUdaf", udaf(new Aggregator[JLong, JLong, JLong] {
> def zero: JLong = 0L
> def reduce(b: JLong, a: JLong): JLong = a + b
> def merge(b1: JLong, b2: JLong): JLong = b1 + b2
> def finish(r: JLong): JLong = r
> def bufferEncoder: Encoder[JLong] = Encoders.LONG
> def outputEncoder: Encoder[JLong] = Encoders.LONG
> }))
> val df = spark.range(100)
> // regular udaf usage is working
> df.agg(expr("someUdaf(id)").as("agg")).show()
> // udaf usage in observe is not working (serialization exception)
> df.observe(
> name = "my_metrics",
> expr("someUdaf(id)").as("agg")
> )
> .collect(){code}
> Exception
> (used Encoders.javaSerialization[Long] just for the serialization stack):
> {code:java}
> Job aborted due to stage failure: task 0.0 in stage 3.0 (TID 3) had a not
> serializable result:
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection
> Serialization stack:
> - object not serializable (class:
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection,
> value: <function1>)
> - field (class: org.apache.spark.sql.execution.aggregate.ScalaAggregator,
> name: inputProjection, type: class
> org.apache.spark.sql.catalyst.expressions.UnsafeProjection)
> - object (class org.apache.spark.sql.execution.aggregate.ScalaAggregator,
> someUdaf(input[0, bigint, false]))
> - element of array (index: 0)
> - array (class
> [Lorg.apache.spark.sql.catalyst.expressions.aggregate.TypedImperativeAggregate;,
> size 1)
> - field (class: org.apache.spark.sql.execution.AggregatingAccumulator,
> name: typedImperatives, type: class
> [Lorg.apache.spark.sql.catalyst.expressions.aggregate.TypedImperativeAggregate;)
> - object (class org.apache.spark.sql.execution.AggregatingAccumulator,
> AggregatingAccumulator(id: 136, name: Some(Collected metrics), value: [empty
> row]))
> - writeExternal data
> - externalizable object (class org.apache.spark.scheduler.DirectTaskResult,
> org.apache.spark.scheduler.DirectTaskResult@31bbb9d7)
> org.apache.spark.SparkException: Job aborted due to stage failure: task 0.0
> in stage 3.0 (TID 3) had a not serializable result:
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection
> Serialization stack:
> - object not serializable (class:
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection,
> value: <function1>)
> - field (class: org.apache.spark.sql.execution.aggregate.ScalaAggregator,
> name: inputProjection, type: class
> org.apache.spark.sql.catalyst.expressions.UnsafeProjection)
> - object (class org.apache.spark.sql.execution.aggregate.ScalaAggregator,
> someUdaf(input[0, bigint, false]))
> - element of array (index: 0)
> - array (class
> [Lorg.apache.spark.sql.catalyst.expressions.aggregate.TypedImperativeAggregate;,
> size 1)
> - field (class: org.apache.spark.sql.execution.AggregatingAccumulator,
> name: typedImperatives, type: class
> [Lorg.apache.spark.sql.catalyst.expressions.aggregate.TypedImperativeAggregate;)
> - object (class org.apache.spark.sql.execution.AggregatingAccumulator,
> AggregatingAccumulator(id: 136, name: Some(Collected metrics), value: [empty
> row]))
> - writeExternal data
> - externalizable object (class
> org.apache.spark.scheduler.DirectTaskResult,
> org.apache.spark.scheduler.DirectTaskResult@31bbb9d7)
> at
> org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2790)
> at
> org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2726)
> at
> org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2725)
> at
> scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
> at
> scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
> at
> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2725)
> at
> org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1211)
> at
> org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1211)
> at scala.Option.foreach(Option.scala:407)
> at
> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1211)
> at
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2989)
> at
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2928)
> at
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2917)
> at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
> at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:976)
> at org.apache.spark.SparkContext.runJob(SparkContext.scala:2258)
> at org.apache.spark.SparkContext.runJob(SparkContext.scala:2279)
> at org.apache.spark.SparkContext.runJob(SparkContext.scala:2298)
> at org.apache.spark.SparkContext.runJob(SparkContext.scala:2323)
> at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1022)
> at
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
> at
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
> at org.apache.spark.rdd.RDD.withScope(RDD.scala:408)
> at org.apache.spark.rdd.RDD.collect(RDD.scala:1021)
> at
> org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:448)
> at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:4218){code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]