qingwei91 commented on code in PR #79:
URL:
https://github.com/apache/flink-connector-jdbc/pull/79#discussion_r1411250155
##########
flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcRowDataLookupFunction.java:
##########
@@ -92,9 +104,18 @@ public JdbcRowDataLookupFunction(
})
.toArray(DataType[]::new);
this.maxRetryTimes = maxRetryTimes;
- this.query =
+
+ final String baseSelectStatement =
options.getDialect()
.getSelectFromStatement(options.getTableName(),
fieldNames, keyNames);
+ if (conditions == null || conditions.length == 0) {
+ this.query = baseSelectStatement;
+ } else {
+ this.query =
+ baseSelectStatement
+ + " AND "
+ +
Arrays.stream(conditions).collect(Collectors.joining(" AND "));
Review Comment:
Do we need to enclose this condition with parenthesis to enforce precedence?
##########
flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableSourceITCase.java:
##########
@@ -276,9 +280,88 @@ public void testFilter() {
@ParameterizedTest
@EnumSource(Caching.class)
void testLookupJoin(Caching caching) {
+ Collection<Row> dataToRegister =
+ Arrays.asList(
+ Row.of(1L, "Alice"),
+ Row.of(1L, "Alice"),
+ Row.of(2L, "Bob"),
+ Row.of(3L, "Charlie"));
+
+ String createTableStatement =
+ "CREATE TABLE value_source ( "
+ + " `id` BIGINT, "
+ + " `name` STRING, "
+ + " `proctime` AS PROCTIME()"
+ + ") WITH ("
+ + " 'connector' = 'values', "
+ + " 'data-id' = '%s'"
+ + ")";
+ String selectStatement =
+ "SELECT S.id, S.name, D.id, D.timestamp6_col, D.decimal_col
FROM value_source"
+ + " AS S JOIN jdbc_lookup for system_time as of
S.proctime AS D ON S.id = D.id";
Review Comment:
This seems to not exercise the filter pushdown code? Can you change it such
that it is being exercised?
##########
flink-connector-jdbc/src/test/resources/org/apache/flink/connector/jdbc/table/JdbcTablePlanTest.xml:
##########
@@ -68,4 +68,29 @@ TableSourceScan(table=[[default_catalog, default_database,
jdbc, filter=[and(OR(
]]>
</Resource>
</TestCase>
+ <TestCase name="testLookupJoin">
+ <Resource name="sql">
+ <![CDATA[SELECT * FROM a left join c FOR SYSTEM_TIME AS OF
a.proctime on c.type = 0 and a.ip = c.ip]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(ip=[$0], proctime=[$1], ip0=[$2], type=[$3], age=[$4])
++- LogicalCorrelate(correlation=[$cor0], joinType=[left], requiredColumns=[{0,
1}])
+ :- LogicalProject(ip=[$0], proctime=[PROCTIME()])
+ : +- LogicalTableScan(table=[[default_catalog, default_database, a]])
+ +- LogicalFilter(condition=[AND(=($1, 0), =($cor0.ip,
CAST($0):VARCHAR(2147483647) CHARACTER SET "UTF-16LE"))])
+ +- LogicalSnapshot(period=[$cor0.proctime])
+ +- LogicalTableScan(table=[[default_catalog, default_database, c]])
+]]>
+ </Resource>
+ <Resource name="optimized exec plan">
+ <![CDATA[
+Calc(select=[ip, PROCTIME_MATERIALIZE(proctime) AS proctime, ip0, type, age])
++- LookupJoin(table=[default_catalog.default_database.c],
joinType=[LeftOuterJoin], lookup=[ip=ip], select=[ip, proctime, ip, CAST(0 AS
INTEGER) AS type, age, CAST(ip AS VARCHAR(2147483647)) AS ip0])
Review Comment:
I might be missing something, as I dont see `c.type = 0` in the physical
plan, I am not expert here though, wonder if anyone has a clue?
##########
flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcRowDataLookupFunction.java:
##########
@@ -92,9 +104,18 @@ public JdbcRowDataLookupFunction(
})
.toArray(DataType[]::new);
this.maxRetryTimes = maxRetryTimes;
- this.query =
+
+ final String baseSelectStatement =
options.getDialect()
.getSelectFromStatement(options.getTableName(),
fieldNames, keyNames);
+ if (conditions == null || conditions.length == 0) {
+ this.query = baseSelectStatement;
+ } else {
+ this.query =
+ baseSelectStatement
+ + " AND "
Review Comment:
Why do we always chain with `AND`? is baseSelectStatement guaranteed to have
a condition?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]