qingwei91 commented on code in PR #79:
URL: 
https://github.com/apache/flink-connector-jdbc/pull/79#discussion_r1411250155


##########
flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcRowDataLookupFunction.java:
##########
@@ -92,9 +104,18 @@ public JdbcRowDataLookupFunction(
                                 })
                         .toArray(DataType[]::new);
         this.maxRetryTimes = maxRetryTimes;
-        this.query =
+
+        final String baseSelectStatement =
                 options.getDialect()
                         .getSelectFromStatement(options.getTableName(), 
fieldNames, keyNames);
+        if (conditions == null || conditions.length == 0) {
+            this.query = baseSelectStatement;
+        } else {
+            this.query =
+                    baseSelectStatement
+                            + " AND "
+                            + 
Arrays.stream(conditions).collect(Collectors.joining(" AND "));

Review Comment:
   Do we need to enclose this condition with parenthesis to enforce precedence?



##########
flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableSourceITCase.java:
##########
@@ -276,9 +280,88 @@ public void testFilter() {
     @ParameterizedTest
     @EnumSource(Caching.class)
     void testLookupJoin(Caching caching) {
+        Collection<Row> dataToRegister =
+                Arrays.asList(
+                        Row.of(1L, "Alice"),
+                        Row.of(1L, "Alice"),
+                        Row.of(2L, "Bob"),
+                        Row.of(3L, "Charlie"));
+
+        String createTableStatement =
+                "CREATE TABLE value_source ( "
+                        + " `id` BIGINT, "
+                        + " `name` STRING, "
+                        + " `proctime` AS PROCTIME()"
+                        + ") WITH ("
+                        + " 'connector' = 'values', "
+                        + " 'data-id' = '%s'"
+                        + ")";
+        String selectStatement =
+                "SELECT S.id, S.name, D.id, D.timestamp6_col, D.decimal_col 
FROM value_source"
+                        + " AS S JOIN jdbc_lookup for system_time as of 
S.proctime AS D ON S.id = D.id";

Review Comment:
   This seems to not exercise the filter pushdown code? Can you change it such 
that it is being exercised?



##########
flink-connector-jdbc/src/test/resources/org/apache/flink/connector/jdbc/table/JdbcTablePlanTest.xml:
##########
@@ -68,4 +68,29 @@ TableSourceScan(table=[[default_catalog, default_database, 
jdbc, filter=[and(OR(
 ]]>
        </Resource>
   </TestCase>
+    <TestCase name="testLookupJoin">
+        <Resource name="sql">
+            <![CDATA[SELECT * FROM a left join c FOR SYSTEM_TIME AS OF 
a.proctime on c.type = 0 and a.ip = c.ip]]>
+        </Resource>
+        <Resource name="ast">
+            <![CDATA[
+LogicalProject(ip=[$0], proctime=[$1], ip0=[$2], type=[$3], age=[$4])
++- LogicalCorrelate(correlation=[$cor0], joinType=[left], requiredColumns=[{0, 
1}])
+   :- LogicalProject(ip=[$0], proctime=[PROCTIME()])
+   :  +- LogicalTableScan(table=[[default_catalog, default_database, a]])
+   +- LogicalFilter(condition=[AND(=($1, 0), =($cor0.ip, 
CAST($0):VARCHAR(2147483647) CHARACTER SET "UTF-16LE"))])
+      +- LogicalSnapshot(period=[$cor0.proctime])
+         +- LogicalTableScan(table=[[default_catalog, default_database, c]])
+]]>
+        </Resource>
+        <Resource name="optimized exec plan">
+            <![CDATA[
+Calc(select=[ip, PROCTIME_MATERIALIZE(proctime) AS proctime, ip0, type, age])
++- LookupJoin(table=[default_catalog.default_database.c], 
joinType=[LeftOuterJoin], lookup=[ip=ip], select=[ip, proctime, ip, CAST(0 AS 
INTEGER) AS type, age, CAST(ip AS VARCHAR(2147483647)) AS ip0])

Review Comment:
   I might be missing something, as I dont see `c.type = 0` in the physical 
plan, I am not expert here though, wonder if  anyone has a clue?



##########
flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcRowDataLookupFunction.java:
##########
@@ -92,9 +104,18 @@ public JdbcRowDataLookupFunction(
                                 })
                         .toArray(DataType[]::new);
         this.maxRetryTimes = maxRetryTimes;
-        this.query =
+
+        final String baseSelectStatement =
                 options.getDialect()
                         .getSelectFromStatement(options.getTableName(), 
fieldNames, keyNames);
+        if (conditions == null || conditions.length == 0) {
+            this.query = baseSelectStatement;
+        } else {
+            this.query =
+                    baseSelectStatement
+                            + " AND "

Review Comment:
   Why do we always chain with `AND`? is baseSelectStatement guaranteed to have 
a condition?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to