[
https://issues.apache.org/jira/browse/FLINK-18461?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Jark Wu updated FLINK-18461:
----------------------------
Description:
{code:sql}
CREATE TABLE t_pick_order (
order_no VARCHAR,
status INT
) WITH (
'connector' = 'kafka',
'topic' = 'example',
'scan.startup.mode' = 'latest-offset',
'properties.bootstrap.servers' = '172.19.78.32:9092',
'format' = 'canal-json'
);
CREATE TABLE order_status (
order_no VARCHAR,
status INT,
PRIMARY KEY (order_no) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://xxx:3306/flink_test',
'table-name' = 'order_status',
'username' = 'dev',
'password' = 'xxxx'
);
INSERT INTO order_status SELECT order_no, status FROM t_pick_order ;
{code}
The above queries throw the following exception:
{code}
[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.table.api.TableException: Provided trait [BEFORE_AND_AFTER]
can't satisfy required trait [ONLY_UPDATE_AFTER]. This is a bug in planner,
please file an issue.
Current node is TableSourceScan(table=[[default_catalog, default_database,
t_pick_order]], fields=[order_no, status])
{code}
It is a bug in planner that we didn't fallback to {{BEFORE_AND_AFTER}} trait
when {{ONLY_UPDATE_AFTER}} can't be satisfied. This results in Changelog source
can't be used to written into upsert sink.
was:
{code:sql}
CREATE TABLE t_pick_order (
order_no VARCHAR,
status INT
) WITH (
'connector' = 'kafka',
'topic' = 'example',
'scan.startup.mode' = 'latest-offset',
'properties.bootstrap.servers' = '172.19.78.32:9092',
'format' = 'canal-json'
);
CREATE TABLE order_status (
order_no VARCHAR,
status INT,
PRIMARY KEY (order_no) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://xxx:3306/flink_test',
'table-name' = 'order_status',
'username' = 'dev',
'password' = 'xxxx'
);
INSERT INTO order_status SELECT order_no, status FROM t_pick_order ;
{code}
The above queries throw the following exception:
{code
[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.table.api.TableException: Provided trait [BEFORE_AND_AFTER]
can't satisfy required trait [ONLY_UPDATE_AFTER]. This is a bug in planner,
please file an issue.
Current node is TableSourceScan(table=[[default_catalog, default_database,
t_pick_order]], fields=[order_no, status])
{code}
It is a bug in planner that we didn't fallback to {{BEFORE_AND_AFTER}} trait
when {{ONLY_UPDATE_AFTER}} can't be satisfied. This results in Changelog source
can't be used to written into upsert sink.
> Changelog source can't be insert into upsert sink
> -------------------------------------------------
>
> Key: FLINK-18461
> URL: https://issues.apache.org/jira/browse/FLINK-18461
> Project: Flink
> Issue Type: Bug
> Components: Table SQL / Planner
> Reporter: Jark Wu
> Assignee: Jark Wu
> Priority: Critical
> Labels: pull-request-available
> Fix For: 1.11.0
>
>
> {code:sql}
> CREATE TABLE t_pick_order (
> order_no VARCHAR,
> status INT
> ) WITH (
> 'connector' = 'kafka',
> 'topic' = 'example',
> 'scan.startup.mode' = 'latest-offset',
> 'properties.bootstrap.servers' = '172.19.78.32:9092',
> 'format' = 'canal-json'
> );
> CREATE TABLE order_status (
> order_no VARCHAR,
> status INT,
> PRIMARY KEY (order_no) NOT ENFORCED
> ) WITH (
> 'connector' = 'jdbc',
> 'url' = 'jdbc:mysql://xxx:3306/flink_test',
> 'table-name' = 'order_status',
> 'username' = 'dev',
> 'password' = 'xxxx'
> );
> INSERT INTO order_status SELECT order_no, status FROM t_pick_order ;
> {code}
> The above queries throw the following exception:
> {code}
> [ERROR] Could not execute SQL statement. Reason:
> org.apache.flink.table.api.TableException: Provided trait [BEFORE_AND_AFTER]
> can't satisfy required trait [ONLY_UPDATE_AFTER]. This is a bug in planner,
> please file an issue.
> Current node is TableSourceScan(table=[[default_catalog, default_database,
> t_pick_order]], fields=[order_no, status])
> {code}
> It is a bug in planner that we didn't fallback to {{BEFORE_AND_AFTER}} trait
> when {{ONLY_UPDATE_AFTER}} can't be satisfied. This results in Changelog
> source can't be used to written into upsert sink.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)