rcc1101 commented on issue #11785:
URL: https://github.com/apache/hudi/issues/11785#issuecomment-2294808687

   **got new exception now**
   
   **I recreated the new dump in hudi again**, now with the above option added 
as follows
   
   creating the dump
   ```
   spark-shell --driver-memory 1g --executor-memory 2500m --executor-cores 1 
--driver-cores 1 --conf 
'spark.serializer=org.apache.spark.serializer.KryoSerializer' --conf 
'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension' 
--conf 
'spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog'
 --conf 'spark.kryo.registrator=org.apache.spark.HoodieSparkKryoRegistrar' 
--conf "spark.sql.caseSensitive=true"  --conf 
spark.sql.legacy.parquet.int96RebaseModeInRead=CORRECTED --conf 
spark.sql.legacy.parquet.int96RebaseModeInWrite=CORRECTED --conf 
spark.sql.legacy.parquet.datetimeRebaseModeInRead=CORRECTED --conf 
spark.sql.legacy.parquet.datetimeRebaseModeInWrite=CORRECTED        --conf 
datanucleus.schema.autoCreateTables=true --conf 
"parquet.avro.write-old-list-structure=false" --conf 
spark.hadoop.parquet.avro.write-old-list-structure=true --packages 
org.apache.hudi:hudi-spark3.5-bundle_2.12:0.15.0  --name ravic 
   ```
   scala code to create dump
   ```
   import org.apache.spark.sql.SaveMode
   val sess = spark
   val snapshotDf = 
sess.read.parquet("s3://bucket/snapshots/prefix/_bid_9223370313087830657/")
   snapshotDf.registerTempTable("snapshot")
   snapshotDf.write.format("hudi")
         .option(DataSourceWriteOptions.OPERATION.key(), 
WriteOperationType.BULK_INSERT.name())
         .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "cdc_pk")
         .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "_id.oid")
         .option(HoodieWriteConfig.TABLE_NAME,"table_hudi_3")
         .option("parquet.avro.write-old-list-structure","false")
         .option("spark.hadoop.parquet.avro.write-old-list-structure","true")
         .mode(SaveMode.Overwrite)
         .save("s3://bucket/snapshots-hudi/prefix/snapshot-3/");
   
   :quit
   
   ```
   -- Good so far --
   
   **Now i am trying to update the hudi snapshot.The update fails.** THe update 
code.
   ```
   spark.conf.set("parquet.avro.write-old-list-structure",false)
   spark.conf.set("spark.hadoop.parquet.avro.write-old-list-structure",true)
   val sess= spark
   val snapshotDf = 
sess.read.parquet("s3://bucket/snapshots/prefix/_bid_9223370313087830657/")
   var cdcDf = sess.read .schema(snapshotDf.schema) .format("json") 
.load("s3://bucket/inputs/prefix/*")
   cdcDf.createOrReplaceTempView("cdc")
   cdcDf = sess.sql("select * from cdc where _id is not null and _id.oid is not 
null ")
   cdcDf.write.format("hudi")
         .options(getQuickstartWriteConfigs)
         
.option("hoodie.datasource.hive_sync.partition_extractor_class","org.apache.hudi.hive.NonPartitionedExtractor")
         .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "cdc_pk")
         .option("hoodie.datasource.write.recordkey.field", "_id.oid")
         .option("parquet.avro.write-old-list-structure","false")
         .option("spark.hadoop.parquet.avro.write-old-list-structure","true")
         .option(HoodieWriteConfig.TABLE_NAME, "table_hudi_3")
         .mode(SaveMode.Append)
         .save("s3://bucket/snapshots-hudi/prefix/snapshot-3/");
   :quit
   ```
   
   new exception when updating the merge. `_id.oid` is a grouping attribute and 
it comes from a dump from mongodb. 
   ```
    Error upserting bucketType UPDATE for partition :2
        at 
org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpsertPartition(BaseSparkCommitActionExecutor.java:344)
        at 
org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.lambda$mapPartitionsAsRDD$a3ab3c4$1(BaseSparkCommitActionExecutor.java:254)
        at 
org.apache.spark.api.java.JavaRDDLike.$anonfun$mapPartitionsWithIndex$1(JavaRDDLike.scala:102)
        at 
org.apache.spark.api.java.JavaRDDLike.$anonfun$mapPartitionsWithIndex$1$adapted(JavaRDDLike.scala:102)
        at 
org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$2(RDD.scala:911)
        at 
org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$2$adapted(RDD.scala:911)
        at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:61)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:368)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:332)
        at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:61)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:368)
        at org.apache.spark.rdd.RDD.$anonfun$getOrCompute$1(RDD.scala:382)
        at 
org.apache.spark.storage.BlockManager.$anonfun$getOrElseUpdate$1(BlockManager.scala:1388)
        at 
org.apache.spark.storage.BlockManager.$anonfun$doPutIterator$1(BlockManager.scala:1630)
        at 
org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1540)
        at 
org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1604)
        at 
org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:1405)
        at 
org.apache.spark.storage.BlockManager.getOrElseUpdateRDDBlock(BlockManager.scala:1359)
        at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:380)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:330)
        at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:61)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:368)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:332)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
        at 
org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:174)
        at org.apache.spark.scheduler.Task.run(Task.scala:152)
        at 
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:632)
        at 
org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
        at 
org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:96)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:635)
        at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
        at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
        at java.base/java.lang.Thread.run(Thread.java:840)
   Caused by: org.apache.hudi.exception.HoodieException: 
org.apache.hudi.exception.HoodieException: 
org.apache.hudi.exception.HoodieException: unable to read next record from 
parquet file
        at 
org.apache.hudi.table.action.commit.HoodieMergeHelper.runMerge(HoodieMergeHelper.java:151)
        at 
org.apache.hudi.table.HoodieSparkTable.runMerge(HoodieSparkTable.java:148)
        at 
org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpdateInternal(BaseSparkCommitActionExecutor.java:376)
        at 
org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpdate(BaseSparkCommitActionExecutor.java:371)
        at 
org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpsertPartition(BaseSparkCommitActionExecutor.java:337)
        ... 33 more
   Caused by: org.apache.hudi.exception.HoodieException: 
org.apache.hudi.exception.HoodieException: unable to read next record from 
parquet file
        at 
org.apache.hudi.common.util.queue.SimpleExecutor.execute(SimpleExecutor.java:75)
        at 
org.apache.hudi.table.action.commit.HoodieMergeHelper.runMerge(HoodieMergeHelper.java:149)
        ... 37 more
   Caused by: org.apache.hudi.exception.HoodieException: unable to read next 
record from parquet file
        at 
org.apache.hudi.common.util.ParquetReaderIterator.hasNext(ParquetReaderIterator.java:54)
        at 
org.apache.hudi.common.util.collection.MappingIterator.hasNext(MappingIterator.java:39)
        at 
org.apache.hudi.common.util.queue.SimpleExecutor.execute(SimpleExecutor.java:67)
        ... 38 more
   Caused by: java.lang.ClassCastException: optional binary value (STRING) is 
not a group
        at org.apache.parquet.schema.Type.asGroupType(Type.java:248)
        at 
org.apache.parquet.avro.AvroRecordConverter.newConverter(AvroRecordConverter.java:362)
        at 
org.apache.parquet.avro.AvroRecordConverter.newConverter(AvroRecordConverter.java:306)
        at 
org.apache.parquet.avro.AvroRecordConverter.access$100(AvroRecordConverter.java:79)
        at 
org.apache.parquet.avro.AvroRecordConverter$AvroCollectionConverter$ElementConverter.<init>(AvroRecordConverter.java:617)
        at 
org.apache.parquet.avro.AvroRecordConverter$AvroCollectionConverter.<init>(AvroRecordConverter.java:567)
        at 
org.apache.parquet.avro.AvroRecordConverter.newConverter(AvroRecordConverter.java:371)
        at 
org.apache.parquet.avro.AvroRecordConverter.<init>(AvroRecordConverter.java:144)
        at 
org.apache.parquet.avro.AvroRecordConverter.<init>(AvroRecordConverter.java:98)
        at 
org.apache.parquet.avro.AvroRecordMaterializer.<init>(AvroRecordMaterializer.java:33)
        at 
org.apache.parquet.avro.AvroReadSupport.prepareForRead(AvroReadSupport.java:146)
        at 
org.apache.parquet.hadoop.InternalParquetRecordReader.initialize(InternalParquetRecordReader.java:190)
        at 
org.apache.parquet.hadoop.ParquetReader.initReader(ParquetReader.java:166)
        at org.apache.parquet.hadoop.ParquetReader.read(ParquetReader.java:135)
        at 
org.apache.hudi.common.util.ParquetReaderIterator.hasNext(ParquetReaderIterator.java:49)
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to