[ 
https://issues.apache.org/jira/browse/SPARK-45176?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Huw updated SPARK-45176:
------------------------
    Description: 
Probably related to SPARK-39044. But potentially also this comment in 
Executor.scala.
{quote}// TODO: do not serialize value twice
val directResult = new DirectTaskResult(valueByteBuffer, accumUpdates, 
metricPeaks)
{quote}
The class cast exception I'm seeing is
{quote}
java.lang.ClassCastException: class [B cannot be cast to class 
org.apache.spark.sql.catalyst.expressions.aggregate.Reservoir
{quote}
But I've seen it with other aggregation buffers like QuantileSummaries as well.

It's my belief that withBufferSerialized() for the Aggregating Accumulator is 
being called twice, leading to on serializeAggregateBuffernPlace(buffer)
also being called twice for the an Imperative aggregate, the second time round, 
the buffer is already a byte array and the asInstanceOf[T] in getBufferObject 
is throwing.

This doesn't appear to happen on all runs, and it might be its only occurring 
when there's a transitive exception. I have a further suspicion that the cause 
might originate with
{quote}
SerializationDebugger.improveException
{quote}
which is traversing the task and forcing writeExternal, to be called.

Setting
|spark.serializer.extraDebugInfo|false|

Seems to make things a bit more reliable (I haven't seen the error while this 
setting is on), and points strongly in that direction.

Stack trace:
{quote}
Job aborted due to stage failure: Authorized committer (attemptNumber=0, 
stage=15, partition=10) failed; but task commit success, data duplication may 
happen. 
reason=ExceptionFailure(java.io.IOException,java.lang.ClassCastException: class 
[B cannot be cast to class 
org.apache.spark.sql.catalyst.expressions.aggregate.Reservoir ([B is in module 
java.base of loader 'bootstrap'; 
org.apache.spark.sql.catalyst.expressions.aggregate.Reservoir is in unnamed 
module of loader 
'app'),[Ljava.lang.StackTraceElement;@7fe2f462,java.io.IOException: 
java.lang.ClassCastException: class [B cannot be cast to class 
org.apache.spark.sql.catalyst.expressions.aggregate.Reservoir ([B is in module 
java.base of loader 'bootstrap'; 
org.apache.spark.sql.catalyst.expressions.aggregate.Reservoir is in unnamed 
module of loader 'app')
at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1502)
at 
org.apache.spark.scheduler.DirectTaskResult.writeExternal(TaskResult.scala:59)
at java.base/java.io.ObjectOutputStream.writeExternalData(Unknown Source)
at java.base/java.io.ObjectOutputStream.writeOrdinaryObject(Unknown Source)
at java.base/java.io.ObjectOutputStream.writeObject0(Unknown Source)
at java.base/java.io.ObjectOutputStream.writeObject(Unknown Source)
at 
org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
at 
org.apache.spark.serializer.SerializerHelper$.serializeToChunkedBuffer(SerializerHelper.scala:42)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:643)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.base/java.lang.Thread.run(Unknown Source)
Caused by: java.lang.ClassCastException: class [B cannot be cast to class 
org.apache.spark.sql.catalyst.expressions.aggregate.Reservoir ([B is in module 
java.base of loader 'bootstrap'; 
org.apache.spark.sql.catalyst.expressions.aggregate.Reservoir is in unnamed 
module of loader 'app')
at 
org.apache.spark.sql.catalyst.expressions.aggregate.ReservoirSample.serialize(ReservoirSample.scala:33)
at 
org.apache.spark.sql.catalyst.expressions.aggregate.TypedImperativeAggregate.serializeAggregateBufferInPlace(interfaces.scala:624)
at 
org.apache.spark.sql.execution.AggregatingAccumulator.withBufferSerialized(AggregatingAccumulator.scala:206)
at 
org.apache.spark.sql.execution.AggregatingAccumulator.withBufferSerialized(AggregatingAccumulator.scala:33)
at org.apache.spark.util.AccumulatorV2.writeReplace(AccumulatorV2.scala:186)
at jdk.internal.reflect.GeneratedMethodAccessor62.invoke(Unknown Source)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown 
Source)
at java.base/java.lang.reflect.Method.invoke(Unknown Source)
at java.base/java.io.ObjectStreamClass.invokeWriteReplace(Unknown Source)
at java.base/java.io.ObjectOutputStream.writeObject0(Unknown Source)
at java.base/java.io.ObjectOutputStream.writeObject(Unknown Source)
at 
org.apache.spark.scheduler.DirectTaskResult.$anonfun$writeExternal$2(TaskResult.scala:62)
at 
org.apache.spark.scheduler.DirectTaskResult.$anonfun$writeExternal$2$adapted(TaskResult.scala:62)
at scala.collection.immutable.Vector.foreach(Vector.scala:1856)
at 
org.apache.spark.scheduler.DirectTaskResult.$anonfun$writeExternal$1(TaskResult.scala:62)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18)
at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1495)
... 11 more
 
{quote}

  was:
Probably related to SPARK-39044. But potentially also this comment in 
Executor.scala.


// TODO: do not serialize value twice
val directResult = new DirectTaskResult(valueByteBuffer, accumUpdates, 
metricPeaks)

The class cast exception I'm seeing is
java.lang.ClassCastException: class [B cannot be cast to class 
org.apache.spark.sql.catalyst.expressions.aggregate.Reservoir
But I've seen it with other aggregation buffers like QuantileSummaries as well.

It's my belief that
withBufferSerialized()
for the Aggregating Accumulator is being called twice, leading to on
serializeAggregateBuffernPlace(buffer)
also being called twice for the an Imperative aggregate, the second time round, 
the buffer is already a byte array and the asInstanceOf[T] in getBufferObject 
is throwing.

This doesn't appear to happen on all runs, and it might be its only occurring 
when there's a transitive exception. I have a further suspicion that the cause 
might originate with
SerializationDebugger.improveException
which is traversing the task and forcing writeExternal, to be called.

Setting
|spark.serializer.extraDebugInfo|false|

Seems to make things a bit more reliable (I haven't seen the error while this 
setting is on), and points strongly in that direction.

Stack trace:
Job aborted due to stage failure: Authorized committer (attemptNumber=0, 
stage=15, partition=10) failed; but task commit success, data duplication may 
happen. 
reason=ExceptionFailure(java.io.IOException,java.lang.ClassCastException: class 
[B cannot be cast to class 
org.apache.spark.sql.catalyst.expressions.aggregate.Reservoir ([B is in module 
java.base of loader 'bootstrap'; 
org.apache.spark.sql.catalyst.expressions.aggregate.Reservoir is in unnamed 
module of loader 
'app'),[Ljava.lang.StackTraceElement;@7fe2f462,java.io.IOException: 
java.lang.ClassCastException: class [B cannot be cast to class 
org.apache.spark.sql.catalyst.expressions.aggregate.Reservoir ([B is in module 
java.base of loader 'bootstrap'; 
org.apache.spark.sql.catalyst.expressions.aggregate.Reservoir is in unnamed 
module of loader 'app')
        at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1502)
        at 
org.apache.spark.scheduler.DirectTaskResult.writeExternal(TaskResult.scala:59)
        at java.base/java.io.ObjectOutputStream.writeExternalData(Unknown 
Source)
        at java.base/java.io.ObjectOutputStream.writeOrdinaryObject(Unknown 
Source)
        at java.base/java.io.ObjectOutputStream.writeObject0(Unknown Source)
        at java.base/java.io.ObjectOutputStream.writeObject(Unknown Source)
        at 
org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
        at 
org.apache.spark.serializer.SerializerHelper$.serializeToChunkedBuffer(SerializerHelper.scala:42)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:643)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown 
Source)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown 
Source)
        at java.base/java.lang.Thread.run(Unknown Source)
Caused by: java.lang.ClassCastException: class [B cannot be cast to class 
org.apache.spark.sql.catalyst.expressions.aggregate.Reservoir ([B is in module 
java.base of loader 'bootstrap'; 
org.apache.spark.sql.catalyst.expressions.aggregate.Reservoir is in unnamed 
module of loader 'app')
        at 
org.apache.spark.sql.catalyst.expressions.aggregate.ReservoirSample.serialize(ReservoirSample.scala:33)
        at 
org.apache.spark.sql.catalyst.expressions.aggregate.TypedImperativeAggregate.serializeAggregateBufferInPlace(interfaces.scala:624)
        at 
org.apache.spark.sql.execution.AggregatingAccumulator.withBufferSerialized(AggregatingAccumulator.scala:206)
        at 
org.apache.spark.sql.execution.AggregatingAccumulator.withBufferSerialized(AggregatingAccumulator.scala:33)
        at 
org.apache.spark.util.AccumulatorV2.writeReplace(AccumulatorV2.scala:186)
        at jdk.internal.reflect.GeneratedMethodAccessor62.invoke(Unknown Source)
        at 
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown 
Source)
        at java.base/java.lang.reflect.Method.invoke(Unknown Source)
        at java.base/java.io.ObjectStreamClass.invokeWriteReplace(Unknown 
Source)
        at java.base/java.io.ObjectOutputStream.writeObject0(Unknown Source)
        at java.base/java.io.ObjectOutputStream.writeObject(Unknown Source)
        at 
org.apache.spark.scheduler.DirectTaskResult.$anonfun$writeExternal$2(TaskResult.scala:62)
        at 
org.apache.spark.scheduler.DirectTaskResult.$anonfun$writeExternal$2$adapted(TaskResult.scala:62)
        at scala.collection.immutable.Vector.foreach(Vector.scala:1856)
        at 
org.apache.spark.scheduler.DirectTaskResult.$anonfun$writeExternal$1(TaskResult.scala:62)
        at 
scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18)
        at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1495)
        ... 11 more
 


> AggregatingAccumulator with TypedImperativeAggregate throwing 
> ClassCastException
> --------------------------------------------------------------------------------
>
>                 Key: SPARK-45176
>                 URL: https://issues.apache.org/jira/browse/SPARK-45176
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 3.4.0, 3.4.1
>            Reporter: Huw
>            Priority: Major
>
> Probably related to SPARK-39044. But potentially also this comment in 
> Executor.scala.
> {quote}// TODO: do not serialize value twice
> val directResult = new DirectTaskResult(valueByteBuffer, accumUpdates, 
> metricPeaks)
> {quote}
> The class cast exception I'm seeing is
> {quote}
> java.lang.ClassCastException: class [B cannot be cast to class 
> org.apache.spark.sql.catalyst.expressions.aggregate.Reservoir
> {quote}
> But I've seen it with other aggregation buffers like QuantileSummaries as 
> well.
> It's my belief that withBufferSerialized() for the Aggregating Accumulator is 
> being called twice, leading to on serializeAggregateBuffernPlace(buffer)
> also being called twice for the an Imperative aggregate, the second time 
> round, the buffer is already a byte array and the asInstanceOf[T] in 
> getBufferObject is throwing.
> This doesn't appear to happen on all runs, and it might be its only occurring 
> when there's a transitive exception. I have a further suspicion that the 
> cause might originate with
> {quote}
> SerializationDebugger.improveException
> {quote}
> which is traversing the task and forcing writeExternal, to be called.
> Setting
> |spark.serializer.extraDebugInfo|false|
> Seems to make things a bit more reliable (I haven't seen the error while this 
> setting is on), and points strongly in that direction.
> Stack trace:
> {quote}
> Job aborted due to stage failure: Authorized committer (attemptNumber=0, 
> stage=15, partition=10) failed; but task commit success, data duplication may 
> happen. 
> reason=ExceptionFailure(java.io.IOException,java.lang.ClassCastException: 
> class [B cannot be cast to class 
> org.apache.spark.sql.catalyst.expressions.aggregate.Reservoir ([B is in 
> module java.base of loader 'bootstrap'; 
> org.apache.spark.sql.catalyst.expressions.aggregate.Reservoir is in unnamed 
> module of loader 
> 'app'),[Ljava.lang.StackTraceElement;@7fe2f462,java.io.IOException: 
> java.lang.ClassCastException: class [B cannot be cast to class 
> org.apache.spark.sql.catalyst.expressions.aggregate.Reservoir ([B is in 
> module java.base of loader 'bootstrap'; 
> org.apache.spark.sql.catalyst.expressions.aggregate.Reservoir is in unnamed 
> module of loader 'app')
> at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1502)
> at 
> org.apache.spark.scheduler.DirectTaskResult.writeExternal(TaskResult.scala:59)
> at java.base/java.io.ObjectOutputStream.writeExternalData(Unknown Source)
> at java.base/java.io.ObjectOutputStream.writeOrdinaryObject(Unknown Source)
> at java.base/java.io.ObjectOutputStream.writeObject0(Unknown Source)
> at java.base/java.io.ObjectOutputStream.writeObject(Unknown Source)
> at 
> org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
> at 
> org.apache.spark.serializer.SerializerHelper$.serializeToChunkedBuffer(SerializerHelper.scala:42)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:643)
> at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
> at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown 
> Source)
> at java.base/java.lang.Thread.run(Unknown Source)
> Caused by: java.lang.ClassCastException: class [B cannot be cast to class 
> org.apache.spark.sql.catalyst.expressions.aggregate.Reservoir ([B is in 
> module java.base of loader 'bootstrap'; 
> org.apache.spark.sql.catalyst.expressions.aggregate.Reservoir is in unnamed 
> module of loader 'app')
> at 
> org.apache.spark.sql.catalyst.expressions.aggregate.ReservoirSample.serialize(ReservoirSample.scala:33)
> at 
> org.apache.spark.sql.catalyst.expressions.aggregate.TypedImperativeAggregate.serializeAggregateBufferInPlace(interfaces.scala:624)
> at 
> org.apache.spark.sql.execution.AggregatingAccumulator.withBufferSerialized(AggregatingAccumulator.scala:206)
> at 
> org.apache.spark.sql.execution.AggregatingAccumulator.withBufferSerialized(AggregatingAccumulator.scala:33)
> at org.apache.spark.util.AccumulatorV2.writeReplace(AccumulatorV2.scala:186)
> at jdk.internal.reflect.GeneratedMethodAccessor62.invoke(Unknown Source)
> at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown 
> Source)
> at java.base/java.lang.reflect.Method.invoke(Unknown Source)
> at java.base/java.io.ObjectStreamClass.invokeWriteReplace(Unknown Source)
> at java.base/java.io.ObjectOutputStream.writeObject0(Unknown Source)
> at java.base/java.io.ObjectOutputStream.writeObject(Unknown Source)
> at 
> org.apache.spark.scheduler.DirectTaskResult.$anonfun$writeExternal$2(TaskResult.scala:62)
> at 
> org.apache.spark.scheduler.DirectTaskResult.$anonfun$writeExternal$2$adapted(TaskResult.scala:62)
> at scala.collection.immutable.Vector.foreach(Vector.scala:1856)
> at 
> org.apache.spark.scheduler.DirectTaskResult.$anonfun$writeExternal$1(TaskResult.scala:62)
> at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18)
> at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1495)
> ... 11 more
>  
> {quote}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to