[
https://issues.apache.org/jira/browse/FLINK-33365?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17785563#comment-17785563
]
david radley edited comment on FLINK-33365 at 11/13/23 6:18 PM:
----------------------------------------------------------------
[~Sergey Nuyanzin] [~qingwei91] thanks for your input. The 2 circumventions do
not work for me on master, on the reported case.
It sounds like the introduction of pushdown predicate capability for jdbc,
broke this case. At the moment the JdbcRowDataLookupFunction just adds the
keyNames to the conditions
_this.query =_
_options.getDialect()_
_.getSelectFromStatement(options.getTableName(), fieldNames, keyNames);_
I was hoping to contribute this as a fix, as I have been assigned the issue;
[~qingwei91] is this still possible- or do you plan to fix this? I see that
function getLookupRuntimeProvider does have access to this.resolvedPredicates
and pushdownParams it would be great if we could use these to create the
condition to add to the keyNames.
It would be great to hear your thoughts on this.
was (Author: JIRAUSER300523):
[~Sergey Nuyanzin] [~qingwei91] thanks for your input. The 2 circumventions do
not work for me on master, on the reported case.
It sounds like the introduction of pushdown predicate capability for jdbc,
broke this case. At the moment the
JdbcRowDataLookupFunction just adds the keyNames to the conditions
_this.query =_
_options.getDialect()_
_.getSelectFromStatement(options.getTableName(), fieldNames, keyNames);_
It seems to me that we need to save the filter information when we do the
pushdown logic, so we can add this as a condition to the last parameter. I was
hoping to contribute this as a fix, as I have been assigned the issue;
[~qingwei91] is this still possible- or do you plan to fix this? It would be
great to hear your thoughts on this.
> Missing filter condition in execution plan containing lookup join with mysql
> jdbc connector
> -------------------------------------------------------------------------------------------
>
> Key: FLINK-33365
> URL: https://issues.apache.org/jira/browse/FLINK-33365
> Project: Flink
> Issue Type: Bug
> Components: Connectors / JDBC
> Affects Versions: 1.18.0, 1.17.1
> Environment: Flink 1.17.1 & Flink 1.18.0 with
> flink-connector-jdbc-3.1.1-1.17.jar
> Reporter: macdoor615
> Assignee: david radley
> Priority: Critical
> Attachments: flink-connector-jdbc-3.0.0-1.16.png,
> flink-connector-jdbc-3.1.1-1.17.png
>
>
> create table in flink with sql-client.sh
> {code:java}
> CREATE TABLE default_catalog.default_database.a (
> ip string,
> proctime as proctime()
> )
> WITH (
> 'connector' = 'datagen'
> );{code}
> create table in mysql
> {code:java}
> create table b (
> ip varchar(20),
> type int
> ); {code}
>
> Flink 1.17.1/ 1.18.0 and *flink-connector-jdbc-3.1.1-1.17.jar*
> excute in sql-client.sh
> {code:java}
> explain SELECT * FROM default_catalog.default_database.a left join
> bnpmp_mysql_test.gem_tmp.b FOR SYSTEM_TIME AS OF a.proctime on b.type = 0 and
> a.ip = b.ip; {code}
> get the execution plan
> {code:java}
> ...
> == Optimized Execution Plan ==
> Calc(select=[ip, PROCTIME_MATERIALIZE(proctime) AS proctime, ip0, type])
> +- LookupJoin(table=[bnpmp_mysql_test.gem_tmp.b], joinType=[LeftOuterJoin],
> lookup=[ip=ip], select=[ip, proctime, ip, CAST(0 AS INTEGER) AS type, CAST(ip
> AS VARCHAR(2147483647)) AS ip0])
> +- Calc(select=[ip, PROCTIME() AS proctime])
> +- TableSourceScan(table=[[default_catalog, default_database, a]],
> fields=[ip]){code}
>
> excute same sql in sql-client with Flink 1.17.1/ 1.18.0 and
> *flink-connector-jdbc-3.0.0-1.16.jar*
> get the execution plan
> {code:java}
> ...
> == Optimized Execution Plan ==
> Calc(select=[ip, PROCTIME_MATERIALIZE(proctime) AS proctime, ip0, type])
> +- LookupJoin(table=[bnpmp_mysql_test.gem_tmp.b], joinType=[LeftOuterJoin],
> lookup=[type=0, ip=ip], where=[(type = 0)], select=[ip, proctime, ip, CAST(0
> AS INTEGER) AS type, CAST(ip AS VARCHAR(2147483647)) AS ip0])
> +- Calc(select=[ip, PROCTIME() AS proctime])
> +- TableSourceScan(table=[[default_catalog, default_database, a]],
> fields=[ip]) {code}
> with flink-connector-jdbc-3.1.1-1.17.jar, the condition is
> *lookup=[ip=ip]*
> with flink-connector-jdbc-3.0.0-1.16.jar , the condition is
> *lookup=[type=0, ip=ip], where=[(type = 0)]*
>
> In out real world production environment, this lead incorrect data output
>
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)