wuchong commented on code in PR #20439:
URL: https://github.com/apache/flink/pull/20439#discussion_r938567609


##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/lookup/cache/LookupCacheManager.java:
##########
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.connector.source.lookup.cache;
+
+import org.apache.flink.annotation.Internal;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Managing shared caches across different subtasks.
+ *
+ * <p>In order to reduce the memory usage of cache, different subtasks of the 
same lookup join
+ * runner will share the same cache instance. Caches are managed by the 
identifier of the lookup
+ * table for which it is serving.
+ */
+@Internal
+public class LookupCacheManager {

Review Comment:
   It seems this is only used internally (flink-table-runtime), why put it in a 
common module?



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/LookupJoinUtil.java:
##########
@@ -191,4 +197,113 @@ public static UserDefinedFunction getLookupFunction(
                         "table %s is neither TableSourceTable not 
LegacyTableSourceTable",
                         temporalTable.getQualifiedName()));
     }
+
+    public static Optional<LookupCacheHandler> getPartialLookupCacheHandler(
+            RelOptTable temporalTable,
+            ReadableConfig tableConfig,
+            ClassLoader classLoader,
+            RowType leftTableRowType,
+            RowType rightTableRowType,
+            Map<Integer, LookupKey> lookupKeys,
+            boolean enableObjectReuse) {
+        // Legacy table source does not support lookup caching
+        if (temporalTable instanceof LegacyTableSourceTable) {
+            return Optional.empty();
+        }
+        LookupTableSource lookupTableSource = 
getLookupTableSource(temporalTable);
+        LookupTableSource.LookupRuntimeProvider provider =
+                lookupTableSource.getLookupRuntimeProvider(
+                        createLookupContext(lookupKeys.keySet()));
+        if (provider instanceof PartialCachingLookupProvider) {
+            LookupCache cache = ((PartialCachingLookupProvider) 
provider).getCache();
+            if (cache == null) {
+                return Optional.empty();

Review Comment:
   Shall we allow a `null` cache for `PartialCachingLookupProvider`? 
   1. Users should use `LookupFunctionProvider` if no cache is provided. 
   2. `PartialCachingLookupProvider#getCache()` is not annotated `@Nullable`. 
   3. I think we also not allow null `getScanRuntimeProvider` and 
`getCacheReloadTrigger` for `FullCachingLookupProvider`.



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/LookupJoinUtil.java:
##########
@@ -191,4 +197,113 @@ public static UserDefinedFunction getLookupFunction(
                         "table %s is neither TableSourceTable not 
LegacyTableSourceTable",
                         temporalTable.getQualifiedName()));
     }
+
+    public static Optional<LookupCacheHandler> getPartialLookupCacheHandler(
+            RelOptTable temporalTable,
+            ReadableConfig tableConfig,
+            ClassLoader classLoader,
+            RowType leftTableRowType,
+            RowType rightTableRowType,
+            Map<Integer, LookupKey> lookupKeys,
+            boolean enableObjectReuse) {
+        // Legacy table source does not support lookup caching
+        if (temporalTable instanceof LegacyTableSourceTable) {
+            return Optional.empty();
+        }
+        LookupTableSource lookupTableSource = 
getLookupTableSource(temporalTable);
+        LookupTableSource.LookupRuntimeProvider provider =
+                lookupTableSource.getLookupRuntimeProvider(
+                        createLookupContext(lookupKeys.keySet()));
+        if (provider instanceof PartialCachingLookupProvider) {
+            LookupCache cache = ((PartialCachingLookupProvider) 
provider).getCache();
+            if (cache == null) {
+                return Optional.empty();
+            } else {
+                return buildLookupCacheHandler(
+                        temporalTable,
+                        tableConfig,
+                        classLoader,
+                        leftTableRowType,
+                        rightTableRowType,
+                        lookupKeys,
+                        enableObjectReuse,
+                        cache);
+            }
+        } else if (provider instanceof PartialCachingAsyncLookupProvider) {
+            LookupCache cache = ((PartialCachingAsyncLookupProvider) 
provider).getCache();
+            if (cache == null) {
+                return Optional.empty();
+            } else {
+                return buildLookupCacheHandler(
+                        temporalTable,
+                        tableConfig,
+                        classLoader,
+                        leftTableRowType,
+                        rightTableRowType,
+                        lookupKeys,
+                        enableObjectReuse,
+                        cache);
+            }
+        }
+        return Optional.empty();
+    }
+
+    private static Optional<LookupCacheHandler> buildLookupCacheHandler(
+            RelOptTable temporalTable,
+            ReadableConfig tableConfig,
+            ClassLoader classLoader,
+            RowType leftTableRowType,
+            RowType rightTableRowType,
+            Map<Integer, LookupKey> lookupKeys,
+            boolean enableObjectReuse,
+            LookupCache cache) {
+        RowType keyRowType =
+                RowType.of(
+                        lookupKeys.values().stream()
+                                .filter(key -> key instanceof 
FieldRefLookupKey)
+                                .map(key -> ((FieldRefLookupKey) key).index)
+                                .map(leftTableRowType::getTypeAt)
+                                .toArray(LogicalType[]::new));
+        if (enableObjectReuse) {
+            return Optional.of(
+                    new LookupCacheHandler(
+                            cache,
+                            StringUtils.join(temporalTable.getQualifiedName(), 
"."),
+                            
LookupJoinCodeGenerator.generateLeftTableKeyProjection(
+                                    tableConfig,
+                                    classLoader,
+                                    leftTableRowType,
+                                    keyRowType,
+                                    lookupKeys)));
+        } else {
+            return Optional.of(
+                    new LookupCacheHandler(
+                            cache,
+                            StringUtils.join(temporalTable.getQualifiedName(), 
"."),
+                            
LookupJoinCodeGenerator.generateLeftTableKeyProjection(
+                                    tableConfig,
+                                    classLoader,
+                                    leftTableRowType,
+                                    keyRowType,
+                                    lookupKeys),
+                            new RowDataSerializer(keyRowType),

Review Comment:
   `cacheKeySerializer` is not needed when we use the above keyselector. 



##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/lookup/cache/LookupCacheManager.java:
##########
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.connector.source.lookup.cache;
+
+import org.apache.flink.annotation.Internal;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Managing shared caches across different subtasks.
+ *
+ * <p>In order to reduce the memory usage of cache, different subtasks of the 
same lookup join
+ * runner will share the same cache instance. Caches are managed by the 
identifier of the lookup
+ * table for which it is serving.
+ */
+@Internal
+public class LookupCacheManager {
+    private static LookupCacheManager instance;
+    private final Map<String, LookupCache> cachesByTableIdentifier = new 
HashMap<>();
+
+    /** Get the shared instance of {@link LookupCacheManager}. */
+    public static synchronized LookupCacheManager getInstance() {
+        if (instance == null) {
+            instance = new LookupCacheManager();
+        }
+        return instance;
+    }
+
+    /**
+     * Register a cache instance with table identifier to the manager.
+     *
+     * <p>If the cache with the given table identifier is already registered 
in the manager, this
+     * method will return the registered one, otherwise this method will 
register the given cache
+     * into the manager then return.
+     *
+     * @param tableIdentifier table identifier
+     * @param cache instance of cache trying to register
+     * @return instance of the shared cache
+     */
+    public synchronized LookupCache registerCache(String tableIdentifier, 
LookupCache cache) {
+        checkNotNull(cache, "Could not register null cache in the manager");
+        if (cachesByTableIdentifier.containsKey(tableIdentifier)) {
+            return cachesByTableIdentifier.get(tableIdentifier);
+        } else {
+            cachesByTableIdentifier.put(tableIdentifier, cache);
+            return cache;
+        }
+    }

Review Comment:
   I have several concerns about this method:
   
   1. the same dim table can be used multiple times in a SQL query with 
different table options (e.g. cache TTL). I think we shouldn't reuse them if 
the configuration differs. That means we may need to introduce a 
`LookupCache#getIdentifier()` interface to get an identifier of a specific 
cache. 
   2. `tableIdentifier` is not enough to identify a cache. Because in session 
mode, different jobs may use the same table identifier to refer to different 
external tables. Maybe we should add JobID as part of the cache id.  
   3. Therefore, the final registered key should be composite of JobID, table 
identifier and cache identifier.
   4. From the method signature, the cache parameter is the one registered, 
however, it maybe not. Maybe `registerCacheIfAbsent` would be better (similar 
to `Map#putIfAbsent`).



##########
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/lookup/LookupCacheHandler.java:
##########
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.join.lookup;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.metrics.groups.CacheMetricGroup;
+import org.apache.flink.table.connector.source.lookup.cache.LookupCache;
+import org.apache.flink.table.connector.source.lookup.cache.LookupCacheManager;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.generated.GeneratedProjection;
+import org.apache.flink.table.runtime.generated.Projection;
+import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
+import org.apache.flink.types.RowKind;
+
+import javax.annotation.Nullable;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/** A helper class providing utils for accessing {@link LookupCache}. */
+@Internal
+public class LookupCacheHandler implements Serializable {
+    private static final long serialVersionUID = 1L;
+
+    private final String tableIdentifier;
+
+    // A projection which convert the input from left table to a row that only 
keeps the joining
+    // keys. The cache should only store the joining keys as the cache key.
+    private final GeneratedProjection generatedCacheKeyProjection;
+
+    // Serializer for deep copy the key and value to be stored in the cache.
+    // Make as nullable since copying is only required when object reuse is 
enabled.
+    @Nullable private final RowDataSerializer cacheKeySerializer;
+    @Nullable private final RowDataSerializer cacheValueSerializer;
+
+    private LookupCache cache;

Review Comment:
   `transient`?



##########
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AsyncLookupJoinITCase.scala:
##########
@@ -308,16 +318,78 @@ class AsyncLookupJoinITCase(
 }
 
 object AsyncLookupJoinITCase {
-  @Parameterized.Parameters(
-    name = "LegacyTableSource={0}, StateBackend={1}, ObjectReuse={2}, 
AsyncOutputMode={3}")
+
+  val LEGACY_TABLE_SOURCE: JBoolean = JBoolean.TRUE;
+  val DYNAMIC_TABLE_SOURCE: JBoolean = JBoolean.FALSE;
+  val ENABLE_OBJECT_REUSE: JBoolean = JBoolean.TRUE;
+  val DISABLE_OBJECT_REUSE: JBoolean = JBoolean.FALSE;
+  val ENABLE_CACHE: JBoolean = JBoolean.TRUE;
+  val DISABLE_CACHE: JBoolean = JBoolean.FALSE;
+
+  @Parameterized.Parameters(name =
+    "LegacyTableSource={0}, StateBackend={1}, ObjectReuse={2}, 
AsyncOutputMode={3}, EnableCache={4}")
   def parameters(): JCollection[Array[Object]] = {
     Seq[Array[AnyRef]](
-      Array(JBoolean.TRUE, HEAP_BACKEND, JBoolean.TRUE, 
AsyncOutputMode.ALLOW_UNORDERED),
-      Array(JBoolean.TRUE, ROCKSDB_BACKEND, JBoolean.FALSE, 
AsyncOutputMode.ORDERED),
-      Array(JBoolean.FALSE, HEAP_BACKEND, JBoolean.FALSE, 
AsyncOutputMode.ORDERED),
-      Array(JBoolean.FALSE, HEAP_BACKEND, JBoolean.TRUE, 
AsyncOutputMode.ORDERED),
-      Array(JBoolean.FALSE, ROCKSDB_BACKEND, JBoolean.FALSE, 
AsyncOutputMode.ALLOW_UNORDERED),
-      Array(JBoolean.FALSE, ROCKSDB_BACKEND, JBoolean.TRUE, 
AsyncOutputMode.ALLOW_UNORDERED)
+      Array(
+        LEGACY_TABLE_SOURCE,
+        HEAP_BACKEND,
+        ENABLE_OBJECT_REUSE,
+        AsyncOutputMode.ALLOW_UNORDERED,
+        DISABLE_CACHE),

Review Comment:
   I think we don't need double cases for dynamic table sources. I think mixing 
cache cases into the existing 4 cases (for dynamic table sources) is enough. 



##########
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/lookup/LookupCacheHandler.java:
##########
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.join.lookup;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.metrics.groups.CacheMetricGroup;
+import org.apache.flink.table.connector.source.lookup.cache.LookupCache;
+import org.apache.flink.table.connector.source.lookup.cache.LookupCacheManager;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.generated.GeneratedProjection;
+import org.apache.flink.table.runtime.generated.Projection;
+import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
+import org.apache.flink.types.RowKind;
+
+import javax.annotation.Nullable;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/** A helper class providing utils for accessing {@link LookupCache}. */
+@Internal
+public class LookupCacheHandler implements Serializable {
+    private static final long serialVersionUID = 1L;
+
+    private final String tableIdentifier;
+
+    // A projection which convert the input from left table to a row that only 
keeps the joining
+    // keys. The cache should only store the joining keys as the cache key.
+    private final GeneratedProjection generatedCacheKeyProjection;
+
+    // Serializer for deep copy the key and value to be stored in the cache.
+    // Make as nullable since copying is only required when object reuse is 
enabled.
+    @Nullable private final RowDataSerializer cacheKeySerializer;
+    @Nullable private final RowDataSerializer cacheValueSerializer;
+
+    private LookupCache cache;
+    private boolean initialized = false;
+
+    // The runtime instance of the cache key projection
+    private transient Projection<RowData, RowData> cacheKeyProjection;
+
+    public LookupCacheHandler(
+            LookupCache cache,
+            String tableIdentifier,
+            GeneratedProjection generatedCacheKeyProjection) {
+        this(cache, tableIdentifier, generatedCacheKeyProjection, null, null);
+    }
+
+    public LookupCacheHandler(
+            LookupCache cache,
+            String tableIdentifier,
+            GeneratedProjection generatedCacheKeyProjection,
+            @Nullable RowDataSerializer cacheKeySerializer,
+            @Nullable RowDataSerializer cacheValueSerializer) {
+        this.cache = cache;
+        this.tableIdentifier = tableIdentifier;
+        this.generatedCacheKeyProjection = 
checkNotNull(generatedCacheKeyProjection);
+        this.cacheKeySerializer = cacheKeySerializer;
+        this.cacheValueSerializer = cacheValueSerializer;
+    }
+
+    @SuppressWarnings("unchecked")
+    public void open(CacheMetricGroup metricGroup, ClassLoader classLoader) {
+        // Try to register the holding cache into the manager, and the manager 
will return the
+        // actual shared cache to use
+        cache = 
LookupCacheManager.getInstance().registerCache(tableIdentifier, cache);
+        cache.open(metricGroup);
+        cacheKeyProjection = 
generatedCacheKeyProjection.newInstance(classLoader);
+        initialized = true;
+    }
+
+    /** Project the input row from left table to get the key row. */
+    public RowData getKeyRowFromInput(RowData in) {
+        RowData keyRow = cacheKeyProjection.apply(in);
+        keyRow.setRowKind(RowKind.INSERT);
+        return keyRow;
+    }
+
+    /**
+     * Put the key value pair into the cache. If object reuse is enabled, copy 
the key value pair
+     * before storing into the cache.
+     */
+    public void maybeCopyThenPut(RowData key, Collection<RowData> value) {
+        if (cacheKeySerializer != null && cacheValueSerializer != null) {
+            RowData copiesKey = cacheKeySerializer.copy(key);
+            Collection<RowData> copiedValues =
+                    value.stream()
+                            .map(cacheValueSerializer::copy)
+                            .collect(Collectors.toCollection(ArrayList::new));
+            getCache().put(copiesKey, copiedValues);

Review Comment:
   Why copy?



##########
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/lookup/AsyncLookupJoinRunner.java:
##########
@@ -141,13 +166,22 @@ public void close() throws Exception {
                 rf.close();
             }
         }
+        if (cacheHandler != null) {
+            cacheHandler.getCache().close();

Review Comment:
   Should unregister the cache as well. Otherwise, memory leaks. That means 
`LookupCacheManager` may need to track the reference count. 



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/LookupJoinUtil.java:
##########
@@ -191,4 +197,113 @@ public static UserDefinedFunction getLookupFunction(
                         "table %s is neither TableSourceTable not 
LegacyTableSourceTable",
                         temporalTable.getQualifiedName()));
     }
+
+    public static Optional<LookupCacheHandler> getPartialLookupCacheHandler(
+            RelOptTable temporalTable,
+            ReadableConfig tableConfig,
+            ClassLoader classLoader,
+            RowType leftTableRowType,
+            RowType rightTableRowType,
+            Map<Integer, LookupKey> lookupKeys,
+            boolean enableObjectReuse) {
+        // Legacy table source does not support lookup caching
+        if (temporalTable instanceof LegacyTableSourceTable) {
+            return Optional.empty();
+        }
+        LookupTableSource lookupTableSource = 
getLookupTableSource(temporalTable);
+        LookupTableSource.LookupRuntimeProvider provider =
+                lookupTableSource.getLookupRuntimeProvider(
+                        createLookupContext(lookupKeys.keySet()));
+        if (provider instanceof PartialCachingLookupProvider) {
+            LookupCache cache = ((PartialCachingLookupProvider) 
provider).getCache();
+            if (cache == null) {
+                return Optional.empty();
+            } else {
+                return buildLookupCacheHandler(
+                        temporalTable,
+                        tableConfig,
+                        classLoader,
+                        leftTableRowType,
+                        rightTableRowType,
+                        lookupKeys,
+                        enableObjectReuse,
+                        cache);
+            }
+        } else if (provider instanceof PartialCachingAsyncLookupProvider) {
+            LookupCache cache = ((PartialCachingAsyncLookupProvider) 
provider).getCache();
+            if (cache == null) {
+                return Optional.empty();
+            } else {
+                return buildLookupCacheHandler(
+                        temporalTable,
+                        tableConfig,
+                        classLoader,
+                        leftTableRowType,
+                        rightTableRowType,
+                        lookupKeys,
+                        enableObjectReuse,
+                        cache);
+            }
+        }
+        return Optional.empty();
+    }
+
+    private static Optional<LookupCacheHandler> buildLookupCacheHandler(
+            RelOptTable temporalTable,
+            ReadableConfig tableConfig,
+            ClassLoader classLoader,
+            RowType leftTableRowType,
+            RowType rightTableRowType,
+            Map<Integer, LookupKey> lookupKeys,
+            boolean enableObjectReuse,
+            LookupCache cache) {
+        RowType keyRowType =
+                RowType.of(
+                        lookupKeys.values().stream()
+                                .filter(key -> key instanceof 
FieldRefLookupKey)
+                                .map(key -> ((FieldRefLookupKey) key).index)
+                                .map(leftTableRowType::getTypeAt)
+                                .toArray(LogicalType[]::new));
+        if (enableObjectReuse) {
+            return Optional.of(
+                    new LookupCacheHandler(
+                            cache,
+                            StringUtils.join(temporalTable.getQualifiedName(), 
"."),
+                            
LookupJoinCodeGenerator.generateLeftTableKeyProjection(
+                                    tableConfig,
+                                    classLoader,
+                                    leftTableRowType,
+                                    keyRowType,
+                                    lookupKeys)));
+        } else {
+            return Optional.of(
+                    new LookupCacheHandler(
+                            cache,
+                            StringUtils.join(temporalTable.getQualifiedName(), 
"."),
+                            
LookupJoinCodeGenerator.generateLeftTableKeyProjection(
+                                    tableConfig,
+                                    classLoader,
+                                    leftTableRowType,
+                                    keyRowType,
+                                    lookupKeys),

Review Comment:
   Please use 
`org.apache.flink.table.planner.plan.utils.KeySelectorUtil#getRowDataSelector` 
to create the key select/projection which guarantees always generates 
`BinaryRowData` for the consistent `hashcode()` and `equals()` behavior. 



##########
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/lookup/LookupJoinRunner.java:
##########
@@ -61,30 +83,72 @@ public void open(Configuration parameters) throws Exception 
{
         this.fetcher = 
generatedFetcher.newInstance(getRuntimeContext().getUserCodeClassLoader());
         this.collector =
                 
generatedCollector.newInstance(getRuntimeContext().getUserCodeClassLoader());
-
+        if (cacheHandler != null) {
+            cachingCollector = new ResultCachingCollector(cacheHandler);
+        }
         FunctionUtils.setFunctionRuntimeContext(fetcher, getRuntimeContext());
         FunctionUtils.setFunctionRuntimeContext(collector, 
getRuntimeContext());
         FunctionUtils.openFunction(fetcher, parameters);
         FunctionUtils.openFunction(collector, parameters);
 
         this.nullRow = new GenericRowData(tableFieldsCount);
         this.outRow = new JoinedRowData();
+        maybeInitializeCacheHandler();
     }
 
     @Override
     public void processElement(RowData in, Context ctx, Collector<RowData> 
out) throws Exception {
+        // Setup collector
         collector.setCollector(out);
         collector.setInput(in);
         collector.reset();
 
-        // fetcher has copied the input field when object reuse is enabled
-        fetcher.flatMap(in, getFetcherCollector());
+        if (cacheHandler != null) {
+            lookupViaCache(in, out);
+        } else {
+            fetcher.flatMap(in, getFetcherCollector());
+            maybeEmitNullForLeftOuterJoin(in, out);
+        }
+    }
 
-        if (isLeftOuterJoin && !collector.isCollected()) {
-            outRow.replace(in, nullRow);
-            outRow.setRowKind(in.getRowKind());
-            out.collect(outRow);
+    /**
+     * Make a lookup via the cache.
+     *
+     * <p>This function checks the cache first and emits cached records to the 
collector directly on
+     * cache hit, otherwise it will trigger a lookup in fetcher (user-provided 
logic) then store the
+     * result into the cache.
+     *
+     * <p>Please notice that the key row stored in the cache will be the input 
from left table after
+     * applying {@link LookupCacheHandler#getKeyRowFromInput}, and the value 
will be the raw value
+     * returned by user's fetcher, which means no calculation and projection. 
For example:
+     *
+     * <p>- Input from left table (id, name): +I(1, Alice)
+     *
+     * <p>- Value return by user's fetcher (id, age, gender): +I(1, 18, female)
+     *
+     * <p>Then the entry stored in the cache would be: +I(1), +I(1, 18, 
female), even calculation

Review Comment:
   The change flag can be omitted when explaining the cache strategy because 
the cache doesn't store any changelogs and all records should be insert-only.
   
   `+I(1), +I(1, 18, female)` ==> `key=(1), value=(1, 18, female)`



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/LookupJoinUtil.java:
##########
@@ -191,4 +197,113 @@ public static UserDefinedFunction getLookupFunction(
                         "table %s is neither TableSourceTable not 
LegacyTableSourceTable",
                         temporalTable.getQualifiedName()));
     }
+
+    public static Optional<LookupCacheHandler> getPartialLookupCacheHandler(
+            RelOptTable temporalTable,
+            ReadableConfig tableConfig,
+            ClassLoader classLoader,
+            RowType leftTableRowType,
+            RowType rightTableRowType,
+            Map<Integer, LookupKey> lookupKeys,
+            boolean enableObjectReuse) {
+        // Legacy table source does not support lookup caching
+        if (temporalTable instanceof LegacyTableSourceTable) {
+            return Optional.empty();
+        }
+        LookupTableSource lookupTableSource = 
getLookupTableSource(temporalTable);
+        LookupTableSource.LookupRuntimeProvider provider =
+                lookupTableSource.getLookupRuntimeProvider(
+                        createLookupContext(lookupKeys.keySet()));
+        if (provider instanceof PartialCachingLookupProvider) {
+            LookupCache cache = ((PartialCachingLookupProvider) 
provider).getCache();
+            if (cache == null) {
+                return Optional.empty();
+            } else {
+                return buildLookupCacheHandler(
+                        temporalTable,
+                        tableConfig,
+                        classLoader,
+                        leftTableRowType,
+                        rightTableRowType,
+                        lookupKeys,
+                        enableObjectReuse,
+                        cache);
+            }
+        } else if (provider instanceof PartialCachingAsyncLookupProvider) {
+            LookupCache cache = ((PartialCachingAsyncLookupProvider) 
provider).getCache();
+            if (cache == null) {
+                return Optional.empty();
+            } else {
+                return buildLookupCacheHandler(
+                        temporalTable,
+                        tableConfig,
+                        classLoader,
+                        leftTableRowType,
+                        rightTableRowType,
+                        lookupKeys,
+                        enableObjectReuse,
+                        cache);
+            }
+        }
+        return Optional.empty();
+    }
+
+    private static Optional<LookupCacheHandler> buildLookupCacheHandler(
+            RelOptTable temporalTable,
+            ReadableConfig tableConfig,
+            ClassLoader classLoader,
+            RowType leftTableRowType,
+            RowType rightTableRowType,
+            Map<Integer, LookupKey> lookupKeys,
+            boolean enableObjectReuse,
+            LookupCache cache) {
+        RowType keyRowType =
+                RowType.of(
+                        lookupKeys.values().stream()
+                                .filter(key -> key instanceof 
FieldRefLookupKey)
+                                .map(key -> ((FieldRefLookupKey) key).index)
+                                .map(leftTableRowType::getTypeAt)
+                                .toArray(LogicalType[]::new));
+        if (enableObjectReuse) {

Review Comment:
   The `if else` can be inlined using the ternary condition operator if the 
`cacheKeySerializer` is not needed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to