[
https://issues.apache.org/jira/browse/FLINK-32139?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Martijn Visser closed FLINK-32139.
----------------------------------
Fix Version/s: 1.18.0
Assignee: LiuZeshan
Resolution: Fixed
Fixed in apache/flink:master via:
fe2ef22ff049e88774e57bb9a3ce81a0215ffc52
a7a16484cc678edbd0bb49ab61f111702920ac6c
8d3b74f5588faeb406cad8693398ab914d5ff354
> Data accidentally deleted and not deleted when upsert sink to hbase
> -------------------------------------------------------------------
>
> Key: FLINK-32139
> URL: https://issues.apache.org/jira/browse/FLINK-32139
> Project: Flink
> Issue Type: Bug
> Components: Connectors / HBase
> Reporter: LiuZeshan
> Assignee: LiuZeshan
> Priority: Major
> Labels: pull-request-available
> Fix For: 1.18.0
>
> Attachments: aa.log, image-2023-05-24-23-07-23-978.png,
> image-2023-05-24-23-16-59-508.png
>
> Original Estimate: 24h
> Remaining Estimate: 24h
>
> h4. *Problem background*
> We meet data accidental deletion and non deletion issues when synchronizing
> MySQL cdc data to HBase using HBase connectors.
> h3. Reproduction steps
> 1、The Flink job with 1 parallelism synchronize a MySQL table into HBase.
> SinkUpsertMaterializer is tunned off by setting
> {{{}table.exec.sink.upsert-materialize = 'NONE'{}}}。
> MySQL table schema is as follows。
> {code:java}
> CREATE TABLE `source_sample_1001` (
> `id` int(11) NOT NULL AUTO_INCREMENT,
> `name` varchar(200) DEFAULT NULL,
> `age` int(11) DEFAULT NULL,
> `weight` float DEFAULT NULL,
> PRIMARY KEY (`id`)
> );{code}
> The source table definition in Flink is as follows.
> {code:java}
> CREATE TABLE `source_sample_1001` (
> `id` bigint,
> `name` String,
> `age` bigint,
> `weight` float,
> PRIMARY KEY (`id`) NOT ENFORCED
> ) WITH (
> 'connector' = 'mysql-cdc' ,
> 'hostname' = '${ip}',
> 'port' = '3306',
> 'username' = '${user}',
> 'password' = '${password}',
> 'database-name' = 'testdb_0010',
> 'table-name' = 'source_sample_1001'
> );{code}
> HBase sink table are created in {{testdb_0011}} namespace.
> {code:java}
> CREATE 'testdb_0011:source_sample_1001', 'data'
>
> describe 'testdb_0011:source_sample_1001'
>
> # describe output
> Table testdb_0011:source_sample_1001 is ENABLED
>
>
> testdb_0011:source_sample_1001
>
>
> COLUMN FAMILIES DESCRIPTION
>
> {NAME => 'data', BLOOMFILTER =>
> 'ROW', IN_MEMORY => 'false', VERSIONS => '1', KEEP_DELETED_CELLS => 'FALSE',
> DATA_BLOCK_ENCODING => 'NONE', COMPRESSION => 'NONE', TTL => 'FOREVER',
> MIN_VERSIONS => '0' , BLOCKCACHE => 'true', BLOCKSIZE => '65536',
> REPLICATION_SCOPE => '0'}
> {code}
>
>
>
> The sink table definition in Flink.
> {code:java}
> CREATE TABLE `hbase_sink1` (
> `id` STRING COMMENT 'unique id',
> `data` ROW<
> `name` string,
> `age` string,
> `weight` string
> >,
> primary key(`id`) not enforced
> ) WITH (
> 'connector' = 'hbase-2.2',
> 'table-name' = 'testdb_0011:source_sample_1001',
> 'zookeeper.quorum' = '${hbase.zookeeper.quorum}'
> );{code}
> DML in flink to synchronize data.
> {code:java}
> INSERT INTO `hbase_sink1` SELECT
> `id`, row(`name`, `age`, `weight`)
> FROM (
> SELECT
> REVERSE(CONCAT_WS('', CAST(id AS VARCHAR ))) as id,
> `name`, cast(`age` as varchar) as `age`, cast(`weight` as varchar) as
> `weight`
> FROM `source_sample_1001`
> ) t;{code}
> 2、Another flink job sinks datagen data to the MySQL table
> {{source_sample_1001}} 。id range from 1 to 10_000, that means
> source_sample_1001 will have at most 10_000 records。
> {code:java}
> CREATE TABLE datagen_source (
> `id` int,
> `name` String,
> `age` int,
> `weight` int
> ) WITH (
> 'connector' = 'datagen',
> 'fields.id.kind' = 'random',
> 'fields.id.min' = '1',
> 'fields.id.max' = '10000',
> 'fields.name.length' = '20',
> 'fields.age.min' = '1',
> 'fields.age.max' = '150',
> 'fields.weight.min' = '5',
> 'fields.weight.max' = '300',
> 'rows-per-second' = '5000'
> );
>
> CREATE TABLE `source_sample_1001` (
> `id` bigint,
> `name` String,
> `age` bigint,
> `weight` float,
> PRIMARY KEY (`id`) NOT ENFORCED
> ) WITH (
> 'connector' = 'jdbc',
> 'url' =
> 'jdbc:mysql://${ip}:3306/testdb_0010?rewriteBatchedStatements=true&serverTimezone=Asia/Shanghai',
> 'table-name' = 'source_sample_1001',
> 'username' = '${user}',
> 'password' = '${password}',
> 'sink.buffer-flush.max-rows' = '500',
> 'sink.buffer-flush.interval' = '1s'
> );
>
> -- dml
> INSERT INTO `source_sample_1001` SELECT `id`, `name`, `age`, cast(`weight` as
> float) FROM `datagen_source`;{code}
> 3、A bash script deletes the MySQL table {{source_sample_1001}} with batch 10.
> {code:java}
> #!/bin/bash
>
> mysql1="mysql -h${ip} -u${user} -p${password}"
> batch=10
>
> for ((i=1; ;i++)); do
> echo "iteration $i start"
> for ((j=1; j<=10000; j+=10)); do
> $mysql1 -e "delete from testdb_0010.source_sample_1001 where id >= $j and
> id < $((j+10))"
> done
> echo "iteration $i end"
> sleep 10
> done{code}
> 4、Start the above two flink jobs and the bash script. Wait for several
> minutes, usually 5 minutes is enough. Please note that deleting data bash
> script is necessary for reproduce the problem.
> 5、Stop the bash script, and waiting for MySQL table to fill up with 10_000
> data by the datagen flink job。And then stop datagen flink job. Waiting for
> the sink hbase job to read all the binlog of MySQL table
> {{{}source_sample_1001{}}}.
> 6、Check the hbase table and reproduce the issue of data loss. As shown below,
> 67 records were lost in a test.
> {code:java}
> hbase(main):006:0> count 'testdb_0011:source_sample_1001'
>
> 9933 row(s)
> Took 0.8724 seconds
>
>
> => 9933{code}
> Find out a missing record and check the raw data in HBase.
> {code:java}
> hbase(main):008:0> get 'testdb_0011:source_sample_1001', '24'
> COLUMN CELL
>
>
> 0 row(s)
> Took 0.0029 seconds
>
>
> hbase(main):009:0> scan 'testdb_0011:source_sample_1001', {RAW => true,
> VERSIONS => 1000, STARTROW => '24', STOPROW => '24'}
> ROW COLUMN+CELL
>
>
> 24 column=data:name,
> timestamp=2023-05-20T21:17:44.884, type=Delete
>
> 24 column=data:name,
> timestamp=2023-05-20T21:17:44.884, value=3a8f571c25a9d9040ef3
>
> 24 column=data:name,
> timestamp=2023-05-20T21:17:43.769, type=Delete
>
> 24 column=data:name,
> timestamp=2023-05-20T21:17:43.769, value=5aada98281ee0a961841
>
> 24 column=data:name,
> timestamp=2023-05-20T21:17:42.902, type=Delete
>
> 24 column=data:name,
> timestamp=2023-05-20T21:17:42.902, value=599790a9a641e6121ab3
>
> 24 column=data:name,
> timestamp=2023-05-20T21:17:41.614, type=Delete
>
> 24 column=data:name,
> timestamp=2023-05-20T21:17:41.614, value=4ece6410d32959457f80
>
> 24 column=data:name,
> timestamp=2023-05-20T21:17:40.885, type=Delete
>
> 24 column=data:name,
> timestamp=2023-05-20T21:17:40.885, value=9edcfcf1c958a7e4ae2a
>
> 24 column=data:name,
> timestamp=2023-05-20T21:17:40.841, type=Delete
>
> 24 column=data:name,
> timestamp=2023-05-20T21:17:40.841, value=3d82dcf982d5bcd5b6b7
>
> 24 column=data:name,
> timestamp=2023-05-20T21:17:39.788, type=Delete
>
> 24 column=data:name,
> timestamp=2023-05-20T21:17:39.788, value=2888a338b65caaf15b30
>
> 24 column=data:name,
> timestamp=2023-05-20T21:17:35.799, type=Delete
>
> 24 column=data:name,
> timestamp=2023-05-20T21:17:35.799, value=a8d7549e18ef0c0e8674
>
> 24 column=data:name,
> timestamp=2023-05-20T21:17:35.688, type=Delete
>
> 24 column=data:name,
> timestamp=2023-05-20T21:17:35.688, value=ada7237e52d030dcef7a
>
> 24 column=data:name,
> timestamp=2023-05-20T21:17:35.650, type=Delete
>
> 24 column=data:name,
> timestamp=2023-05-20T21:17:35.650, value=482feed26918dcdc911e
>
> 24 column=data:name,
> timestamp=2023-05-20T21:17:34.885, type=Delete
>
> 24 column=data:name,
> timestamp=2023-05-20T21:17:34.885, value=36d6bdd585dbb65dedb7
>
> 24 column=data:name,
> timestamp=2023-05-20T21:17:33.905, type=Delete
>
> 24 column=data:name,
> timestamp=2023-05-20T21:17:33.905, value=6e15c4462f8435040700
>
> 24 column=data:name,
> timestamp=2023-05-20T21:17:33.803, type=Delete
>
> 24 column=data:name,
> timestamp=2023-05-20T21:17:33.803, value=d122df5afd4eac32da72
>
> 24 column=data:name,
> timestamp=2023-05-20T21:17:33.693, type=Delete
>
> 24 column=data:name,
> timestamp=2023-05-20T21:17:33.693, value=ed603d47fedb3852b520
>
> 24 column=data:name,
> timestamp=2023-05-20T21:17:31.784, type=Delete
>
> 24 column=data:name,
> timestamp=2023-05-20T21:17:31.784, value=1ebdd5fe6310850b8098
>
> 24 column=data:name,
> timestamp=2023-05-20T21:17:30.684, type=Delete
>
> 24 column=data:name,
> timestamp=2023-05-20T21:17:30.684, value=cc628ba45d1ad07fce2f
>
> 24 column=data:name,
> timestamp=2023-05-20T21:17:29.812, type=Delete
>
> 24 column=data:name,
> timestamp=2023-05-20T21:17:29.812, value=c1d4df6e987bdb3cd0a3
>
> 24 column=data:name,
> timestamp=2023-05-20T21:17:29.590, type=Delete
>
> 24 column=data:name,
> timestamp=2023-05-20T21:17:29.590, value=535557700ca01c6b6b1e
>
> 24 column=data:name,
> timestamp=2023-05-20T21:17:28.876, type=Delete
>
> 24 column=data:name,
> timestamp=2023-05-20T21:17:28.876, value=a63c2ebfefc82eab4bcf
>
> 24 column=data:name,
> timestamp=2023-05-20T21:17:28.565, type=Delete
>
> 24 column=data:name,
> timestamp=2023-05-20T21:17:28.565, value=dd2b24ff0dfa672c49ba
>
> 24 column=data:name,
> timestamp=2023-05-20T21:17:27.879, type=Delete
>
> 24 column=data:name,
> timestamp=2023-05-20T21:17:27.879, value=69dbe1287c2bc54781ab
>
> 24 column=data:name,
> timestamp=2023-05-20T21:17:27.699, type=Delete
>
> 24 column=data:name,
> timestamp=2023-05-20T21:17:27.699, value=775d06dcbf1148e665ee
>
> 24 column=data:name,
> timestamp=2023-05-20T21:17:24.209, type=Delete
>
> 24 column=data:name,
> timestamp=2023-05-20T21:17:24.209, value=e23c010ab06125c88870
>
> 24 column=data:name,
> timestamp=2023-05-20T21:17:22.480, type=Delete
>
> 24 column=data:name,
> timestamp=2023-05-20T21:17:20.716, type=Delete
>
> 24 column=data:name,
> timestamp=2023-05-20T21:17:18.678, type=Delete
>
> 24 column=data:name,
> timestamp=2023-05-20T21:17:17.720, type=Delete
>
> 24 column=data:name,
> timestamp=2023-05-20T21:17:16.858, type=Delete
>
> 24 column=data:name,
> timestamp=2023-05-20T21:17:16.682, type=Delete
>
> 24 column=data:name,
> timestamp=2023-05-20T21:17:15.753, type=Delete
>
> 24 column=data:name,
> timestamp=2023-05-20T21:17:14.571, type=Delete
>
> 24 column=data:name,
> timestamp=2023-05-20T21:17:11.572, type=Delete
>
> 24 column=data:name,
> timestamp=2023-05-20T21:17:09.681, type=Delete
>
> 24 column=data:name,
> timestamp=2023-05-20T21:17:08.792, type=Delete
>
> 24 column=data:name,
> timestamp=2023-05-20T21:17:05.888, type=Delete
>
> 24 column=data:name,
> timestamp=2023-05-20T21:17:05.754, type=Delete
>
> 24 column=data:name,
> timestamp=2023-05-20T21:17:03.626, type=Delete
>
> 24 column=data:name,
> timestamp=2023-05-20T21:17:02.652, type=Delete
>
> 24 column=data:name,
> timestamp=2023-05-20T21:17:01.790, type=Delete
>
> 24 column=data:name,
> timestamp=2023-05-20T21:17:00.986, type=Delete
>
> 24 column=data:name,
> timestamp=2023-05-20T21:16:59.797, type=Delete
>
> 24 column=data:name,
> timestamp=2023-05-20T21:16:58.982, type=Delete
>
> 24 column=data:name,
> timestamp=2023-05-20T21:16:58.781, type=Delete
>
> 24 column=data:name,
> timestamp=2023-05-20T21:16:58.626, type=Delete
>
> 24 column=data:name,
> timestamp=2023-05-20T21:16:58.149, type=Delete
>
> 24 column=data:name,
> timestamp=2023-05-20T21:16:56.610, type=Delete
>
> 24 column=data:name,
> timestamp=2023-05-20T21:16:51.655, type=Delete
>
> 24 column=data:name,
> timestamp=2023-05-20T21:16:51.458, type=Delete
>
> 24 column=data:name,
> timestamp=2023-05-20T21:16:44.860, type=Delete
>
> 1 row(s)
> Took 0.1466 seconds
> {code}
> 7、Start the bash script to delete all data of the MySQL table. Waiting for
> the sink hbase job to read all the binlog of MySQL table
> {{{}source_sample_1001{}}}.
> 6、Check the hbase table and reproduce the issue of data no deletion. As shown
> below, 6 records were not deleted in the test.
> {code:java}
> hbase(main):012:0> count 'testdb_0011:source_sample_1001'
> 6 row(s)
> Took 0.5121 seconds
>
>
> => 6{code}
> Check the raw data of a record in HBase.
> {code:java}
> hbase(main):013:0> get 'testdb_0011:source_sample_1001', '3668'
> COLUMN CELL
>
>
> data:name
> timestamp=2023-05-20T21:17:26.714, value=ebb15f905622340d0351
>
> 1 row(s)
> Took 0.0037 seconds
>
>
> hbase(main):014:0> scan 'testdb_0011:source_sample_1001', {RAW => true,
> VERSIONS => 1000, STARTROW => '3668', STOPROW => '3668'}
> ROW COLUMN+CELL
>
>
> 3668 column=data:name,
> timestamp=2023-05-20T21:17:45.728, type=Delete
>
> 3668 column=data:name,
> timestamp=2023-05-20T21:17:45.728, value=c675a12c7cbed27599c3
>
> 3668 column=data:name,
> timestamp=2023-05-20T21:17:44.693, type=Delete
>
> 3668 column=data:name,
> timestamp=2023-05-20T21:17:44.693, value=413921aa1ac44f545954
>
> 3668 column=data:name,
> timestamp=2023-05-20T21:17:43.854, type=Delete
>
> 3668 column=data:name,
> timestamp=2023-05-20T21:17:43.854, value=7d44b0efc0923e4035b7
>
> 3668 column=data:name,
> timestamp=2023-05-20T21:17:41.721, type=Delete
>
> 3668 column=data:name,
> timestamp=2023-05-20T21:17:41.721, value=60bfaef81bf8efdf781a
>
> 3668 column=data:name,
> timestamp=2023-05-20T21:17:40.763, type=Delete
>
> 3668 column=data:name,
> timestamp=2023-05-20T21:17:40.763, value=2c371f9cd3909dd3b3f8
>
> 3668 column=data:name,
> timestamp=2023-05-20T21:17:37.872, type=Delete
>
> 3668 column=data:name,
> timestamp=2023-05-20T21:17:37.872, value=9e32087cb39065976e50
>
> 3668 column=data:name,
> timestamp=2023-05-20T21:17:32.573, type=Delete
>
> 3668 column=data:name,
> timestamp=2023-05-20T21:17:32.573, value=708364bf84dad4a04170
>
> 3668 column=data:name,
> timestamp=2023-05-20T21:17:26.811, type=Delete
>
> 3668 column=data:name,
> timestamp=2023-05-20T21:17:26.811, value=c0e8e11eed3f8410dea9
>
> 3668 column=data:name,
> timestamp=2023-05-20T21:17:26.714, value=ebb15f905622340d0351
>
> 3668 column=data:name,
> timestamp=2023-05-20T21:17:24.310, type=Delete
>
> 3668 column=data:name,
> timestamp=2023-05-20T21:17:24.310, value=21681a161ed2ccbe884e
>
> 3668 column=data:name,
> timestamp=2023-05-20T21:17:23.508, type=Delete
>
> 3668 column=data:name,
> timestamp=2023-05-20T21:17:23.508, value=a1ef547a9efd57a7a0e2
>
> 3668 column=data:name,
> timestamp=2023-05-20T21:17:22.788, type=Delete
>
> 3668 column=data:name,
> timestamp=2023-05-20T21:17:22.788, value=34e688060e6c40f4f83b
>
> 3668 column=data:name,
> timestamp=2023-05-20T21:17:21.746, type=Delete
>
> 3668 column=data:name,
> timestamp=2023-05-20T21:17:17.761, type=Delete
>
> 3668 column=data:name,
> timestamp=2023-05-20T21:17:12.610, type=Delete
>
> 3668 column=data:name,
> timestamp=2023-05-20T21:17:11.909, type=Delete
>
> 3668 column=data:name,
> timestamp=2023-05-20T21:17:07.846, type=Delete
>
> 3668 column=data:name,
> timestamp=2023-05-20T21:17:06.901, type=Delete
>
> 3668 column=data:name,
> timestamp=2023-05-20T21:17:06.758, type=Delete
>
> 3668 column=data:name,
> timestamp=2023-05-20T21:17:06.569, type=Delete
>
> 3668 column=data:name,
> timestamp=2023-05-20T21:17:02.689, type=Delete
>
> 3668 column=data:name,
> timestamp=2023-05-20T21:17:00.344, type=Delete
>
> 3668 column=data:name,
> timestamp=2023-05-20T21:16:59.961, type=Delete
>
> 3668 column=data:name,
> timestamp=2023-05-20T21:16:59.415, type=Delete
>
> 3668 column=data:name,
> timestamp=2023-05-20T21:16:58.916, type=Delete
>
> 3668 column=data:name,
> timestamp=2023-05-20T21:16:58.781, type=Delete
>
> 3668 column=data:name,
> timestamp=2023-05-20T21:16:58.718, type=Delete
>
> 3668 column=data:name,
> timestamp=2023-05-20T21:16:58.339, type=Delete
>
> 3668 column=data:name,
> timestamp=2023-05-20T21:16:56.340, type=Delete
>
> 3668 column=data:name,
> timestamp=2023-05-20T21:16:55.883, type=Delete
>
> 3668 column=data:name,
> timestamp=2023-05-20T21:16:55.683, type=Delete
>
> 3668 column=data:name,
> timestamp=2023-05-20T21:16:55.056, type=Delete
>
> 3668 column=data:name,
> timestamp=2023-05-20T21:16:46.845, type=Delete
>
> 1 row(s)
> Took 0.0457 seconds
> {code}
> h4. *Reason for the problem*
> The [HBase
> connector|https://github.com/apache/flink/blob/06688f345f6793a8964ec00002175f44cda13c33/flink-connectors/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/util/HBaseSerde.java#L177]
> use the [Delete key
> type|https://github.com/apache/hbase/blob/c05ee564d3026688bcfdc456071059c7c8409694/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Delete.java#L380]
> [without
> timestamp|https://github.com/apache/flink/blob/06688f345f6793a8964ec00002175f44cda13c33/flink-connectors/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/util/HBaseSerde.java#L168]
> to {{{}delete the latest version of the specified column. This is an
> expensive call in that on the server-side, it first does a get to find the
> latest versions timestamp. Then it adds a delete using the fetched cells
> timestamp{}}}. Causing the following issues:
> Problem 1: When writing update data, the timestamp of -U and +U added by the
> hbase server to the update message may be the same, and -U deleted the latest
> version of +U data, resulting in accidental deletion of the data. The problem
> was also reported by https://issues.apache.org/jira/browse/FLINK-28910
> Problem 2: When there are multiple versions of HBase data, deleting the data
> will exposes earlier versions of the data, and resulting in the issue of data
> no deletion.
> h4. *Solution proposal*
> Use the [DeleteColumn key
> type|https://github.com/apache/hbase/blob/c05ee564d3026688bcfdc456071059c7c8409694/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Delete.java#L322]
> and set strongly increasing timestamp for
> [put|https://github.com/lzshlzsh/flink/blob/a2341810a244b97a3af32951e17efbc49f570cdd/flink-connectors/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/util/HBaseSerde.java#L138]
> and
> [delete|https://github.com/lzshlzsh/flink/blob/a2341810a244b97a3af32951e17efbc49f570cdd/flink-connectors/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/util/HBaseSerde.java#L170]
> mutation. The delete mutation will delete all versions of the specified
> column with a timestamp less than or equal to the specified.
> I have test the proposed solution for several days, and neither the data
> accidental deletion nor no deletion issues happen.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)