[
https://issues.apache.org/jira/browse/FLINK-14567?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16975898#comment-16975898
]
Jark Wu edited comment on FLINK-14567 at 11/17/19 3:07 AM:
-----------------------------------------------------------
Another soution is letting sink concats keys. For example, an HBase sink can
have more than one key fields, say k1, k2, k3, then a {{key-delimiter}} option
is required to concat key fields to a rowkey, the concated rowkey will always
be varchar type. The HBase sink will insert the concated rowkey into HBase
table.
{code:java}
create table my_table (
k1 int,
k2 varchar,
k3 timestamp(3),
f1 row<q1 bigint, q2 bigint>
) with (
'connector.type' = 'hbase',
'connector.key-delimiter' = '-'
);
insert into my_table
select k1, k2, k3, ROW(count(*), count(distinct user))
group by k1, k2, k3
{code}
This is very similar to the [ElasticSearch
Connector|https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connect.html#elasticsearch-connector].
In this way, {{UpsertStreamTableSink#setKeyFields}} still work for the sink,
because the sink pretends it has a composite key with 3 fields.
However, this can't solve all the problems. For example, if one of the 3 fields
is transformed from the group key, but the transformation will lose key
information.
was (Author: jark):
Another soution is letting sink concats keys. For example, an HBase sink can
have more than one key fields, say k1, k2, k3, then a {{key-delimiter}} option
is required to concat key fields to a rowkey, the concated rowkey will always
be varchar type. The HBase sink will insert the concated rowkey into HBase
table.
{code:java}
create table my_table (
k1 int,
k2 varchar,
k3 timestamp(3),
f1 row<q1 bigint, q2 bigint>
) with (
'connector.type' = 'hbase',
'connector.key-delimiter' = '-'
);
insert into my_table
select k1, k2, k3, ROW(count(*), count(distinct user))
group by k1, k2, k3
{code}
This is very similar to the [ElasticSearch
Connector|https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connect.html#elasticsearch-connector].
In this way, {{UpsertStreamTableSink#setKeyFields}} still work for the sink,
because the sink pretends it has a composite key with 3 fields.
However, this can't solve all the problems. For example, if the one of the 3
fields is transformed form the group key, but the transformation will lose key
information.
> Aggregate query with more than two group fields can't be write into HBase sink
> ------------------------------------------------------------------------------
>
> Key: FLINK-14567
> URL: https://issues.apache.org/jira/browse/FLINK-14567
> Project: Flink
> Issue Type: Bug
> Components: Connectors / HBase, Table SQL / Legacy Planner, Table
> SQL / Planner
> Reporter: Jark Wu
> Priority: Critical
> Fix For: 1.10.0
>
>
> If we have a hbase table sink with rowkey of varchar (also primary key) and a
> column of bigint, we want to write the result of the following query into the
> sink using upsert mode. However, it will fail when primary key check with the
> exception "UpsertStreamTableSink requires that Table has a full primary keys
> if it is updated."
> {code:sql}
> select concat(f0, '-', f1) as key, sum(f2)
> from T1
> group by f0, f1
> {code}
> This happens in both blink planner and old planner. That is because if the
> query works in update mode, then there must be a primary key exist to be
> extracted and set to {{UpsertStreamTableSink#setKeyFields}}.
> That's why we want to derive primary key for concat in FLINK-14539, however,
> we found that the primary key is not preserved after concating. For example,
> if we have a primary key (f0, f1, f2) which are all varchar type, say we have
> two unique records ('a', 'b', 'c') and ('ab', '', 'c'), but the results of
> concat(f0, f1, f2) are the same, which means the concat result is not primary
> key anymore.
> So here comes the problem, how can we proper support HBase sink or such use
> case?
--
This message was sent by Atlassian Jira
(v8.3.4#803005)