zlzhang0122 created FLINK-33657:
-----------------------------------

             Summary: Insert message in top n without row number didn't 
consider it's number and may not correct
                 Key: FLINK-33657
                 URL: https://issues.apache.org/jira/browse/FLINK-33657
             Project: Flink
          Issue Type: Improvement
          Components: Table SQL / Runtime
    Affects Versions: 1.17.1, 1.16.2
            Reporter: zlzhang0122


The new insert message in top n without row number didn't consider it's order 
and just 
collectInsert to the next operator, this may not correct when the next operator 
collect all the top n records and aggregate it.
 
For example:
create table user_info(

user_id int,

item_id int,
app string,

dt timestamp
) whith(
'connector'='kafka',

...
);
create table redis_sink (
redis_key string,
hash_key string,
hash_value string
)
with (
'connector' = 'redis',
'command' = 'hmset'
'nodes' = 'xxx',
'additional-ttl' = 'xx'
);
create view base_lastn

as select * from(
select user_id,item_id,app,dt,row_number() over(partition by item_id, app order 
by dt desc) as rn from user_action
)t where rn<=5;

insert into redis_sink

select
concat("id", item_id) as redis_key, app as hash_key,user_id as hash_value from 
base_lastn where rn=1;
insert into redis_sink

select
concat("list_", item_id) as redis_key, app as hash_key,listagg(user_id,",") as 
hash_value from base_lastn where group by item_id, app;
 

There will be a scene that the value in the top 1 will not appear in the first 
or last value of the top 5.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to