boneanxs opened a new issue #5083:
URL: https://github.com/apache/hudi/issues/5083


   **_Tips before filing an issue_**
   
   - Have you gone through our [FAQs](https://hudi.apache.org/learn/faq/)?
   
   - Join the mailing list to engage in conversations and get faster support at 
[email protected].
   
   - If you have triaged this as a bug, then file an 
[issue](https://issues.apache.org/jira/projects/HUDI/issues) directly.
   
   **Describe the problem you faced**
   
   A clear and concise description of the problem.
   
   **To Reproduce**
   
   Steps to reproduce the behavior:
   
   1. **Bulk insert** results from a table to a hudi table(insert will not 
cause this exception)
   2. Doing clustering for this table, could cause the exception:
   3. Spark can read these files without any exception
   
   ```java
   org.apache.hudi.exception.HoodieException: unable to read next record from 
parquet file 
        at 
org.apache.hudi.common.util.ParquetReaderIterator.hasNext(ParquetReaderIterator.java:53)
        at 
org.apache.hudi.client.utils.ConcatenatingIterator.hasNext(ConcatenatingIterator.java:45)
        at 
scala.collection.convert.Wrappers$JIteratorWrapper.hasNext(Wrappers.scala:43)
        at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
        at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
        at scala.collection.Iterator$SliceIterator.hasNext(Iterator.scala:266)
        at scala.collection.Iterator.foreach(Iterator.scala:941)
        at scala.collection.Iterator.foreach$(Iterator.scala:941)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
        at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)
        at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)
        at 
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105)
        at 
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49)
        at scala.collection.TraversableOnce.to(TraversableOnce.scala:315)
        at scala.collection.TraversableOnce.to$(TraversableOnce.scala:313)
        at scala.collection.AbstractIterator.to(Iterator.scala:1429)
        at scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:307)
        at scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:307)
        at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1429)
        at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:294)
        at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:288)
        at scala.collection.AbstractIterator.toArray(Iterator.scala:1429)
        at org.apache.spark.rdd.RDD.$anonfun$take$2(RDD.scala:1449)
        at 
org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2244)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
        at org.apache.spark.scheduler.Task.run(Task.scala:131)
        at 
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
   Caused by: org.apache.avro.SchemaParseException: Can't redefine: list
        at org.apache.avro.Schema$Names.put(Schema.java:1128)
        at org.apache.avro.Schema$NamedSchema.writeNameRef(Schema.java:562)
        at org.apache.avro.Schema$RecordSchema.toJson(Schema.java:690)
        at org.apache.avro.Schema$ArraySchema.toJson(Schema.java:805)
        at org.apache.avro.Schema$UnionSchema.toJson(Schema.java:882)
        at org.apache.avro.Schema$RecordSchema.fieldsToJson(Schema.java:716)
        at org.apache.avro.Schema$RecordSchema.toJson(Schema.java:701)
        at org.apache.avro.Schema$UnionSchema.toJson(Schema.java:882)
        at org.apache.avro.Schema$RecordSchema.fieldsToJson(Schema.java:716)
        at org.apache.avro.Schema$RecordSchema.toJson(Schema.java:701)
        at org.apache.avro.Schema.toString(Schema.java:324)
        at 
org.apache.avro.SchemaCompatibility.checkReaderWriterCompatibility(SchemaCompatibility.java:68)
        at 
org.apache.parquet.avro.AvroRecordConverter.isElementType(AvroRecordConverter.java:866)
        at 
org.apache.parquet.avro.AvroRecordConverter$AvroCollectionConverter.<init>(AvroRecordConverter.java:475)
        at 
org.apache.parquet.avro.AvroRecordConverter.newConverter(AvroRecordConverter.java:289)
        at 
org.apache.parquet.avro.AvroRecordConverter.<init>(AvroRecordConverter.java:141)
        at 
org.apache.parquet.avro.AvroRecordConverter.<init>(AvroRecordConverter.java:95)
        at 
org.apache.parquet.avro.AvroRecordMaterializer.<init>(AvroRecordMaterializer.java:33)
        at 
org.apache.parquet.avro.AvroReadSupport.prepareForRead(AvroReadSupport.java:138)
        at 
org.apache.parquet.hadoop.InternalParquetRecordReader.initialize(InternalParquetRecordReader.java:183)
        at 
org.apache.parquet.hadoop.ParquetReader.initReader(ParquetReader.java:156)
        at org.apache.parquet.hadoop.ParquetReader.read(ParquetReader.java:135)
        at 
org.apache.hudi.common.util.ParquetReaderIterator.hasNext(ParquetReaderIterator.java:48)
        ... 31 more
   ```
   
   I think this might be a bug in parquet-avro, related issue: 
https://github.com/apache/parquet-mr/pull/560(fixed by 1.12). As bulk insert 
uses `HoodieInternalRowParquetWriter` to write internalRows while insert uses 
`HoodieParquetWriter` to write IndexedRecord, so insert will not cause the 
exception.
   
   Also Spark doesn't use parquet-avro to read parquet files, so it also will 
not have any problems.
   
   I simply change the clustering logic to read base files by spark dataframe 
to test, which can avoid this exception, it can work well for COW tables, but 
it can not handle MOR tables as it will also need to read log files. So raise 
it here to seek help from the community.
   
   My change:
   
   ```java
     private JavaRDD<HoodieRecord<T>> 
readRecordsForGroupBaseFiles(JavaSparkContext jsc,
                                                                   
List<ClusteringOperation> clusteringOps) {
       SerializableConfiguration hadoopConf = new 
SerializableConfiguration(getHoodieTable().getHadoopConf());
       HoodieWriteConfig writeConfig = getWriteConfig();
   
       // NOTE: It's crucial to make sure that we don't capture whole "this" 
object into the
       //       closure, as this might lead to issues attempting to serialize 
its nested fields
       String[] dataFilePaths = clusteringOps.stream()
               .map(ClusteringOperation::getDataFilePath)
               .toArray(String[]::new);
       SQLContext sqlContext = new SQLContext(jsc.sc());
   
       Dataset<Row> inputFrame;
   
       final String extension = FSUtils.getFileExtension(dataFilePaths[0]);
       if (PARQUET.getFileExtension().equals(extension)) {
         inputFrame = sqlContext.read().format("parquet").load(dataFilePaths);
       } else if (ORC.getFileExtension().equals(extension)) {
         inputFrame = sqlContext.read().format("orc").load(dataFilePaths);
       } else {
         throw new IllegalArgumentException("Not supported");
       }
   
       Tuple2<String, String> avroRecordNameAndNamespace =
               
AvroConversionUtils.getAvroRecordNameAndNamespace(writeConfig.getTableName());
       return  HoodieSparkUtils.createRdd(inputFrame, 
avroRecordNameAndNamespace._1, avroRecordNameAndNamespace._2,
               scala.Option.apply(HoodieAvroUtils.addMetadataFields(new 
Schema.Parser().parse(writeConfig.getSchema()))))
                .toJavaRDD().map(record -> transform(record, writeConfig));
     }
   ```
   
   **Expected behavior**
   
   A clear and concise description of what you expected to happen.
   
   **Environment Description**
   
   * Hudi version : 0.11-snapshot
   
   * Spark version :
   
   * Hive version :
   
   * Hadoop version :
   
   * Storage (HDFS/S3/GCS..) :
   
   * Running on Docker? (yes/no) :
   
   
   **Additional context**
   
   Add any other context about the problem here.
   
   **Stacktrace**
   
   ```Add the stacktrace of the error.```
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to