hoanpvzz opened a new issue, #7152:
URL: https://github.com/apache/hudi/issues/7152

   **Version Info**
   hudi-utilities-bundle_2.12-0.12.1
   Spark 3.1.3
   Kafka 3.2
   Debezium: 1.9.6 or 2.0
   MySQL: 5.7
   Java: 11.0.16
   
   **Describe the problem you faced**
   
   I have an existing table in MySQL need migrate to hudi:
   
   **_Step 1:_**  I migrate data from exist table to MySQL use deltastreamer 
JDBC
   
   **_hoodie-init.properties_**
   
   ```
   include=hoodie-base.properties
   
   hoodie.deltastreamer.jdbc.url=jdbc:mysql://<ip>:3306/demo
   hoodie.deltastreamer.jdbc.user=<user>
   hoodie.deltastreamer.jdbc.password=<pass>
   hoodie.deltastreamer.jdbc.driver.class=com.mysql.cj.jdbc.Driver
   hoodie.deltastreamer.jdbc.table.name=user_info
   hoodie.deltastreamer.jdbc.table.incr.column.name=id
   hoodie.deltastreamer.jdbc.incr.pull=true
   
   hoodie.datasource.write.recordkey.field=id
   hoodie.datasource.write.partitionpath.field=age
   ```
   
   
   ```
   spark-submit \
     --packages ${SPARK_PACKAGES} \
     --jars 
libs/gcs-connector-hadoop3-latest.jar,libs/mysql-connector-java-8.0.30.jar \
     --properties-file ${SPARK_PROPERTIES} \
     --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer \
     ./jars/hudi-utilities-bundle_2.12-0.12.1.jar \
     --props hoodie-init.properties \
     --table-type COPY_ON_WRITE 
     --op UPSERT \
     --target-base-path ${HUDI_BASE_PATH} \
     --target-table ${HUDI_TABLE} \
     --source-class org.apache.hudi.utilities.sources.JdbcSource \
     --source-ordering-field id
   ```
   Data migrate completely without error.
   
   
   **_Step 2_**: I sync new data from mysql debezium kafka to hudi
   **_hoodie-debezium.properties_**
   ```
   # Built for demo of Apache Hudi 0.9.0 (EMR 6.5.0) with Apache Hive and 
SchemaRegistryProvider
   
   include=hoodie-base.properties
   
   # Kafka
   bootstrap.servers=kafka:9092
   auto.offset.reset=earliest
   schema.registry.url=http://schema-registry:8081
   
   
hoodie.deltastreamer.schemaprovider.registry.url=http://schema-registry:8081/subjects/demo-user-info.demo.user_info-value/versions/latest
   hoodie.deltastreamer.source.kafka.topic=demo-user-info.demo.user_info
   
   hoodie.datasource.write.recordkey.field=id
   hoodie.datasource.write.partitionpath.field=age
   ```
   
   ```
   spark-submit \
     --packages ${SPARK_PACKAGES} \
     --jars libs/gcs-connector-hadoop3-latest.jar \
     --properties-file ${SPARK_PROPERTIES} \
     --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer \
     ./jars/hudi-utilities-bundle_2.12-0.12.1.jar \
     --continuous \
     --min-sync-interval-seconds 10 \
     --table-type COPY_ON_WRITE \
     --op UPSERT \
     --target-base-path ${HUDI_BASE_PATH} \
     --target-table ${HUDI_TABLE} \
     --source-class 
org.apache.hudi.utilities.sources.debezium.MysqlDebeziumSource \
     --source-ordering-field _event_origin_ts_ms \
     --payload-class 
org.apache.hudi.common.model.debezium.MySqlDebeziumAvroPayload \
     --props hoodie-debezium.properties
   ```
   
   **Result**
   - With new data, it insert successfully to hudi
   - With event update, it can't update exist data and error log:
   
   ```
   Caused by: org.apache.hudi.exception.HoodieUpsertException: Failed to 
combine/merge new record with old value in storage, for new record 
{HoodieRecord{key=HoodieKey { recordKey=2 partitionPath=female}, 
currentLocation='HoodieRecordLocation {instantTime=20221106165556313, 
fileId=1cbbe4eb-fad2-4b3b-8748-c3712fa4bee1-0}', 
newLocation='HoodieRecordLocation {instantTime=20221106180405002, 
fileId=1cbbe4eb-fad2-4b3b-8748-c3712fa4bee1-0}'}}, old value 
{{"_hoodie_commit_time": "20221106165556313", "_hoodie_commit_seqno": 
"20221106165556313_0_0", "_hoodie_record_key": "2", "_hoodie_partition_path": 
"female", "_hoodie_file_name": 
"1cbbe4eb-fad2-4b3b-8748-c3712fa4bee1-0_0-31-27_20221106165556313.parquet", 
"_change_operation_type": null, "_upstream_event_processed_ts_ms": null, 
"db_shard_source_partition": null, "_event_origin_ts_ms": null, 
"_event_bin_file": null, "_event_pos": null, "_event_row": null, "id": 2, 
"first_name": "John", "last_name": "Ash", "age": 20, "sex": "female", 
"_event_seq
 ": null}}
           at 
org.apache.hudi.io.HoodieMergeHandle.write(HoodieMergeHandle.java:362)
           at 
org.apache.hudi.table.action.commit.BaseMergeHelper$UpdateHandler.consumeOneRecord(BaseMergeHelper.java:122)
           at 
org.apache.hudi.table.action.commit.BaseMergeHelper$UpdateHandler.consumeOneRecord(BaseMergeHelper.java:112)
           at 
org.apache.hudi.common.util.queue.BoundedInMemoryQueueConsumer.consume(BoundedInMemoryQueueConsumer.java:37)
           at 
org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.lambda$null$2(BoundedInMemoryExecutor.java:135)
           at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
           ... 3 more
   Caused by: java.lang.NullPointerException
           at 
org.apache.hudi.common.model.debezium.MySqlDebeziumAvroPayload.extractSeq(MySqlDebeziumAvroPayload.java:56)
           at 
org.apache.hudi.common.model.debezium.MySqlDebeziumAvroPayload.shouldPickCurrentRecord(MySqlDebeziumAvroPayload.java:61)
           at 
org.apache.hudi.common.model.debezium.AbstractDebeziumAvroPayload.combineAndGetUpdateValue(AbstractDebeziumAvroPayload.java:66)
           at 
org.apache.hudi.common.model.HoodieRecordPayload.combineAndGetUpdateValue(HoodieRecordPayload.java:83)
           at 
org.apache.hudi.io.HoodieMergeHandle.write(HoodieMergeHandle.java:343)
           ... 8 more
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to