Frederik Schreiber created SPARK-47615:
------------------------------------------
Summary: Aggregate + First() Function -
ArrayIndexOutOfBoundsException - ColumnPruning?
Key: SPARK-47615
URL: https://issues.apache.org/jira/browse/SPARK-47615
Project: Spark
Issue Type: Bug
Components: Optimizer
Affects Versions: 3.5.0, 3.4.1
Environment: Amazon EMR version
emr-7.0.0
Installed applications
Tez 0.10.2, Spark 3.5.0
Amazon Linux release
2023.3.20240312.0
1 Master Node m6g.xlarge
2 Core Nodes m6g.2xlarge
Reporter: Frederik Schreiber
Currently i`m investigating in upgrade our code base from spark 3.3.0 to 3.5.0
(embedded in dedicated aws emr cluster).
I got the following exception if i execute my code on the cluster, if i run
local unit tests the code runs as expected without exception.
{code:java}
24/03/26 19:32:19 INFO RecordServerQueryListener: Cleaning up temp directory -
/user/KKQI7VHKTMNQZJQNMMZXKH5KYNRPOHXG/application_1711468652551_0023 Exception
in thread "main" org.apache.spark.SparkException: Job aborted due to stage
failure: Task 0 in stage 12.0 failed 4 times, most recent failure: Lost task
0.3 in stage 12.0 (TID 186) (ip-10-1-1-6.eu-central-1.compute.internal executor
2): java.lang.ArrayIndexOutOfBoundsException: Index -1 out of bounds for length
3 at
org.apache.spark.sql.vectorized.ColumnarBatch.column(ColumnarBatch.java:95) at
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.columnaraggregatetorow_parquetMax_0$(Unknown
Source) at
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.columnaraggregatetorow_nextBatch_0$(Unknown
Source) at
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.hashAgg_doAggregateWithoutKey_0$(Unknown
Source) at
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown
Source) at
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:35)
at
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.hasNext(Unknown
Source) at
org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) at
org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:142)
at
org.apache.spark.shuffle.ShuffleWriteProcessor.doWrite(ShuffleWriteProcessor.scala:45)
at
org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:68)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:104)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:54)
at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161) at
org.apache.spark.scheduler.Task.run(Task.scala:143) at
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:629)
at
org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
at
org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:95) at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:632) at
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
at
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at java.base/java.lang.Thread.run(Thread.java:840) Driver stacktrace: at
org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:3067)
at
org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:3003)
at
org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:3002)
at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) at
scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) at
scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) at
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:3002) at
org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1318)
at
org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1318)
at scala.Option.foreach(Option.scala:407) at
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1318)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3271)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:3205)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:3194)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49) at
org.apache.spark.sql.execution.adaptive.AdaptiveExecutor.checkNoFailures(AdaptiveExecutor.scala:154)
at
org.apache.spark.sql.execution.adaptive.AdaptiveExecutor.doRun(AdaptiveExecutor.scala:88)
at
org.apache.spark.sql.execution.adaptive.AdaptiveExecutor.tryRunningAndGetFuture(AdaptiveExecutor.scala:66)
at
org.apache.spark.sql.execution.adaptive.AdaptiveExecutor.execute(AdaptiveExecutor.scala:57)
at
org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$getFinalPhysicalPlan$1(AdaptiveSparkPlanExec.scala:277)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900) at
org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.getFinalPhysicalPlan(AdaptiveSparkPlanExec.scala:276)
at
org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.withFinalPlanUpdate(AdaptiveSparkPlanExec.scala:558)
at
org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.executeCollect(AdaptiveSparkPlanExec.scala:520)
at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:4411) at
org.apache.spark.sql.Dataset.$anonfun$head$1(Dataset.scala:3370) at
org.apache.spark.sql.Dataset.$anonfun$withAction$2(Dataset.scala:4401) at
org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:625)
at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:4399) at
org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:108)
at
org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:255)
at
org.apache.spark.sql.execution.SQLExecution$.executeQuery$1(SQLExecution.scala:129)
at
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$9(SQLExecution.scala:165)
at
org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:108)
at
org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:255)
at
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$8(SQLExecution.scala:165)
at
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:276)
at
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:164)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900) at
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:70)
at org.apache.spark.sql.Dataset.withAction(Dataset.scala:4399) at
org.apache.spark.sql.Dataset.head(Dataset.scala:3370) at
org.apache.spark.sql.Dataset.head(Dataset.scala:3377) at
org.apache.spark.sql.Dataset.first(Dataset.scala:3384) at
de.my.maintained.code.business.transformer.MyMaintainedClass$BusinessForecastNonRecurringTransformer.determineMaxDateId(MyMaintainedClass.scala:56)
at
de.my.maintained.code.business.transformer.MyMaintainedClass$BusinessForecastNonRecurringTransformer.processInternal(MyMaintainedClass.scala:25)
at
de.my.maintained.code.business.transformer.MyMaintainedClass$BusinessForecastNonRecurringTransformer.processInternal$(MyMaintainedClass.scala:22)
at
de.my.maintained.code.common.app.FqmApp$$anon$35.processInternal(FqmApp.scala:112)
at
de.my.maintained.code.common.transformer.Transformer.process(Transformer.scala:26)
at
de.my.maintained.code.common.transformer.Transformer.process$(Transformer.scala:24)
at de.my.maintained.code.common.app.FqmApp$$anon$35.process(FqmApp.scala:112)
at
de.my.maintained.code.business.transformer.BusinessForecastTransformerComponent$BusinessForecastTransformer.processInternal(BusinessForecastTransformerComponent.scala:35)
at
de.my.maintained.code.business.transformer.BusinessForecastTransformerComponent$BusinessForecastTransformer.processInternal$(BusinessForecastTransformerComponent.scala:26)
at
de.my.maintained.code.common.app.FqmApp$$anon$33.processInternal(FqmApp.scala:110)
at
de.my.maintained.code.common.transformer.Transformer.process(Transformer.scala:26)
at
de.my.maintained.code.common.transformer.Transformer.process$(Transformer.scala:24)
at de.my.maintained.code.common.app.FqmApp$$anon$33.process(FqmApp.scala:110)
at
de.my.maintained.code.business.aws.BusinessStage2Forecast$.main(BusinessStage2Forecast.scala:10)
at
de.my.maintained.code.business.aws.BusinessStage2Forecast.main(BusinessStage2Forecast.scala)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
Method) at
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
at
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:568) at
org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52) at
org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:1075)
at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:194) at
org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:217) at
org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:91) at
org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1167) at
org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1176) at
org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Caused by:
java.lang.ArrayIndexOutOfBoundsException: Index -1 out of bounds for length 3
at org.apache.spark.sql.vectorized.ColumnarBatch.column(ColumnarBatch.java:95)
at
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.columnaraggregatetorow_parquetMax_0$(Unknown
Source) at
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.columnaraggregatetorow_nextBatch_0$(Unknown
Source) at
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.hashAgg_doAggregateWithoutKey_0$(Unknown
Source) at
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown
Source) at
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:35)
at
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.hasNext(Unknown
Source) at
org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) at
org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:142)
at
org.apache.spark.shuffle.ShuffleWriteProcessor.doWrite(ShuffleWriteProcessor.scala:45)
at
org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:68)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:104)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:54)
at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161) at
org.apache.spark.scheduler.Task.run(Task.scala:143) at
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:629)
at
org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
at
org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:95) at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:632) at
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
at
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at java.base/java.lang.Thread.run(Thread.java:840){code}
A little earlier in logfile i found the following:
24/03/26 19:32:18 INFO DAGScheduler: Submitting 16 missing tasks from
ShuffleMapStage 12 (MapPartitionsRDD[28] at first at
MyMaintainedClass.scala:56) (first 15 tasks are for partitions Vector(0, 1, 2,
3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14))
if i jump to the code i found this:
{code:java}
52: val first_row =
53: df_usage_current_and_next_month
54: .filter(args.billingDate.filter_current_month($"year", $"month"))
55: .withColumn("DATE_ID", $"year" * 10000 + $"month" * 100 +
date_format(dateColumn, "dd").cast(IntegerType))
56: .agg(max("DATE_ID")).first() {code}
The Problem seems to be occur in the last row ".agg(max("DATE_ID")).first()"
So next i have identified all placed with this exception. All of them use
aggregation (min/max/count) with the call of first() after that.
After that i searched all code placed with the first() or head() function in
our code base and i found one example without occuring an
ArrayIndexOutOfBoundException. The StackOverFlow (link at the bottom), post
gave me an hint. Every time we do an spark.read (parquet) with an aggregation
and calling first function AND using the same DataFrame after that for other
calculations & filterings & writing we got an ArrayIndexOutOfBounds Exception.
If we still only do agg and first there is no problem.
So there must be something mutable while reading the DataSource and split the
executing plan into two ways. (don`t know what there happen exactly). In my
opinion, an optimization mechanism intervenes there, which removes certain
columns that are supposedly not needed but needed. Unfortunately i`m not able
to reproduce it locally, only in AWS EMR Cluster. (maybe there is a different
Configuration)
Workaround not getting the ArrayIndexOutOfBoundException
1. using an explicit spark.read for every agg an first function
OR
2. using an persist() between agg and first function
#similar problem mentioned on stackoverflow
https://stackoverflow.com/questions/53483406/spark-sql-dataframe-count-gives-java-lang-arrayindexoutofboundsexception
Versions Tested:
Spark
3.3.0 (no problem) (emr 6.11.1)
3.4.1 (ArrayIndexOutOfBoundsException) (emr 6.15.1)
3.5.0 (ArrayIndexOutOfBoundsException) (emr 7.0.0)
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]