hudi-bot opened a new issue, #15413: URL: https://github.com/apache/hudi/issues/15413
parquet-mr fails to read nested array fields. See following issues for more discussion: [https://github.com/apache/hudi/issues/5985] [https://github.com/apache/hudi/issues/5701] Essentially, Hudi needs to upgrade parquet version after following parquet issues are fixed: https://issues.apache.org/jira/browse/PARQUET-1254 https://issues.apache.org/jira/browse/PARQUET-1681 https://issues.apache.org/jira/browse/PARQUET-2069 ## JIRA info - Link: https://issues.apache.org/jira/browse/HUDI-4798 - Type: Task - Epic: https://issues.apache.org/jira/browse/HUDI-6242 - Fix version(s): - 1.1.0 --- ## Comments 06/May/25 02:45;vhs;Another relevant fix that is required: https://issues.apache.org/jira/browse/PARQUET-2450 The test below will reproduce it on master - e4d01dd791e6e41617bd4d9f1df22a792315d4c2 {code:java} test("Repro Issue: Array of Struct with single inner field") { withTempDir { tmp => val tableName = "hudi_type_test_mor" spark.sql( s""" |CREATE TABLE $tableName ( | uuid STRING, | precombine_field LONG, | col_double DOUBLE, | array_struct ARRAY<STRUCT<inner_f3: STRING>>, | part_col STRING |) USING hudi | LOCATION '${tmp.getCanonicalPath}' | TBLPROPERTIES ( | primaryKey = 'uuid', | type = 'mor', | preCombineField = 'precombine_field' | ) | PARTITIONED BY (part_col) """.stripMargin) // directly write to new parquet file spark.sql(s"set hoodie.parquet.small.file.limit=0") spark.sql(s"set hoodie.metadata.compact.max.delta.commits=1") // partition stats index is enabled together with column stats index spark.sql(s"set hoodie.metadata.index.column.stats.enable=true") spark.sql(s"set hoodie.metadata.record.index.enable=true") spark.sql(s"set hoodie.metadata.index.secondary.enable=true") // Insert row 1 into partition 'A' spark.sql( s""" | INSERT INTO $tableName VALUES ( | 'uuid1', 1000L, 1.1, | array(struct('asd'), struct('ghj')), | 'A' | ) """.stripMargin) spark.sql(s"CREATE INDEX idx_double ON $tableName (col_double)") // Generate log files through updates on partition 'A' // Allow time for async indexer to run so that error will be thrown spark.sql(s"UPDATE $tableName SET col_double = col_double + 100, precombine_field = precombine_field + 1 WHERE part_col = 'A'") } }{code} Error: {code:java} Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2856) at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2792) at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2791) at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2791) at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1247) at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1247) at scala.Option.foreach(Option.scala:407) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1247) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3060) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2994) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2983) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:989) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2393) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2414) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2433) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2458) at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1049) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) at org.apache.spark.rdd.RDD.withScope(RDD.scala:410) at org.apache.spark.rdd.RDD.collect(RDD.scala:1048) at org.apache.spark.rdd.PairRDDFunctions.$anonfun$countByKey$1(PairRDDFunctions.scala:367) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) at org.apache.spark.rdd.RDD.withScope(RDD.scala:410) at org.apache.spark.rdd.PairRDDFunctions.countByKey(PairRDDFunctions.scala:367) at org.apache.spark.api.java.JavaPairRDD.countByKey(JavaPairRDD.scala:314) at org.apache.hudi.data.HoodieJavaPairRDD.countByKey(HoodieJavaPairRDD.java:109) at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.buildProfile(BaseSparkCommitActionExecutor.java:204) at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.execute(BaseSparkCommitActionExecutor.java:171) at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.execute(BaseSparkCommitActionExecutor.java:151) at org.apache.hudi.table.action.deltacommit.SparkUpsertPreppedDeltaCommitActionExecutor.execute(SparkUpsertPreppedDeltaCommitActionExecutor.java:44) at org.apache.hudi.table.HoodieSparkMergeOnReadTable.upsertPrepped(HoodieSparkMergeOnReadTable.java:126) at org.apache.hudi.table.HoodieSparkMergeOnReadTable.upsertPrepped(HoodieSparkMergeOnReadTable.java:88) at org.apache.hudi.client.SparkRDDWriteClient.upsertPreppedRecords(SparkRDDWriteClient.java:146) at org.apache.hudi.client.SparkRDDWriteClient.upsertPreppedRecords(SparkRDDWriteClient.java:64) at org.apache.hudi.metadata.HoodieBackedTableMetadataWriter.commitInternal(HoodieBackedTableMetadataWriter.java:1412) at org.apache.hudi.metadata.SparkHoodieBackedTableMetadataWriter.commit(SparkHoodieBackedTableMetadataWriter.java:136) at org.apache.hudi.metadata.HoodieBackedTableMetadataWriter.processAndCommit(HoodieBackedTableMetadataWriter.java:1047) at org.apache.hudi.metadata.HoodieBackedTableMetadataWriter.update(HoodieBackedTableMetadataWriter.java:1105) at org.apache.hudi.client.BaseHoodieClient.writeTableMetadata(BaseHoodieClient.java:277) ... 96 more Caused by: org.apache.hudi.exception.HoodieException: unable to read next record from parquet file at org.apache.hudi.common.util.ParquetReaderIterator.hasNext(ParquetReaderIterator.java:54) at org.apache.hudi.common.util.collection.MappingIterator.hasNext(MappingIterator.java:39) at org.apache.hudi.common.table.log.HoodieFileSliceReader.hasNextInternal(HoodieFileSliceReader.java:73) at org.apache.hudi.common.table.log.HoodieFileSliceReader.doHasNext(HoodieFileSliceReader.java:99) at org.apache.hudi.common.util.collection.CachingIterator.hasNext(CachingIterator.java:32) at org.apache.hudi.metadata.SecondaryIndexRecordGenerationUtils.getRecordKeyToSecondaryKey(SecondaryIndexRecordGenerationUtils.java:183) at org.apache.hudi.metadata.SecondaryIndexRecordGenerationUtils.lambda$convertWriteStatsToSecondaryIndexRecords$8386a558$1(SecondaryIndexRecordGenerationUtils.java:138) at org.apache.hudi.data.HoodieJavaRDD.lambda$flatMap$a6598fcb$1(HoodieJavaRDD.java:160) at org.apache.spark.api.java.JavaRDDLike.$anonfun$flatMap$1(JavaRDDLike.scala:125) at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) at org.apache.spark.storage.memory.MemoryStore.putIterator(MemoryStore.scala:223) at org.apache.spark.storage.memory.MemoryStore.putIteratorAsBytes(MemoryStore.scala:352) at org.apache.spark.storage.BlockManager.$anonfun$doPutIterator$1(BlockManager.scala:1614) at org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1524) at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1588) at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:1389) at org.apache.spark.storage.BlockManager.getOrElseUpdateRDDBlock(BlockManager.scala:1343) at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:379) at org.apache.spark.rdd.RDD.iterator(RDD.scala:329) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367) at org.apache.spark.rdd.RDD.iterator(RDD.scala:331) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367) at org.apache.spark.rdd.RDD.iterator(RDD.scala:331) at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:104) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:54) at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166) at org.apache.spark.scheduler.Task.run(Task.scala:141) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620) at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64) at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:750) Caused by: java.lang.ClassCastException: optional binary inner_f3 (STRING) is not a group at org.apache.parquet.schema.Type.asGroupType(Type.java:248) at org.apache.parquet.avro.AvroRecordConverter.newConverter(AvroRecordConverter.java:362) at org.apache.parquet.avro.AvroRecordConverter.newConverter(AvroRecordConverter.java:306) at org.apache.parquet.avro.AvroRecordConverter.access$100(AvroRecordConverter.java:79) at org.apache.parquet.avro.AvroRecordConverter$AvroCollectionConverter$ElementConverter.<init>(AvroRecordConverter.java:617) at org.apache.parquet.avro.AvroRecordConverter$AvroCollectionConverter.<init>(AvroRecordConverter.java:567) at org.apache.parquet.avro.AvroRecordConverter.newConverter(AvroRecordConverter.java:371) at org.apache.parquet.avro.AvroRecordConverter.<init>(AvroRecordConverter.java:144) at org.apache.parquet.avro.AvroRecordConverter.<init>(AvroRecordConverter.java:98) at org.apache.parquet.avro.AvroRecordMaterializer.<init>(AvroRecordMaterializer.java:33) at org.apache.parquet.avro.AvroReadSupport.prepareForRead(AvroReadSupport.java:146) at org.apache.parquet.hadoop.InternalParquetRecordReader.initialize(InternalParquetRecordReader.java:190) at org.apache.parquet.hadoop.ParquetReader.initReader(ParquetReader.java:166) at org.apache.parquet.hadoop.ParquetReader.read(ParquetReader.java:135) at org.apache.hudi.common.util.ParquetReaderIterator.hasNext(ParquetReaderIterator.java:49) ... 39 more{code} This is fixed in Parquet-Avro / parquet.version {*}1.14.0{*}.;;; -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
