Frederik Schreiber created SPARK-47615:
------------------------------------------

             Summary: Aggregate + First() Function - 
ArrayIndexOutOfBoundsException - ColumnPruning?
                 Key: SPARK-47615
                 URL: https://issues.apache.org/jira/browse/SPARK-47615
             Project: Spark
          Issue Type: Bug
          Components: Optimizer
    Affects Versions: 3.5.0, 3.4.1
         Environment: Amazon EMR version
emr-7.0.0
Installed applications
Tez 0.10.2, Spark 3.5.0
Amazon Linux release
2023.3.20240312.0
 
1 Master Node m6g.xlarge
2 Core Nodes m6g.2xlarge
 
 
            Reporter: Frederik Schreiber


Currently i`m investigating in upgrade our code base from spark 3.3.0 to 3.5.0 
(embedded in dedicated aws emr cluster).
 
I got the following exception if i execute my code on the cluster, if i run 
local unit tests the code runs as expected without exception.
 
 
{code:java}
24/03/26 19:32:19 INFO RecordServerQueryListener: Cleaning up temp directory - 
/user/KKQI7VHKTMNQZJQNMMZXKH5KYNRPOHXG/application_1711468652551_0023 Exception 
in thread "main" org.apache.spark.SparkException: Job aborted due to stage 
failure: Task 0 in stage 12.0 failed 4 times, most recent failure: Lost task 
0.3 in stage 12.0 (TID 186) (ip-10-1-1-6.eu-central-1.compute.internal executor 
2): java.lang.ArrayIndexOutOfBoundsException: Index -1 out of bounds for length 
3 at 
org.apache.spark.sql.vectorized.ColumnarBatch.column(ColumnarBatch.java:95) at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.columnaraggregatetorow_parquetMax_0$(Unknown
 Source) at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.columnaraggregatetorow_nextBatch_0$(Unknown
 Source) at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.hashAgg_doAggregateWithoutKey_0$(Unknown
 Source) at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown
 Source) at 
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:35)
 at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.hasNext(Unknown
 Source) at 
org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
 at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) at 
org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:142)
 at 
org.apache.spark.shuffle.ShuffleWriteProcessor.doWrite(ShuffleWriteProcessor.scala:45)
 at 
org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:68)
 at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:104) 
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:54) 
at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161) at 
org.apache.spark.scheduler.Task.run(Task.scala:143) at 
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:629)
 at 
org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
 at 
org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
 at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:95) at 
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:632) at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
 at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
 at java.base/java.lang.Thread.run(Thread.java:840)   Driver stacktrace: at 
org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:3067)
 at 
org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:3003)
 at 
org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:3002)
 at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) at 
scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) at 
scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) at 
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:3002) at 
org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1318)
 at 
org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1318)
 at scala.Option.foreach(Option.scala:407) at 
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1318)
 at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3271)
 at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:3205)
 at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:3194)
 at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49) at 
org.apache.spark.sql.execution.adaptive.AdaptiveExecutor.checkNoFailures(AdaptiveExecutor.scala:154)
 at 
org.apache.spark.sql.execution.adaptive.AdaptiveExecutor.doRun(AdaptiveExecutor.scala:88)
 at 
org.apache.spark.sql.execution.adaptive.AdaptiveExecutor.tryRunningAndGetFuture(AdaptiveExecutor.scala:66)
 at 
org.apache.spark.sql.execution.adaptive.AdaptiveExecutor.execute(AdaptiveExecutor.scala:57)
 at 
org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$getFinalPhysicalPlan$1(AdaptiveSparkPlanExec.scala:277)
 at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900) at 
org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.getFinalPhysicalPlan(AdaptiveSparkPlanExec.scala:276)
 at 
org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.withFinalPlanUpdate(AdaptiveSparkPlanExec.scala:558)
 at 
org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.executeCollect(AdaptiveSparkPlanExec.scala:520)
 at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:4411) at 
org.apache.spark.sql.Dataset.$anonfun$head$1(Dataset.scala:3370) at 
org.apache.spark.sql.Dataset.$anonfun$withAction$2(Dataset.scala:4401) at 
org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:625)
 at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:4399) at 
org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:108)
 at 
org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:255)
 at 
org.apache.spark.sql.execution.SQLExecution$.executeQuery$1(SQLExecution.scala:129)
 at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$9(SQLExecution.scala:165)
 at 
org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:108)
 at 
org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:255)
 at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$8(SQLExecution.scala:165)
 at 
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:276)
 at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:164)
 at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900) at 
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:70)
 at org.apache.spark.sql.Dataset.withAction(Dataset.scala:4399) at 
org.apache.spark.sql.Dataset.head(Dataset.scala:3370) at 
org.apache.spark.sql.Dataset.head(Dataset.scala:3377) at 
org.apache.spark.sql.Dataset.first(Dataset.scala:3384) at 
de.my.maintained.code.business.transformer.MyMaintainedClass$BusinessForecastNonRecurringTransformer.determineMaxDateId(MyMaintainedClass.scala:56)
 at 
de.my.maintained.code.business.transformer.MyMaintainedClass$BusinessForecastNonRecurringTransformer.processInternal(MyMaintainedClass.scala:25)
 at 
de.my.maintained.code.business.transformer.MyMaintainedClass$BusinessForecastNonRecurringTransformer.processInternal$(MyMaintainedClass.scala:22)
 at 
de.my.maintained.code.common.app.FqmApp$$anon$35.processInternal(FqmApp.scala:112)
 at 
de.my.maintained.code.common.transformer.Transformer.process(Transformer.scala:26)
 at 
de.my.maintained.code.common.transformer.Transformer.process$(Transformer.scala:24)
 at de.my.maintained.code.common.app.FqmApp$$anon$35.process(FqmApp.scala:112) 
at 
de.my.maintained.code.business.transformer.BusinessForecastTransformerComponent$BusinessForecastTransformer.processInternal(BusinessForecastTransformerComponent.scala:35)
 at 
de.my.maintained.code.business.transformer.BusinessForecastTransformerComponent$BusinessForecastTransformer.processInternal$(BusinessForecastTransformerComponent.scala:26)
 at 
de.my.maintained.code.common.app.FqmApp$$anon$33.processInternal(FqmApp.scala:110)
 at 
de.my.maintained.code.common.transformer.Transformer.process(Transformer.scala:26)
 at 
de.my.maintained.code.common.transformer.Transformer.process$(Transformer.scala:24)
 at de.my.maintained.code.common.app.FqmApp$$anon$33.process(FqmApp.scala:110) 
at 
de.my.maintained.code.business.aws.BusinessStage2Forecast$.main(BusinessStage2Forecast.scala:10)
 at 
de.my.maintained.code.business.aws.BusinessStage2Forecast.main(BusinessStage2Forecast.scala)
 at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native 
Method) at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
 at 
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.base/java.lang.reflect.Method.invoke(Method.java:568) at 
org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52) at 
org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:1075)
 at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:194) at 
org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:217) at 
org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:91) at 
org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1167) at 
org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1176) at 
org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Caused by: 
java.lang.ArrayIndexOutOfBoundsException: Index -1 out of bounds for length 3 
at org.apache.spark.sql.vectorized.ColumnarBatch.column(ColumnarBatch.java:95) 
at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.columnaraggregatetorow_parquetMax_0$(Unknown
 Source) at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.columnaraggregatetorow_nextBatch_0$(Unknown
 Source) at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.hashAgg_doAggregateWithoutKey_0$(Unknown
 Source) at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown
 Source) at 
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:35)
 at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.hasNext(Unknown
 Source) at 
org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
 at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) at 
org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:142)
 at 
org.apache.spark.shuffle.ShuffleWriteProcessor.doWrite(ShuffleWriteProcessor.scala:45)
 at 
org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:68)
 at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:104) 
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:54) 
at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161) at 
org.apache.spark.scheduler.Task.run(Task.scala:143) at 
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:629)
 at 
org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
 at 
org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
 at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:95) at 
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:632) at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
 at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
 at java.base/java.lang.Thread.run(Thread.java:840){code}
 
 
A little earlier in logfile i found the following:
 
24/03/26 19:32:18 INFO DAGScheduler: Submitting 16 missing tasks from 
ShuffleMapStage 12 (MapPartitionsRDD[28] at first at 
MyMaintainedClass.scala:56) (first 15 tasks are for partitions Vector(0, 1, 2, 
3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14))
 
 
if i jump to the code i found this:
 
 
{code:java}
52: val first_row =
53:  df_usage_current_and_next_month
54:   .filter(args.billingDate.filter_current_month($"year", $"month"))
55:   .withColumn("DATE_ID", $"year" * 10000 + $"month" * 100 + 
date_format(dateColumn, "dd").cast(IntegerType))
56:   .agg(max("DATE_ID")).first()  {code}
 
The Problem seems to be occur in the last row ".agg(max("DATE_ID")).first()"
 
So next i have identified all placed with this exception. All of them use 
aggregation (min/max/count) with the call of first() after that.
 
After that i searched all code placed with the first() or head() function in 
our code base and i found one example without occuring an 
ArrayIndexOutOfBoundException. The StackOverFlow (link at the bottom), post 
gave me an hint. Every time we do an spark.read (parquet) with an aggregation 
and calling first function AND using the same DataFrame after that for other 
calculations & filterings & writing we got an ArrayIndexOutOfBounds Exception. 
If we still only do agg and first there is no problem.
 
So there must be something mutable while reading the DataSource and split the 
executing plan into two ways. (don`t know what there happen exactly). In my 
opinion, an optimization mechanism intervenes there, which removes certain 
columns that are supposedly not needed but needed. Unfortunately i`m not able 
to reproduce it locally, only in AWS EMR Cluster. (maybe there is a different 
Configuration)
 
Workaround not getting the ArrayIndexOutOfBoundException
 
1. using an explicit spark.read for every agg an first function
OR
2. using an persist() between agg and first function
 
 
#similar problem mentioned on stackoverflow
https://stackoverflow.com/questions/53483406/spark-sql-dataframe-count-gives-java-lang-arrayindexoutofboundsexception
 
 
Versions Tested:
 
Spark
3.3.0 (no problem) (emr 6.11.1)
3.4.1 (ArrayIndexOutOfBoundsException) (emr 6.15.1)
3.5.0 (ArrayIndexOutOfBoundsException) (emr 7.0.0)
 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to