JankoWilliam created FLINK-33566:
------------------------------------
Summary: HBase sql-connector needs overwrite the rowKey
Key: FLINK-33566
URL: https://issues.apache.org/jira/browse/FLINK-33566
Project: Flink
Issue Type: Improvement
Components: Connectors / HBase
Affects Versions: 1.18.0
Environment: flink: 1.18.0
hbase: 2.2.3
{{{}flink-connector-hbase-2.2:{}}}{{{}3.0.0-1.18{}}}{{{}{}}}
Reporter: JankoWilliam
Fix For: hbase-3.0.0
When I want to write label values of 50+to a rowkey column family in HBase (all
values are label values of 0/1), for example:
{"id":"1111","q1":"0","q2":"1","q3":"0","q4":"1",...,"q49":"0","q50":"1"}
Here are four label values for example:
{"id":"1111","q1":"0","q2":"1","q3":"0","q4":"1"}
{code:java}
--source:
CREATE TABLE kafka_table_(
`id` STRING,
`q1` STRING,
`q2` STRING,
`q3` STRING,
`q4` STRING
) WITH (
...
'connector'='kafka',
'format'='json',
...
);
--sink:
CREATE TABLE hbase_table_ (
rowkey STRING,
cf ROW<q1 INT,q2 INT,q3 INT,q4 INT>,
PRIMARY KEY (rowkey ) NOT ENFORCED
) WITH (
'connector' = 'my-hbase-2.2',
'table-name' = 'test_table',
'zookeeper.quorum' = '127.0.0.1'
);
--insert:
insert into hbase_table_
select
id AS rowkey ,
ROW( cast(q1 as INT),cast(q2 as INT),cast(q3 as INT),cast(q4 as INT)) as cf
from kafka_table_ ;{code}
hbase:
hbase(main):016:0> scan 'test_table'
ROW COLUMN+CELL
1111 column=cf:q1,
timestamp=0000000000001, value=\x00\x00\x00\x00
1111 column=cf:q2,
timestamp=0000000000001, value=\x00\x00\x00\x01
1111 column=cf:q3,
timestamp=0000000000001, value=\x00\x00\x00\x00
1111 column=cf:q4,
timestamp=0000000000001, value=\x00\x00\x00\x01
Upstream data has a fixed value of 50+k-v data, among which very few value
values are 1 (the default label value is 0). For example, only 1 or 2 values
are 1: q2=1, q4=1, so I want HBase to store the following values:
hbase(main):016:0> scan 'test_table'
ROW COLUMN+CELL
1111 column=cf:q2,
timestamp=0000000000001, value=\x00\x00\x00\x01
1111 column=cf:q4,
timestamp=0000000000001, value=\x00\x00\x00\x01
When I use the "sink. ignore null value" keyword here, It just don't update the
null value, and downstream third parties will still read all the values (such
as 50+), but there are only 2 values that are truly 1:
{code:java}
--sink:
CREATE TABLE hbase_table_ (
rowkey STRING,
cf ROW<q1 INT,q2 INT,q3 INT,q4 INT>,
PRIMARY KEY (rowkey ) NOT ENFORCED
) WITH (
'connector' = 'my-hbase-2.2',
'table-name' = 'test_table',
'sink.ignore-null-value' = 'true',
'zookeeper.quorum' = '127.0.0.1'
);
--insert:
insert into hbase_table_
select
id AS rowkey ,
ROW(
case when q1 <> '0' then cast(q1 as INT) else null end,
case when q2 <> '0' then cast(q2 as INT) else null end,
case when q3 <> '0' then cast(q3 as INT) else null end,
case when q4 <> '0' then cast(q4 as INT) else null end
) as cf
from kafka_table_ ; {code}
hbase(main):016:0> scan 'test_table'
ROW COLUMN+CELL
1111 column=cf:q1,
timestamp=0000000000001, value=\x00\x00\x00\x00
1111 column=cf:q2,
timestamp=0000000000002, value=\x00\x00\x00\x01
1111 column=cf:q3,
timestamp=0000000000001, value=\x00\x00\x00\x00
1111 column=cf:q4,
timestamp=0000000000002, value=\x00\x00\x00\x01
There are no other configurations available, so I hope to have the function of
overwriting and writing rowKey, that is, deleting the rowkey before adding new
data.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)