[ 
https://issues.apache.org/jira/browse/FLINK-32778?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

simenliuxing updated FLINK-32778:
---------------------------------
    Description: 
{code:sql}

-- flink version:1.16.1
-- parallelism.default: 1

CREATE TABLE s1(
    id string,
    gk bigint,
    price int
 )WITH(
    'connector' = 'kafka'
    ,'properties.bootstrap.servers' = 'xx:9092'
    ,'properties.group.id' = 'xx-xx'
    ,'scan.startup.mode' = 'earliest-offset'
    ,'value.format' = 'json'
    ,'topic' = 'topic1'
 );

CREATE TABLE s2(
    id string,
    name string
 )WITH(
    'connector' = 'kafka'
    ,'properties.bootstrap.servers' = 'xx:9092'
    ,'properties.group.id' = 'xx-xx'
    ,'scan.startup.mode' = 'earliest-offset'
    ,'value.format' = 'json'
    ,'topic' = 'topic2'
 );

create table sink(
    id string,
    name string,
    gk bigint,
    price int
 )with(
    'connector'='print'
 );

create view v1 as select
    id,
    gk,
    last_value(price) price
from s1
group by id,gk;

insert into sink
select
    v1.id,
    s2.name,
    v1.gk,
    v1.price
from v1
left join s2 on v1.id=s2.id;

-- 1.Enter two pieces of data into the topic1 topic:
-- {"id":"1","gk":758,"price":100}
-- {"id":"1","gk":1818,"price":200}

-- The output is as follows:
-- +I[1, null, 758, 100]
-- +I[1, null, 1818, 200]

-- 2.Enter two pieces of data into the topic2 topic:
-- {"id":1,"name":"z3"}

-- The output is as follows:
-- -D[1, null, 1818, 200]
-- -D[1, null, 758, 100]
-- +I[1, z3, 1818, 200]
-- +I[1, z3, 758, 100]

-- My doubt is that the output should be in the order of input , like below:
-- -D[1, null, 758, 100]
-- -D[1, null, 1818, 200]
-- +I[1, z3, 758, 100]
-- +I[1, z3, 1818, 200]

-- 3.When I re-run the above sql, the results are output in the order of input
-- +I[1, z3, 758, 100]
-- +I[1, z3, 1818, 200]

-- Is there a way to control this uncertainty?

{code}


  was:

{code:sql}

-- flink version:1.16.1
-- parallelism.default: 1

CREATE TABLE s1(
    id string,
    gk bigint,
    price int
 )WITH(
    'connector' = 'kafka'
    ,'properties.bootstrap.servers' = 'xx:9092'
    ,'properties.group.id' = 'xx-xx'
    ,'scan.startup.mode' = 'earliest-offset'
    ,'value.format' = 'json'
    ,'topic' = 'topic1'
 );

CREATE TABLE s2(
    id string,
    name string
 )WITH(
    'connector' = 'kafka'
    ,'properties.bootstrap.servers' = 'xx:9092'
    ,'properties.group.id' = 'xx-xx'
    ,'scan.startup.mode' = 'earliest-offset'
    ,'value.format' = 'json'
    ,'topic' = 'topic2'
 );

create table sink(
    id string,
    name string,
    gk bigint,
    price int
 )with(
    'connector'='print'
 );

create view v1 as select
    id,
    gk,
    last_value(price) price
from s1
group by id,gk;

insert into sink
select
    v1.id,
    s2.name,
    v1.gk,
    v1.price
from v1
left join s2 on v1.id=s2.id;

1.Enter two pieces of data into the topic1 topic:
{"id":"1","gk":758,"price":100}
{"id":"1","gk":1818,"price":200}

The output is as follows:
+I[1, null, 758, 100]
+I[1, null, 1818, 200]

2.Enter two pieces of data into the topic2 topic:
{"id":1,"name":"z3"}

The output is as follows:
-D[1, null, 1818, 200]
-D[1, null, 758, 100]
+I[1, z3, 1818, 200]
+I[1, z3, 758, 100]

My doubt is that the output should be in the order of input , like below:
-D[1, null, 758, 100]
-D[1, null, 1818, 200]
+I[1, z3, 758, 100]
+I[1, z3, 1818, 200]

3.When I re-run the above sql, the results are output in the order of input
+I[1, z3, 758, 100]
+I[1, z3, 1818, 200]

Is there a way to control this uncertainty?

{code}



> Stream join data output sequence is inconsistent with input sequence
> --------------------------------------------------------------------
>
>                 Key: FLINK-32778
>                 URL: https://issues.apache.org/jira/browse/FLINK-32778
>             Project: Flink
>          Issue Type: Improvement
>          Components: Table SQL / Runtime
>    Affects Versions: 1.16.1
>            Reporter: simenliuxing
>            Priority: Major
>             Fix For: 1.7.3
>
>
> {code:sql}
> -- flink version:1.16.1
> -- parallelism.default: 1
> CREATE TABLE s1(
>     id string,
>     gk bigint,
>     price int
>  )WITH(
>     'connector' = 'kafka'
>     ,'properties.bootstrap.servers' = 'xx:9092'
>     ,'properties.group.id' = 'xx-xx'
>     ,'scan.startup.mode' = 'earliest-offset'
>     ,'value.format' = 'json'
>     ,'topic' = 'topic1'
>  );
> CREATE TABLE s2(
>     id string,
>     name string
>  )WITH(
>     'connector' = 'kafka'
>     ,'properties.bootstrap.servers' = 'xx:9092'
>     ,'properties.group.id' = 'xx-xx'
>     ,'scan.startup.mode' = 'earliest-offset'
>     ,'value.format' = 'json'
>     ,'topic' = 'topic2'
>  );
> create table sink(
>     id string,
>     name string,
>     gk bigint,
>     price int
>  )with(
>     'connector'='print'
>  );
> create view v1 as select
>     id,
>     gk,
>     last_value(price) price
> from s1
> group by id,gk;
> insert into sink
> select
>     v1.id,
>     s2.name,
>     v1.gk,
>     v1.price
> from v1
> left join s2 on v1.id=s2.id;
> -- 1.Enter two pieces of data into the topic1 topic:
> -- {"id":"1","gk":758,"price":100}
> -- {"id":"1","gk":1818,"price":200}
> -- The output is as follows:
> -- +I[1, null, 758, 100]
> -- +I[1, null, 1818, 200]
> -- 2.Enter two pieces of data into the topic2 topic:
> -- {"id":1,"name":"z3"}
> -- The output is as follows:
> -- -D[1, null, 1818, 200]
> -- -D[1, null, 758, 100]
> -- +I[1, z3, 1818, 200]
> -- +I[1, z3, 758, 100]
> -- My doubt is that the output should be in the order of input , like below:
> -- -D[1, null, 758, 100]
> -- -D[1, null, 1818, 200]
> -- +I[1, z3, 758, 100]
> -- +I[1, z3, 1818, 200]
> -- 3.When I re-run the above sql, the results are output in the order of input
> -- +I[1, z3, 758, 100]
> -- +I[1, z3, 1818, 200]
> -- Is there a way to control this uncertainty?
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to