[
https://issues.apache.org/jira/browse/SPARK-50581?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Tom Sisso updated SPARK-50581:
------------------------------
Description:
Dataset.observe() supports generic aggregation expressions for collected
metrics, but it doesn't work when trying to use custom UDAF.
It fails on serialization exception (the fix may be very small: annotate
inputProjection field in ScalaAggregator as @transient).
Failure example:
{code:java}
import java.lang.{Long => JLong}
spark.udf.register("someUdaf", udaf(new Aggregator[JLong, JLong, JLong] {
def zero: JLong = 0L
def reduce(b: JLong, a: JLong): JLong = a + b
def merge(b1: JLong, b2: JLong): JLong = b1 + b2
def finish(r: JLong): JLong = r
def bufferEncoder: Encoder[JLong] = Encoders.LONG
def outputEncoder: Encoder[JLong] = Encoders.LONG
}))
val df = spark.range(100)
// regular udaf usage is working
df.agg(expr("someUdaf(id)").as("agg")).show()
// udaf usage in observe is not working (serialization exception)
df.observe(
name = "my_metrics",
expr("someUdaf(id)").as("agg")
)
.collect(){code}
Exception
(used Encoders.javaSerialization[Long] just for the serialization stack):
{code:java}
Job aborted due to stage failure: task 0.0 in stage 3.0 (TID 3) had a not
serializable result:
org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection
Serialization stack:
- object not serializable (class:
org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection,
value: <function1>)
- field (class: org.apache.spark.sql.execution.aggregate.ScalaAggregator,
name: inputProjection, type: class
org.apache.spark.sql.catalyst.expressions.UnsafeProjection)
- object (class org.apache.spark.sql.execution.aggregate.ScalaAggregator,
someUdaf(input[0, bigint, false]))
- element of array (index: 0)
- array (class
[Lorg.apache.spark.sql.catalyst.expressions.aggregate.TypedImperativeAggregate;,
size 1)
- field (class: org.apache.spark.sql.execution.AggregatingAccumulator, name:
typedImperatives, type: class
[Lorg.apache.spark.sql.catalyst.expressions.aggregate.TypedImperativeAggregate;)
- object (class org.apache.spark.sql.execution.AggregatingAccumulator,
AggregatingAccumulator(id: 136, name: Some(Collected metrics), value: [empty
row]))
- writeExternal data
- externalizable object (class org.apache.spark.scheduler.DirectTaskResult,
org.apache.spark.scheduler.DirectTaskResult@31bbb9d7)
org.apache.spark.SparkException: Job aborted due to stage failure: task 0.0
in stage 3.0 (TID 3) had a not serializable result:
org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection
Serialization stack:
- object not serializable (class:
org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection,
value: <function1>)
- field (class: org.apache.spark.sql.execution.aggregate.ScalaAggregator,
name: inputProjection, type: class
org.apache.spark.sql.catalyst.expressions.UnsafeProjection)
- object (class org.apache.spark.sql.execution.aggregate.ScalaAggregator,
someUdaf(input[0, bigint, false]))
- element of array (index: 0)
- array (class
[Lorg.apache.spark.sql.catalyst.expressions.aggregate.TypedImperativeAggregate;,
size 1)
- field (class: org.apache.spark.sql.execution.AggregatingAccumulator,
name: typedImperatives, type: class
[Lorg.apache.spark.sql.catalyst.expressions.aggregate.TypedImperativeAggregate;)
- object (class org.apache.spark.sql.execution.AggregatingAccumulator,
AggregatingAccumulator(id: 136, name: Some(Collected metrics), value: [empty
row]))
- writeExternal data
- externalizable object (class org.apache.spark.scheduler.DirectTaskResult,
org.apache.spark.scheduler.DirectTaskResult@31bbb9d7)
at
org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2790)
at
org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2726)
at
org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2725)
at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
at
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2725)
at
org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1211)
at
org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1211)
at scala.Option.foreach(Option.scala:407)
at
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1211)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2989)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2928)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2917)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:976)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2258)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2279)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2298)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2323)
at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1022)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:408)
at org.apache.spark.rdd.RDD.collect(RDD.scala:1021)
at
org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:448)
at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:4218){code}
was:
Dataset.observe() supports generic aggregation expressions for collected
metrics, but it doesn't work when trying to use custom UDAF.
It fails on serialization exception (the fix may be very small: annotate
inputProjection field in ScalaAggregator as @transient).
Failure example:
{code:java}
import java.lang.{Long => JLong}
spark.udf.register("someUdaf", udaf(new Aggregator[JLong, JLong, JLong] {
def zero: JLong = 0L def reduce(b: JLong, a: JLong): JLong = a + b
def merge(b1: JLong, b2: JLong): JLong = b1 + b2 def finish(r:
JLong): JLong = r def bufferEncoder: Encoder[JLong] = Encoders.LONG
def outputEncoder: Encoder[JLong] = Encoders.LONG }))
val df = spark.range(100)
// regular udaf usage is working
df.agg(expr("someUdaf(id)").as("agg")).show()
// udaf usage in observe is not working (serialization exception)
df.observe( name = "my_metrics",
expr("someUdaf(id)").as("agg") ) .collect(){code}
Exception
(used Encoders.javaSerialization[Long] just for the serialization stack):
{code:java}
Job aborted due to stage failure: task 0.0 in stage 3.0 (TID 3) had a not
serializable result:
org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjectionSerialization
stack: - object not serializable (class:
org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection,
value: <function1>) - field (class:
org.apache.spark.sql.execution.aggregate.ScalaAggregator, name:
inputProjection, type: class
org.apache.spark.sql.catalyst.expressions.UnsafeProjection) - object (class
org.apache.spark.sql.execution.aggregate.ScalaAggregator, someUdaf(input[0,
bigint, false])) - element of array (index: 0) - array (class
[Lorg.apache.spark.sql.catalyst.expressions.aggregate.TypedImperativeAggregate;,
size 1) - field (class:
org.apache.spark.sql.execution.AggregatingAccumulator, name: typedImperatives,
type: class
[Lorg.apache.spark.sql.catalyst.expressions.aggregate.TypedImperativeAggregate;)
- object (class org.apache.spark.sql.execution.AggregatingAccumulator,
AggregatingAccumulator(id: 136, name: Some(Collected metrics), value: [empty
row])) - writeExternal data - externalizable object (class
org.apache.spark.scheduler.DirectTaskResult,
org.apache.spark.scheduler.DirectTaskResult@31bbb9d7)org.apache.spark.SparkException:
Job aborted due to stage failure: task 0.0 in stage 3.0 (TID 3) had a not
serializable result:
org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjectionSerialization
stack: - object not serializable (class:
org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection,
value: <function1>) - field (class:
org.apache.spark.sql.execution.aggregate.ScalaAggregator, name:
inputProjection, type: class
org.apache.spark.sql.catalyst.expressions.UnsafeProjection) - object (class
org.apache.spark.sql.execution.aggregate.ScalaAggregator, someUdaf(input[0,
bigint, false])) - element of array (index: 0) - array (class
[Lorg.apache.spark.sql.catalyst.expressions.aggregate.TypedImperativeAggregate;,
size 1) - field (class:
org.apache.spark.sql.execution.AggregatingAccumulator, name: typedImperatives,
type: class
[Lorg.apache.spark.sql.catalyst.expressions.aggregate.TypedImperativeAggregate;)
- object (class org.apache.spark.sql.execution.AggregatingAccumulator,
AggregatingAccumulator(id: 136, name: Some(Collected metrics), value: [empty
row])) - writeExternal data - externalizable object (class
org.apache.spark.scheduler.DirectTaskResult,
org.apache.spark.scheduler.DirectTaskResult@31bbb9d7) at
org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2790)
at
org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2726)
at
org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2725)
at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) at
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2725)
at
org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1211)
at
org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1211)
at scala.Option.foreach(Option.scala:407) at
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1211)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2989)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2928)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2917)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49) at
org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:976) at
org.apache.spark.SparkContext.runJob(SparkContext.scala:2258) at
org.apache.spark.SparkContext.runJob(SparkContext.scala:2279) at
org.apache.spark.SparkContext.runJob(SparkContext.scala:2298) at
org.apache.spark.SparkContext.runJob(SparkContext.scala:2323) at
org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1022) at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:408) at
org.apache.spark.rdd.RDD.collect(RDD.scala:1021) at
org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:448)
at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:4218) {code}
> Dataset.observe() is not working with UDAF
> ------------------------------------------
>
> Key: SPARK-50581
> URL: https://issues.apache.org/jira/browse/SPARK-50581
> Project: Spark
> Issue Type: Bug
> Components: SQL
> Affects Versions: 3.0.0, 3.5.3
> Reporter: Tom Sisso
> Priority: Minor
>
> Dataset.observe() supports generic aggregation expressions for collected
> metrics, but it doesn't work when trying to use custom UDAF.
> It fails on serialization exception (the fix may be very small: annotate
> inputProjection field in ScalaAggregator as @transient).
>
> Failure example:
> {code:java}
> import java.lang.{Long => JLong}
> spark.udf.register("someUdaf", udaf(new Aggregator[JLong, JLong, JLong] {
> def zero: JLong = 0L
> def reduce(b: JLong, a: JLong): JLong = a + b
> def merge(b1: JLong, b2: JLong): JLong = b1 + b2
> def finish(r: JLong): JLong = r
> def bufferEncoder: Encoder[JLong] = Encoders.LONG
> def outputEncoder: Encoder[JLong] = Encoders.LONG
> }))
> val df = spark.range(100)
> // regular udaf usage is working
> df.agg(expr("someUdaf(id)").as("agg")).show()
> // udaf usage in observe is not working (serialization exception)
> df.observe(
> name = "my_metrics",
> expr("someUdaf(id)").as("agg")
> )
> .collect(){code}
> Exception
> (used Encoders.javaSerialization[Long] just for the serialization stack):
> {code:java}
> Job aborted due to stage failure: task 0.0 in stage 3.0 (TID 3) had a not
> serializable result:
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection
> Serialization stack:
> - object not serializable (class:
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection,
> value: <function1>)
> - field (class: org.apache.spark.sql.execution.aggregate.ScalaAggregator,
> name: inputProjection, type: class
> org.apache.spark.sql.catalyst.expressions.UnsafeProjection)
> - object (class org.apache.spark.sql.execution.aggregate.ScalaAggregator,
> someUdaf(input[0, bigint, false]))
> - element of array (index: 0)
> - array (class
> [Lorg.apache.spark.sql.catalyst.expressions.aggregate.TypedImperativeAggregate;,
> size 1)
> - field (class: org.apache.spark.sql.execution.AggregatingAccumulator,
> name: typedImperatives, type: class
> [Lorg.apache.spark.sql.catalyst.expressions.aggregate.TypedImperativeAggregate;)
> - object (class org.apache.spark.sql.execution.AggregatingAccumulator,
> AggregatingAccumulator(id: 136, name: Some(Collected metrics), value: [empty
> row]))
> - writeExternal data
> - externalizable object (class org.apache.spark.scheduler.DirectTaskResult,
> org.apache.spark.scheduler.DirectTaskResult@31bbb9d7)
> org.apache.spark.SparkException: Job aborted due to stage failure: task 0.0
> in stage 3.0 (TID 3) had a not serializable result:
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection
> Serialization stack:
> - object not serializable (class:
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection,
> value: <function1>)
> - field (class: org.apache.spark.sql.execution.aggregate.ScalaAggregator,
> name: inputProjection, type: class
> org.apache.spark.sql.catalyst.expressions.UnsafeProjection)
> - object (class org.apache.spark.sql.execution.aggregate.ScalaAggregator,
> someUdaf(input[0, bigint, false]))
> - element of array (index: 0)
> - array (class
> [Lorg.apache.spark.sql.catalyst.expressions.aggregate.TypedImperativeAggregate;,
> size 1)
> - field (class: org.apache.spark.sql.execution.AggregatingAccumulator,
> name: typedImperatives, type: class
> [Lorg.apache.spark.sql.catalyst.expressions.aggregate.TypedImperativeAggregate;)
> - object (class org.apache.spark.sql.execution.AggregatingAccumulator,
> AggregatingAccumulator(id: 136, name: Some(Collected metrics), value: [empty
> row]))
> - writeExternal data
> - externalizable object (class
> org.apache.spark.scheduler.DirectTaskResult,
> org.apache.spark.scheduler.DirectTaskResult@31bbb9d7)
> at
> org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2790)
> at
> org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2726)
> at
> org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2725)
> at
> scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
> at
> scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
> at
> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2725)
> at
> org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1211)
> at
> org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1211)
> at scala.Option.foreach(Option.scala:407)
> at
> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1211)
> at
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2989)
> at
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2928)
> at
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2917)
> at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
> at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:976)
> at org.apache.spark.SparkContext.runJob(SparkContext.scala:2258)
> at org.apache.spark.SparkContext.runJob(SparkContext.scala:2279)
> at org.apache.spark.SparkContext.runJob(SparkContext.scala:2298)
> at org.apache.spark.SparkContext.runJob(SparkContext.scala:2323)
> at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1022)
> at
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
> at
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
> at org.apache.spark.rdd.RDD.withScope(RDD.scala:408)
> at org.apache.spark.rdd.RDD.collect(RDD.scala:1021)
> at
> org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:448)
> at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:4218){code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]