wangyinsheng opened a new pull request, #12560:
URL: https://github.com/apache/hudi/pull/12560

   ### Change Logs
   
   While vectorized reading parquet file with schema evolution,  occasionally 
it will failed with ArrayIndexOutOfBoundsException
   
   ```
   24/11/25 11:53:38 [Executor task launch worker for task 9] ERROR Executor: 
Exception in task 0.5 in stage 3.0 (TID 9)
   java.lang.ArrayIndexOutOfBoundsException: -1
        at 
org.apache.spark.sql.execution.vectorized.OnHeapColumnVector.isNullAt(OnHeapColumnVector.java:130)
        at 
org.apache.spark.sql.execution.vectorized.WritableColumnVector.getUTF8String(WritableColumnVector.java:396)
        at 
org.apache.spark.sql.vectorized.ColumnarBatchRow.getUTF8String(ColumnarBatch.java:220)
        at 
org.apache.spark.sql.catalyst.InternalRow.getString(InternalRow.scala:34)
        at 
org.apache.hudi.RecordMergingFileIterator.hasNextInternal(Iterators.scala:200)
        at 
org.apache.hudi.RecordMergingFileIterator.doHasNext(Iterators.scala:192)
        at 
org.apache.hudi.util.CachingIterator.hasNext(CachingIterator.scala:36)
        at 
org.apache.hudi.util.CachingIterator.hasNext$(CachingIterator.scala:36)
        at org.apache.hudi.LogFileIterator.hasNext(Iterators.scala:60)
        at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
        at 
org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:355)
        at 
org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:878)
        at 
org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:878)
        at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:355)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:319)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
        at org.apache.spark.scheduler.Task.run(Task.scala:129)
        at 
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:478)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1480)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:481)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
   ```
   
    
   The root cause is `Spark3HoodieVectorizedParquetRecordReader` extends 
   `VectorizedParquetRecordReader`,both of them keep a member variable named 
"batchIdx", but Spark3HoodieVectorizedParquetRecordReader never change the 
value of "batchIdx" from super,  Exception occured when call 
`super.getCurrentValue` 
   
   ```
   // code in Spark3HoodieVectorizedParquetRecordReader
   
   @Override
   public Object getCurrentValue() {
     if (typeChangeInfos == null || typeChangeInfos.isEmpty()) {
       return super.getCurrentValue();
     }
   
     if (returnColumnarBatch) {
       return columnarBatch == null ? super.getCurrentValue() : columnarBatch;
     }
   
     return columnarBatch == null ? super.getCurrentValue() : 
columnarBatch.getRow(batchIdx - 1);
   }
   
   @Override
   public boolean nextKeyValue() throws IOException {
     resultBatch();
   
     if (returnColumnarBatch)  {
       return nextBatch();
     }
   
     if (batchIdx >= numBatched) {
       if (!nextBatch()) {
         return false;
       }
     }
     ++batchIdx;
     return true;
   } 
   ```
   
   ```
   // VectorizedParquetRecordReader
   
   @Override
   public boolean nextKeyValue() throws IOException {
     resultBatch();
   
     if (returnColumnarBatch) return nextBatch();
   
     if (batchIdx >= numBatched) {
       if (!nextBatch()) return false;
     }
     ++batchIdx;
     return true;
   }
   
   @Override
   public Object getCurrentValue() {
     if (returnColumnarBatch) return columnarBatch;
     return columnarBatch.getRow(batchIdx - 1);
   } 
   ```
   ### Impact
   
   None
   
   ### Risk level (write none, low medium or high below)
   
   medium
   ### Documentation Update
   None
   
   ### Contributor's checklist
   
   - [ ] Read through [contributor's 
guide](https://hudi.apache.org/contribute/how-to-contribute)
   - [ ] Change Logs and Impact were stated clearly
   - [ ] Adequate tests were added if applicable
   - [ ] CI passed
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to