[
https://issues.apache.org/jira/browse/FLINK-33365?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17785321#comment-17785321
]
Sergey Nuyanzin edited comment on FLINK-33365 at 11/12/23 10:20 PM:
--------------------------------------------------------------------
looks like the issue is incomplete (at least for the case with
{noformat}
for system_time as of {noformat}
) implementation of filter push down FLINK-16024
TC to reproduce
add this to {{org.apache.flink.connector.jdbc.table.JdbcTablePlanTest}}
{code:java}
@Test
public void issue33365() {
String dataId =
TestValuesTableFactory.registerData(
Arrays.asList(
Row.of(1L, "Alice"),
Row.of(1L, "Alice"),
Row.of(2L, "Bob"),
Row.of(3L, "Charlie")));
util.tableEnv().executeSql(
String.format(
"CREATE TABLE value_source (\n"
+ "`id` BIGINT,\n"
+ "`name` STRING,\n"
+ "`proctime` AS PROCTIME()\n"
+ ") WITH (\n"
+ "'connector' = 'values', \n"
+ "'data-id' = '%s')",
dataId));
util.verifyExecPlan(
"SELECT S.id, S.name, D.id, D.timestamp6_col, D.double_col FROM
value_source"
+ " AS S JOIN jdbc for system_time as of S.proctime AS
D ON S.id = D.id and D.decimal_col = 0.0");
}
{code}
and check before and after commit [https://github.com/apache/flink/pull/20140]
or same commit in jdbc connector
(https://github.com/apache/flink-connector-jdbc/commit/3e3b40e8cfadfc16a8ab74d4ef6a3ab3ceafa57b)
it shows different results.
At the same time there is a bunch of WA like add something meaningless or cast
or some other function use,
e.g. slightly changed query started behave as expected since math operations,
cast and other functions are not going to push down (yet)
however in Flink 1.18 math operations could be simplified by newer Calcite
(CALCITE-4420), so need to check case by case
like
{code:sql}
SELECT S.id, S.name, D.id, D.timestamp6_col, D.double_col
FROM value_source AS S
JOIN jdbc for system_time as of S.proctime AS D
ON S.id = D.id and D.decimal_col = 0.0 + 0
{code}
or with casting like
{code:sql}
SELECT S.id, S.name, D.id, D.timestamp6_col, D.double_col
FROM value_source AS S
JOIN jdbc for system_time as of S.proctime AS D
ON S.id = D.id and D.decimal_col = cast(0.0 as decimal)
{code}
[~macdoor615] that might help you with your queries
[~qingwei91] could you have a look here please since you are aware of current
implementation
was (Author: sergey nuyanzin):
looks like the issue is incomplete (at least for the case with
{noformat}
for system_time as of {noformat}
) implementation of filter push down FLINK-16024
TC to reproduce
add this to {{org.apache.flink.connector.jdbc.table.JdbcTablePlanTest}}
{code:java}
@Test
public void issue33365() {
String dataId =
TestValuesTableFactory.registerData(
Arrays.asList(
Row.of(1L, "Alice"),
Row.of(1L, "Alice"),
Row.of(2L, "Bob"),
Row.of(3L, "Charlie")));
util.tableEnv().executeSql(
String.format(
"CREATE TABLE value_source (\n"
+ "`id` BIGINT,\n"
+ "`name` STRING,\n"
+ "`proctime` AS PROCTIME()\n"
+ ") WITH (\n"
+ "'connector' = 'values', \n"
+ "'data-id' = '%s')",
dataId));
util.verifyExecPlan(
"SELECT S.id, S.name, D.id, D.timestamp6_col, D.double_col FROM
value_source"
+ " AS S JOIN jdbc for system_time as of S.proctime AS
D ON S.id = D.id and D.decimal_col = 0.0");
}
{code}
and check before and after commit [https://github.com/apache/flink/pull/20140]
or same commit in jdbc connector
(https://github.com/apache/flink-connector-jdbc/commit/3e3b40e8cfadfc16a8ab74d4ef6a3ab3ceafa57b)
it shows different results.
At the same time there is a bunch of WA like add something meaningless or cast
or some other function use,
e.g. slightly changed query started behave as expected since math operations,
cast and other functions are not going to push down (yet)
however in Flink 1.18 math operations could be simplified by newer Calcite
(CALCITE-4420), so need to check case by case
like
{code:sql}
SELECT S.id, S.name, D.id, D.timestamp6_col, D.double_col FROM value_source
AS S JOIN jdbc for system_time as of S.proctime AS D
ON S.id = D.id and D.decimal_col = 0.0 + 0
{code}
or with casting like
{code:sql}
SELECT S.id, S.name, D.id, D.timestamp6_col, D.double_col FROM value_source
AS S JOIN jdbc for system_time as of S.proctime AS D
ON S.id = D.id and D.decimal_col = cast(0.0 as decimal)
{code}
[~macdoor615] that might help you with your queries
[~qingwei91] could you have a look here please since you are aware of current
implementation
> Missing filter condition in execution plan containing lookup join with mysql
> jdbc connector
> -------------------------------------------------------------------------------------------
>
> Key: FLINK-33365
> URL: https://issues.apache.org/jira/browse/FLINK-33365
> Project: Flink
> Issue Type: Bug
> Components: Connectors / JDBC
> Affects Versions: 1.18.0, 1.17.1
> Environment: Flink 1.17.1 & Flink 1.18.0 with
> flink-connector-jdbc-3.1.1-1.17.jar
> Reporter: macdoor615
> Assignee: david radley
> Priority: Critical
> Attachments: flink-connector-jdbc-3.0.0-1.16.png,
> flink-connector-jdbc-3.1.1-1.17.png
>
>
> create table in flink with sql-client.sh
> {code:java}
> CREATE TABLE default_catalog.default_database.a (
> ip string,
> proctime as proctime()
> )
> WITH (
> 'connector' = 'datagen'
> );{code}
> create table in mysql
> {code:java}
> create table b (
> ip varchar(20),
> type int
> ); {code}
>
> Flink 1.17.1/ 1.18.0 and *flink-connector-jdbc-3.1.1-1.17.jar*
> excute in sql-client.sh
> {code:java}
> explain SELECT * FROM default_catalog.default_database.a left join
> bnpmp_mysql_test.gem_tmp.b FOR SYSTEM_TIME AS OF a.proctime on b.type = 0 and
> a.ip = b.ip; {code}
> get the execution plan
> {code:java}
> ...
> == Optimized Execution Plan ==
> Calc(select=[ip, PROCTIME_MATERIALIZE(proctime) AS proctime, ip0, type])
> +- LookupJoin(table=[bnpmp_mysql_test.gem_tmp.b], joinType=[LeftOuterJoin],
> lookup=[ip=ip], select=[ip, proctime, ip, CAST(0 AS INTEGER) AS type, CAST(ip
> AS VARCHAR(2147483647)) AS ip0])
> +- Calc(select=[ip, PROCTIME() AS proctime])
> +- TableSourceScan(table=[[default_catalog, default_database, a]],
> fields=[ip]){code}
>
> excute same sql in sql-client with Flink 1.17.1/ 1.18.0 and
> *flink-connector-jdbc-3.0.0-1.16.jar*
> get the execution plan
> {code:java}
> ...
> == Optimized Execution Plan ==
> Calc(select=[ip, PROCTIME_MATERIALIZE(proctime) AS proctime, ip0, type])
> +- LookupJoin(table=[bnpmp_mysql_test.gem_tmp.b], joinType=[LeftOuterJoin],
> lookup=[type=0, ip=ip], where=[(type = 0)], select=[ip, proctime, ip, CAST(0
> AS INTEGER) AS type, CAST(ip AS VARCHAR(2147483647)) AS ip0])
> +- Calc(select=[ip, PROCTIME() AS proctime])
> +- TableSourceScan(table=[[default_catalog, default_database, a]],
> fields=[ip]) {code}
> with flink-connector-jdbc-3.1.1-1.17.jar, the condition is
> *lookup=[ip=ip]*
> with flink-connector-jdbc-3.0.0-1.16.jar , the condition is
> *lookup=[type=0, ip=ip], where=[(type = 0)]*
>
> In out real world production environment, this lead incorrect data output
>
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)