davidradl commented on code in PR #79:
URL: 
https://github.com/apache/flink-connector-jdbc/pull/79#discussion_r1463160400


##########
flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcRowDataLookupFunctionTest.java:
##########
@@ -82,6 +103,121 @@ void testLookup(boolean withFailure) throws Exception {
         assertThat(result).isEqualTo(expected);
     }
 
+    @ParameterizedTest
+    @MethodSource("lookupWithPredicatesProvider")
+    public void testEval(TestSpec testSpec) throws Exception {
+        JdbcRowDataLookupFunction lookupFunction =
+                buildRowDataLookupFunctionWithPredicates(
+                        testSpec.withFailure, testSpec.resolvedPredicates, 
testSpec.pushdownParams);
+
+        ListOutputCollector collector = new ListOutputCollector();
+        lookupFunction.setCollector(collector);
+        lookupFunction.open(null);
+        lookupFunction.eval(testSpec.keys);
+
+        if (testSpec.withFailure) {
+            // Close connection here, and this will be recovered by retry
+            if (lookupFunction.getDbConnection() != null) {
+                lookupFunction.getDbConnection().close();
+            }
+        }
+
+        List<String> result =
+                new ArrayList<>(collector.getOutputs())
+                        
.stream().map(RowData::toString).sorted().collect(Collectors.toList());
+        Collections.sort(testSpec.expected);
+        assertThat(result).isEqualTo(testSpec.expected);
+    }
+
+    private static class TestSpec {
+
+        private boolean withFailure;
+        private final List<String> resolvedPredicates;
+        private final Serializable[] pushdownParams;
+        private final Object[] keys;
+        private List<String> expected;
+
+        private TestSpec(
+                boolean withFailure,
+                List<String> resolvedPredicates,
+                Serializable[] pushdownParams,
+                Object[] keys,
+                List<String> expected) {
+            this.withFailure = withFailure;
+            this.resolvedPredicates = resolvedPredicates;
+            this.pushdownParams = pushdownParams;
+            this.keys = keys;
+            this.expected = expected;
+        }
+    }
+
+    static Collection<TestSpec> lookupWithPredicatesProvider() {
+        return ImmutableList.<TestSpec>builder()
+                .addAll(getTestSpecs(true))
+                .addAll(getTestSpecs(false))
+                .build();
+    }
+
+    @NotNull
+    private static ImmutableList<TestSpec> getTestSpecs(boolean withFailure) {
+        return ImmutableList.of(
+                // var char single filter
+                new TestSpec(
+                        withFailure,
+                        Arrays.asList(new String[] {"(comment1 = ?)"}),
+                        new Serializable[] {"11-c1-v1"},
+                        new Object[] {1, StringData.fromString("1")},
+                        Arrays.asList(new String[] 
{"+I(1,1,11-c1-v1,11-c2-v1,100.1011,1.1,2.2)"})),

Review Comment:
   sorry my bad - you are right - I have fixed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to