[ 
https://issues.apache.org/jira/browse/SPARK-50581?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tom Sisso updated SPARK-50581:
------------------------------
    Description: 
Dataset.observe() supports generic aggregation expressions for collected 
metrics, but it doesn't work when trying to use custom UDAF.
It fails on serialization exception (the fix may be very small: annotate 
inputProjection field in ScalaAggregator as @transient). 
 
Failure example:
{code:java}
import java.lang.{Long => JLong}

spark.udf.register("someUdaf", udaf(new Aggregator[JLong, JLong, JLong] {
  def zero: JLong = 0L
  def reduce(b: JLong, a: JLong): JLong = a + b
  def merge(b1: JLong, b2: JLong): JLong = b1 + b2
  def finish(r: JLong): JLong = r
  def bufferEncoder: Encoder[JLong] = Encoders.LONG
  def outputEncoder: Encoder[JLong] = Encoders.LONG
}))

val df = spark.range(100)

// regular udaf usage is working
df.agg(expr("someUdaf(id)").as("agg")).show()

// udaf usage in observe is not working (serialization exception)
df.observe(
    name = "my_metrics",
    expr("someUdaf(id)").as("agg")
  )
  .collect(){code}
Exception 
(used Encoders.javaSerialization[Long] just for the serialization stack):
{code:java}
Job aborted due to stage failure: task 0.0 in stage 3.0 (TID 3) had a not 
serializable result: 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection
Serialization stack:
  - object not serializable (class: 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection,
 value: <function1>)
  - field (class: org.apache.spark.sql.execution.aggregate.ScalaAggregator, 
name: inputProjection, type: class 
org.apache.spark.sql.catalyst.expressions.UnsafeProjection)
  - object (class org.apache.spark.sql.execution.aggregate.ScalaAggregator, 
someUdaf(input[0, bigint, false]))
  - element of array (index: 0)
  - array (class 
[Lorg.apache.spark.sql.catalyst.expressions.aggregate.TypedImperativeAggregate;,
 size 1)
  - field (class: org.apache.spark.sql.execution.AggregatingAccumulator, name: 
typedImperatives, type: class 
[Lorg.apache.spark.sql.catalyst.expressions.aggregate.TypedImperativeAggregate;)
  - object (class org.apache.spark.sql.execution.AggregatingAccumulator, 
AggregatingAccumulator(id: 136, name: Some(Collected metrics), value: [empty 
row]))
  - writeExternal data
  - externalizable object (class org.apache.spark.scheduler.DirectTaskResult, 
org.apache.spark.scheduler.DirectTaskResult@31bbb9d7)
  org.apache.spark.SparkException: Job aborted due to stage failure: task 0.0 
in stage 3.0 (TID 3) had a not serializable result: 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection
  Serialization stack:
  - object not serializable (class: 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection,
 value: <function1>)
    - field (class: org.apache.spark.sql.execution.aggregate.ScalaAggregator, 
name: inputProjection, type: class 
org.apache.spark.sql.catalyst.expressions.UnsafeProjection)
    - object (class org.apache.spark.sql.execution.aggregate.ScalaAggregator, 
someUdaf(input[0, bigint, false]))
    - element of array (index: 0)
    - array (class 
[Lorg.apache.spark.sql.catalyst.expressions.aggregate.TypedImperativeAggregate;,
 size 1)
    - field (class: org.apache.spark.sql.execution.AggregatingAccumulator, 
name: typedImperatives, type: class 
[Lorg.apache.spark.sql.catalyst.expressions.aggregate.TypedImperativeAggregate;)
    - object (class org.apache.spark.sql.execution.AggregatingAccumulator, 
AggregatingAccumulator(id: 136, name: Some(Collected metrics), value: [empty 
row]))
    - writeExternal data
    - externalizable object (class org.apache.spark.scheduler.DirectTaskResult, 
org.apache.spark.scheduler.DirectTaskResult@31bbb9d7)
    at 
org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2790)
    at 
org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2726)
    at 
org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2725)
    at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
    at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
    at 
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2725)
    at 
org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1211)
    at 
org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1211)
    at scala.Option.foreach(Option.scala:407)
    at 
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1211)
    at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2989)
    at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2928)
    at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2917)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:976)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2258)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2279)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2298)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2323)
    at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1022)
    at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:408)
    at org.apache.spark.rdd.RDD.collect(RDD.scala:1021)
    at 
org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:448)
    at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:4218){code}

  was:
Dataset.observe() supports generic aggregation expressions for collected 
metrics, but it doesn't work when trying to use custom UDAF.
It fails on serialization exception (the fix may be very small: annotate 
inputProjection field in ScalaAggregator as @transient). 
 
Failure example:
{code:java}
    import java.lang.{Long => JLong}
    spark.udf.register("someUdaf", udaf(new Aggregator[JLong, JLong, JLong] {   
     def zero: JLong = 0L        def reduce(b: JLong, a: JLong): JLong = a + b  
      def merge(b1: JLong, b2: JLong): JLong = b1 + b2        def finish(r: 
JLong): JLong = r        def bufferEncoder: Encoder[JLong] = Encoders.LONG      
  def outputEncoder: Encoder[JLong] = Encoders.LONG      }))
      val df = spark.range(100)
      // regular udaf usage is working      
df.agg(expr("someUdaf(id)").as("agg")).show()
      // udaf usage in observe is not working (serialization exception)        
df.observe(            name = "my_metrics",            
expr("someUdaf(id)").as("agg")          )          .collect(){code}
Exception 
(used Encoders.javaSerialization[Long] just for the serialization stack):
{code:java}
    Job aborted due to stage failure: task 0.0 in stage 3.0 (TID 3) had a not 
serializable result: 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjectionSerialization
 stack:    - object not serializable (class: 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection,
 value: <function1>)    - field (class: 
org.apache.spark.sql.execution.aggregate.ScalaAggregator, name: 
inputProjection, type: class 
org.apache.spark.sql.catalyst.expressions.UnsafeProjection)    - object (class 
org.apache.spark.sql.execution.aggregate.ScalaAggregator, someUdaf(input[0, 
bigint, false]))    - element of array (index: 0)    - array (class 
[Lorg.apache.spark.sql.catalyst.expressions.aggregate.TypedImperativeAggregate;,
 size 1)    - field (class: 
org.apache.spark.sql.execution.AggregatingAccumulator, name: typedImperatives, 
type: class 
[Lorg.apache.spark.sql.catalyst.expressions.aggregate.TypedImperativeAggregate;)
    - object (class org.apache.spark.sql.execution.AggregatingAccumulator, 
AggregatingAccumulator(id: 136, name: Some(Collected metrics), value: [empty 
row]))    - writeExternal data    - externalizable object (class 
org.apache.spark.scheduler.DirectTaskResult, 
org.apache.spark.scheduler.DirectTaskResult@31bbb9d7)org.apache.spark.SparkException:
 Job aborted due to stage failure: task 0.0 in stage 3.0 (TID 3) had a not 
serializable result: 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjectionSerialization
 stack:    - object not serializable (class: 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection,
 value: <function1>)    - field (class: 
org.apache.spark.sql.execution.aggregate.ScalaAggregator, name: 
inputProjection, type: class 
org.apache.spark.sql.catalyst.expressions.UnsafeProjection)    - object (class 
org.apache.spark.sql.execution.aggregate.ScalaAggregator, someUdaf(input[0, 
bigint, false]))    - element of array (index: 0)    - array (class 
[Lorg.apache.spark.sql.catalyst.expressions.aggregate.TypedImperativeAggregate;,
 size 1)    - field (class: 
org.apache.spark.sql.execution.AggregatingAccumulator, name: typedImperatives, 
type: class 
[Lorg.apache.spark.sql.catalyst.expressions.aggregate.TypedImperativeAggregate;)
    - object (class org.apache.spark.sql.execution.AggregatingAccumulator, 
AggregatingAccumulator(id: 136, name: Some(Collected metrics), value: [empty 
row]))    - writeExternal data    - externalizable object (class 
org.apache.spark.scheduler.DirectTaskResult, 
org.apache.spark.scheduler.DirectTaskResult@31bbb9d7)    at 
org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2790)
    at 
org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2726)
    at 
org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2725)
    at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) 
   at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) 
   at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)    at 
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2725)    
at 
org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1211)
    at 
org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1211)
    at scala.Option.foreach(Option.scala:407)    at 
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1211)
    at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2989)
    at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2928)
    at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2917)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)    at 
org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:976)    at 
org.apache.spark.SparkContext.runJob(SparkContext.scala:2258)    at 
org.apache.spark.SparkContext.runJob(SparkContext.scala:2279)    at 
org.apache.spark.SparkContext.runJob(SparkContext.scala:2298)    at 
org.apache.spark.SparkContext.runJob(SparkContext.scala:2323)    at 
org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1022)    at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)  
  at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)  
  at org.apache.spark.rdd.RDD.withScope(RDD.scala:408)    at 
org.apache.spark.rdd.RDD.collect(RDD.scala:1021)    at 
org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:448)    
at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:4218) {code}


> Dataset.observe() is not working with UDAF
> ------------------------------------------
>
>                 Key: SPARK-50581
>                 URL: https://issues.apache.org/jira/browse/SPARK-50581
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 3.0.0, 3.5.3
>            Reporter: Tom Sisso
>            Priority: Minor
>
> Dataset.observe() supports generic aggregation expressions for collected 
> metrics, but it doesn't work when trying to use custom UDAF.
> It fails on serialization exception (the fix may be very small: annotate 
> inputProjection field in ScalaAggregator as @transient). 
>  
> Failure example:
> {code:java}
> import java.lang.{Long => JLong}
> spark.udf.register("someUdaf", udaf(new Aggregator[JLong, JLong, JLong] {
>   def zero: JLong = 0L
>   def reduce(b: JLong, a: JLong): JLong = a + b
>   def merge(b1: JLong, b2: JLong): JLong = b1 + b2
>   def finish(r: JLong): JLong = r
>   def bufferEncoder: Encoder[JLong] = Encoders.LONG
>   def outputEncoder: Encoder[JLong] = Encoders.LONG
> }))
> val df = spark.range(100)
> // regular udaf usage is working
> df.agg(expr("someUdaf(id)").as("agg")).show()
> // udaf usage in observe is not working (serialization exception)
> df.observe(
>     name = "my_metrics",
>     expr("someUdaf(id)").as("agg")
>   )
>   .collect(){code}
> Exception 
> (used Encoders.javaSerialization[Long] just for the serialization stack):
> {code:java}
> Job aborted due to stage failure: task 0.0 in stage 3.0 (TID 3) had a not 
> serializable result: 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection
> Serialization stack:
>   - object not serializable (class: 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection,
>  value: <function1>)
>   - field (class: org.apache.spark.sql.execution.aggregate.ScalaAggregator, 
> name: inputProjection, type: class 
> org.apache.spark.sql.catalyst.expressions.UnsafeProjection)
>   - object (class org.apache.spark.sql.execution.aggregate.ScalaAggregator, 
> someUdaf(input[0, bigint, false]))
>   - element of array (index: 0)
>   - array (class 
> [Lorg.apache.spark.sql.catalyst.expressions.aggregate.TypedImperativeAggregate;,
>  size 1)
>   - field (class: org.apache.spark.sql.execution.AggregatingAccumulator, 
> name: typedImperatives, type: class 
> [Lorg.apache.spark.sql.catalyst.expressions.aggregate.TypedImperativeAggregate;)
>   - object (class org.apache.spark.sql.execution.AggregatingAccumulator, 
> AggregatingAccumulator(id: 136, name: Some(Collected metrics), value: [empty 
> row]))
>   - writeExternal data
>   - externalizable object (class org.apache.spark.scheduler.DirectTaskResult, 
> org.apache.spark.scheduler.DirectTaskResult@31bbb9d7)
>   org.apache.spark.SparkException: Job aborted due to stage failure: task 0.0 
> in stage 3.0 (TID 3) had a not serializable result: 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection
>   Serialization stack:
>   - object not serializable (class: 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection,
>  value: <function1>)
>     - field (class: org.apache.spark.sql.execution.aggregate.ScalaAggregator, 
> name: inputProjection, type: class 
> org.apache.spark.sql.catalyst.expressions.UnsafeProjection)
>     - object (class org.apache.spark.sql.execution.aggregate.ScalaAggregator, 
> someUdaf(input[0, bigint, false]))
>     - element of array (index: 0)
>     - array (class 
> [Lorg.apache.spark.sql.catalyst.expressions.aggregate.TypedImperativeAggregate;,
>  size 1)
>     - field (class: org.apache.spark.sql.execution.AggregatingAccumulator, 
> name: typedImperatives, type: class 
> [Lorg.apache.spark.sql.catalyst.expressions.aggregate.TypedImperativeAggregate;)
>     - object (class org.apache.spark.sql.execution.AggregatingAccumulator, 
> AggregatingAccumulator(id: 136, name: Some(Collected metrics), value: [empty 
> row]))
>     - writeExternal data
>     - externalizable object (class 
> org.apache.spark.scheduler.DirectTaskResult, 
> org.apache.spark.scheduler.DirectTaskResult@31bbb9d7)
>     at 
> org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2790)
>     at 
> org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2726)
>     at 
> org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2725)
>     at 
> scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
>     at 
> scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
>     at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
>     at 
> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2725)
>     at 
> org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1211)
>     at 
> org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1211)
>     at scala.Option.foreach(Option.scala:407)
>     at 
> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1211)
>     at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2989)
>     at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2928)
>     at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2917)
>     at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
>     at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:976)
>     at org.apache.spark.SparkContext.runJob(SparkContext.scala:2258)
>     at org.apache.spark.SparkContext.runJob(SparkContext.scala:2279)
>     at org.apache.spark.SparkContext.runJob(SparkContext.scala:2298)
>     at org.apache.spark.SparkContext.runJob(SparkContext.scala:2323)
>     at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1022)
>     at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>     at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
>     at org.apache.spark.rdd.RDD.withScope(RDD.scala:408)
>     at org.apache.spark.rdd.RDD.collect(RDD.scala:1021)
>     at 
> org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:448)
>     at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:4218){code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to