[ 
https://issues.apache.org/jira/browse/SPARK-50581?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hyukjin Kwon reassigned SPARK-50581:
------------------------------------

    Assignee: Tom Sisso

> Dataset.observe() is not working with UDAF
> ------------------------------------------
>
>                 Key: SPARK-50581
>                 URL: https://issues.apache.org/jira/browse/SPARK-50581
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 3.0.0, 3.5.3
>            Reporter: Tom Sisso
>            Assignee: Tom Sisso
>            Priority: Minor
>              Labels: pull-request-available
>
> Dataset.observe() supports generic aggregation expressions for collected 
> metrics, but it doesn't work when trying to use custom UDAF.
> It fails on serialization exception (the fix may be very small: annotate 
> inputProjection field in ScalaAggregator as @transient). 
>  
> Failure example:
> {code:java}
> import java.lang.{Long => JLong}
> spark.udf.register("someUdaf", udaf(new Aggregator[JLong, JLong, JLong] {
>   def zero: JLong = 0L
>   def reduce(b: JLong, a: JLong): JLong = a + b
>   def merge(b1: JLong, b2: JLong): JLong = b1 + b2
>   def finish(r: JLong): JLong = r
>   def bufferEncoder: Encoder[JLong] = Encoders.LONG
>   def outputEncoder: Encoder[JLong] = Encoders.LONG
> }))
> val df = spark.range(100)
> // regular udaf usage is working
> df.agg(expr("someUdaf(id)").as("agg")).show()
> // udaf usage in observe is not working (serialization exception)
> df.observe(
>     name = "my_metrics",
>     expr("someUdaf(id)").as("agg")
>   )
>   .collect(){code}
> Exception 
> (used Encoders.javaSerialization[Long] just for the serialization stack):
> {code:java}
> Job aborted due to stage failure: task 0.0 in stage 3.0 (TID 3) had a not 
> serializable result: 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection
> Serialization stack:
>   - object not serializable (class: 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection,
>  value: <function1>)
>   - field (class: org.apache.spark.sql.execution.aggregate.ScalaAggregator, 
> name: inputProjection, type: class 
> org.apache.spark.sql.catalyst.expressions.UnsafeProjection)
>   - object (class org.apache.spark.sql.execution.aggregate.ScalaAggregator, 
> someUdaf(input[0, bigint, false]))
>   - element of array (index: 0)
>   - array (class 
> [Lorg.apache.spark.sql.catalyst.expressions.aggregate.TypedImperativeAggregate;,
>  size 1)
>   - field (class: org.apache.spark.sql.execution.AggregatingAccumulator, 
> name: typedImperatives, type: class 
> [Lorg.apache.spark.sql.catalyst.expressions.aggregate.TypedImperativeAggregate;)
>   - object (class org.apache.spark.sql.execution.AggregatingAccumulator, 
> AggregatingAccumulator(id: 136, name: Some(Collected metrics), value: [empty 
> row]))
>   - writeExternal data
>   - externalizable object (class org.apache.spark.scheduler.DirectTaskResult, 
> org.apache.spark.scheduler.DirectTaskResult@31bbb9d7)
>   org.apache.spark.SparkException: Job aborted due to stage failure: task 0.0 
> in stage 3.0 (TID 3) had a not serializable result: 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection
>   Serialization stack:
>   - object not serializable (class: 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection,
>  value: <function1>)
>     - field (class: org.apache.spark.sql.execution.aggregate.ScalaAggregator, 
> name: inputProjection, type: class 
> org.apache.spark.sql.catalyst.expressions.UnsafeProjection)
>     - object (class org.apache.spark.sql.execution.aggregate.ScalaAggregator, 
> someUdaf(input[0, bigint, false]))
>     - element of array (index: 0)
>     - array (class 
> [Lorg.apache.spark.sql.catalyst.expressions.aggregate.TypedImperativeAggregate;,
>  size 1)
>     - field (class: org.apache.spark.sql.execution.AggregatingAccumulator, 
> name: typedImperatives, type: class 
> [Lorg.apache.spark.sql.catalyst.expressions.aggregate.TypedImperativeAggregate;)
>     - object (class org.apache.spark.sql.execution.AggregatingAccumulator, 
> AggregatingAccumulator(id: 136, name: Some(Collected metrics), value: [empty 
> row]))
>     - writeExternal data
>     - externalizable object (class 
> org.apache.spark.scheduler.DirectTaskResult, 
> org.apache.spark.scheduler.DirectTaskResult@31bbb9d7)
>     at 
> org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2790)
>     at 
> org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2726)
>     at 
> org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2725)
>     at 
> scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
>     at 
> scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
>     at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
>     at 
> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2725)
>     at 
> org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1211)
>     at 
> org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1211)
>     at scala.Option.foreach(Option.scala:407)
>     at 
> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1211)
>     at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2989)
>     at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2928)
>     at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2917)
>     at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
>     at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:976)
>     at org.apache.spark.SparkContext.runJob(SparkContext.scala:2258)
>     at org.apache.spark.SparkContext.runJob(SparkContext.scala:2279)
>     at org.apache.spark.SparkContext.runJob(SparkContext.scala:2298)
>     at org.apache.spark.SparkContext.runJob(SparkContext.scala:2323)
>     at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1022)
>     at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>     at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
>     at org.apache.spark.rdd.RDD.withScope(RDD.scala:408)
>     at org.apache.spark.rdd.RDD.collect(RDD.scala:1021)
>     at 
> org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:448)
>     at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:4218){code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to