Arun Benoy V created SPARK-37892:
------------------------------------

             Summary: Spark streaming task failure with ClassCastException [C 
cannot be cast to [J, at 
org.apache.spark.unsafe.memory.HeapMemoryAllocator.allocate
                 Key: SPARK-37892
                 URL: https://issues.apache.org/jira/browse/SPARK-37892
             Project: Spark
          Issue Type: Bug
          Components: Structured Streaming
    Affects Versions: 3.2.0
            Reporter: Arun Benoy V


We have a java spark streaming application which does scd2 operation on 
deltalake.

We were using spark 3.0.0 and delta lake 0.7.0 after upgrading to Spark 3.2.0 
and delta 1.1.0, we can see the following exception (under load of 100K events)

_Caused by: java.lang.ClassCastException: [C cannot be cast to [J, at 
org.apache.spark.unsafe.memory.HeapMemoryAllocator.allocate(HeapMemoryAllocator.java:58)
 , at 
org.apache.spark.memory.TaskMemoryManager.allocatePage(TaskMemoryManager.java:314)_

We are currently in development hence running in local mode, for 100K test with
 * spark master - local[4]
 * 4GB driver memory.
 * Shuffle partition 25
 * Kafka topic partition 25
 * micro batch size 5000
 * trigger 120 sec

Another observation is that , this exception does not occur after the first 
couple of batches and for the subsequent 100K load it works fine if we don't 
restart.

Additional information regarding the load. The payload is same for all the 
events, we use different keys for 100K events. And the exception is seen only 
for the first couple of batches (may be 2-3 other batches works fine)

Any guidelines regarding tuning or fix that can help in resolving this 
exception would help.

Thanks.

Attaching full stack trace here

[Executor task launch worker for task 9.0 in stage 59.0 (TID 784)] 
o.a.s.e.Executor ERROR - Exception in task 9.0 in stage 59.0 (TID 784) 
java.lang.ClassCastException: [C cannot be cast to [J,  at 
org.apache.spark.unsafe.memory.HeapMemoryAllocator.allocate(HeapMemoryAllocator.java:58)
,  at 
org.apache.spark.memory.TaskMemoryManager.allocatePage(TaskMemoryManager.java:314)
,  at 
org.apache.spark.memory.MemoryConsumer.allocatePage(MemoryConsumer.java:116)
,  at 
org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.acquireNewPageIfNecessary(UnsafeExternalSorter.java:401)
,  at 
org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.allocateMemoryForRecordIfNecessary(UnsafeExternalSorter.java:420)
,  at 
org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.insertRecord(UnsafeExternalSorter.java:455)
,  at 
org.apache.spark.sql.execution.UnsafeExternalRowSorter.insertRow(UnsafeExternalRowSorter.java:138)
,  at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.sort_addToSorter_0$(Unknown
 Source)
,  at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.processNext(Unknown
 Source)
,  at 
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
,  at 
org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:759)
,  at 
org.apache.spark.sql.execution.aggregate.SortAggregateExec.$anonfun$doExecute$1(SortAggregateExec.scala:58)
,  at 
org.apache.spark.sql.execution.aggregate.SortAggregateExec.$anonfun$doExecute$1$adapted(SortAggregateExec.scala:55)
,  at 
org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndexInternal$2(RDD.scala:885)
,  at 
org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndexInternal$2$adapted(RDD.scala:885)
,  at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
,  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
,  at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
,  at 
org.apache.spark.sql.execution.streaming.state.StateStoreRDD.compute(StateStoreRDD.scala:126)
,  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
,  at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
,  at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
,  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
,  at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
,  at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
,  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
,  at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
,  at 
org.apache.spark.sql.execution.SQLExecutionRDD.compute(SQLExecutionRDD.scala:55)
,  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
,  at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
,  at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
,  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
,  at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
,  at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
,  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
,  at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
,  at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
,  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
,  at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
,  at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
,  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
,  at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
,  at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
,  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
,  at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
,  at 
org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
,  at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
,  at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
,  at org.apache.spark.scheduler.Task.run(Task.scala:131)
,  at 
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
,  at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1462)
,  at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
,  at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
,  at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
,  at java.lang.Thread.run(Thread.java:745)



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to