wuchong commented on code in PR #20480:
URL: https://github.com/apache/flink/pull/20480#discussion_r939639534


##########
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/table/lookup/CachingAsyncLookupFunction.java:
##########
@@ -0,0 +1,95 @@
+package org.apache.flink.table.runtime.functions.table.lookup;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.ThreadSafeSimpleCounter;
+import org.apache.flink.metrics.groups.CacheMetricGroup;
+import org.apache.flink.runtime.metrics.groups.InternalCacheMetricGroup;
+import org.apache.flink.table.connector.source.lookup.cache.LookupCache;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.functions.AsyncLookupFunction;
+import org.apache.flink.table.functions.FunctionContext;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.concurrent.CompletableFuture;
+
+public class CachingAsyncLookupFunction extends AsyncLookupFunction {
+
+    // Constants
+    public static final String LOOKUP_CACHE_METRIC_GROUP_NAME = "cache";
+    private static final long UNINITIALIZED = -1;
+
+    // The actual user-provided lookup function
+    private final AsyncLookupFunction delegate;
+
+    private String cacheIdentifier;
+    private LookupCache cache;
+
+    // Cache metrics
+    private transient CacheMetricGroup cacheMetricGroup;
+    private transient Counter loadCounter;
+    private transient Counter numLoadFailuresCounter;
+    private volatile long latestLoadTime = UNINITIALIZED;
+
+    public CachingAsyncLookupFunction(LookupCache cache, AsyncLookupFunction 
delegate) {
+        this.cache = cache;
+        this.delegate = delegate;
+    }
+
+    @Override
+    public void open(FunctionContext context) throws Exception {
+        // Get the shared cache from manager
+        cacheIdentifier = functionIdentifier();
+        cache = 
LookupCacheManager.getInstance().registerCacheIfAbsent(cacheIdentifier, cache);
+
+        // Register metrics
+        cacheMetricGroup =
+                new InternalCacheMetricGroup(
+                        context.getMetricGroup(), 
LOOKUP_CACHE_METRIC_GROUP_NAME);
+        loadCounter = new ThreadSafeSimpleCounter();
+        cacheMetricGroup.loadCounter(loadCounter);
+        numLoadFailuresCounter = new ThreadSafeSimpleCounter();
+        cacheMetricGroup.numLoadFailuresCounter(numLoadFailuresCounter);
+
+        cache.open(cacheMetricGroup);
+        delegate.open(context);
+    }
+
+    @Override
+    public CompletableFuture<Collection<RowData>> asyncLookup(RowData keyRow) {
+        Collection<RowData> cachedValues = cache.getIfPresent(keyRow);
+        if (cachedValues != null) {
+            return CompletableFuture.completedFuture(cachedValues);
+        } else {
+            return delegate.asyncLookup(keyRow)
+                    .whenComplete(
+                            (lookupValues, throwable) -> {
+                                if (throwable != null) {
+                                    numLoadFailuresCounter.inc();

Review Comment:
   It seems the `numLoadFailuresCounter` metric is useless because it will 
reset when any load failure happens. I think this metric makes sense when we 
support retry-load.



##########
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AsyncLookupJoinITCase.scala:
##########
@@ -305,19 +320,147 @@ class AsyncLookupJoinITCase(
     fail("NumberFormatException is expected here!")
   }
 
+  @Test
+  def testLookupCacheSharingAcrossSubtasks(): Unit = {
+    if (!enableCache) {
+      return
+    }
+    // Keep the cache for later validation
+    LookupCacheManager.keepCacheOnRelease(true)
+    try {
+      // Use datagen source here to support parallel running
+      val sourceDdl =
+        s"""
+           |CREATE TABLE T (
+           |  id BIGINT,
+           |  proc AS PROCTIME()
+           |) WITH (
+           |  'connector' = 'datagen',
+           |  'fields.id.kind' = 'sequence',
+           |  'fields.id.start' = '1',
+           |  'fields.id.end' = '6'
+           |)
+           |""".stripMargin
+      tEnv.executeSql(sourceDdl)
+      val sql =
+        """
+          |SELECT T.id, D.name, D.age FROM T 
+          |LEFT JOIN user_table FOR SYSTEM_TIME AS OF T.proc AS D 
+          |ON T.id = D.id
+          |""".stripMargin
+      val sink = new TestingAppendSink
+      tEnv.sqlQuery(sql).toAppendStream[Row].addSink(sink)
+      env.execute()
+
+      // Validate that only one cache is registered
+      val managedCaches = LookupCacheManager.getInstance().getManagedCaches
+      assertThat(managedCaches.size()).isEqualTo(1)
+
+      // Validate 6 entries are cached
+      val cache = 
managedCaches.get(managedCaches.keySet().iterator().next()).getCache
+      assertThat(cache.size()).isEqualTo(6)
+
+      // Validate contents of cached entries
+      assertThatIterable(cache.getIfPresent(GenericRowData.of(jl(1L))))
+        .containsExactlyInAnyOrder(
+          GenericRowData.of(ji(11), jl(1L), 
BinaryStringData.fromString("Julian")))
+      assertThatIterable(cache.getIfPresent(GenericRowData.of(jl(2L))))
+        .containsExactlyInAnyOrder(
+          GenericRowData.of(ji(22), jl(2L), 
BinaryStringData.fromString("Jark")))
+      assertThatIterable(cache.getIfPresent(GenericRowData.of(jl(3L))))
+        .containsExactlyInAnyOrder(
+          GenericRowData.of(ji(33), jl(3L), 
BinaryStringData.fromString("Fabian")))
+      
assertThatIterable(cache.getIfPresent(GenericRowData.of(jl(4L)))).isEmpty()
+    } finally {
+      LookupCacheManager.getInstance().checkAllReleased()
+      LookupCacheManager.getInstance().clear()
+      LookupCacheManager.keepCacheOnRelease(false)
+    }
+  }
+
+  def ji(i: Int): java.lang.Integer = {
+    new java.lang.Integer(i)
+  }
+
+  def jl(l: Long): java.lang.Long = {
+    new java.lang.Long(l)
+  }
+
 }
 
 object AsyncLookupJoinITCase {
-  @Parameterized.Parameters(
-    name = "LegacyTableSource={0}, StateBackend={1}, ObjectReuse={2}, 
AsyncOutputMode={3}")
+
+  val LEGACY_TABLE_SOURCE: JBoolean = JBoolean.TRUE;
+  val DYNAMIC_TABLE_SOURCE: JBoolean = JBoolean.FALSE;
+  val ENABLE_OBJECT_REUSE: JBoolean = JBoolean.TRUE;
+  val DISABLE_OBJECT_REUSE: JBoolean = JBoolean.FALSE;
+  val ENABLE_CACHE: JBoolean = JBoolean.TRUE;
+  val DISABLE_CACHE: JBoolean = JBoolean.FALSE;
+
+  @Parameterized.Parameters(name =
+    "LegacyTableSource={0}, StateBackend={1}, ObjectReuse={2}, 
AsyncOutputMode={3}, EnableCache={4}")
   def parameters(): JCollection[Array[Object]] = {
     Seq[Array[AnyRef]](
-      Array(JBoolean.TRUE, HEAP_BACKEND, JBoolean.TRUE, 
AsyncOutputMode.ALLOW_UNORDERED),
-      Array(JBoolean.TRUE, ROCKSDB_BACKEND, JBoolean.FALSE, 
AsyncOutputMode.ORDERED),
-      Array(JBoolean.FALSE, HEAP_BACKEND, JBoolean.FALSE, 
AsyncOutputMode.ORDERED),
-      Array(JBoolean.FALSE, HEAP_BACKEND, JBoolean.TRUE, 
AsyncOutputMode.ORDERED),
-      Array(JBoolean.FALSE, ROCKSDB_BACKEND, JBoolean.FALSE, 
AsyncOutputMode.ALLOW_UNORDERED),
-      Array(JBoolean.FALSE, ROCKSDB_BACKEND, JBoolean.TRUE, 
AsyncOutputMode.ALLOW_UNORDERED)
+      Array(
+        LEGACY_TABLE_SOURCE,
+        HEAP_BACKEND,
+        ENABLE_OBJECT_REUSE,
+        AsyncOutputMode.ALLOW_UNORDERED,
+        DISABLE_CACHE),
+      Array(
+        LEGACY_TABLE_SOURCE,
+        ROCKSDB_BACKEND,
+        DISABLE_OBJECT_REUSE,
+        AsyncOutputMode.ORDERED,
+        DISABLE_CACHE),
+      Array(
+        DYNAMIC_TABLE_SOURCE,
+        HEAP_BACKEND,
+        DISABLE_OBJECT_REUSE,
+        AsyncOutputMode.ORDERED,
+        DISABLE_CACHE),
+      Array(
+        DYNAMIC_TABLE_SOURCE,
+        HEAP_BACKEND,
+        ENABLE_OBJECT_REUSE,
+        AsyncOutputMode.ORDERED,
+        DISABLE_CACHE),
+      Array(
+        DYNAMIC_TABLE_SOURCE,
+        ROCKSDB_BACKEND,
+        DISABLE_OBJECT_REUSE,
+        AsyncOutputMode.ALLOW_UNORDERED,
+        DISABLE_CACHE),
+      Array(
+        DYNAMIC_TABLE_SOURCE,
+        ROCKSDB_BACKEND,
+        ENABLE_OBJECT_REUSE,
+        AsyncOutputMode.ALLOW_UNORDERED,
+        DISABLE_CACHE),
+      Array(
+        DYNAMIC_TABLE_SOURCE,
+        HEAP_BACKEND,
+        DISABLE_OBJECT_REUSE,
+        AsyncOutputMode.ORDERED,
+        ENABLE_CACHE),
+      Array(
+        DYNAMIC_TABLE_SOURCE,
+        HEAP_BACKEND,
+        ENABLE_OBJECT_REUSE,
+        AsyncOutputMode.ORDERED,
+        ENABLE_CACHE),
+      Array(
+        DYNAMIC_TABLE_SOURCE,
+        ROCKSDB_BACKEND,
+        DISABLE_OBJECT_REUSE,
+        AsyncOutputMode.ALLOW_UNORDERED,
+        ENABLE_CACHE),
+      Array(
+        DYNAMIC_TABLE_SOURCE,
+        ROCKSDB_BACKEND,
+        ENABLE_OBJECT_REUSE,
+        AsyncOutputMode.ALLOW_UNORDERED,
+        ENABLE_CACHE)

Review Comment:
   I think we don't need double cases for dynamic table sources. I think mixing 
cache cases into the existing 4 cases (for dynamic table sources) is enough.



##########
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/table/lookup/CachingLookupFunction.java:
##########
@@ -0,0 +1,147 @@
+package org.apache.flink.table.runtime.functions.table.lookup;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.SimpleCounter;
+import org.apache.flink.metrics.groups.CacheMetricGroup;
+import org.apache.flink.runtime.metrics.MetricNames;
+import org.apache.flink.runtime.metrics.groups.InternalCacheMetricGroup;
+import org.apache.flink.table.connector.source.lookup.cache.LookupCache;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.functions.FunctionContext;
+import org.apache.flink.table.functions.LookupFunction;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A wrapper function around user-provided lookup function with a cache layer.
+ *
+ * <p>This function will check the cache on lookup request and return entries 
directly on cache hit,
+ * otherwise the function will invoke the actual lookup function, and store 
the entry into the cache
+ * after lookup for later use.
+ */
+@Internal
+public class CachingLookupFunction extends LookupFunction {
+
+    // Constants
+    public static final String LOOKUP_CACHE_METRIC_GROUP_NAME = "cache";
+    private static final long UNINITIALIZED = -1;
+
+    // The actual user-provided lookup function
+    private final LookupFunction delegate;
+
+    private LookupCache cache;
+
+    // Cache metrics
+    private transient String cacheIdentifier;
+    private transient CacheMetricGroup cacheMetricGroup;

Review Comment:
   This can be a local variable?



##########
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/table/lookup/CachingLookupFunction.java:
##########
@@ -0,0 +1,147 @@
+package org.apache.flink.table.runtime.functions.table.lookup;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.SimpleCounter;
+import org.apache.flink.metrics.groups.CacheMetricGroup;
+import org.apache.flink.runtime.metrics.MetricNames;
+import org.apache.flink.runtime.metrics.groups.InternalCacheMetricGroup;
+import org.apache.flink.table.connector.source.lookup.cache.LookupCache;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.functions.FunctionContext;
+import org.apache.flink.table.functions.LookupFunction;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A wrapper function around user-provided lookup function with a cache layer.
+ *
+ * <p>This function will check the cache on lookup request and return entries 
directly on cache hit,
+ * otherwise the function will invoke the actual lookup function, and store 
the entry into the cache
+ * after lookup for later use.
+ */
+@Internal
+public class CachingLookupFunction extends LookupFunction {
+
+    // Constants
+    public static final String LOOKUP_CACHE_METRIC_GROUP_NAME = "cache";
+    private static final long UNINITIALIZED = -1;
+
+    // The actual user-provided lookup function
+    private final LookupFunction delegate;
+
+    private LookupCache cache;
+
+    // Cache metrics
+    private transient String cacheIdentifier;
+    private transient CacheMetricGroup cacheMetricGroup;
+    private transient Counter loadCounter;
+    private transient Counter numLoadFailuresCounter;
+    private volatile long latestLoadTime = UNINITIALIZED;

Review Comment:
   This is never used?



##########
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/table/lookup/CachingLookupFunction.java:
##########
@@ -0,0 +1,147 @@
+package org.apache.flink.table.runtime.functions.table.lookup;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.SimpleCounter;
+import org.apache.flink.metrics.groups.CacheMetricGroup;
+import org.apache.flink.runtime.metrics.MetricNames;
+import org.apache.flink.runtime.metrics.groups.InternalCacheMetricGroup;
+import org.apache.flink.table.connector.source.lookup.cache.LookupCache;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.functions.FunctionContext;
+import org.apache.flink.table.functions.LookupFunction;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A wrapper function around user-provided lookup function with a cache layer.
+ *
+ * <p>This function will check the cache on lookup request and return entries 
directly on cache hit,
+ * otherwise the function will invoke the actual lookup function, and store 
the entry into the cache
+ * after lookup for later use.
+ */
+@Internal
+public class CachingLookupFunction extends LookupFunction {

Review Comment:
   Declare `serialVersionUID` field.



##########
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/table/lookup/CachingAsyncLookupFunction.java:
##########
@@ -0,0 +1,95 @@
+package org.apache.flink.table.runtime.functions.table.lookup;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.ThreadSafeSimpleCounter;
+import org.apache.flink.metrics.groups.CacheMetricGroup;
+import org.apache.flink.runtime.metrics.groups.InternalCacheMetricGroup;
+import org.apache.flink.table.connector.source.lookup.cache.LookupCache;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.functions.AsyncLookupFunction;
+import org.apache.flink.table.functions.FunctionContext;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.concurrent.CompletableFuture;
+
+public class CachingAsyncLookupFunction extends AsyncLookupFunction {
+
+    // Constants
+    public static final String LOOKUP_CACHE_METRIC_GROUP_NAME = "cache";
+    private static final long UNINITIALIZED = -1;
+
+    // The actual user-provided lookup function
+    private final AsyncLookupFunction delegate;
+
+    private String cacheIdentifier;
+    private LookupCache cache;
+
+    // Cache metrics
+    private transient CacheMetricGroup cacheMetricGroup;
+    private transient Counter loadCounter;
+    private transient Counter numLoadFailuresCounter;
+    private volatile long latestLoadTime = UNINITIALIZED;
+
+    public CachingAsyncLookupFunction(LookupCache cache, AsyncLookupFunction 
delegate) {
+        this.cache = cache;
+        this.delegate = delegate;
+    }
+
+    @Override
+    public void open(FunctionContext context) throws Exception {
+        // Get the shared cache from manager
+        cacheIdentifier = functionIdentifier();
+        cache = 
LookupCacheManager.getInstance().registerCacheIfAbsent(cacheIdentifier, cache);
+
+        // Register metrics
+        cacheMetricGroup =
+                new InternalCacheMetricGroup(
+                        context.getMetricGroup(), 
LOOKUP_CACHE_METRIC_GROUP_NAME);
+        loadCounter = new ThreadSafeSimpleCounter();
+        cacheMetricGroup.loadCounter(loadCounter);
+        numLoadFailuresCounter = new ThreadSafeSimpleCounter();
+        cacheMetricGroup.numLoadFailuresCounter(numLoadFailuresCounter);
+
+        cache.open(cacheMetricGroup);
+        delegate.open(context);
+    }
+
+    @Override
+    public CompletableFuture<Collection<RowData>> asyncLookup(RowData keyRow) {
+        Collection<RowData> cachedValues = cache.getIfPresent(keyRow);
+        if (cachedValues != null) {
+            return CompletableFuture.completedFuture(cachedValues);
+        } else {
+            return delegate.asyncLookup(keyRow)
+                    .whenComplete(
+                            (lookupValues, throwable) -> {
+                                if (throwable != null) {
+                                    numLoadFailuresCounter.inc();
+                                    throw new RuntimeException(
+                                            String.format("Failed to lookup 
key '%s'", keyRow),
+                                            throwable);
+                                }
+                                Collection<RowData> cachingValues = 
lookupValues;
+                                if (lookupValues == null || 
lookupValues.isEmpty()) {
+                                    cachingValues = Collections.emptyList();
+                                }
+                                cache.put(keyRow, cachingValues);

Review Comment:
   Add a comment to 
`org.apache.flink.table.functions.AsyncLookupFunction#asyncLookup` that "the 
returned Collection of RowData shouldn't be reused". The same to 
`LookupFunction#lookup`.



##########
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/table/lookup/CachingAsyncLookupFunction.java:
##########
@@ -0,0 +1,95 @@
+package org.apache.flink.table.runtime.functions.table.lookup;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.ThreadSafeSimpleCounter;
+import org.apache.flink.metrics.groups.CacheMetricGroup;
+import org.apache.flink.runtime.metrics.groups.InternalCacheMetricGroup;
+import org.apache.flink.table.connector.source.lookup.cache.LookupCache;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.functions.AsyncLookupFunction;
+import org.apache.flink.table.functions.FunctionContext;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.concurrent.CompletableFuture;
+
+public class CachingAsyncLookupFunction extends AsyncLookupFunction {

Review Comment:
   Declare `serialVersionUID` field.



##########
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/table/lookup/CachingAsyncLookupFunction.java:
##########
@@ -0,0 +1,95 @@
+package org.apache.flink.table.runtime.functions.table.lookup;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.ThreadSafeSimpleCounter;
+import org.apache.flink.metrics.groups.CacheMetricGroup;
+import org.apache.flink.runtime.metrics.groups.InternalCacheMetricGroup;
+import org.apache.flink.table.connector.source.lookup.cache.LookupCache;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.functions.AsyncLookupFunction;
+import org.apache.flink.table.functions.FunctionContext;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.concurrent.CompletableFuture;
+
+public class CachingAsyncLookupFunction extends AsyncLookupFunction {
+
+    // Constants
+    public static final String LOOKUP_CACHE_METRIC_GROUP_NAME = "cache";
+    private static final long UNINITIALIZED = -1;
+
+    // The actual user-provided lookup function
+    private final AsyncLookupFunction delegate;
+
+    private String cacheIdentifier;
+    private LookupCache cache;
+
+    // Cache metrics
+    private transient CacheMetricGroup cacheMetricGroup;
+    private transient Counter loadCounter;
+    private transient Counter numLoadFailuresCounter;
+    private volatile long latestLoadTime = UNINITIALIZED;
+
+    public CachingAsyncLookupFunction(LookupCache cache, AsyncLookupFunction 
delegate) {
+        this.cache = cache;
+        this.delegate = delegate;
+    }
+
+    @Override
+    public void open(FunctionContext context) throws Exception {
+        // Get the shared cache from manager
+        cacheIdentifier = functionIdentifier();
+        cache = 
LookupCacheManager.getInstance().registerCacheIfAbsent(cacheIdentifier, cache);
+
+        // Register metrics
+        cacheMetricGroup =
+                new InternalCacheMetricGroup(
+                        context.getMetricGroup(), 
LOOKUP_CACHE_METRIC_GROUP_NAME);
+        loadCounter = new ThreadSafeSimpleCounter();
+        cacheMetricGroup.loadCounter(loadCounter);

Review Comment:
   The `loadCounter` metric is never used. 



##########
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/table/lookup/CachingAsyncLookupFunction.java:
##########
@@ -0,0 +1,95 @@
+package org.apache.flink.table.runtime.functions.table.lookup;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.ThreadSafeSimpleCounter;
+import org.apache.flink.metrics.groups.CacheMetricGroup;
+import org.apache.flink.runtime.metrics.groups.InternalCacheMetricGroup;
+import org.apache.flink.table.connector.source.lookup.cache.LookupCache;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.functions.AsyncLookupFunction;
+import org.apache.flink.table.functions.FunctionContext;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.concurrent.CompletableFuture;
+
+public class CachingAsyncLookupFunction extends AsyncLookupFunction {

Review Comment:
   Missing Javadoc and license. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to