hudi-bot opened a new issue, #15413:
URL: https://github.com/apache/hudi/issues/15413

   parquet-mr fails to read nested array fields.
   
   See following issues for more discussion:
   
   [https://github.com/apache/hudi/issues/5985]
   
   [https://github.com/apache/hudi/issues/5701]
   
   Essentially, Hudi needs to upgrade parquet version after following parquet 
issues are fixed:
   
   https://issues.apache.org/jira/browse/PARQUET-1254
   
   https://issues.apache.org/jira/browse/PARQUET-1681
   
   https://issues.apache.org/jira/browse/PARQUET-2069
   
    
   
   ## JIRA info
   
   - Link: https://issues.apache.org/jira/browse/HUDI-4798
   - Type: Task
   - Epic: https://issues.apache.org/jira/browse/HUDI-6242
   - Fix version(s):
     - 1.1.0
   
   
   ---
   
   
   ## Comments
   
   06/May/25 02:45;vhs;Another relevant fix that is required:
   
   https://issues.apache.org/jira/browse/PARQUET-2450
   
    
   
   The test below will reproduce it on master - 
e4d01dd791e6e41617bd4d9f1df22a792315d4c2
   
    
   {code:java}
   test("Repro Issue: Array of Struct with single inner field") {
     withTempDir { tmp =>
       val tableName = "hudi_type_test_mor"
       spark.sql(
         s"""
            |CREATE TABLE $tableName (
            |  uuid STRING,
            |  precombine_field LONG,
            |  col_double DOUBLE,
            |  array_struct ARRAY<STRUCT<inner_f3: STRING>>,
            |  part_col STRING
            |) USING hudi
            | LOCATION '${tmp.getCanonicalPath}'
            | TBLPROPERTIES (
            |  primaryKey = 'uuid',
            |  type = 'mor',
            |  preCombineField = 'precombine_field'
            | )
            | PARTITIONED BY (part_col)
         """.stripMargin)
       // directly write to new parquet file
       spark.sql(s"set hoodie.parquet.small.file.limit=0")
       spark.sql(s"set hoodie.metadata.compact.max.delta.commits=1")
       // partition stats index is enabled together with column stats index
       spark.sql(s"set hoodie.metadata.index.column.stats.enable=true")
       spark.sql(s"set hoodie.metadata.record.index.enable=true")
       spark.sql(s"set hoodie.metadata.index.secondary.enable=true")
   
       // Insert row 1 into partition 'A'
       spark.sql(
         s"""
            | INSERT INTO $tableName VALUES (
            |  'uuid1', 1000L, 1.1,
            |  array(struct('asd'), struct('ghj')),
            |  'A'
            | )
       """.stripMargin)
   
       spark.sql(s"CREATE INDEX idx_double ON $tableName (col_double)")
   
       // Generate log files through updates on partition 'A'
       // Allow time for async indexer to run so that error will be thrown
       spark.sql(s"UPDATE $tableName SET col_double = col_double + 100, 
precombine_field = precombine_field + 1 WHERE part_col = 'A'")
     }
   }{code}
    
   
    
   
   Error:
   
    
   {code:java}
   Driver stacktrace:
       at 
org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2856)
       at 
org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2792)
       at 
org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2791)
       at 
scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
       at 
scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
       at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
       at 
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2791)
       at 
org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1247)
       at 
org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1247)
       at scala.Option.foreach(Option.scala:407)
       at 
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1247)
       at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3060)
       at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2994)
       at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2983)
       at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
       at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:989)
       at org.apache.spark.SparkContext.runJob(SparkContext.scala:2393)
       at org.apache.spark.SparkContext.runJob(SparkContext.scala:2414)
       at org.apache.spark.SparkContext.runJob(SparkContext.scala:2433)
       at org.apache.spark.SparkContext.runJob(SparkContext.scala:2458)
       at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1049)
       at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
       at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
       at org.apache.spark.rdd.RDD.withScope(RDD.scala:410)
       at org.apache.spark.rdd.RDD.collect(RDD.scala:1048)
       at 
org.apache.spark.rdd.PairRDDFunctions.$anonfun$countByKey$1(PairRDDFunctions.scala:367)
       at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
       at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
       at org.apache.spark.rdd.RDD.withScope(RDD.scala:410)
       at 
org.apache.spark.rdd.PairRDDFunctions.countByKey(PairRDDFunctions.scala:367)
       at 
org.apache.spark.api.java.JavaPairRDD.countByKey(JavaPairRDD.scala:314)
       at 
org.apache.hudi.data.HoodieJavaPairRDD.countByKey(HoodieJavaPairRDD.java:109)
       at 
org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.buildProfile(BaseSparkCommitActionExecutor.java:204)
       at 
org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.execute(BaseSparkCommitActionExecutor.java:171)
       at 
org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.execute(BaseSparkCommitActionExecutor.java:151)
       at 
org.apache.hudi.table.action.deltacommit.SparkUpsertPreppedDeltaCommitActionExecutor.execute(SparkUpsertPreppedDeltaCommitActionExecutor.java:44)
       at 
org.apache.hudi.table.HoodieSparkMergeOnReadTable.upsertPrepped(HoodieSparkMergeOnReadTable.java:126)
       at 
org.apache.hudi.table.HoodieSparkMergeOnReadTable.upsertPrepped(HoodieSparkMergeOnReadTable.java:88)
       at 
org.apache.hudi.client.SparkRDDWriteClient.upsertPreppedRecords(SparkRDDWriteClient.java:146)
       at 
org.apache.hudi.client.SparkRDDWriteClient.upsertPreppedRecords(SparkRDDWriteClient.java:64)
       at 
org.apache.hudi.metadata.HoodieBackedTableMetadataWriter.commitInternal(HoodieBackedTableMetadataWriter.java:1412)
       at 
org.apache.hudi.metadata.SparkHoodieBackedTableMetadataWriter.commit(SparkHoodieBackedTableMetadataWriter.java:136)
       at 
org.apache.hudi.metadata.HoodieBackedTableMetadataWriter.processAndCommit(HoodieBackedTableMetadataWriter.java:1047)
       at 
org.apache.hudi.metadata.HoodieBackedTableMetadataWriter.update(HoodieBackedTableMetadataWriter.java:1105)
       at 
org.apache.hudi.client.BaseHoodieClient.writeTableMetadata(BaseHoodieClient.java:277)
       ... 96 more
   
   Caused by: org.apache.hudi.exception.HoodieException: unable to read next 
record from parquet file 
       at 
org.apache.hudi.common.util.ParquetReaderIterator.hasNext(ParquetReaderIterator.java:54)
       at 
org.apache.hudi.common.util.collection.MappingIterator.hasNext(MappingIterator.java:39)
       at 
org.apache.hudi.common.table.log.HoodieFileSliceReader.hasNextInternal(HoodieFileSliceReader.java:73)
       at 
org.apache.hudi.common.table.log.HoodieFileSliceReader.doHasNext(HoodieFileSliceReader.java:99)
       at 
org.apache.hudi.common.util.collection.CachingIterator.hasNext(CachingIterator.java:32)
       at 
org.apache.hudi.metadata.SecondaryIndexRecordGenerationUtils.getRecordKeyToSecondaryKey(SecondaryIndexRecordGenerationUtils.java:183)
       at 
org.apache.hudi.metadata.SecondaryIndexRecordGenerationUtils.lambda$convertWriteStatsToSecondaryIndexRecords$8386a558$1(SecondaryIndexRecordGenerationUtils.java:138)
       at 
org.apache.hudi.data.HoodieJavaRDD.lambda$flatMap$a6598fcb$1(HoodieJavaRDD.java:160)
       at 
org.apache.spark.api.java.JavaRDDLike.$anonfun$flatMap$1(JavaRDDLike.scala:125)
       at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)
       at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492)
       at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
       at 
org.apache.spark.storage.memory.MemoryStore.putIterator(MemoryStore.scala:223)
       at 
org.apache.spark.storage.memory.MemoryStore.putIteratorAsBytes(MemoryStore.scala:352)
       at 
org.apache.spark.storage.BlockManager.$anonfun$doPutIterator$1(BlockManager.scala:1614)
       at 
org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1524)
       at 
org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1588)
       at 
org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:1389)
       at 
org.apache.spark.storage.BlockManager.getOrElseUpdateRDDBlock(BlockManager.scala:1343)
       at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:379)
       at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
       at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
       at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
       at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
       at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
       at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
       at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
       at 
org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
       at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:104)
       at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:54)
       at 
org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
       at org.apache.spark.scheduler.Task.run(Task.scala:141)
       at 
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
       at 
org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
       at 
org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
       at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
       at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
       at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
       at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
       at java.lang.Thread.run(Thread.java:750) 
   
   Caused by: java.lang.ClassCastException: optional binary inner_f3 (STRING) 
is not a group
       at org.apache.parquet.schema.Type.asGroupType(Type.java:248)
       at 
org.apache.parquet.avro.AvroRecordConverter.newConverter(AvroRecordConverter.java:362)
       at 
org.apache.parquet.avro.AvroRecordConverter.newConverter(AvroRecordConverter.java:306)
       at 
org.apache.parquet.avro.AvroRecordConverter.access$100(AvroRecordConverter.java:79)
       at 
org.apache.parquet.avro.AvroRecordConverter$AvroCollectionConverter$ElementConverter.<init>(AvroRecordConverter.java:617)
       at 
org.apache.parquet.avro.AvroRecordConverter$AvroCollectionConverter.<init>(AvroRecordConverter.java:567)
       at 
org.apache.parquet.avro.AvroRecordConverter.newConverter(AvroRecordConverter.java:371)
       at 
org.apache.parquet.avro.AvroRecordConverter.<init>(AvroRecordConverter.java:144)
       at 
org.apache.parquet.avro.AvroRecordConverter.<init>(AvroRecordConverter.java:98)
       at 
org.apache.parquet.avro.AvroRecordMaterializer.<init>(AvroRecordMaterializer.java:33)
       at 
org.apache.parquet.avro.AvroReadSupport.prepareForRead(AvroReadSupport.java:146)
       at 
org.apache.parquet.hadoop.InternalParquetRecordReader.initialize(InternalParquetRecordReader.java:190)
       at 
org.apache.parquet.hadoop.ParquetReader.initReader(ParquetReader.java:166)
       at org.apache.parquet.hadoop.ParquetReader.read(ParquetReader.java:135)
       at 
org.apache.hudi.common.util.ParquetReaderIterator.hasNext(ParquetReaderIterator.java:49)
       ... 39 more{code}
    
   
    
   
   This is fixed in Parquet-Avro / parquet.version {*}1.14.0{*}.;;;


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to