[
https://issues.apache.org/jira/browse/HUDI-8803?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
ASF GitHub Bot updated HUDI-8803:
---------------------------------
Labels: pull-request-available (was: )
> Fix ArrayIndexOutOfBoundsException while vectorized read with schema evolution
> ------------------------------------------------------------------------------
>
> Key: HUDI-8803
> URL: https://issues.apache.org/jira/browse/HUDI-8803
> Project: Apache Hudi
> Issue Type: Bug
> Reporter: Yinsheng Wang
> Priority: Major
> Labels: pull-request-available
>
> While vectorized reading parquet file with schema evolution, occasionally it
> will failed with ArrayIndexOutOfBoundsException
> {code:java}
> // code placeholder
> 24/11/25 11:53:38 [Executor task launch worker for task 9] ERROR Executor:
> Exception in task 0.5 in stage 3.0 (TID 9)
> java.lang.ArrayIndexOutOfBoundsException: -1
> at
> org.apache.spark.sql.execution.vectorized.OnHeapColumnVector.isNullAt(OnHeapColumnVector.java:130)
> at
> org.apache.spark.sql.execution.vectorized.WritableColumnVector.getUTF8String(WritableColumnVector.java:396)
> at
> org.apache.spark.sql.vectorized.ColumnarBatchRow.getUTF8String(ColumnarBatch.java:220)
> at
> org.apache.spark.sql.catalyst.InternalRow.getString(InternalRow.scala:34)
> at
> org.apache.hudi.RecordMergingFileIterator.hasNextInternal(Iterators.scala:200)
> at
> org.apache.hudi.RecordMergingFileIterator.doHasNext(Iterators.scala:192)
> at
> org.apache.hudi.util.CachingIterator.hasNext(CachingIterator.scala:36)
> at
> org.apache.hudi.util.CachingIterator.hasNext$(CachingIterator.scala:36)
> at org.apache.hudi.LogFileIterator.hasNext(Iterators.scala:60)
> at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
> at
> org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:355)
> at
> org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:878)
> at
> org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:878)
> at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:355)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:319)
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
> at org.apache.spark.scheduler.Task.run(Task.scala:129)
> at
> org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:478)
> at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1480)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:481)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
> {code}
> The root cause is `Spark3HoodieVectorizedParquetRecordReader` extends
> `VectorizedParquetRecordReader`,both of them keep a member variable named
> "batchIdx", but Spark3HoodieVectorizedParquetRecordReader never change the
> value of "batchIdx" from super, Exception occured when call
> `super.getCurrentValue`
> {code:java}
> // code in Spark3HoodieVectorizedParquetRecordReader
> @Override
> public Object getCurrentValue() {
> if (typeChangeInfos == null || typeChangeInfos.isEmpty()) {
> return super.getCurrentValue();
> }
> if (returnColumnarBatch) {
> return columnarBatch == null ? super.getCurrentValue() : columnarBatch;
> }
> return columnarBatch == null ? super.getCurrentValue() :
> columnarBatch.getRow(batchIdx - 1);
> }
> @Override
> public boolean nextKeyValue() throws IOException {
> resultBatch();
> if (returnColumnarBatch) {
> return nextBatch();
> }
> if (batchIdx >= numBatched) {
> if (!nextBatch()) {
> return false;
> }
> }
> ++batchIdx;
> return true;
> } {code}
> {code:java}
> // VectorizedParquetRecordReader
> @Override
> public boolean nextKeyValue() throws IOException {
> resultBatch();
> if (returnColumnarBatch) return nextBatch();
> if (batchIdx >= numBatched) {
> if (!nextBatch()) return false;
> }
> ++batchIdx;
> return true;
> }
> @Override
> public Object getCurrentValue() {
> if (returnColumnarBatch) return columnarBatch;
> return columnarBatch.getRow(batchIdx - 1);
> } {code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)