voonhous opened a new issue, #17978:
URL: https://github.com/apache/hudi/issues/17978
### Bug Description
**What happened:**
A `ClassCastException` is thrown when trying to perform an UPSERT into a
bootstrapped table.
**What you expected:**
UPSERT should succeed.
**Steps to reproduce:**
```shell
/bin/bash -c spark-submit --class
org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer
/var/hoodie/ws/docker/hoodie/hadoop/hive_base/target/hoodie-utilities.jar
--table-type COPY_ON_WRITE --base-file-format PARQUET --source-class
org.apache.hudi.utilities.sources.JsonDFSSource --source-ordering-field ts
--target-base-path /user/hive/warehouse/stock_ticks_cow_bs --target-table
stock_ticks_cow_bs --props /var/demo/config/dfs-source.properties
--schemaprovider-class org.apache.hudi.utilities.schema.FilebasedSchemaProvider
--enable-hive-sync --hoodie-conf
hoodie.datasource.hive_sync.jdbcurl=jdbc:hive2://hiveserver:10000/
--hoodie-conf
hoodie.datasource.hive_sync.partition_extractor_class=org.apache.hudi.hive.SlashEncodedDayPartitionValueExtractor
--hoodie-conf hoodie.datasource.hive_sync.username=hive --hoodie-conf
hoodie.datasource.hive_sync.password=hive --hoodie-conf
hoodie.datasource.hive_sync.partition_fields=dt --hoodie-conf
hoodie.datasource.hive_sync.database=defa
ult --hoodie-conf hoodie.datasource.hive_sync.table=stock_ticks_cow_bs
```
The above command that executes during
`org.apache.hudi.integ.ITTestHoodieDemo#ingestSecondBatchAndHiveSync` fails
with the error below:
### Environment
**Hudi version:** 1.2.0-SNAPSHOT
**Query engine:** (Spark/Flink/Trino etc) Spark
**Relevant configs:**
### Logs and Stack Trace
```
org.opentest4j.AssertionFailedError: Command failed with exit code 1.
Stderr: Exception in thread "main" org.apache.hudi.exception.HoodieException:
Failed to run HoodieStreamer
at
org.apache.hudi.utilities.streamer.HoodieStreamer.main(HoodieStreamer.java:652)
at
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at
org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
at
org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:1029)
at
org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:194)
at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:217)
at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:91)
at
org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1120)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1129)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: org.apache.spark.SparkException: Job aborted due to stage
failure: Task 0 in stage 28.0 failed 1 times, most recent failure: Lost task
0.0 in stage 28.0 (TID 32) (adhoc-1 executor driver):
org.apache.hudi.exception.HoodieUpsertException: Error upserting bucketType
UPDATE for partition :0
at
org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpsertPartition(BaseSparkCommitActionExecutor.java:365)
at
org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.lambda$mapPartitionsAsRDD$e664f7e$1(BaseSparkCommitActionExecutor.java:298)
at
org.apache.spark.api.java.JavaRDDLike.$anonfun$mapPartitionsWithIndex$1(JavaRDDLike.scala:102)
at
org.apache.spark.api.java.JavaRDDLike.$anonfun$mapPartitionsWithIndex$1$adapted(JavaRDDLike.scala:102)
at
org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$2(RDD.scala:910)
at
org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$2$adapted(RDD.scala:910)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
at org.apache.spark.rdd.RDD.$anonfun$getOrCompute$1(RDD.scala:381)
at
org.apache.spark.storage.BlockManager.$anonfun$getOrElseUpdate$1(BlockManager.scala:1372)
at
org.apache.spark.storage.BlockManager.$anonfun$doPutIterator$1(BlockManager.scala:1614)
at
org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1524)
at
org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1588)
at
org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:1389)
at
org.apache.spark.storage.BlockManager.getOrElseUpdateRDDBlock(BlockManager.scala:1343)
at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:379)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
at
org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:104)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:54)
at
org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
at org.apache.spark.scheduler.Task.run(Task.scala:141)
at
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
at
org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
at
org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
at
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.lang.ClassCastException: class java.lang.Integer cannot be
cast to class org.apache.avro.util.Utf8 (java.lang.Integer is in module
java.base of loader 'bootstrap'; org.apache.avro.util.Utf8 is in unnamed module
of loader 'app')
at org.apache.avro.util.Utf8.compareTo(Utf8.java:36)
at
org.apache.hudi.common.table.read.BufferedRecordMergerFactory.shouldKeepNewerRecord(BufferedRecordMergerFactory.java:492)
at
org.apache.hudi.common.table.read.BufferedRecordMergerFactory.access$100(BufferedRecordMergerFactory.java:41)
at
org.apache.hudi.common.table.read.BufferedRecordMergerFactory$EventTimeRecordMerger.finalMerge(BufferedRecordMergerFactory.java:193)
at
org.apache.hudi.common.table.read.buffer.FileGroupRecordBuffer.hasNextBaseRecord(FileGroupRecordBuffer.java:241)
at
org.apache.hudi.common.table.read.buffer.KeyBasedFileGroupRecordBuffer.hasNextBaseRecord(KeyBasedFileGroupRecordBuffer.java:138)
at
org.apache.hudi.common.table.read.buffer.KeyBasedFileGroupRecordBuffer.doHasNext(KeyBasedFileGroupRecordBuffer.java:147)
at
org.apache.hudi.common.table.read.buffer.FileGroupRecordBuffer.hasNext(FileGroupRecordBuffer.java:152)
at
org.apache.hudi.common.table.read.HoodieFileGroupReader.hasNext(HoodieFileGroupReader.java:247)
at
org.apache.hudi.common.table.read.HoodieFileGroupReader$HoodieFileGroupReaderIterator.hasNext(HoodieFileGroupReader.java:334)
at
org.apache.hudi.common.util.collection.MappingIterator.hasNext(MappingIterator.java:39)
at
org.apache.hudi.io.FileGroupReaderBasedMergeHandle.doMerge(FileGroupReaderBasedMergeHandle.java:270)
at org.apache.hudi.io.IOUtils.runMerge(IOUtils.java:120)
at
org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpdate(BaseSparkCommitActionExecutor.java:392)
at
org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpsertPartition(BaseSparkCommitActionExecutor.java:358)
... 35 more
Driver stacktrace:
at
org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2856)
at
org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2792)
at
org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2791)
at
scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
at
scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
at
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2791)
at
org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1247)
at
org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1247)
at scala.Option.foreach(Option.scala:407)
at
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1247)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3060)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2994)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2983)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
at
org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:989)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2393)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2414)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2433)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2458)
at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1049)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:410)
at org.apache.spark.rdd.RDD.collect(RDD.scala:1048)
at org.apache.spark.api.java.JavaRDDLike.collect(JavaRDDLike.scala:362)
at org.apache.spark.api.java.JavaRDDLike.collect$(JavaRDDLike.scala:361)
at
org.apache.spark.api.java.AbstractJavaRDDLike.collect(JavaRDDLike.scala:45)
at
org.apache.hudi.client.SparkRDDWriteClient$SlimWriteStats.from(SparkRDDWriteClient.java:441)
at
org.apache.hudi.client.SparkRDDWriteClient.commit(SparkRDDWriteClient.java:125)
at
org.apache.hudi.utilities.streamer.StreamSync.writeToSinkAndDoMetaSync(StreamSync.java:869)
at
org.apache.hudi.utilities.streamer.StreamSync.syncOnce(StreamSync.java:521)
at
org.apache.hudi.utilities.streamer.HoodieStreamer$StreamSyncService.ingestOnce(HoodieStreamer.java:917)
at
org.apache.hudi.utilities.ingestion.HoodieIngestionService.startIngestion(HoodieIngestionService.java:72)
at org.apache.hudi.common.util.Option.ifPresent(Option.java:101)
at
org.apache.hudi.utilities.streamer.HoodieStreamer.sync(HoodieStreamer.java:228)
at
org.apache.hudi.utilities.streamer.HoodieStreamer.main(HoodieStreamer.java:649)
... 12 more
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]