hoanpvzz opened a new issue, #7152:
URL: https://github.com/apache/hudi/issues/7152
**Version Info**
hudi-utilities-bundle_2.12-0.12.1
Spark 3.1.3
Kafka 3.2
Debezium: 1.9.6 or 2.0
MySQL: 5.7
Java: 11.0.16
**Describe the problem you faced**
I have an existing table in MySQL need migrate to hudi:
**_Step 1:_** I migrate data from exist table to MySQL use deltastreamer
JDBC
**_hoodie-init.properties_**
```
include=hoodie-base.properties
hoodie.deltastreamer.jdbc.url=jdbc:mysql://<ip>:3306/demo
hoodie.deltastreamer.jdbc.user=<user>
hoodie.deltastreamer.jdbc.password=<pass>
hoodie.deltastreamer.jdbc.driver.class=com.mysql.cj.jdbc.Driver
hoodie.deltastreamer.jdbc.table.name=user_info
hoodie.deltastreamer.jdbc.table.incr.column.name=id
hoodie.deltastreamer.jdbc.incr.pull=true
hoodie.datasource.write.recordkey.field=id
hoodie.datasource.write.partitionpath.field=age
```
```
spark-submit \
--packages ${SPARK_PACKAGES} \
--jars
libs/gcs-connector-hadoop3-latest.jar,libs/mysql-connector-java-8.0.30.jar \
--properties-file ${SPARK_PROPERTIES} \
--class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer \
./jars/hudi-utilities-bundle_2.12-0.12.1.jar \
--props hoodie-init.properties \
--table-type COPY_ON_WRITE
--op UPSERT \
--target-base-path ${HUDI_BASE_PATH} \
--target-table ${HUDI_TABLE} \
--source-class org.apache.hudi.utilities.sources.JdbcSource \
--source-ordering-field id
```
Data migrate completely without error.
**_Step 2_**: I sync new data from mysql debezium kafka to hudi
**_hoodie-debezium.properties_**
```
# Built for demo of Apache Hudi 0.9.0 (EMR 6.5.0) with Apache Hive and
SchemaRegistryProvider
include=hoodie-base.properties
# Kafka
bootstrap.servers=kafka:9092
auto.offset.reset=earliest
schema.registry.url=http://schema-registry:8081
hoodie.deltastreamer.schemaprovider.registry.url=http://schema-registry:8081/subjects/demo-user-info.demo.user_info-value/versions/latest
hoodie.deltastreamer.source.kafka.topic=demo-user-info.demo.user_info
hoodie.datasource.write.recordkey.field=id
hoodie.datasource.write.partitionpath.field=age
```
```
spark-submit \
--packages ${SPARK_PACKAGES} \
--jars libs/gcs-connector-hadoop3-latest.jar \
--properties-file ${SPARK_PROPERTIES} \
--class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer \
./jars/hudi-utilities-bundle_2.12-0.12.1.jar \
--continuous \
--min-sync-interval-seconds 10 \
--table-type COPY_ON_WRITE \
--op UPSERT \
--target-base-path ${HUDI_BASE_PATH} \
--target-table ${HUDI_TABLE} \
--source-class
org.apache.hudi.utilities.sources.debezium.MysqlDebeziumSource \
--source-ordering-field _event_origin_ts_ms \
--payload-class
org.apache.hudi.common.model.debezium.MySqlDebeziumAvroPayload \
--props hoodie-debezium.properties
```
**Result**
- With new data, it insert successfully to hudi
- With event update, it can't update exist data and error log:
```
Caused by: org.apache.hudi.exception.HoodieUpsertException: Failed to
combine/merge new record with old value in storage, for new record
{HoodieRecord{key=HoodieKey { recordKey=2 partitionPath=female},
currentLocation='HoodieRecordLocation {instantTime=20221106165556313,
fileId=1cbbe4eb-fad2-4b3b-8748-c3712fa4bee1-0}',
newLocation='HoodieRecordLocation {instantTime=20221106180405002,
fileId=1cbbe4eb-fad2-4b3b-8748-c3712fa4bee1-0}'}}, old value
{{"_hoodie_commit_time": "20221106165556313", "_hoodie_commit_seqno":
"20221106165556313_0_0", "_hoodie_record_key": "2", "_hoodie_partition_path":
"female", "_hoodie_file_name":
"1cbbe4eb-fad2-4b3b-8748-c3712fa4bee1-0_0-31-27_20221106165556313.parquet",
"_change_operation_type": null, "_upstream_event_processed_ts_ms": null,
"db_shard_source_partition": null, "_event_origin_ts_ms": null,
"_event_bin_file": null, "_event_pos": null, "_event_row": null, "id": 2,
"first_name": "John", "last_name": "Ash", "age": 20, "sex": "female",
"_event_seq
": null}}
at
org.apache.hudi.io.HoodieMergeHandle.write(HoodieMergeHandle.java:362)
at
org.apache.hudi.table.action.commit.BaseMergeHelper$UpdateHandler.consumeOneRecord(BaseMergeHelper.java:122)
at
org.apache.hudi.table.action.commit.BaseMergeHelper$UpdateHandler.consumeOneRecord(BaseMergeHelper.java:112)
at
org.apache.hudi.common.util.queue.BoundedInMemoryQueueConsumer.consume(BoundedInMemoryQueueConsumer.java:37)
at
org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.lambda$null$2(BoundedInMemoryExecutor.java:135)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
... 3 more
Caused by: java.lang.NullPointerException
at
org.apache.hudi.common.model.debezium.MySqlDebeziumAvroPayload.extractSeq(MySqlDebeziumAvroPayload.java:56)
at
org.apache.hudi.common.model.debezium.MySqlDebeziumAvroPayload.shouldPickCurrentRecord(MySqlDebeziumAvroPayload.java:61)
at
org.apache.hudi.common.model.debezium.AbstractDebeziumAvroPayload.combineAndGetUpdateValue(AbstractDebeziumAvroPayload.java:66)
at
org.apache.hudi.common.model.HoodieRecordPayload.combineAndGetUpdateValue(HoodieRecordPayload.java:83)
at
org.apache.hudi.io.HoodieMergeHandle.write(HoodieMergeHandle.java:343)
... 8 more
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]