[ 
https://issues.apache.org/jira/browse/HUDI-8803?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated HUDI-8803:
---------------------------------
    Labels: pull-request-available  (was: )

> Fix ArrayIndexOutOfBoundsException while vectorized read with schema evolution
> ------------------------------------------------------------------------------
>
>                 Key: HUDI-8803
>                 URL: https://issues.apache.org/jira/browse/HUDI-8803
>             Project: Apache Hudi
>          Issue Type: Bug
>            Reporter: Yinsheng Wang
>            Priority: Major
>              Labels: pull-request-available
>
> While vectorized reading parquet file with schema evolution,  occasionally it 
> will failed with ArrayIndexOutOfBoundsException
> {code:java}
> // code placeholder
> 24/11/25 11:53:38 [Executor task launch worker for task 9] ERROR Executor: 
> Exception in task 0.5 in stage 3.0 (TID 9)
> java.lang.ArrayIndexOutOfBoundsException: -1
>       at 
> org.apache.spark.sql.execution.vectorized.OnHeapColumnVector.isNullAt(OnHeapColumnVector.java:130)
>       at 
> org.apache.spark.sql.execution.vectorized.WritableColumnVector.getUTF8String(WritableColumnVector.java:396)
>       at 
> org.apache.spark.sql.vectorized.ColumnarBatchRow.getUTF8String(ColumnarBatch.java:220)
>       at 
> org.apache.spark.sql.catalyst.InternalRow.getString(InternalRow.scala:34)
>       at 
> org.apache.hudi.RecordMergingFileIterator.hasNextInternal(Iterators.scala:200)
>       at 
> org.apache.hudi.RecordMergingFileIterator.doHasNext(Iterators.scala:192)
>       at 
> org.apache.hudi.util.CachingIterator.hasNext(CachingIterator.scala:36)
>       at 
> org.apache.hudi.util.CachingIterator.hasNext$(CachingIterator.scala:36)
>       at org.apache.hudi.LogFileIterator.hasNext(Iterators.scala:60)
>       at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
>       at 
> org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:355)
>       at 
> org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:878)
>       at 
> org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:878)
>       at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>       at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:355)
>       at org.apache.spark.rdd.RDD.iterator(RDD.scala:319)
>       at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
>       at org.apache.spark.scheduler.Task.run(Task.scala:129)
>       at 
> org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:478)
>       at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1480)
>       at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:481)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>       at java.lang.Thread.run(Thread.java:745)
>  {code}
> The root cause is `Spark3HoodieVectorizedParquetRecordReader` extends 
> `VectorizedParquetRecordReader`,both of them keep a member variable named 
> "batchIdx", but Spark3HoodieVectorizedParquetRecordReader never change the 
> value of "batchIdx" from super,  Exception occured when call 
> `super.getCurrentValue` 
> {code:java}
> // code in Spark3HoodieVectorizedParquetRecordReader
> @Override
> public Object getCurrentValue() {
>   if (typeChangeInfos == null || typeChangeInfos.isEmpty()) {
>     return super.getCurrentValue();
>   }
>   if (returnColumnarBatch) {
>     return columnarBatch == null ? super.getCurrentValue() : columnarBatch;
>   }
>   return columnarBatch == null ? super.getCurrentValue() : 
> columnarBatch.getRow(batchIdx - 1);
> }
> @Override
> public boolean nextKeyValue() throws IOException {
>   resultBatch();
>   if (returnColumnarBatch)  {
>     return nextBatch();
>   }
>   if (batchIdx >= numBatched) {
>     if (!nextBatch()) {
>       return false;
>     }
>   }
>   ++batchIdx;
>   return true;
> } {code}
> {code:java}
> // VectorizedParquetRecordReader
> @Override
> public boolean nextKeyValue() throws IOException {
>   resultBatch();
>   if (returnColumnarBatch) return nextBatch();
>   if (batchIdx >= numBatched) {
>     if (!nextBatch()) return false;
>   }
>   ++batchIdx;
>   return true;
> }
> @Override
> public Object getCurrentValue() {
>   if (returnColumnarBatch) return columnarBatch;
>   return columnarBatch.getRow(batchIdx - 1);
> } {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to