zyclove opened a new issue, #8906:
URL: https://github.com/apache/hudi/issues/8906
**Describe the problem you faced**
**To Reproduce**
hudi 0.9 upgrade hudi 0.12.3
data cannot be upsert into old table.
code:
`jsonDataSet.write()
.format("org.apache.hudi")
.option(HoodieTableConfig.TYPE.key(),
HoodieTableType.MERGE_ON_READ.name())
.option(DataSourceWriteOptions.OPERATION().key(),
WriteOperationType.UPSERT.value())
.option(DataSourceWriteOptions.TABLE_TYPE().key(),
HoodieTableType.MERGE_ON_READ.name())
.option(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key(),
config.getIdName())
.option(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key(),
Constants.DT)
.option(KeyGeneratorOptions.HIVE_STYLE_PARTITIONING_ENABLE.key(), true)
.option(HoodieWriteConfig.PRECOMBINE_FIELD_NAME.key(),
Constants.UPDATE_TIME)
.option(HoodieWriteConfig.COMBINE_BEFORE_UPSERT.key(), false)
.option(HoodieWriteConfig.UPSERT_PARALLELISM_VALUE.key(),
200)
.option(HoodieWriteConfig.FINALIZE_WRITE_PARALLELISM_VALUE.key(), 200)
.option(HoodieWriteConfig.WRITE_PAYLOAD_CLASS_NAME.key(),
DefaultHoodieRecordPayload.class.getName())
.option(HoodieWriteConfig.AVRO_EXTERNAL_SCHEMA_TRANSFORMATION_ENABLE.key(),
false)
.option(HoodieWriteConfig.MARKERS_TYPE.key(),
MarkerType.DIRECT.toString())
.option(HoodieWriteConfig.COMBINE_BEFORE_INSERT.key(), true)
.option(HoodiePayloadConfig.PAYLOAD_CLASS_NAME.key(),
DefaultHoodieRecordPayload.class.getName())
.option(HoodieCleanConfig.CLEANER_COMMITS_RETAINED.key(),
dataKeepTime)
.option(HoodieCleanConfig.AUTO_CLEAN.key(), true)
.option(HoodieCleanConfig.CLEANER_INCREMENTAL_MODE_ENABLE.key(), true)
.option(HoodieArchivalConfig.MIN_COMMITS_TO_KEEP.key(),
dataKeepTime + 1)
.option(HoodieArchivalConfig.MAX_COMMITS_TO_KEEP.key(),
dataKeepTime * 3)
.option(HoodieCompactionConfig.TARGET_IO_PER_COMPACTION_IN_MB.key(), 500 * 1024)
.option(HoodieCleanConfig.CLEANER_POLICY.key(),
HoodieCleaningPolicy.KEEP_LATEST_BY_HOURS.name())
.option(HoodieCleanConfig.CLEANER_HOURS_RETAINED.key(), 72)
.option(HoodieCompactionConfig.PARQUET_SMALL_FILE_LIMIT.key(), 128 * 1024 *
1024)
.option(HoodieStorageConfig.PARQUET_MAX_FILE_SIZE.key(), 256
* 1024 * 1024)
.option(HoodieCompactionConfig.INLINE_COMPACT.key(), true)
.option(HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS.key(), 0)
.option(HoodieMemoryConfig.SPILLABLE_MAP_BASE_PATH.key(),
tempPath)
.option(HoodieMetadataConfig.ENABLE.key(), true)
.option(HoodieMetadataConfig.MIN_COMMITS_TO_KEEP.key(),
dataKeepTime + 1)
.option(HoodieMetadataConfig.MAX_COMMITS_TO_KEEP.key(),
dataKeepTime + 2)
.option(HoodieMetadataConfig.CLEANER_COMMITS_RETAINED.key(),
dataKeepTime)
.option(HoodieMemoryConfig.MAX_MEMORY_FOR_MERGE.key(),
maxUseMemory)
.option(HoodieMemoryConfig.MAX_MEMORY_FOR_COMPACTION.key(),
maxUseMemory)
.option(HoodiePayloadProps.PAYLOAD_ORDERING_FIELD_PROP_KEY,
Constants.UPDATE_TIME)
.option(HoodiePayloadProps.PAYLOAD_EVENT_TIME_FIELD_PROP_KEY,
Constants.UPDATE_TIME)
.option(HoodieTableConfig.NAME.key(), config.getName())
.option(HoodieIndexConfig.INDEX_TYPE.key(), BUCKET.name())
.option(HoodieIndexConfig.BUCKET_INDEX_NUM_BUCKETS.key(), 20)
.option(HoodieLayoutConfig.LAYOUT_TYPE.key(), BUCKET.name())
.option(HoodieLayoutConfig.LAYOUT_PARTITIONER_CLASS_NAME.key(),
SparkBucketIndexPartitioner.class.getName())
.option(HoodieWriteConfig.WRITE_CONCURRENCY_MODE.key(),
WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL.name())
.option(HoodieCleanConfig.FAILED_WRITES_CLEANER_POLICY.key(),
HoodieFailedWritesCleaningPolicy.LAZY.name())
.option("hoodie.write.lock.provider",
"org.apache.hudi.client.transaction.lock.FileSystemBasedLockProvider")
.mode(SaveMode.Append)
.save(config.getSinkPath());`
it seems do not use
org.apache.hudi.table.action.commit.SparkBucketIndexPartitioner.
Steps to reproduce the behavior:
1. upgrade hudi to 0.12.3
2. upsert data
**Expected behavior**
A clear and concise description of what you expected to happen.
**Environment Description**
* Hudi version :0.12.3
* Spark version :3.2.1
* Hive version :2.3.9
* Hadoop version :3.2.1
* Storage (HDFS/S3/GCS..) :s3
* Running on Docker? (yes/no) :no
**Additional context**
Add any other context about the problem here.
**Stacktrace**
```Add the stacktrace of the error.```
23/06/06 10:26:33 ERROR HuDiMergeFunction: upsert error,
config:{"apiTimeOut":5000,"bloomIndexByRange":false,"checkPointPath":"s3a://ind-tuya-big-data/hudi/checkpoint/ods_auto_test_task_log","chinaTime":false,"dataKafkaServer":"172.27.236.31:9093,172.27.249.115:9093,172.27.156.124:9093","delta":false,"dev":false,"driverClass":"com.tuya.common.Driver","hourPt":false,"idName":"id","initTable":false,"jobId":258,"kafkaServer":"172.27.229.60:9093,172.27.157.141:9093,172.27.245.83:9093","level":"hight","logLevel":"warn","lossFailed":false,"maxAge":"3d","maxFileNum":200,"mqData":false,"name":"ods_auto_test_task_log","parallel":false,"partitions":8,"pollTimeOut":2000,"region":"IND","sinkPath":"s3a://ind-tuya-big-data/hudi/tables/ods_auto_test_task_log","startingOffsets":"latest","tableAlias":"tuya_testbook_ng.auto_test_task_log","topic":"yugongx-mysql-binlog-testbook_ng","triggerNum":40000000,"triggerOnce":false,"triggerTime":20,"type":"binlog","yearDt":false},
error:{}
org.apache.hudi.exception.HoodieUpsertException: Failed to upsert for commit
time 20230606102610614
at
org.apache.hudi.table.action.commit.BaseWriteHelper.write(BaseWriteHelper.java:70)
at
org.apache.hudi.table.action.deltacommit.SparkUpsertDeltaCommitActionExecutor.execute(SparkUpsertDeltaCommitActionExecutor.java:46)
at
org.apache.hudi.table.HoodieSparkMergeOnReadTable.upsert(HoodieSparkMergeOnReadTable.java:94)
at
org.apache.hudi.table.HoodieSparkMergeOnReadTable.upsert(HoodieSparkMergeOnReadTable.java:81)
at
org.apache.hudi.client.SparkRDDWriteClient.upsert(SparkRDDWriteClient.java:159)
at
org.apache.hudi.DataSourceUtils.doWriteOperation(DataSourceUtils.java:206)
at
org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:343)
at
org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:149)
at
org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45)
at
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:75)
at
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:73)
at
org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:84)
at
org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:115)
at
org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
at
org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:232)
at
org.apache.spark.sql.execution.SQLExecution$.executeQuery$1(SQLExecution.scala:110)
at
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:135)
at
org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
at
org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:232)
at
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:135)
at
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:253)
at
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:134)
at
org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
at
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:68)
at
org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:112)
at
org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:108)
at
org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:519)
at
org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:83)
at
org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:519)
at
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
at
org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
at
org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
at
org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
at
org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:286)
at
org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:209)
Caused by: java.lang.NumberFormatException: For input string: "d880d4ea"
at
java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)
at java.lang.Integer.parseInt(Integer.java:580)
at java.lang.Integer.parseInt(Integer.java:615)
at
org.apache.hudi.index.bucket.BucketIdentifier.bucketIdFromFileId(BucketIdentifier.java:84)
at
org.apache.hudi.index.bucket.HoodieSimpleBucketIndex.lambda$loadPartitionBucketIdFileIdMapping$0(HoodieSimpleBucketIndex.java:59)
at java.util.ArrayList.forEach(ArrayList.java:1259)
at
org.apache.hudi.index.bucket.HoodieSimpleBucketIndex.loadPartitionBucketIdFileIdMapping(HoodieSimpleBucketIndex.java:55)
at
org.apache.hudi.index.bucket.HoodieSimpleBucketIndex.access$000(HoodieSimpleBucketIndex.java:39)
at
org.apache.hudi.index.bucket.HoodieSimpleBucketIndex$SimpleBucketIndexLocationMapper.lambda$new$1(HoodieSimpleBucketIndex.java:89)
at java.util.stream.Collectors.lambda$toMap$58(Collectors.java:1321)
at
java.util.stream.ReduceOps$3ReducingSink.accept(ReduceOps.java:169)
at java.util.Iterator.forEachRemaining(Iterator.java:116)
at
scala.collection.convert.Wrappers$IteratorWrapper.forEachRemaining(Wrappers.scala:31)
at
java.util.Spliterators$IteratorSpliterator.forEachRemaining(Spliterators.java:1801)
at
java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
at
java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
at
java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
at
java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
at
java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:566)
at
org.apache.hudi.index.bucket.HoodieSimpleBucketIndex$SimpleBucketIndexLocationMapper.<init>(HoodieSimpleBucketIndex.java:89)
at
org.apache.hudi.index.bucket.HoodieSimpleBucketIndex.getLocationMapper(HoodieSimpleBucketIndex.java:78)
at
org.apache.hudi.index.bucket.HoodieBucketIndex.tagLocation(HoodieBucketIndex.java:75)
at
org.apache.hudi.table.action.commit.HoodieWriteHelper.tag(HoodieWriteHelper.java:51)
at
org.apache.hudi.table.action.commit.HoodieWriteHelper.tag(HoodieWriteHelper.java:34)
at
org.apache.hudi.table.action.commit.BaseWriteHelper.write(BaseWriteHelper.java:59)
... 78 more
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]