[
https://issues.apache.org/jira/browse/FLINK-33566?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
JankoWilliam updated FLINK-33566:
---------------------------------
Environment:
flink: 1.18.0
hbase: 2.2.3
flink-connector-hbase-2.2:3.0.0-1.18
was:
flink: 1.18.0
hbase: 2.2.3
{{{}flink-connector-hbase-2.2:{}}}{{{}3.0.0-1.18{}}}{{{}{}}}
> HBase sql-connector needs overwrite the rowKey
> ----------------------------------------------
>
> Key: FLINK-33566
> URL: https://issues.apache.org/jira/browse/FLINK-33566
> Project: Flink
> Issue Type: Improvement
> Components: Connectors / HBase
> Affects Versions: 1.18.0
> Environment: flink: 1.18.0
> hbase: 2.2.3
> flink-connector-hbase-2.2:3.0.0-1.18
> Reporter: JankoWilliam
> Priority: Major
> Labels: HBase-2.0, flink-connector-hbase
> Fix For: hbase-3.0.0
>
>
> When I want to write label values of 50+to a rowkey column family in HBase
> (all values are label values of 0/1), for example:
> {"id":"1111","q1":"0","q2":"1","q3":"0","q4":"1",...,"q49":"0","q50":"1"}
> Here are four label values for example:
> {"id":"1111","q1":"0","q2":"1","q3":"0","q4":"1"}
> {code:java}
> --source:
> CREATE TABLE kafka_table_(
> `id` STRING,
> `q1` STRING,
> `q2` STRING,
> `q3` STRING,
> `q4` STRING
> ) WITH (
> ...
> 'connector'='kafka',
> 'format'='json',
> ...
> );
> --sink:
> CREATE TABLE hbase_table_ (
> rowkey STRING,
> cf ROW<q1 INT,q2 INT,q3 INT,q4 INT>,
> PRIMARY KEY (rowkey ) NOT ENFORCED
> ) WITH (
> 'connector' = 'my-hbase-2.2',
> 'table-name' = 'test_table',
> 'zookeeper.quorum' = '127.0.0.1'
> );
> --insert:
> insert into hbase_table_
> select
> id AS rowkey ,
> ROW( cast(q1 as INT),cast(q2 as INT),cast(q3 as INT),cast(q4 as INT)) as cf
> from kafka_table_ ;{code}
> hbase:
> hbase(main):016:0> scan 'test_table'
> ROW COLUMN+CELL
>
>
> 1111 column=cf:q1,
> timestamp=0000000000001, value=\x00\x00\x00\x00
>
>
> 1111 column=cf:q2,
> timestamp=0000000000001, value=\x00\x00\x00\x01
>
>
> 1111 column=cf:q3,
> timestamp=0000000000001, value=\x00\x00\x00\x00
>
>
> 1111 column=cf:q4,
> timestamp=0000000000001, value=\x00\x00\x00\x01
>
> Upstream data has a fixed value of 50+k-v data, among which very few value
> values are 1 (the default label value is 0). For example, only 1 or 2 values
> are 1: q2=1, q4=1, so I want HBase to store the following values:
> hbase(main):016:0> scan 'test_table'
> ROW COLUMN+CELL
>
>
> 1111 column=cf:q2,
> timestamp=0000000000001, value=\x00\x00\x00\x01
>
> 1111 column=cf:q4,
> timestamp=0000000000001, value=\x00\x00\x00\x01
>
> When I use the "sink. ignore null value" keyword here, It just don't update
> the null value, and downstream third parties will still read all the values
> (such as 50+), but there are only 2 values that are truly 1:
> {code:java}
> --sink:
> CREATE TABLE hbase_table_ (
> rowkey STRING,
> cf ROW<q1 INT,q2 INT,q3 INT,q4 INT>,
> PRIMARY KEY (rowkey ) NOT ENFORCED
> ) WITH (
> 'connector' = 'my-hbase-2.2',
> 'table-name' = 'test_table',
> 'sink.ignore-null-value' = 'true',
> 'zookeeper.quorum' = '127.0.0.1'
> );
> --insert:
> insert into hbase_table_
> select
> id AS rowkey ,
> ROW(
> case when q1 <> '0' then cast(q1 as INT) else null end,
> case when q2 <> '0' then cast(q2 as INT) else null end,
> case when q3 <> '0' then cast(q3 as INT) else null end,
> case when q4 <> '0' then cast(q4 as INT) else null end
> ) as cf
> from kafka_table_ ; {code}
> hbase(main):016:0> scan 'test_table'
> ROW COLUMN+CELL
>
>
> 1111 column=cf:q1,
> timestamp=0000000000001, value=\x00\x00\x00\x00
>
>
> 1111 column=cf:q2,
> timestamp=0000000000002, value=\x00\x00\x00\x01
>
>
> 1111 column=cf:q3,
> timestamp=0000000000001, value=\x00\x00\x00\x00
>
>
> 1111 column=cf:q4,
> timestamp=0000000000002, value=\x00\x00\x00\x01
>
> There are no other configurations available, so I hope to have the function
> of overwriting and writing rowKey, that is, deleting the rowkey before adding
> new data.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)