Tom Sisso created SPARK-50581:
---------------------------------

             Summary: Dataset.observe() is not working with UDAF
                 Key: SPARK-50581
                 URL: https://issues.apache.org/jira/browse/SPARK-50581
             Project: Spark
          Issue Type: Bug
          Components: SQL
    Affects Versions: 3.5.3, 3.0.0
            Reporter: Tom Sisso


Dataset.observe() supports generic aggregation expressions for collected 
metrics, but it doesn't work when trying to use custom UDAF.
It fails on serialization exception (the fix may be very small: annotate 
inputProjection field in ScalaAggregator as @transient). 
 
Failure example:
{code:java}
    import java.lang.{Long => JLong}
    spark.udf.register("someUdaf", udaf(new Aggregator[JLong, JLong, JLong] {   
     def zero: JLong = 0L        def reduce(b: JLong, a: JLong): JLong = a + b  
      def merge(b1: JLong, b2: JLong): JLong = b1 + b2        def finish(r: 
JLong): JLong = r        def bufferEncoder: Encoder[JLong] = Encoders.LONG      
  def outputEncoder: Encoder[JLong] = Encoders.LONG      }))
      val df = spark.range(100)
      // regular udaf usage is working      
df.agg(expr("someUdaf(id)").as("agg")).show()
      // udaf usage in observe is not working (serialization exception)        
df.observe(            name = "my_metrics",            
expr("someUdaf(id)").as("agg")          )          .collect(){code}
Exception 
(used Encoders.javaSerialization[Long] just for the serialization stack):
{code:java}
    Job aborted due to stage failure: task 0.0 in stage 3.0 (TID 3) had a not 
serializable result: 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjectionSerialization
 stack:    - object not serializable (class: 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection,
 value: <function1>)    - field (class: 
org.apache.spark.sql.execution.aggregate.ScalaAggregator, name: 
inputProjection, type: class 
org.apache.spark.sql.catalyst.expressions.UnsafeProjection)    - object (class 
org.apache.spark.sql.execution.aggregate.ScalaAggregator, someUdaf(input[0, 
bigint, false]))    - element of array (index: 0)    - array (class 
[Lorg.apache.spark.sql.catalyst.expressions.aggregate.TypedImperativeAggregate;,
 size 1)    - field (class: 
org.apache.spark.sql.execution.AggregatingAccumulator, name: typedImperatives, 
type: class 
[Lorg.apache.spark.sql.catalyst.expressions.aggregate.TypedImperativeAggregate;)
    - object (class org.apache.spark.sql.execution.AggregatingAccumulator, 
AggregatingAccumulator(id: 136, name: Some(Collected metrics), value: [empty 
row]))    - writeExternal data    - externalizable object (class 
org.apache.spark.scheduler.DirectTaskResult, 
org.apache.spark.scheduler.DirectTaskResult@31bbb9d7)org.apache.spark.SparkException:
 Job aborted due to stage failure: task 0.0 in stage 3.0 (TID 3) had a not 
serializable result: 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjectionSerialization
 stack:    - object not serializable (class: 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection,
 value: <function1>)    - field (class: 
org.apache.spark.sql.execution.aggregate.ScalaAggregator, name: 
inputProjection, type: class 
org.apache.spark.sql.catalyst.expressions.UnsafeProjection)    - object (class 
org.apache.spark.sql.execution.aggregate.ScalaAggregator, someUdaf(input[0, 
bigint, false]))    - element of array (index: 0)    - array (class 
[Lorg.apache.spark.sql.catalyst.expressions.aggregate.TypedImperativeAggregate;,
 size 1)    - field (class: 
org.apache.spark.sql.execution.AggregatingAccumulator, name: typedImperatives, 
type: class 
[Lorg.apache.spark.sql.catalyst.expressions.aggregate.TypedImperativeAggregate;)
    - object (class org.apache.spark.sql.execution.AggregatingAccumulator, 
AggregatingAccumulator(id: 136, name: Some(Collected metrics), value: [empty 
row]))    - writeExternal data    - externalizable object (class 
org.apache.spark.scheduler.DirectTaskResult, 
org.apache.spark.scheduler.DirectTaskResult@31bbb9d7)    at 
org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2790)
    at 
org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2726)
    at 
org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2725)
    at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) 
   at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) 
   at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)    at 
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2725)    
at 
org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1211)
    at 
org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1211)
    at scala.Option.foreach(Option.scala:407)    at 
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1211)
    at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2989)
    at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2928)
    at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2917)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)    at 
org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:976)    at 
org.apache.spark.SparkContext.runJob(SparkContext.scala:2258)    at 
org.apache.spark.SparkContext.runJob(SparkContext.scala:2279)    at 
org.apache.spark.SparkContext.runJob(SparkContext.scala:2298)    at 
org.apache.spark.SparkContext.runJob(SparkContext.scala:2323)    at 
org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1022)    at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)  
  at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)  
  at org.apache.spark.rdd.RDD.withScope(RDD.scala:408)    at 
org.apache.spark.rdd.RDD.collect(RDD.scala:1021)    at 
org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:448)    
at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:4218) {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to