boneanxs opened a new issue #5083: URL: https://github.com/apache/hudi/issues/5083
**_Tips before filing an issue_** - Have you gone through our [FAQs](https://hudi.apache.org/learn/faq/)? - Join the mailing list to engage in conversations and get faster support at [email protected]. - If you have triaged this as a bug, then file an [issue](https://issues.apache.org/jira/projects/HUDI/issues) directly. **Describe the problem you faced** A clear and concise description of the problem. **To Reproduce** Steps to reproduce the behavior: 1. **Bulk insert** results from a table to a hudi table(insert will not cause this exception) 2. Doing clustering for this table, could cause the exception: 3. Spark can read these files without any exception ```java org.apache.hudi.exception.HoodieException: unable to read next record from parquet file at org.apache.hudi.common.util.ParquetReaderIterator.hasNext(ParquetReaderIterator.java:53) at org.apache.hudi.client.utils.ConcatenatingIterator.hasNext(ConcatenatingIterator.java:45) at scala.collection.convert.Wrappers$JIteratorWrapper.hasNext(Wrappers.scala:43) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458) at scala.collection.Iterator$SliceIterator.hasNext(Iterator.scala:266) at scala.collection.Iterator.foreach(Iterator.scala:941) at scala.collection.Iterator.foreach$(Iterator.scala:941) at scala.collection.AbstractIterator.foreach(Iterator.scala:1429) at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62) at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49) at scala.collection.TraversableOnce.to(TraversableOnce.scala:315) at scala.collection.TraversableOnce.to$(TraversableOnce.scala:313) at scala.collection.AbstractIterator.to(Iterator.scala:1429) at scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:307) at scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:307) at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1429) at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:294) at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:288) at scala.collection.AbstractIterator.toArray(Iterator.scala:1429) at org.apache.spark.rdd.RDD.$anonfun$take$2(RDD.scala:1449) at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2244) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at org.apache.spark.scheduler.Task.run(Task.scala:131) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.avro.SchemaParseException: Can't redefine: list at org.apache.avro.Schema$Names.put(Schema.java:1128) at org.apache.avro.Schema$NamedSchema.writeNameRef(Schema.java:562) at org.apache.avro.Schema$RecordSchema.toJson(Schema.java:690) at org.apache.avro.Schema$ArraySchema.toJson(Schema.java:805) at org.apache.avro.Schema$UnionSchema.toJson(Schema.java:882) at org.apache.avro.Schema$RecordSchema.fieldsToJson(Schema.java:716) at org.apache.avro.Schema$RecordSchema.toJson(Schema.java:701) at org.apache.avro.Schema$UnionSchema.toJson(Schema.java:882) at org.apache.avro.Schema$RecordSchema.fieldsToJson(Schema.java:716) at org.apache.avro.Schema$RecordSchema.toJson(Schema.java:701) at org.apache.avro.Schema.toString(Schema.java:324) at org.apache.avro.SchemaCompatibility.checkReaderWriterCompatibility(SchemaCompatibility.java:68) at org.apache.parquet.avro.AvroRecordConverter.isElementType(AvroRecordConverter.java:866) at org.apache.parquet.avro.AvroRecordConverter$AvroCollectionConverter.<init>(AvroRecordConverter.java:475) at org.apache.parquet.avro.AvroRecordConverter.newConverter(AvroRecordConverter.java:289) at org.apache.parquet.avro.AvroRecordConverter.<init>(AvroRecordConverter.java:141) at org.apache.parquet.avro.AvroRecordConverter.<init>(AvroRecordConverter.java:95) at org.apache.parquet.avro.AvroRecordMaterializer.<init>(AvroRecordMaterializer.java:33) at org.apache.parquet.avro.AvroReadSupport.prepareForRead(AvroReadSupport.java:138) at org.apache.parquet.hadoop.InternalParquetRecordReader.initialize(InternalParquetRecordReader.java:183) at org.apache.parquet.hadoop.ParquetReader.initReader(ParquetReader.java:156) at org.apache.parquet.hadoop.ParquetReader.read(ParquetReader.java:135) at org.apache.hudi.common.util.ParquetReaderIterator.hasNext(ParquetReaderIterator.java:48) ... 31 more ``` I think this might be a bug in parquet-avro, related issue: https://github.com/apache/parquet-mr/pull/560(fixed by 1.12). As bulk insert uses `HoodieInternalRowParquetWriter` to write internalRows while insert uses `HoodieParquetWriter` to write IndexedRecord, so insert will not cause the exception. Also Spark doesn't use parquet-avro to read parquet files, so it also will not have any problems. I simply change the clustering logic to read base files by spark dataframe to test, which can avoid this exception, it can work well for COW tables, but it can not handle MOR tables as it will also need to read log files. So raise it here to seek help from the community. My change: ```java private JavaRDD<HoodieRecord<T>> readRecordsForGroupBaseFiles(JavaSparkContext jsc, List<ClusteringOperation> clusteringOps) { SerializableConfiguration hadoopConf = new SerializableConfiguration(getHoodieTable().getHadoopConf()); HoodieWriteConfig writeConfig = getWriteConfig(); // NOTE: It's crucial to make sure that we don't capture whole "this" object into the // closure, as this might lead to issues attempting to serialize its nested fields String[] dataFilePaths = clusteringOps.stream() .map(ClusteringOperation::getDataFilePath) .toArray(String[]::new); SQLContext sqlContext = new SQLContext(jsc.sc()); Dataset<Row> inputFrame; final String extension = FSUtils.getFileExtension(dataFilePaths[0]); if (PARQUET.getFileExtension().equals(extension)) { inputFrame = sqlContext.read().format("parquet").load(dataFilePaths); } else if (ORC.getFileExtension().equals(extension)) { inputFrame = sqlContext.read().format("orc").load(dataFilePaths); } else { throw new IllegalArgumentException("Not supported"); } Tuple2<String, String> avroRecordNameAndNamespace = AvroConversionUtils.getAvroRecordNameAndNamespace(writeConfig.getTableName()); return HoodieSparkUtils.createRdd(inputFrame, avroRecordNameAndNamespace._1, avroRecordNameAndNamespace._2, scala.Option.apply(HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(writeConfig.getSchema())))) .toJavaRDD().map(record -> transform(record, writeConfig)); } ``` **Expected behavior** A clear and concise description of what you expected to happen. **Environment Description** * Hudi version : 0.11-snapshot * Spark version : * Hive version : * Hadoop version : * Storage (HDFS/S3/GCS..) : * Running on Docker? (yes/no) : **Additional context** Add any other context about the problem here. **Stacktrace** ```Add the stacktrace of the error.``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
