rcc1101 commented on issue #11785:
URL: https://github.com/apache/hudi/issues/11785#issuecomment-2294808687
**got new exception now**
**I recreated the new dump in hudi again**, now with the above option added
as follows
creating the dump
```
spark-shell --driver-memory 1g --executor-memory 2500m --executor-cores 1
--driver-cores 1 --conf
'spark.serializer=org.apache.spark.serializer.KryoSerializer' --conf
'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension'
--conf
'spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog'
--conf 'spark.kryo.registrator=org.apache.spark.HoodieSparkKryoRegistrar'
--conf "spark.sql.caseSensitive=true" --conf
spark.sql.legacy.parquet.int96RebaseModeInRead=CORRECTED --conf
spark.sql.legacy.parquet.int96RebaseModeInWrite=CORRECTED --conf
spark.sql.legacy.parquet.datetimeRebaseModeInRead=CORRECTED --conf
spark.sql.legacy.parquet.datetimeRebaseModeInWrite=CORRECTED --conf
datanucleus.schema.autoCreateTables=true --conf
"parquet.avro.write-old-list-structure=false" --conf
spark.hadoop.parquet.avro.write-old-list-structure=true --packages
org.apache.hudi:hudi-spark3.5-bundle_2.12:0.15.0 --name ravic
```
scala code to create dump
```
import org.apache.spark.sql.SaveMode
val sess = spark
val snapshotDf =
sess.read.parquet("s3://bucket/snapshots/prefix/_bid_9223370313087830657/")
snapshotDf.registerTempTable("snapshot")
snapshotDf.write.format("hudi")
.option(DataSourceWriteOptions.OPERATION.key(),
WriteOperationType.BULK_INSERT.name())
.option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "cdc_pk")
.option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "_id.oid")
.option(HoodieWriteConfig.TABLE_NAME,"table_hudi_3")
.option("parquet.avro.write-old-list-structure","false")
.option("spark.hadoop.parquet.avro.write-old-list-structure","true")
.mode(SaveMode.Overwrite)
.save("s3://bucket/snapshots-hudi/prefix/snapshot-3/");
:quit
```
-- Good so far --
**Now i am trying to update the hudi snapshot.The update fails.** THe update
code.
```
spark.conf.set("parquet.avro.write-old-list-structure",false)
spark.conf.set("spark.hadoop.parquet.avro.write-old-list-structure",true)
val sess= spark
val snapshotDf =
sess.read.parquet("s3://bucket/snapshots/prefix/_bid_9223370313087830657/")
var cdcDf = sess.read .schema(snapshotDf.schema) .format("json")
.load("s3://bucket/inputs/prefix/*")
cdcDf.createOrReplaceTempView("cdc")
cdcDf = sess.sql("select * from cdc where _id is not null and _id.oid is not
null ")
cdcDf.write.format("hudi")
.options(getQuickstartWriteConfigs)
.option("hoodie.datasource.hive_sync.partition_extractor_class","org.apache.hudi.hive.NonPartitionedExtractor")
.option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "cdc_pk")
.option("hoodie.datasource.write.recordkey.field", "_id.oid")
.option("parquet.avro.write-old-list-structure","false")
.option("spark.hadoop.parquet.avro.write-old-list-structure","true")
.option(HoodieWriteConfig.TABLE_NAME, "table_hudi_3")
.mode(SaveMode.Append)
.save("s3://bucket/snapshots-hudi/prefix/snapshot-3/");
:quit
```
new exception when updating the merge. `_id.oid` is a grouping attribute and
it comes from a dump from mongodb.
```
Error upserting bucketType UPDATE for partition :2
at
org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpsertPartition(BaseSparkCommitActionExecutor.java:344)
at
org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.lambda$mapPartitionsAsRDD$a3ab3c4$1(BaseSparkCommitActionExecutor.java:254)
at
org.apache.spark.api.java.JavaRDDLike.$anonfun$mapPartitionsWithIndex$1(JavaRDDLike.scala:102)
at
org.apache.spark.api.java.JavaRDDLike.$anonfun$mapPartitionsWithIndex$1$adapted(JavaRDDLike.scala:102)
at
org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$2(RDD.scala:911)
at
org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$2$adapted(RDD.scala:911)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:61)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:368)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:332)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:61)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:368)
at org.apache.spark.rdd.RDD.$anonfun$getOrCompute$1(RDD.scala:382)
at
org.apache.spark.storage.BlockManager.$anonfun$getOrElseUpdate$1(BlockManager.scala:1388)
at
org.apache.spark.storage.BlockManager.$anonfun$doPutIterator$1(BlockManager.scala:1630)
at
org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1540)
at
org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1604)
at
org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:1405)
at
org.apache.spark.storage.BlockManager.getOrElseUpdateRDDBlock(BlockManager.scala:1359)
at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:380)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:330)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:61)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:368)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:332)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
at
org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:174)
at org.apache.spark.scheduler.Task.run(Task.scala:152)
at
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:632)
at
org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
at
org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:96)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:635)
at
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
at
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at java.base/java.lang.Thread.run(Thread.java:840)
Caused by: org.apache.hudi.exception.HoodieException:
org.apache.hudi.exception.HoodieException:
org.apache.hudi.exception.HoodieException: unable to read next record from
parquet file
at
org.apache.hudi.table.action.commit.HoodieMergeHelper.runMerge(HoodieMergeHelper.java:151)
at
org.apache.hudi.table.HoodieSparkTable.runMerge(HoodieSparkTable.java:148)
at
org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpdateInternal(BaseSparkCommitActionExecutor.java:376)
at
org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpdate(BaseSparkCommitActionExecutor.java:371)
at
org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpsertPartition(BaseSparkCommitActionExecutor.java:337)
... 33 more
Caused by: org.apache.hudi.exception.HoodieException:
org.apache.hudi.exception.HoodieException: unable to read next record from
parquet file
at
org.apache.hudi.common.util.queue.SimpleExecutor.execute(SimpleExecutor.java:75)
at
org.apache.hudi.table.action.commit.HoodieMergeHelper.runMerge(HoodieMergeHelper.java:149)
... 37 more
Caused by: org.apache.hudi.exception.HoodieException: unable to read next
record from parquet file
at
org.apache.hudi.common.util.ParquetReaderIterator.hasNext(ParquetReaderIterator.java:54)
at
org.apache.hudi.common.util.collection.MappingIterator.hasNext(MappingIterator.java:39)
at
org.apache.hudi.common.util.queue.SimpleExecutor.execute(SimpleExecutor.java:67)
... 38 more
Caused by: java.lang.ClassCastException: optional binary value (STRING) is
not a group
at org.apache.parquet.schema.Type.asGroupType(Type.java:248)
at
org.apache.parquet.avro.AvroRecordConverter.newConverter(AvroRecordConverter.java:362)
at
org.apache.parquet.avro.AvroRecordConverter.newConverter(AvroRecordConverter.java:306)
at
org.apache.parquet.avro.AvroRecordConverter.access$100(AvroRecordConverter.java:79)
at
org.apache.parquet.avro.AvroRecordConverter$AvroCollectionConverter$ElementConverter.<init>(AvroRecordConverter.java:617)
at
org.apache.parquet.avro.AvroRecordConverter$AvroCollectionConverter.<init>(AvroRecordConverter.java:567)
at
org.apache.parquet.avro.AvroRecordConverter.newConverter(AvroRecordConverter.java:371)
at
org.apache.parquet.avro.AvroRecordConverter.<init>(AvroRecordConverter.java:144)
at
org.apache.parquet.avro.AvroRecordConverter.<init>(AvroRecordConverter.java:98)
at
org.apache.parquet.avro.AvroRecordMaterializer.<init>(AvroRecordMaterializer.java:33)
at
org.apache.parquet.avro.AvroReadSupport.prepareForRead(AvroReadSupport.java:146)
at
org.apache.parquet.hadoop.InternalParquetRecordReader.initialize(InternalParquetRecordReader.java:190)
at
org.apache.parquet.hadoop.ParquetReader.initReader(ParquetReader.java:166)
at org.apache.parquet.hadoop.ParquetReader.read(ParquetReader.java:135)
at
org.apache.hudi.common.util.ParquetReaderIterator.hasNext(ParquetReaderIterator.java:49)
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]