[
https://issues.apache.org/jira/browse/FLINK-33365?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17786427#comment-17786427
]
david radley edited comment on FLINK-33365 at 11/15/23 4:34 PM:
----------------------------------------------------------------
[~libenchao]
I have moved the code from the AbstractDialog and pushed up the change. I could
not see how to get a _PreparedStatement_ to be able to set the condition
on{_}.{_} Please could you give me some pointers.
I have successfully tested using the supplied test tables:
* other simple predicates work
* multiple simple predicates work
I added extra tests to JdbcDynamicTableSourceITCase but cannot test those
changes as I get errors locally when running the tests without my changes. Any
pointers would be great I am running on a Mac.
I wanted to test 2 look up keys, so I created a new table d (on MariaDB) and e
(on paimon) and set up the following tables and did the some joins with
filters and a join with multiple keys. The results do not look right to me (but
I may be misunderstanding) - WDYT?
select * from mariadb_catalog.menagerie.d ;
{+}---{-}{-}{+}------------------------------{-}++{-}-----------{-}{-}------------+
|op| ip| type| age|
{+}---{-}{-}{+}------------------------------{-}++{-}-----------{-}{-}------------+
|+I| 10.10.10.10| 1| 30|
|+I| 10.10.10.10| 2| 40|
|+I| 10.10.10.10| 2| 50|
|+I| 10.10.10.10| 3| 50|
{+}---{-}{-}{+}------------------------------{-}++{-}-----------{-}{-}------------+
Received a total of 4 rows
Flink SQL> select * from e;
{+}---{-}{-}{+}------------------------------{-}++{-}-----------{-}{-}------------------------+
|op| ip| age| proctime|
{+}---{-}{-}{+}------------------------------{-}++{-}-----------{-}{-}------------------------+
|+I| 10.10.10.10| 40|2023-11-15 16:12:57.553|
|+I| 10.10.10.10| 50|2023-11-15 16:12:57.554|
Flink SQL> SELECT * FROM e left join mariadb_catalog.menagerie.d FOR
SYSTEM_TIME AS OF e.proctime on d.type = 2 and d.age = 50 and e.ip = d.ip;
{+}---{-}{-}{+}------------------------------{-}++{-}-----------{-}{-}-----------------------{-}++{-}------------------------------{-}{-}-----------{-}++{-}------------
|op| ip| age| proctime|
ip0| type| age0|
{+}---{-}{-}{+}------------------------------{-}++{-}-----------{-}{-}-----------------------{-}++{-}------------------------------{-}{-}-----------{-}++{-}------------
|+I| 10.10.10.10| 40|2023-11-15 16:08:40.973|
10.10.10.10| 2| 50|
|+I| 10.10.10.10| 50|2023-11-15 16:08:40.974|
10.10.10.10| 2| 50|
Is this what you would expect?
Also I get
SELECT * FROM e left join mariadb_catalog.menagerie.d FOR SYSTEM_TIME AS OF
e.proctime on e.age = d.age;
{+}---{-}{-}{+}------------------------------{-}++{-}-----------{-}{-}-----------------------{-}++{-}------------------------------{-}{-}-----------{-}++{-}------------
|op| ip| age| proctime|
ip0| type| age0|
{+}---{-}{-}{+}------------------------------{-}++{-}-----------{-}{-}-----------------------{-}++{-}------------------------------{-}{-}-----------{-}++{-}------------
^CQuery terminated, received a total of 0 row
and
SELECT * FROM e left join mariadb_catalog.menagerie.d FOR SYSTEM_TIME AS OF
e.proctime on e.ip = d.ip;
+----+--------------------------------+-------------+-------------------------+--------------------------------+-------------+-------------+
| op | ip | age | proctime |
ip0 | type | age0 |
+----+--------------------------------+-------------+-------------------------+--------------------------------+-------------+-------------+
^CQuery terminated, received a total of 0 row
and
SELECT * FROM e left join mariadb_catalog.menagerie.d FOR SYSTEM_TIME AS OF
e.proctime on e.age = d.age and d.ip = e.ip;
{+}---{-}{-}{+}------------------------------{-}++{-}-----------{-}{-}-----------------------{-}++{-}------------------------------{-}{-}-----------{-}++{-}------------
|op| ip| age| proctime|
ip0| type| age0|
{+}---{-}{-}{+}------------------------------{-}++{-}-----------{-}{-}-----------------------{-}++{-}------------------------------{-}{-}-----------{-}++{-}------------
^CQuery terminated, received a total of 0 row
was (Author: JIRAUSER300523):
[~libenchao]
I have moved the code from the AbstractDialog and pushed up the change. I could
not see how to get a _PreparedStatement_ to be able to set the condition
on{_}.{_} Please could you give me some pointers.
I have successfully tested using the supplied test tables:
* other simple predicates work
* multiple simple predicates work
I added extra tests to JdbcDynamicTableSourceITCase but cannot test those
changes as I get errors locally when running the tests without my changes. Any
pointers would be great I am running on a Mac.
I wanted to test 2 look up keys, so I created a new table d (on MariaDB) and e
(on paimon) and set up the following tables and did the some joins with
filters and a join with multiple keys. The results do not look right to me (but
I may be misunderstanding) - but want to check with you:
select * from mariadb_catalog.menagerie.d ;
+----+--------------------------------+-------------+-------------+
| op | ip | type | age |
+----+--------------------------------+-------------+-------------+
| +I | 10.10.10.10 | 1 | 30 |
| +I | 10.10.10.10 | 2 | 40 |
| +I | 10.10.10.10 | 2 | 50 |
| +I | 10.10.10.10 | 3 | 50 |
+----+--------------------------------+-------------+-------------+
Received a total of 4 rows
Flink SQL> select * from e;
+----+--------------------------------+-------------+-------------------------+
| op | ip | age | proctime |
+----+--------------------------------+-------------+-------------------------+
| +I | 10.10.10.10 | 40 | 2023-11-15 16:12:57.553 |
| +I | 10.10.10.10 | 50 | 2023-11-15 16:12:57.554 |
Flink SQL> SELECT * FROM e left join mariadb_catalog.menagerie.d FOR
SYSTEM_TIME AS OF e.proctime on d.type = 2 and d.age = 50 and e.ip = d.ip;
+----+--------------------------------+-------------+-------------------------+--------------------------------+-------------+-------------+
| op | ip | age | proctime |
ip0 | type | age0 |
+----+--------------------------------+-------------+-------------------------+--------------------------------+-------------+-------------+
| +I | 10.10.10.10 | 40 | 2023-11-15 16:08:40.973 |
10.10.10.10 | 2 | 50 |
| +I | 10.10.10.10 | 50 | 2023-11-15 16:08:40.974 |
10.10.10.10 | 2 | 50 |
Is this what you would expect?
Also I get
SELECT * FROM e left join mariadb_catalog.menagerie.d FOR SYSTEM_TIME AS OF
e.proctime on e.age = d.age;
+----+--------------------------------+-------------+-------------------------+--------------------------------+-------------+-------------+
| op | ip | age | proctime |
ip0 | type | age0 |
+----+--------------------------------+-------------+-------------------------+--------------------------------+-------------+-------------+
^CQuery terminated, received a total of 0 row
and
SELECT * FROM e left join mariadb_catalog.menagerie.d FOR SYSTEM_TIME AS OF
e.proctime on e.age = d.age and d.ip = e.ip;
+----+--------------------------------+-------------+-------------------------+--------------------------------+-------------+-------------+
| op | ip | age | proctime |
ip0 | type | age0 |
+----+--------------------------------+-------------+-------------------------+--------------------------------+-------------+-------------+
^CQuery terminated, received a total of 0 row
> Missing filter condition in execution plan containing lookup join with mysql
> jdbc connector
> -------------------------------------------------------------------------------------------
>
> Key: FLINK-33365
> URL: https://issues.apache.org/jira/browse/FLINK-33365
> Project: Flink
> Issue Type: Bug
> Components: Connectors / JDBC
> Affects Versions: 1.18.0, 1.17.1
> Environment: Flink 1.17.1 & Flink 1.18.0 with
> flink-connector-jdbc-3.1.1-1.17.jar
> Reporter: macdoor615
> Assignee: david radley
> Priority: Critical
> Attachments: flink-connector-jdbc-3.0.0-1.16.png,
> flink-connector-jdbc-3.1.1-1.17.png
>
>
> create table in flink with sql-client.sh
> {code:java}
> CREATE TABLE default_catalog.default_database.a (
> ip string,
> proctime as proctime()
> )
> WITH (
> 'connector' = 'datagen'
> );{code}
> create table in mysql
> {code:java}
> create table b (
> ip varchar(20),
> type int
> ); {code}
>
> Flink 1.17.1/ 1.18.0 and *flink-connector-jdbc-3.1.1-1.17.jar*
> excute in sql-client.sh
> {code:java}
> explain SELECT * FROM default_catalog.default_database.a left join
> bnpmp_mysql_test.gem_tmp.b FOR SYSTEM_TIME AS OF a.proctime on b.type = 0 and
> a.ip = b.ip; {code}
> get the execution plan
> {code:java}
> ...
> == Optimized Execution Plan ==
> Calc(select=[ip, PROCTIME_MATERIALIZE(proctime) AS proctime, ip0, type])
> +- LookupJoin(table=[bnpmp_mysql_test.gem_tmp.b], joinType=[LeftOuterJoin],
> lookup=[ip=ip], select=[ip, proctime, ip, CAST(0 AS INTEGER) AS type, CAST(ip
> AS VARCHAR(2147483647)) AS ip0])
> +- Calc(select=[ip, PROCTIME() AS proctime])
> +- TableSourceScan(table=[[default_catalog, default_database, a]],
> fields=[ip]){code}
>
> excute same sql in sql-client with Flink 1.17.1/ 1.18.0 and
> *flink-connector-jdbc-3.0.0-1.16.jar*
> get the execution plan
> {code:java}
> ...
> == Optimized Execution Plan ==
> Calc(select=[ip, PROCTIME_MATERIALIZE(proctime) AS proctime, ip0, type])
> +- LookupJoin(table=[bnpmp_mysql_test.gem_tmp.b], joinType=[LeftOuterJoin],
> lookup=[type=0, ip=ip], where=[(type = 0)], select=[ip, proctime, ip, CAST(0
> AS INTEGER) AS type, CAST(ip AS VARCHAR(2147483647)) AS ip0])
> +- Calc(select=[ip, PROCTIME() AS proctime])
> +- TableSourceScan(table=[[default_catalog, default_database, a]],
> fields=[ip]) {code}
> with flink-connector-jdbc-3.1.1-1.17.jar, the condition is
> *lookup=[ip=ip]*
> with flink-connector-jdbc-3.0.0-1.16.jar , the condition is
> *lookup=[type=0, ip=ip], where=[(type = 0)]*
>
> In out real world production environment, this lead incorrect data output
>
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)