[
https://issues.apache.org/jira/browse/FLINK-24533?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17438129#comment-17438129
]
Jark Wu commented on FLINK-24533:
---------------------------------
Hi [~licp], from the log in you "Specify_some_key.png", it seems the changelog
generated by Flink is correct, however, the last Put is ignored by HBase. Maybe
this is related with HBASE-8626 that Delete and Put has the same timestamp, so
which one takes affect is not deterministic. You can see that we generate Put
and Delete using default Long.MAX as the timestamp value [1]. So a fix would be
simple according to this idea [2] that we need to pass in a strictly increasing
timestamp value to construct Put and Delete.
[1]: org.apache.flink.connector.hbase.util.HBaseSerde#createPutMutation
[2]:
https://issues.apache.org/jira/browse/HBASE-8626?focusedCommentId=13669455&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-13669455
> Flink SQL Upsert To Hbase Appear data loss
> ---------------------------------------------
>
> Key: FLINK-24533
> URL: https://issues.apache.org/jira/browse/FLINK-24533
> Project: Flink
> Issue Type: Improvement
> Components: Connectors / HBase
> Affects Versions: 1.12.0
> Environment: Flink 1.12.0 on Yarn
> HBase1.4
> Reporter: licp
> Priority: Major
> Attachments: Check_Result.png, Specify_some_key.png
>
>
> Data flow direction is described below:
> Source:Mysql
> Sink:HBase
> Parallelism:1
> -------------------Code Example One Using Left Join -------------------
> -- {color:#FF0000}Mysql Source ,Total Records:4829{color}
> create table user(
> user_id string,
> user_name string,
> primary key(user_id) not enforced
> )with(
> 'connector' = 'mysql-cdc',
> 'hostname' = 'localhost',
> 'port' = '3308',
> 'username' = 'user_name',
> 'password' = '******',
> 'database-name' = 'database_name',
> 'table-name' = 'table_name',
> 'debezium.event.processing.failure.handling.mode' = 'warn',
> 'debezium.snapshot.locking.mode' = 'none'
> );
> create table user_profile(
> user_id string,
> age int,
> primary key(user_id) not enforced
> )with(
> 'connector' = 'mysql-cdc',
> 'hostname' = 'localhost',
> 'port' = '3308',
> 'username' = 'user_name',
> 'password' = '******',
> 'database-name' = 'database_name',
> 'table-name' = 'table_name',
> 'debezium.event.processing.failure.handling.mode' = 'warn',
> 'debezium.snapshot.locking.mode' = 'none'
> );
> {color:#FF0000}-- HBase sink ;Total Record:4826{color}
> create table real_dwd_user_info_to_hbase(
> rowkey string,
> f ROW(user_name string,age int)
> )with(
> 'connector' = 'hbase-1.4',
> 'table-name' = 'table_name',
> 'zookeeper.quorum' = 'zk',
> 'zookeeper.znode.parent' = '/hbase'
> );
> insert into real_dwd_user_info_to_hbase
> select
> u.user_id,
> row(u.user_name,up.age) as
> from user u
> left join user_profile up
> on u.user_id = up.user_id
> where u.user_id<1000000
> ;
> ----------Code Example Two Using Left Join And Specify some key---------
> insert into real_dwd_user_info_to_hbase
> select
> u.user_id,
> row(u.user_name,up.age) as
> from user u
> left join user_profile up
> on u.user_id = up.user_id
> where u.user_id=0
> -------------------------------------------------------------------------------
> I printed the same code logic results, I can find that the specified key has
> three results of " + i -d + i", but in HBase is still not find the key
--
This message was sent by Atlassian Jira
(v8.3.4#803005)