leonardBang commented on code in PR #20489:
URL: https://github.com/apache/flink/pull/20489#discussion_r940843642


##########
flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableFactory.java:
##########
@@ -181,6 +179,27 @@ private JdbcDmlOptions getJdbcDmlOptions(
                 .build();
     }
 
+    @Nullable
+    private LookupCache getLookupCache(ReadableConfig tableOptions) {
+        LookupCache cache = null;
+        // Legacy cache options
+        if (tableOptions.get(LOOKUP_CACHE_MAX_ROWS) > 0
+                && tableOptions.get(LOOKUP_CACHE_TTL).compareTo(Duration.ZERO) 
> 0) {
+            cache =
+                    DefaultLookupCache.newBuilder()
+                            
.maximumSize(tableOptions.get(LOOKUP_CACHE_MAX_ROWS))
+                            
.expireAfterWrite(tableOptions.get(LOOKUP_CACHE_TTL))
+                            
.cacheMissingKey(tableOptions.get(LOOKUP_CACHE_MISSING_KEY))
+                            .build();
+        }

Review Comment:
   After we add fallbackKeys, we can use new introducing options



##########
flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcRowDataLookupFunctionTest.java:
##########
@@ -45,30 +43,29 @@
 /** Test suite for {@link JdbcRowDataLookupFunction}. */
 public class JdbcRowDataLookupFunctionTest extends JdbcLookupTestBase {
 
-    private static String[] fieldNames = new String[] {"id1", "id2", 
"comment1", "comment2"};
-    private static DataType[] fieldDataTypes =
+    private static final String[] fieldNames = new String[] {"id1", "id2", 
"comment1", "comment2"};
+    private static final DataType[] fieldDataTypes =
             new DataType[] {
                 DataTypes.INT(), DataTypes.STRING(), DataTypes.STRING(), 
DataTypes.STRING()
             };
 
-    private static String[] lookupKeys = new String[] {"id1", "id2"};
-
-    @Test
-    public void testEval() throws Exception {
+    private static final String[] lookupKeys = new String[] {"id1", "id2"};
 
-        JdbcLookupOptions lookupOptions = JdbcLookupOptions.builder().build();
-        JdbcRowDataLookupFunction lookupFunction = 
buildRowDataLookupFunction(lookupOptions);
+    @ParameterizedTest(name = "withFailure = {0}")
+    @ValueSource(booleans = {false, true})
+    public void testLookup(boolean withFailure) throws Exception {
+        JdbcRowDataLookupFunction lookupFunction = 
buildRowDataLookupFunction(withFailure);
 
         ListOutputCollector collector = new ListOutputCollector();
         lookupFunction.setCollector(collector);
 
         lookupFunction.open(null);
 
         lookupFunction.eval(1, StringData.fromString("1"));
-
-        // close connection
-        lookupFunction.getDbConnection().close();
-
+        if (withFailure) {
+            // Close connection here, and this will be recovered by retry
+            lookupFunction.getDbConnection().close();

Review Comment:
   check null before call `close()`



##########
flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcConnectorOptions.java:
##########
@@ -116,6 +117,8 @@ public class JdbcConnectorOptions {
     // Lookup options
     // 
-----------------------------------------------------------------------------------------
 
+    /** @deprecated please use {@link LookupOptions#PARTIAL_CACHE_MAX_ROWS} 
instead. */
+    @Deprecated
     public static final ConfigOption<Long> LOOKUP_CACHE_MAX_ROWS =
             ConfigOptions.key("lookup.cache.max-rows")

Review Comment:
   Could we add `lookup.cache.max-rows` to LookupOptions#PARTIAL_CACHE_MAX_ROWS 
's fallbackKeys ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to