This is an automated email from the ASF dual-hosted git repository.

renqs pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 10a6f41fa12 [FLINK-28532][table] Support full caching in lookup join 
runner using InputFormats as scan runtime provider (#20447)
10a6f41fa12 is described below

commit 10a6f41fa12284f15af55c359c0c0954800f02de
Author: SmirAlex <[email protected]>
AuthorDate: Wed Aug 10 04:12:29 2022 +0700

    [FLINK-28532][table] Support full caching in lookup join runner using 
InputFormats as scan runtime provider (#20447)
---
 .../nodes/exec/common/CommonExecLookupJoin.java    |  11 +-
 .../table/planner/plan/utils/KeySelectorUtil.java  |  29 +++-
 .../table/planner/plan/utils/LookupJoinUtil.java   | 123 +++++++++++++--
 .../planner/codegen/ProjectionCodeGenerator.scala  |  17 ++
 .../physical/common/CommonPhysicalLookupJoin.scala |   9 +-
 .../planner/factories/TestValuesTableFactory.java  |  46 +++++-
 .../runtime/batch/sql/join/LookupJoinITCase.scala  |  47 ++++--
 .../runtime/stream/sql/LookupJoinITCase.scala      |  43 +++--
 .../table/lookup/CachingLookupFunction.java        |  24 ++-
 .../table/lookup/fullcache/CacheLoader.java        | 116 ++++++++++++++
 .../table/lookup/fullcache/LookupFullCache.java    | 101 ++++++++++++
 .../lookup/fullcache/ReloadTriggerContext.java     |  59 +++++++
 .../inputformat/InputFormatCacheLoader.java        | 159 +++++++++++++++++++
 .../inputformat/InputSplitCacheLoadTask.java       | 127 +++++++++++++++
 .../keyselector/GenericRowDataKeySelector.java     |  65 ++++++++
 .../table/fullcache/FullCacheTestInputFormat.java  | 175 +++++++++++++++++++++
 .../fullcache/TestManualCacheReloadTrigger.java    |  49 ++++++
 17 files changed, 1132 insertions(+), 68 deletions(-)

diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecLookupJoin.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecLookupJoin.java
index aaa2030ae65..facbb5ab50d 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecLookupJoin.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecLookupJoin.java
@@ -249,7 +249,11 @@ public abstract class CommonExecLookupJoin extends 
ExecNodeBase<RowData>
         // upsertMaterialize only works on sync lookup mode, async lookup is 
unsupported.
         if (!inputInsertOnly && upsertMaterialize) {
             userDefinedFunction =
-                    LookupJoinUtil.getLookupFunction(temporalTable, 
lookupKeys.keySet(), true);
+                    LookupJoinUtil.getLookupFunction(
+                            temporalTable,
+                            lookupKeys.keySet(),
+                            planner.getFlinkContext().getClassLoader(),
+                            true);
             UserDefinedFunctionHelper.prepareInstance(config, 
userDefinedFunction);
 
             return createSyncLookupJoinWithState(
@@ -268,7 +272,10 @@ public abstract class CommonExecLookupJoin extends 
ExecNodeBase<RowData>
                     lookupKeyContainsPrimaryKey);
         } else {
             userDefinedFunction =
-                    LookupJoinUtil.getLookupFunction(temporalTable, 
lookupKeys.keySet());
+                    LookupJoinUtil.getLookupFunction(
+                            temporalTable,
+                            lookupKeys.keySet(),
+                            planner.getFlinkContext().getClassLoader());
             if (userDefinedFunction instanceof AsyncTableFunction) {
                 isAsyncEnabled = true;
             }
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/KeySelectorUtil.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/KeySelectorUtil.java
index eacab8d7b2c..fff2f95f0f5 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/KeySelectorUtil.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/KeySelectorUtil.java
@@ -19,20 +19,30 @@
 package org.apache.flink.table.planner.plan.utils;
 
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.data.GenericRowData;
 import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.binary.BinaryRowData;
 import org.apache.flink.table.planner.codegen.CodeGeneratorContext;
 import org.apache.flink.table.planner.codegen.ProjectionCodeGenerator;
 import org.apache.flink.table.runtime.generated.GeneratedProjection;
 import org.apache.flink.table.runtime.keyselector.BinaryRowDataKeySelector;
 import org.apache.flink.table.runtime.keyselector.EmptyRowDataKeySelector;
+import org.apache.flink.table.runtime.keyselector.GenericRowDataKeySelector;
 import org.apache.flink.table.runtime.keyselector.RowDataKeySelector;
+import org.apache.flink.table.runtime.typeutils.InternalSerializers;
 import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
 import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.logical.RowType;
 
 /** Utility for KeySelector. */
 public class KeySelectorUtil {
 
+    public static RowDataKeySelector getRowDataSelector(
+            ClassLoader classLoader, int[] keyFields, 
InternalTypeInfo<RowData> rowType) {
+        return getRowDataSelector(classLoader, keyFields, rowType, 
BinaryRowData.class);
+    }
+
     /**
      * Create a RowDataKeySelector to extract keys from DataStream which type 
is {@link
      * InternalTypeInfo} of {@link RowData}.
@@ -44,7 +54,10 @@ public class KeySelectorUtil {
      *     InternalTypeInfo} of {@link RowData}.
      */
     public static RowDataKeySelector getRowDataSelector(
-            ClassLoader classLoader, int[] keyFields, 
InternalTypeInfo<RowData> rowType) {
+            ClassLoader classLoader,
+            int[] keyFields,
+            InternalTypeInfo<RowData> rowType,
+            Class<? extends RowData> outClass) {
         if (keyFields.length > 0) {
             LogicalType[] inputFieldTypes = rowType.toRowFieldTypes();
             LogicalType[] keyFieldTypes = new LogicalType[keyFields.length];
@@ -61,9 +74,19 @@ public class KeySelectorUtil {
                             "KeyProjection",
                             inputType,
                             returnType,
-                            keyFields);
+                            keyFields,
+                            outClass);
             InternalTypeInfo<RowData> keyRowType = 
InternalTypeInfo.of(returnType);
-            return new BinaryRowDataKeySelector(keyRowType, 
generatedProjection);
+            if (outClass == BinaryRowData.class) {
+                return new BinaryRowDataKeySelector(keyRowType, 
generatedProjection);
+            } else if (outClass == GenericRowData.class) {
+                RowDataSerializer keySerializer = 
InternalSerializers.create(returnType);
+                return new GenericRowDataKeySelector(
+                        keyRowType, keySerializer, generatedProjection);
+            } else {
+                throw new UnsupportedOperationException(
+                        "Currently only GenericRowData and BinaryRowData 
supported as outClass of KeySelector.");
+            }
         } else {
             return EmptyRowDataKeySelector.INSTANCE;
         }
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/LookupJoinUtil.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/LookupJoinUtil.java
index fffe805cb08..5545dda5da6 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/LookupJoinUtil.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/LookupJoinUtil.java
@@ -19,22 +19,38 @@
 package org.apache.flink.table.planner.plan.utils;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.io.InputFormat;
 import org.apache.flink.table.api.TableException;
 import org.apache.flink.table.connector.source.AsyncTableFunctionProvider;
+import org.apache.flink.table.connector.source.InputFormatProvider;
 import org.apache.flink.table.connector.source.LookupTableSource;
+import org.apache.flink.table.connector.source.ScanTableSource;
+import org.apache.flink.table.connector.source.SourceFunctionProvider;
 import org.apache.flink.table.connector.source.TableFunctionProvider;
 import 
org.apache.flink.table.connector.source.lookup.AsyncLookupFunctionProvider;
+import 
org.apache.flink.table.connector.source.lookup.FullCachingLookupProvider;
 import org.apache.flink.table.connector.source.lookup.LookupFunctionProvider;
 import 
org.apache.flink.table.connector.source.lookup.PartialCachingAsyncLookupProvider;
 import 
org.apache.flink.table.connector.source.lookup.PartialCachingLookupProvider;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.functions.UserDefinedFunction;
+import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
 import org.apache.flink.table.planner.plan.schema.LegacyTableSourceTable;
 import org.apache.flink.table.planner.plan.schema.TableSourceTable;
 import 
org.apache.flink.table.runtime.connector.source.LookupRuntimeProviderContext;
 import 
org.apache.flink.table.runtime.functions.table.lookup.CachingAsyncLookupFunction;
 import 
org.apache.flink.table.runtime.functions.table.lookup.CachingLookupFunction;
+import 
org.apache.flink.table.runtime.functions.table.lookup.fullcache.CacheLoader;
+import 
org.apache.flink.table.runtime.functions.table.lookup.fullcache.LookupFullCache;
+import 
org.apache.flink.table.runtime.functions.table.lookup.fullcache.inputformat.InputFormatCacheLoader;
+import org.apache.flink.table.runtime.keyselector.GenericRowDataKeySelector;
+import org.apache.flink.table.runtime.typeutils.InternalSerializers;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
 import org.apache.flink.table.sources.LookupableTableSource;
 import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.util.Preconditions;
 
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties;
@@ -153,8 +169,8 @@ public final class LookupJoinUtil {
 
     /** Gets LookupFunction from temporal table according to the given lookup 
keys. */
     public static UserDefinedFunction getLookupFunction(
-            RelOptTable temporalTable, Collection<Integer> lookupKeys) {
-        return getLookupFunction(temporalTable, lookupKeys, false);
+            RelOptTable temporalTable, Collection<Integer> lookupKeys, 
ClassLoader classLoader) {
+        return getLookupFunction(temporalTable, lookupKeys, classLoader, 
false);
     }
 
     /**
@@ -163,23 +179,16 @@ public final class LookupJoinUtil {
      * implemented.
      */
     public static UserDefinedFunction getLookupFunction(
-            RelOptTable temporalTable, Collection<Integer> lookupKeys, boolean 
requireSyncLookup) {
+            RelOptTable temporalTable,
+            Collection<Integer> lookupKeys,
+            ClassLoader classLoader,
+            boolean requireSyncLookup) {
 
         int[] lookupKeyIndicesInOrder = getOrderedLookupKeys(lookupKeys);
 
         if (temporalTable instanceof TableSourceTable) {
-            // TODO: support nested lookup keys in the future,
-            //  currently we only support top-level lookup keys
-            int[][] indices =
-                    IntStream.of(lookupKeyIndicesInOrder)
-                            .mapToObj(i -> new int[] {i})
-                            .toArray(int[][]::new);
-            LookupTableSource tableSource =
-                    (LookupTableSource) ((TableSourceTable) 
temporalTable).tableSource();
-            LookupRuntimeProviderContext providerContext =
-                    new LookupRuntimeProviderContext(indices);
             LookupTableSource.LookupRuntimeProvider provider =
-                    tableSource.getLookupRuntimeProvider(providerContext);
+                    getLookupRuntimeProvider(temporalTable, lookupKeys);
 
             // TODO this method will be refactored in FLINK-28848
             if (requireSyncLookup
@@ -199,6 +208,19 @@ public final class LookupJoinUtil {
                     return new CachingLookupFunction(
                             partialCachingLookupProvider.getCache(),
                             
partialCachingLookupProvider.createLookupFunction());
+                } else if (provider instanceof FullCachingLookupProvider) {
+                    FullCachingLookupProvider fullCachingLookupProvider =
+                            (FullCachingLookupProvider) provider;
+                    RowType tableSourceRowType =
+                            
FlinkTypeFactory.toLogicalRowType(temporalTable.getRowType());
+                    LookupFullCache fullCache =
+                            createFullCache(
+                                    fullCachingLookupProvider,
+                                    lookupKeyIndicesInOrder,
+                                    classLoader,
+                                    tableSourceRowType);
+                    return new CachingLookupFunction(
+                            fullCache, 
fullCachingLookupProvider.createLookupFunction());
                 }
                 return ((LookupFunctionProvider) 
provider).createLookupFunction();
             } else if (provider instanceof AsyncLookupFunctionProvider) {
@@ -246,4 +268,77 @@ public final class LookupJoinUtil {
                         "table %s is neither TableSourceTable not 
LegacyTableSourceTable",
                         temporalTable.getQualifiedName()));
     }
+
+    public static boolean isAsyncLookup(RelOptTable temporalTable, 
Collection<Integer> lookupKeys) {
+        if (temporalTable instanceof TableSourceTable) {
+            LookupTableSource.LookupRuntimeProvider provider =
+                    getLookupRuntimeProvider(temporalTable, lookupKeys);
+            return provider instanceof AsyncLookupFunctionProvider
+                    || provider instanceof AsyncTableFunctionProvider;
+        } else if (temporalTable instanceof LegacyTableSourceTable) {
+            LegacyTableSourceTable<?> legacyTableSourceTable =
+                    (LegacyTableSourceTable<?>) temporalTable;
+            LookupableTableSource<?> lookupableTableSource =
+                    (LookupableTableSource<?>) 
legacyTableSourceTable.tableSource();
+            return lookupableTableSource.isAsyncEnabled();
+        }
+        throw new TableException(
+                String.format(
+                        "table %s is neither TableSourceTable not 
LegacyTableSourceTable",
+                        temporalTable.getQualifiedName()));
+    }
+
+    private static LookupTableSource.LookupRuntimeProvider 
getLookupRuntimeProvider(
+            RelOptTable temporalTable, Collection<Integer> lookupKeys) {
+        int[] lookupKeyIndicesInOrder = getOrderedLookupKeys(lookupKeys);
+        // TODO: support nested lookup keys in the future,
+        //  currently we only support top-level lookup keys
+        int[][] indices =
+                IntStream.of(lookupKeyIndicesInOrder)
+                        .mapToObj(i -> new int[] {i})
+                        .toArray(int[][]::new);
+        LookupTableSource tableSource =
+                (LookupTableSource) ((TableSourceTable) 
temporalTable).tableSource();
+        LookupRuntimeProviderContext providerContext = new 
LookupRuntimeProviderContext(indices);
+        return tableSource.getLookupRuntimeProvider(providerContext);
+    }
+
+    private static LookupFullCache createFullCache(
+            FullCachingLookupProvider provider,
+            int[] lookupKeyIndicesInOrder,
+            ClassLoader classLoader,
+            RowType tableSourceRowType) {
+
+        ScanTableSource.ScanRuntimeProvider scanProvider = 
provider.getScanRuntimeProvider();
+        Preconditions.checkArgument(
+                scanProvider.isBounded(),
+                "ScanRuntimeProvider that is used for data loading in "
+                        + "lookup 'FULL' cache must be bounded.");
+
+        GenericRowDataKeySelector lookupTableKeySelector =
+                (GenericRowDataKeySelector)
+                        KeySelectorUtil.getRowDataSelector(
+                                classLoader,
+                                lookupKeyIndicesInOrder,
+                                InternalTypeInfo.of(tableSourceRowType),
+                                GenericRowData.class);
+
+        if (scanProvider instanceof InputFormatProvider) {
+            InputFormat<RowData, ?> inputFormat =
+                    ((InputFormatProvider) scanProvider).createInputFormat();
+            CacheLoader cacheLoader =
+                    new InputFormatCacheLoader(
+                            inputFormat,
+                            lookupTableKeySelector,
+                            InternalSerializers.create(tableSourceRowType));
+            return new LookupFullCache(cacheLoader, 
provider.getCacheReloadTrigger());
+        } else if (scanProvider instanceof SourceFunctionProvider) {
+            // TODO support SourceFunctions
+            throw new UnsupportedOperationException(
+                    "Full caching using SourceFunction currently not 
supported.");
+        } else {
+            throw new UnsupportedOperationException(
+                    "Currently only InputFormatProvider and 
SourceFunctionProvider are supported as ScanRuntimeProviders for Full caching 
lookup join.");
+        }
+    }
 }
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/ProjectionCodeGenerator.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/ProjectionCodeGenerator.scala
index 82d3b7c7ad0..496af1394fe 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/ProjectionCodeGenerator.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/ProjectionCodeGenerator.scala
@@ -138,4 +138,21 @@ object ProjectionCodeGenerator {
       outputType,
       inputMapping,
       inputTerm = DEFAULT_INPUT1_TERM)
+
+  /** For java invoke. */
+  def generateProjection(
+      ctx: CodeGeneratorContext,
+      name: String,
+      inputType: RowType,
+      outputType: RowType,
+      inputMapping: Array[Int],
+      outClass: Class[_ <: RowData]): GeneratedProjection =
+    generateProjection(
+      ctx,
+      name,
+      inputType,
+      outputType,
+      inputMapping,
+      outClass = outClass,
+      inputTerm = DEFAULT_INPUT1_TERM)
 }
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/common/CommonPhysicalLookupJoin.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/common/CommonPhysicalLookupJoin.scala
index 0d6d964c1fe..d8193eed182 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/common/CommonPhysicalLookupJoin.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/common/CommonPhysicalLookupJoin.scala
@@ -19,7 +19,6 @@ package 
org.apache.flink.table.planner.plan.nodes.physical.common
 
 import org.apache.flink.table.api.TableException
 import org.apache.flink.table.catalog.{ObjectIdentifier, UniqueConstraint}
-import org.apache.flink.table.functions.{AsyncTableFunction, TableFunction, 
UserDefinedFunction}
 import org.apache.flink.table.planner.calcite.FlinkTypeFactory
 import org.apache.flink.table.planner.plan.nodes.FlinkRelNode
 import org.apache.flink.table.planner.plan.schema.{IntermediateRelTable, 
LegacyTableSourceTable, TableSourceTable}
@@ -160,12 +159,8 @@ abstract class CommonPhysicalLookupJoin(
       case t: LegacyTableSourceTable[_] => t.tableIdentifier
     }
 
-    val lookupFunction: UserDefinedFunction =
-      LookupJoinUtil.getLookupFunction(temporalTable, 
allLookupKeys.keys.map(Int.box).toList.asJava)
-    val isAsyncEnabled: Boolean = lookupFunction match {
-      case _: TableFunction[_] => false
-      case _: AsyncTableFunction[_] => true
-    }
+    val isAsyncEnabled: Boolean =
+      LookupJoinUtil.isAsyncLookup(temporalTable, 
allLookupKeys.keys.map(Int.box).toList.asJava)
 
     super
       .explainTerms(pw)
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java
index 44d7c537be2..60967f8494d 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java
@@ -69,13 +69,17 @@ import 
org.apache.flink.table.connector.source.abilities.SupportsReadingMetadata
 import 
org.apache.flink.table.connector.source.abilities.SupportsSourceWatermark;
 import 
org.apache.flink.table.connector.source.abilities.SupportsWatermarkPushDown;
 import 
org.apache.flink.table.connector.source.lookup.AsyncLookupFunctionProvider;
+import 
org.apache.flink.table.connector.source.lookup.FullCachingLookupProvider;
 import org.apache.flink.table.connector.source.lookup.LookupFunctionProvider;
 import org.apache.flink.table.connector.source.lookup.LookupOptions;
 import 
org.apache.flink.table.connector.source.lookup.PartialCachingAsyncLookupProvider;
 import 
org.apache.flink.table.connector.source.lookup.PartialCachingLookupProvider;
 import org.apache.flink.table.connector.source.lookup.cache.DefaultLookupCache;
 import org.apache.flink.table.connector.source.lookup.cache.LookupCache;
+import 
org.apache.flink.table.connector.source.lookup.cache.trigger.CacheReloadTrigger;
+import 
org.apache.flink.table.connector.source.lookup.cache.trigger.PeriodicCacheReloadTrigger;
 import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.util.DataFormatConverters;
 import org.apache.flink.table.expressions.AggregateExpression;
 import org.apache.flink.table.expressions.FieldReferenceExpression;
 import org.apache.flink.table.expressions.ResolvedExpression;
@@ -100,6 +104,7 @@ import 
org.apache.flink.table.planner.functions.aggfunctions.SumAggFunction;
 import org.apache.flink.table.planner.runtime.utils.FailingCollectionSource;
 import org.apache.flink.table.planner.utils.FilterUtils;
 import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
+import 
org.apache.flink.table.runtime.functions.table.fullcache.FullCacheTestInputFormat;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.logical.BigIntType;
 import org.apache.flink.table.types.logical.LogicalType;
@@ -136,6 +141,11 @@ import java.util.stream.Collectors;
 import scala.collection.Seq;
 
 import static 
org.apache.flink.table.connector.source.lookup.LookupOptions.CACHE_TYPE;
+import static 
org.apache.flink.table.connector.source.lookup.LookupOptions.FULL_CACHE_PERIODIC_RELOAD_INTERVAL;
+import static 
org.apache.flink.table.connector.source.lookup.LookupOptions.FULL_CACHE_PERIODIC_RELOAD_SCHEDULE_MODE;
+import static 
org.apache.flink.table.connector.source.lookup.LookupOptions.FULL_CACHE_RELOAD_STRATEGY;
+import static 
org.apache.flink.table.connector.source.lookup.LookupOptions.FULL_CACHE_TIMED_RELOAD_INTERVAL_IN_DAYS;
+import static 
org.apache.flink.table.connector.source.lookup.LookupOptions.FULL_CACHE_TIMED_RELOAD_ISO_TIME;
 import static 
org.apache.flink.table.connector.source.lookup.LookupOptions.PARTIAL_CACHE_CACHE_MISSING_KEY;
 import static 
org.apache.flink.table.connector.source.lookup.LookupOptions.PARTIAL_CACHE_EXPIRE_AFTER_ACCESS;
 import static 
org.apache.flink.table.connector.source.lookup.LookupOptions.PARTIAL_CACHE_EXPIRE_AFTER_WRITE;
@@ -410,6 +420,10 @@ public final class TestValuesTableFactory
         if 
(helper.getOptions().get(CACHE_TYPE).equals(LookupOptions.LookupCacheType.PARTIAL))
 {
             cache = DefaultLookupCache.fromConfig(helper.getOptions());
         }
+        CacheReloadTrigger reloadTrigger = null;
+        if 
(helper.getOptions().get(CACHE_TYPE).equals(LookupOptions.LookupCacheType.FULL))
 {
+            reloadTrigger = 
PeriodicCacheReloadTrigger.fromConfig(helper.getOptions());
+        }
 
         Optional<List<String>> filterableFields =
                 helper.getOptions().getOptional(FILTERABLE_FIELDS);
@@ -525,7 +539,8 @@ public final class TestValuesTableFactory
                         partitions,
                         readableMetadata,
                         null,
-                        cache);
+                        cache,
+                        reloadTrigger);
             }
         } else {
             try {
@@ -631,7 +646,12 @@ public final class TestValuesTableFactory
                         PARTIAL_CACHE_EXPIRE_AFTER_ACCESS,
                         PARTIAL_CACHE_EXPIRE_AFTER_WRITE,
                         PARTIAL_CACHE_CACHE_MISSING_KEY,
-                        PARTIAL_CACHE_MAX_ROWS));
+                        PARTIAL_CACHE_MAX_ROWS,
+                        FULL_CACHE_RELOAD_STRATEGY,
+                        FULL_CACHE_PERIODIC_RELOAD_INTERVAL,
+                        FULL_CACHE_PERIODIC_RELOAD_SCHEDULE_MODE,
+                        FULL_CACHE_TIMED_RELOAD_ISO_TIME,
+                        FULL_CACHE_TIMED_RELOAD_INTERVAL_IN_DAYS));
     }
 
     private static int validateAndExtractRowtimeIndex(
@@ -1474,6 +1494,7 @@ public final class TestValuesTableFactory
 
         private final @Nullable String lookupFunctionClass;
         private final @Nullable LookupCache cache;
+        private final @Nullable CacheReloadTrigger reloadTrigger;
         private final boolean isAsync;
 
         private TestValuesScanLookupTableSource(
@@ -1495,7 +1516,8 @@ public final class TestValuesTableFactory
                 List<Map<String, String>> allPartitions,
                 Map<String, DataType> readableMetadata,
                 @Nullable int[] projectedMetadataFields,
-                @Nullable LookupCache cache) {
+                @Nullable LookupCache cache,
+                @Nullable CacheReloadTrigger reloadTrigger) {
             super(
                     producedDataType,
                     changelogMode,
@@ -1516,6 +1538,7 @@ public final class TestValuesTableFactory
             this.lookupFunctionClass = lookupFunctionClass;
             this.isAsync = isAsync;
             this.cache = cache;
+            this.reloadTrigger = reloadTrigger;
         }
 
         @SuppressWarnings({"unchecked", "rawtypes"})
@@ -1569,10 +1592,18 @@ public final class TestValuesTableFactory
             } else {
                 TestValuesLookupFunction lookupFunction =
                         new TestValuesLookupFunction(data, lookupIndices, 
converter);
-                if (cache == null) {
-                    return LookupFunctionProvider.of(lookupFunction);
-                } else {
+                if (cache != null) {
                     return PartialCachingLookupProvider.of(lookupFunction, 
cache);
+                } else if (reloadTrigger != null) {
+                    DataFormatConverters.RowConverter rowConverter =
+                            new DataFormatConverters.RowConverter(
+                                    producedDataType.getChildren().toArray(new 
DataType[] {}));
+                    FullCacheTestInputFormat inputFormat =
+                            new FullCacheTestInputFormat(data, rowConverter);
+                    return FullCachingLookupProvider.of(
+                            InputFormatProvider.of(inputFormat), 
reloadTrigger);
+                } else {
+                    return LookupFunctionProvider.of(lookupFunction);
                 }
             }
         }
@@ -1598,7 +1629,8 @@ public final class TestValuesTableFactory
                     allPartitions,
                     readableMetadata,
                     projectedMetadataFields,
-                    cache);
+                    cache,
+                    reloadTrigger);
         }
     }
 
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/join/LookupJoinITCase.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/join/LookupJoinITCase.scala
index aaf22ab7150..219c6bbef12 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/join/LookupJoinITCase.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/join/LookupJoinITCase.scala
@@ -19,6 +19,7 @@ package org.apache.flink.table.planner.runtime.batch.sql.join
 
 import org.apache.flink.table.api.{TableSchema, Types}
 import org.apache.flink.table.connector.source.lookup.LookupOptions
+import 
org.apache.flink.table.connector.source.lookup.LookupOptions.{LookupCacheType, 
ReloadStrategy}
 import org.apache.flink.table.data.GenericRowData
 import org.apache.flink.table.data.binary.BinaryStringData
 import org.apache.flink.table.planner.factories.TestValuesTableFactory
@@ -39,7 +40,7 @@ import java.util
 import scala.collection.JavaConversions._
 
 @RunWith(classOf[Parameterized])
-class LookupJoinITCase(legacyTableSource: Boolean, isAsyncMode: Boolean, 
enableCache: Boolean)
+class LookupJoinITCase(legacyTableSource: Boolean, isAsyncMode: Boolean, 
cacheType: LookupCacheType)
   extends BatchTestBase {
 
   val data = List(
@@ -104,11 +105,17 @@ class LookupJoinITCase(legacyTableSource: Boolean, 
isAsyncMode: Boolean, enableC
     } else {
       val dataId = TestValuesTableFactory.registerData(data)
       val cacheOptions =
-        if (enableCache)
+        if (cacheType == LookupCacheType.PARTIAL)
           s"""
-             |  '${LookupOptions.CACHE_TYPE.key()}' = 
'${LookupOptions.LookupCacheType.PARTIAL}',
+             |  '${LookupOptions.CACHE_TYPE.key()}' = 
'${LookupCacheType.PARTIAL}',
              |  '${LookupOptions.PARTIAL_CACHE_MAX_ROWS.key()}' = 
'${Long.MaxValue}',
              |""".stripMargin
+        else if (cacheType == LookupCacheType.FULL)
+          s"""
+             |  '${LookupOptions.CACHE_TYPE.key()}' = 
'${LookupCacheType.FULL}',
+             |  '${LookupOptions.FULL_CACHE_RELOAD_STRATEGY.key()}' = 
'${ReloadStrategy.PERIODIC}',
+             |  '${LookupOptions.FULL_CACHE_PERIODIC_RELOAD_INTERVAL.key()}' = 
'${Long.MaxValue}',
+             |""".stripMargin
         else ""
 
       tEnv.executeSql(s"""
@@ -131,11 +138,17 @@ class LookupJoinITCase(legacyTableSource: Boolean, 
isAsyncMode: Boolean, enableC
     if (!legacyTableSource) {
       val dataId = TestValuesTableFactory.registerData(data)
       val cacheOptions =
-        if (enableCache)
+        if (cacheType == LookupCacheType.PARTIAL)
           s"""
-             |  '${LookupOptions.CACHE_TYPE.key()}' = 
'${LookupOptions.LookupCacheType.PARTIAL}',
+             |  '${LookupOptions.CACHE_TYPE.key()}' = 
'${LookupCacheType.PARTIAL}',
              |  '${LookupOptions.PARTIAL_CACHE_MAX_ROWS.key()}' = 
'${Long.MaxValue}',
              |""".stripMargin
+        else if (cacheType == LookupCacheType.FULL)
+          s"""
+             |  '${LookupOptions.CACHE_TYPE.key()}' = 
'${LookupCacheType.FULL}',
+             |  '${LookupOptions.FULL_CACHE_RELOAD_STRATEGY.key()}' = 
'${ReloadStrategy.PERIODIC}',
+             |  '${LookupOptions.FULL_CACHE_PERIODIC_RELOAD_INTERVAL.key()}' = 
'${Long.MaxValue}',
+             |""".stripMargin
         else ""
       tEnv.executeSql(s"""
                          |CREATE TABLE $tableName (
@@ -331,7 +344,7 @@ class LookupJoinITCase(legacyTableSource: Boolean, 
isAsyncMode: Boolean, enableC
 
   @Test
   def testLookupCacheSharingAcrossSubtasks(): Unit = {
-    if (!enableCache) {
+    if (cacheType == LookupCacheType.NONE) {
       return
     }
     // Keep the cache for later validation
@@ -364,9 +377,10 @@ class LookupJoinITCase(legacyTableSource: Boolean, 
isAsyncMode: Boolean, enableC
       val managedCaches = LookupCacheManager.getInstance().getManagedCaches
       assertThat(managedCaches.size()).isEqualTo(1)
 
-      // Validate 6 entries are cached
+      val numEntries = if (cacheType == LookupCacheType.PARTIAL) 6 else 
userData.size
+      // Validate 6 entries are cached for PARTIAL and all entries for FULL
       val cache = 
managedCaches.get(managedCaches.keySet().iterator().next()).getCache
-      assertThat(cache.size()).isEqualTo(6)
+      assertThat(cache.size()).isEqualTo(numEntries)
 
       // Validate contents of cached entries
       assertThatIterable(cache.getIfPresent(GenericRowData.of(jl(1L))))
@@ -401,18 +415,17 @@ object LookupJoinITCase {
   val DYNAMIC_TABLE_SOURCE: JBoolean = JBoolean.FALSE;
   val ASYNC_MODE: JBoolean = JBoolean.TRUE;
   val SYNC_MODE: JBoolean = JBoolean.FALSE;
-  val ENABLE_CACHE: JBoolean = JBoolean.TRUE;
-  val DISABLE_CACHE: JBoolean = JBoolean.FALSE;
 
-  @Parameterized.Parameters(name = "LegacyTableSource={0}, isAsyncMode = {1}, 
enableCache = {2}")
+  @Parameterized.Parameters(name = "LegacyTableSource={0}, isAsyncMode = {1}, 
cacheType = {2}")
   def parameters(): util.Collection[Array[java.lang.Object]] = {
     Seq[Array[AnyRef]](
-      Array(LEGACY_TABLE_SOURCE, ASYNC_MODE, DISABLE_CACHE),
-      Array(LEGACY_TABLE_SOURCE, SYNC_MODE, DISABLE_CACHE),
-      Array(DYNAMIC_TABLE_SOURCE, ASYNC_MODE, DISABLE_CACHE),
-      Array(DYNAMIC_TABLE_SOURCE, SYNC_MODE, DISABLE_CACHE),
-      Array(DYNAMIC_TABLE_SOURCE, ASYNC_MODE, ENABLE_CACHE),
-      Array(DYNAMIC_TABLE_SOURCE, SYNC_MODE, ENABLE_CACHE)
+      Array(LEGACY_TABLE_SOURCE, ASYNC_MODE, LookupCacheType.NONE),
+      Array(LEGACY_TABLE_SOURCE, SYNC_MODE, LookupCacheType.NONE),
+      Array(DYNAMIC_TABLE_SOURCE, ASYNC_MODE, LookupCacheType.NONE),
+      Array(DYNAMIC_TABLE_SOURCE, SYNC_MODE, LookupCacheType.NONE),
+      Array(DYNAMIC_TABLE_SOURCE, ASYNC_MODE, LookupCacheType.PARTIAL),
+      Array(DYNAMIC_TABLE_SOURCE, SYNC_MODE, LookupCacheType.PARTIAL),
+      Array(DYNAMIC_TABLE_SOURCE, SYNC_MODE, LookupCacheType.FULL)
     )
   }
 }
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/LookupJoinITCase.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/LookupJoinITCase.scala
index 748026bf750..752a903ca39 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/LookupJoinITCase.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/LookupJoinITCase.scala
@@ -22,6 +22,7 @@ import org.apache.flink.table.api._
 import org.apache.flink.table.api.bridge.scala._
 import org.apache.flink.table.api.config.OptimizerConfigOptions
 import org.apache.flink.table.connector.source.lookup.LookupOptions
+import 
org.apache.flink.table.connector.source.lookup.LookupOptions.{LookupCacheType, 
ReloadStrategy}
 import org.apache.flink.table.data.GenericRowData
 import org.apache.flink.table.data.binary.BinaryStringData
 import org.apache.flink.table.planner.factories.TestValuesTableFactory
@@ -44,7 +45,8 @@ import java.util.{Collection => JCollection}
 import scala.collection.JavaConversions._
 
 @RunWith(classOf[Parameterized])
-class LookupJoinITCase(legacyTableSource: Boolean, enableCache: Boolean) 
extends StreamingTestBase {
+class LookupJoinITCase(legacyTableSource: Boolean, cacheType: LookupCacheType)
+  extends StreamingTestBase {
 
   val data = List(
     rowOf(1L, 12, "Julian"),
@@ -108,11 +110,17 @@ class LookupJoinITCase(legacyTableSource: Boolean, 
enableCache: Boolean) extends
     } else {
       val dataId = TestValuesTableFactory.registerData(data)
       val cacheOptions =
-        if (enableCache)
+        if (cacheType == LookupCacheType.PARTIAL)
           s"""
-             |  '${LookupOptions.CACHE_TYPE.key()}' = 
'${LookupOptions.LookupCacheType.PARTIAL}',
+             |  '${LookupOptions.CACHE_TYPE.key()}' = 
'${LookupCacheType.PARTIAL}',
              |  '${LookupOptions.PARTIAL_CACHE_MAX_ROWS.key()}' = 
'${Long.MaxValue}',
              |""".stripMargin
+        else if (cacheType == LookupCacheType.FULL)
+          s"""
+             |  '${LookupOptions.CACHE_TYPE.key()}' = 
'${LookupCacheType.FULL}',
+             |  '${LookupOptions.FULL_CACHE_RELOAD_STRATEGY.key()}' = 
'${ReloadStrategy.PERIODIC}',
+             |  '${LookupOptions.FULL_CACHE_PERIODIC_RELOAD_INTERVAL.key()}' = 
'${Long.MaxValue}',
+             |""".stripMargin
         else ""
 
       tEnv.executeSql(s"""
@@ -133,11 +141,17 @@ class LookupJoinITCase(legacyTableSource: Boolean, 
enableCache: Boolean) extends
     if (!legacyTableSource) {
       val dataId = TestValuesTableFactory.registerData(data)
       val cacheOptions =
-        if (enableCache)
+        if (cacheType == LookupCacheType.PARTIAL)
           s"""
-             |  '${LookupOptions.CACHE_TYPE.key()}' = 
'${LookupOptions.LookupCacheType.PARTIAL}',
+             |  '${LookupOptions.CACHE_TYPE.key()}' = 
'${LookupCacheType.PARTIAL}',
              |  '${LookupOptions.PARTIAL_CACHE_MAX_ROWS.key()}' = 
'${Long.MaxValue}',
              |""".stripMargin
+        else if (cacheType == LookupCacheType.FULL)
+          s"""
+             |  '${LookupOptions.CACHE_TYPE.key()}' = 
'${LookupCacheType.FULL}',
+             |  '${LookupOptions.FULL_CACHE_RELOAD_STRATEGY.key()}' = 
'${ReloadStrategy.PERIODIC}',
+             |  '${LookupOptions.FULL_CACHE_PERIODIC_RELOAD_INTERVAL.key()}' = 
'${Long.MaxValue}',
+             |""".stripMargin
         else ""
       tEnv.executeSql(s"""
                          |CREATE TABLE $tableName (
@@ -556,7 +570,7 @@ class LookupJoinITCase(legacyTableSource: Boolean, 
enableCache: Boolean) extends
 
   @Test
   def testLookupCacheSharingAcrossSubtasks(): Unit = {
-    if (!enableCache) {
+    if (cacheType == LookupCacheType.NONE) {
       return
     }
     // Keep the cache for later validation
@@ -590,9 +604,10 @@ class LookupJoinITCase(legacyTableSource: Boolean, 
enableCache: Boolean) extends
       val managedCaches = LookupCacheManager.getInstance().getManagedCaches
       assertThat(managedCaches.size()).isEqualTo(1)
 
-      // Validate 6 entries are cached
+      val numEntries = if (cacheType == LookupCacheType.PARTIAL) 6 else 
userData.size
+      // Validate 6 entries are cached for PARTIAL and all entries for FULL
       val cache = 
managedCaches.get(managedCaches.keySet().iterator().next()).getCache
-      assertThat(cache.size()).isEqualTo(6)
+      assertThat(cache.size()).isEqualTo(numEntries)
 
       // Validate contents of cached entries
       assertThatIterable(cache.getIfPresent(GenericRowData.of(jl(1L))))
@@ -699,14 +714,14 @@ object LookupJoinITCase {
 
   val LEGACY_TABLE_SOURCE: JBoolean = JBoolean.TRUE;
   val DYNAMIC_TABLE_SOURCE: JBoolean = JBoolean.FALSE;
-  val ENABLE_CACHE: JBoolean = JBoolean.TRUE;
-  val DISABLE_CACHE: JBoolean = JBoolean.FALSE;
 
-  @Parameterized.Parameters(name = "LegacyTableSource={0}, EnableCache={1}")
+  @Parameterized.Parameters(name = "LegacyTableSource={0}, cacheType={1}")
   def parameters(): JCollection[Array[Object]] = {
     Seq[Array[AnyRef]](
-      Array(LEGACY_TABLE_SOURCE, DISABLE_CACHE),
-      Array(DYNAMIC_TABLE_SOURCE, ENABLE_CACHE),
-      Array(DYNAMIC_TABLE_SOURCE, DISABLE_CACHE))
+      Array(LEGACY_TABLE_SOURCE, LookupCacheType.NONE),
+      Array(DYNAMIC_TABLE_SOURCE, LookupCacheType.NONE),
+      Array(DYNAMIC_TABLE_SOURCE, LookupCacheType.PARTIAL),
+      Array(DYNAMIC_TABLE_SOURCE, LookupCacheType.FULL)
+    )
   }
 }
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/table/lookup/CachingLookupFunction.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/table/lookup/CachingLookupFunction.java
index d3bbf3b6d7c..6fe97bb6fc9 100644
--- 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/table/lookup/CachingLookupFunction.java
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/table/lookup/CachingLookupFunction.java
@@ -20,6 +20,7 @@ package org.apache.flink.table.runtime.functions.table.lookup;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.metrics.Counter;
 import org.apache.flink.metrics.SimpleCounter;
 import org.apache.flink.metrics.groups.CacheMetricGroup;
@@ -29,6 +30,10 @@ import 
org.apache.flink.table.connector.source.lookup.cache.LookupCache;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.functions.FunctionContext;
 import org.apache.flink.table.functions.LookupFunction;
+import 
org.apache.flink.table.runtime.functions.table.lookup.fullcache.LookupFullCache;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
 
 import java.io.IOException;
 import java.util.Collection;
@@ -52,7 +57,7 @@ public class CachingLookupFunction extends LookupFunction {
     private static final long UNINITIALIZED = -1;
 
     // The actual user-provided lookup function
-    private final LookupFunction delegate;
+    @Nullable private final LookupFunction delegate;
 
     private LookupCache cache;
     private transient String cacheIdentifier;
@@ -70,7 +75,7 @@ public class CachingLookupFunction extends LookupFunction {
      * actual cache instance will be retrieved from the {@link 
LookupCacheManager} during {@link
      * #open}.
      */
-    public CachingLookupFunction(LookupCache cache, LookupFunction delegate) {
+    public CachingLookupFunction(LookupCache cache, @Nullable LookupFunction 
delegate) {
         this.cache = cache;
         this.delegate = delegate;
     }
@@ -103,7 +108,13 @@ public class CachingLookupFunction extends LookupFunction {
 
         // Initialize cache and the delegating function
         cache.open(cacheMetricGroup);
-        delegate.open(context);
+        if (cache instanceof LookupFullCache) {
+            // TODO add Configuration into FunctionContext
+            ((LookupFullCache) cache).open(new Configuration());
+        }
+        if (delegate != null) {
+            delegate.open(context);
+        }
     }
 
     @Override
@@ -128,7 +139,9 @@ public class CachingLookupFunction extends LookupFunction {
 
     @Override
     public void close() throws Exception {
-        delegate.close();
+        if (delegate != null) {
+            delegate.close();
+        }
         if (cacheIdentifier != null) {
             LookupCacheManager.getInstance().unregisterCache(cacheIdentifier);
         }
@@ -142,6 +155,9 @@ public class CachingLookupFunction extends LookupFunction {
     // -------------------------------- Helper functions 
------------------------------
     private Collection<RowData> lookupByDelegate(RowData keyRow) throws 
IOException {
         try {
+            Preconditions.checkState(
+                    delegate != null,
+                    "User's lookup function can't be null, if there are 
possible cache misses.");
             Collection<RowData> lookupValues = delegate.lookup(keyRow);
             loadCounter.inc();
             updateLatestLoadTime();
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/table/lookup/fullcache/CacheLoader.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/table/lookup/fullcache/CacheLoader.java
new file mode 100644
index 00000000000..bbd099833ab
--- /dev/null
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/table/lookup/fullcache/CacheLoader.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.functions.table.lookup.fullcache;
+
+import org.apache.flink.api.common.functions.AbstractRichFunction;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.ThreadSafeSimpleCounter;
+import org.apache.flink.metrics.groups.CacheMetricGroup;
+import 
org.apache.flink.table.connector.source.ScanTableSource.ScanRuntimeProvider;
+import org.apache.flink.table.data.RowData;
+
+import org.apache.flink.shaded.guava30.com.google.common.base.Joiner;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * Abstract task that loads data in Full cache from source provided by {@link 
ScanRuntimeProvider}.
+ */
+public abstract class CacheLoader extends AbstractRichFunction implements 
Runnable, Serializable {
+    private static final Logger LOG = 
LoggerFactory.getLogger(CacheLoader.class);
+
+    protected transient volatile ConcurrentHashMap<RowData, 
Collection<RowData>> cache;
+
+    // 2 reloads can't be executed simultaneously, so they are performed under 
lock
+    private final ReentrantLock reloadLock = new ReentrantLock();
+    // runtime waits for the first load to complete to start an execution 
lookup join
+    private CountDownLatch firstLoadLatch;
+
+    // Cache metrics
+    private transient Counter loadCounter;
+    private transient Counter loadFailuresCounter;
+    private transient volatile long latestLoadTimeMs;
+
+    protected abstract void reloadCache() throws Exception;
+
+    @Override
+    public void open(Configuration parameters) throws Exception {
+        firstLoadLatch = new CountDownLatch(1);
+        loadCounter = new ThreadSafeSimpleCounter();
+        loadFailuresCounter = new ThreadSafeSimpleCounter();
+    }
+
+    public void open(CacheMetricGroup cacheMetricGroup) {
+        // Register metrics
+        cacheMetricGroup.loadCounter(loadCounter);
+        cacheMetricGroup.numLoadFailuresCounter(loadFailuresCounter);
+        cacheMetricGroup.numCachedRecordsGauge(() -> (long) cache.size());
+        cacheMetricGroup.latestLoadTimeGauge(() -> latestLoadTimeMs);
+    }
+
+    public ConcurrentHashMap<RowData, Collection<RowData>> getCache() {
+        return cache;
+    }
+
+    public void awaitFirstLoad() throws InterruptedException {
+        firstLoadLatch.await();
+    }
+
+    @Override
+    public void run() {
+        // 2 reloads can't be executed simultaneously
+        reloadLock.lock();
+        try {
+            LOG.info("Lookup 'FULL' cache loading triggered.");
+            long start = System.currentTimeMillis();
+            reloadCache();
+            latestLoadTimeMs = System.currentTimeMillis() - start;
+            loadCounter.inc();
+            LOG.info(
+                    "Lookup 'FULL' cache loading finished. Time elapsed - {} 
ms. Number of records - {}.",
+                    latestLoadTimeMs,
+                    cache.size());
+            if (LOG.isDebugEnabled()) {
+                // 'if' guard statement prevents us from transforming cache to 
string
+                LOG.debug(
+                        "Cache content: \n{\n\t{}\n}",
+                        Joiner.on(",\n\t").withKeyValueSeparator(" = 
").join(cache));
+            }
+        } catch (Exception e) {
+            loadFailuresCounter.inc();
+            throw new RuntimeException("Failed to reload lookup 'FULL' 
cache.", e);
+        } finally {
+            reloadLock.unlock();
+            firstLoadLatch.countDown();
+        }
+    }
+
+    @Override
+    public void close() throws Exception {
+        cache.clear();
+    }
+}
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/table/lookup/fullcache/LookupFullCache.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/table/lookup/fullcache/LookupFullCache.java
new file mode 100644
index 00000000000..0a631766199
--- /dev/null
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/table/lookup/fullcache/LookupFullCache.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.functions.table.lookup.fullcache;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.groups.CacheMetricGroup;
+import 
org.apache.flink.table.connector.source.lookup.LookupOptions.LookupCacheType;
+import org.apache.flink.table.connector.source.lookup.cache.LookupCache;
+import 
org.apache.flink.table.connector.source.lookup.cache.trigger.CacheReloadTrigger;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Collection;
+import java.util.Collections;
+
+/** Internal implementation of {@link LookupCache} for {@link 
LookupCacheType#FULL}. */
+public class LookupFullCache implements LookupCache {
+    private static final long serialVersionUID = 1L;
+
+    private final CacheLoader cacheLoader;
+    private final CacheReloadTrigger reloadTrigger;
+
+    private transient volatile ReloadTriggerContext reloadTriggerContext;
+    private transient volatile Throwable reloadFailCause;
+
+    public LookupFullCache(CacheLoader cacheLoader, CacheReloadTrigger 
reloadTrigger) {
+        this.cacheLoader = Preconditions.checkNotNull(cacheLoader);
+        this.reloadTrigger = Preconditions.checkNotNull(reloadTrigger);
+    }
+
+    @Override
+    public synchronized void open(CacheMetricGroup metricGroup) {
+        cacheLoader.open(metricGroup);
+    }
+
+    public synchronized void open(Configuration parameters) throws Exception {
+        if (reloadTriggerContext == null) {
+            cacheLoader.open(parameters);
+            reloadTriggerContext =
+                    new ReloadTriggerContext(
+                            cacheLoader,
+                            th -> {
+                                if (reloadFailCause == null) {
+                                    reloadFailCause = th;
+                                } else {
+                                    reloadFailCause.addSuppressed(th);
+                                }
+                            });
+
+            reloadTrigger.open(reloadTriggerContext);
+            cacheLoader.awaitFirstLoad();
+        }
+    }
+
+    @Override
+    public Collection<RowData> getIfPresent(RowData key) {
+        if (reloadFailCause != null) {
+            throw new RuntimeException(reloadFailCause);
+        }
+        return cacheLoader.getCache().getOrDefault(key, 
Collections.emptyList());
+    }
+
+    @Override
+    public Collection<RowData> put(RowData key, Collection<RowData> value) {
+        throw new UnsupportedOperationException(
+                "Lookup Full cache doesn't support public 'put' operation from 
the outside.");
+    }
+
+    @Override
+    public void invalidate(RowData key) {
+        throw new UnsupportedOperationException(
+                "Lookup Full cache doesn't support public 'invalidate' 
operation from the outside.");
+    }
+
+    @Override
+    public long size() {
+        return cacheLoader.getCache().size();
+    }
+
+    @Override
+    public void close() throws Exception {
+        cacheLoader.close();
+        reloadTrigger.close();
+    }
+}
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/table/lookup/fullcache/ReloadTriggerContext.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/table/lookup/fullcache/ReloadTriggerContext.java
new file mode 100644
index 00000000000..f1dad16113a
--- /dev/null
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/table/lookup/fullcache/ReloadTriggerContext.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.functions.table.lookup.fullcache;
+
+import 
org.apache.flink.table.connector.source.lookup.cache.trigger.CacheReloadTrigger;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Consumer;
+
+/** Runtime implementation of {@link CacheReloadTrigger.Context}. */
+public class ReloadTriggerContext implements CacheReloadTrigger.Context {
+
+    private final Runnable reloadTask;
+    private final Consumer<Throwable> reloadFailCallback;
+
+    public ReloadTriggerContext(Runnable reloadTask, Consumer<Throwable> 
reloadFailCallback) {
+        this.reloadTask = reloadTask;
+        this.reloadFailCallback = reloadFailCallback;
+    }
+
+    @Override
+    public long currentProcessingTime() {
+        // TODO add processingTime into FunctionContext
+        return System.currentTimeMillis();
+    }
+
+    @Override
+    public long currentWatermark() {
+        // TODO add watermarks into FunctionContext
+        throw new UnsupportedOperationException(
+                "Watermarks are currently unsupported in cache reload 
triggers.");
+    }
+
+    @Override
+    public CompletableFuture<Void> triggerReload() {
+        return CompletableFuture.runAsync(reloadTask)
+                .exceptionally(
+                        th -> {
+                            reloadFailCallback.accept(th);
+                            return null;
+                        });
+    }
+}
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/table/lookup/fullcache/inputformat/InputFormatCacheLoader.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/table/lookup/fullcache/inputformat/InputFormatCacheLoader.java
new file mode 100644
index 00000000000..f7b95dbd694
--- /dev/null
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/table/lookup/fullcache/inputformat/InputFormatCacheLoader.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package 
org.apache.flink.table.runtime.functions.table.lookup.fullcache.inputformat;
+
+import org.apache.flink.api.common.io.InputFormat;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.io.InputSplit;
+import org.apache.flink.table.data.RowData;
+import 
org.apache.flink.table.runtime.functions.table.lookup.fullcache.CacheLoader;
+import org.apache.flink.table.runtime.keyselector.GenericRowDataKeySelector;
+import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
+import org.apache.flink.util.InstantiationUtil;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.stream.Collectors;
+
+/** {@link CacheLoader} that used {@link InputFormat} for loading data into 
the cache. */
+public class InputFormatCacheLoader extends CacheLoader {
+    private static final long serialVersionUID = 1L;
+    private static final Logger LOG = 
LoggerFactory.getLogger(InputFormatCacheLoader.class);
+
+    private final InputFormat<RowData, InputSplit> initialInputFormat;
+    private final GenericRowDataKeySelector keySelector;
+    private final RowDataSerializer cacheEntriesSerializer;
+
+    private transient volatile List<InputSplitCacheLoadTask> cacheLoadTasks;
+    private transient Configuration parameters;
+
+    private volatile boolean isStopped;
+
+    public InputFormatCacheLoader(
+            InputFormat<RowData, ?> initialInputFormat,
+            GenericRowDataKeySelector keySelector,
+            RowDataSerializer cacheEntriesSerializer) {
+        // noinspection unchecked
+        this.initialInputFormat = (InputFormat<RowData, InputSplit>) 
initialInputFormat;
+        this.keySelector = keySelector;
+        this.cacheEntriesSerializer = cacheEntriesSerializer;
+    }
+
+    @Override
+    public void open(Configuration parameters) throws Exception {
+        super.open(parameters);
+        this.parameters = parameters;
+        this.initialInputFormat.configure(parameters);
+    }
+
+    @Override
+    protected void reloadCache() throws Exception {
+        InputSplit[] inputSplits = createInputSplits();
+        int numSplits = inputSplits.length;
+        // load data into the another copy of cache
+        // notice: it requires twice more memory, but on the other hand we 
don't need any blocking
+        // cache has default initialCapacity and loadFactor, but overridden 
concurrencyLevel
+        ConcurrentHashMap<RowData, Collection<RowData>> newCache =
+                new ConcurrentHashMap<>(16, 0.75f, 
getConcurrencyLevel(numSplits));
+        this.cacheLoadTasks =
+                Arrays.stream(inputSplits)
+                        .map(split -> createCacheLoadTask(split, newCache))
+                        .collect(Collectors.toList());
+
+        // run first task and start numTasks - 1 threads to run remaining tasks
+        ExecutorService cacheLoadTaskService = null;
+        List<Future<?>> futures = null;
+        if (numSplits > 1) {
+            futures = new ArrayList<>();
+            int numThreads = getConcurrencyLevel(numSplits) - 1;
+            cacheLoadTaskService = Executors.newFixedThreadPool(numThreads);
+            for (int i = 1; i < numSplits; i++) {
+                Future<?> future = 
cacheLoadTaskService.submit(cacheLoadTasks.get(i));
+                futures.add(future);
+            }
+        }
+        cacheLoadTasks.get(0).run();
+        if (cacheLoadTaskService != null) {
+            for (Future<?> future : futures) {
+                future.get(); // if any exception occurs it will be thrown here
+            }
+            cacheLoadTaskService.shutdownNow();
+        }
+        if (!isStopped) {
+            // reassigning cache field is safe, because it's volatile
+            cache = newCache;
+        }
+    }
+
+    @Override
+    public void close() throws Exception {
+        super.close();
+        isStopped = true;
+        if (cacheLoadTasks != null) {
+            cacheLoadTasks.forEach(InputSplitCacheLoadTask::stopRunning);
+        }
+    }
+
+    private InputSplitCacheLoadTask createCacheLoadTask(
+            InputSplit inputSplit, ConcurrentHashMap<RowData, 
Collection<RowData>> newCache) {
+        try {
+            // InputFormat and KeySelector are not thread-safe,
+            // so we create copies of them for each task
+            InputFormat<RowData, InputSplit> inputFormat =
+                    InstantiationUtil.clone(initialInputFormat);
+            inputFormat.configure(parameters);
+            return new InputSplitCacheLoadTask(
+                    newCache, keySelector.copy(), cacheEntriesSerializer, 
inputFormat, inputSplit);
+        } catch (Exception e) {
+            throw new RuntimeException("Failed to create 
InputFormatCacheLoadTask", e);
+        }
+    }
+
+    private InputSplit[] createInputSplits() throws IOException {
+        InputSplit[] inputSplits = 
this.initialInputFormat.createInputSplits(1);
+        if (LOG.isDebugEnabled()) {
+            // 'if' guard statement prevents us from transforming splits to 
string
+            LOG.debug(
+                    "InputFormat created {} InputSplits: {}",
+                    inputSplits.length,
+                    Arrays.deepToString(inputSplits));
+        }
+        Preconditions.checkState(
+                inputSplits.length >= 1,
+                "InputFormat must provide at least one input split to load 
data into the lookup 'FULL' cache.");
+        return inputSplits;
+    }
+
+    private int getConcurrencyLevel(int numSplits) {
+        // creating many threads will cause performance issues because of 
context switching
+        int numOfCores = Runtime.getRuntime().availableProcessors();
+        return Math.min(numSplits, numOfCores);
+    }
+}
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/table/lookup/fullcache/inputformat/InputSplitCacheLoadTask.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/table/lookup/fullcache/inputformat/InputSplitCacheLoadTask.java
new file mode 100644
index 00000000000..8610b206fff
--- /dev/null
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/table/lookup/fullcache/inputformat/InputSplitCacheLoadTask.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package 
org.apache.flink.table.runtime.functions.table.lookup.fullcache.inputformat;
+
+import org.apache.flink.api.common.io.InputFormat;
+import org.apache.flink.api.common.io.RichInputFormat;
+import org.apache.flink.core.io.InputSplit;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.binary.BinaryRowData;
+import org.apache.flink.table.runtime.keyselector.RowDataKeySelector;
+import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
+import org.apache.flink.types.RowKind;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+/**
+ * Parallel task that loads data into the cache from {@link InputFormat} with 
specified {@link
+ * InputSplit}.
+ */
+public class InputSplitCacheLoadTask implements Runnable {
+
+    private final ConcurrentHashMap<RowData, Collection<RowData>> cache;
+    private final RowDataKeySelector keySelector;
+    private final RowDataSerializer cacheEntriesSerializer;
+    private final InputFormat<RowData, InputSplit> inputFormat;
+    private final InputSplit inputSplit;
+
+    private volatile boolean isRunning = true;
+
+    public InputSplitCacheLoadTask(
+            ConcurrentHashMap<RowData, Collection<RowData>> cache,
+            RowDataKeySelector keySelector,
+            RowDataSerializer cacheEntriesSerializer,
+            InputFormat<RowData, InputSplit> inputFormat,
+            InputSplit inputSplit) {
+        this.cache = cache;
+        this.keySelector = keySelector;
+        this.inputFormat = inputFormat;
+        this.cacheEntriesSerializer = cacheEntriesSerializer;
+        this.inputSplit = inputSplit;
+    }
+
+    @Override
+    public void run() {
+        try {
+            if (inputFormat instanceof RichInputFormat) {
+                ((RichInputFormat<?, ?>) inputFormat).openInputFormat();
+            }
+            inputFormat.open(inputSplit);
+            RowData nextElement = new 
BinaryRowData(cacheEntriesSerializer.getArity());
+            while (isRunning && !inputFormat.reachedEnd()) {
+                nextElement = inputFormat.nextRecord(nextElement);
+                if (nextElement != null) {
+                    if (nextElement.getRowKind() != RowKind.INSERT) {
+                        throw new IllegalStateException(
+                                "InputFormat must provide only INSERT records 
in lookup 'FULL' cache. Received record "
+                                        + nextElement);
+                    }
+                    RowData record = cacheEntriesSerializer.copy(nextElement);
+                    RowData key = keySelector.getKey(record);
+                    if (hasNoNulls(key)) {
+                        Collection<RowData> resultRows =
+                                cache.computeIfAbsent(
+                                        key,
+                                        // collection must be thread-safe and 
can possibly
+                                        // contain duplicates
+                                        k -> new ConcurrentLinkedQueue<>());
+                        resultRows.add(record);
+                    }
+                } else {
+                    break;
+                }
+            }
+        } catch (Exception e) {
+            throw new RuntimeException(
+                    "Failed to load data into the lookup 'FULL' cache from 
InputSplit "
+                            + inputSplit,
+                    e);
+        } finally {
+            try {
+                inputFormat.close();
+                if (inputFormat instanceof RichInputFormat) {
+                    ((RichInputFormat<?, ?>) inputFormat).closeInputFormat();
+                }
+            } catch (IOException e) {
+                // can't do try-with-resources
+                throw new RuntimeException("Failed to close InputFormat.", e);
+            }
+        }
+    }
+
+    public void stopRunning() {
+        isRunning = false;
+    }
+
+    private static boolean hasNoNulls(RowData keys) {
+        // in SQL comparison 'null = null' returns false
+        // IS NOT DISTINCT FROM (which returns true) is not supported in 
lookup join
+        // so to prevent equality of 2 nulls when searching in cache
+        // we must not store rows with nulls in order not to violate semantics 
of '='
+        for (int i = 0; i < keys.getArity(); i++) {
+            if (keys.isNullAt(i)) {
+                return false;
+            }
+        }
+        return true;
+    }
+}
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/keyselector/GenericRowDataKeySelector.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/keyselector/GenericRowDataKeySelector.java
new file mode 100644
index 00000000000..13f01fd7d76
--- /dev/null
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/keyselector/GenericRowDataKeySelector.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.keyselector;
+
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.generated.GeneratedProjection;
+import org.apache.flink.table.runtime.generated.Projection;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
+
+/** A KeySelector which will extract key from RowData. The key type is 
GenericRowData. */
+public class GenericRowDataKeySelector implements RowDataKeySelector {
+
+    private static final long serialVersionUID = 1L;
+
+    private final InternalTypeInfo<RowData> keyRowType;
+    private final RowDataSerializer keySerializer;
+    private final GeneratedProjection generatedProjection;
+    private transient Projection<RowData, GenericRowData> projection;
+
+    public GenericRowDataKeySelector(
+            InternalTypeInfo<RowData> keyRowType,
+            RowDataSerializer keySerializer,
+            GeneratedProjection generatedProjection) {
+        this.keyRowType = keyRowType;
+        this.generatedProjection = generatedProjection;
+        this.keySerializer = keySerializer;
+    }
+
+    @Override
+    public RowData getKey(RowData value) throws Exception {
+        if (projection == null) {
+            ClassLoader cl = Thread.currentThread().getContextClassLoader();
+            //noinspection unchecked
+            projection = generatedProjection.newInstance(cl);
+        }
+        return keySerializer.copy(projection.apply(value));
+    }
+
+    @Override
+    public InternalTypeInfo<RowData> getProducedType() {
+        return keyRowType;
+    }
+
+    public GenericRowDataKeySelector copy() {
+        return new GenericRowDataKeySelector(keyRowType, keySerializer, 
generatedProjection);
+    }
+}
diff --git 
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/functions/table/fullcache/FullCacheTestInputFormat.java
 
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/functions/table/fullcache/FullCacheTestInputFormat.java
new file mode 100644
index 00000000000..8084a279276
--- /dev/null
+++ 
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/functions/table/fullcache/FullCacheTestInputFormat.java
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.functions.table.fullcache;
+
+import org.apache.flink.api.common.io.DefaultInputSplitAssigner;
+import org.apache.flink.api.common.io.InputFormat;
+import org.apache.flink.api.common.io.RichInputFormat;
+import org.apache.flink.api.common.io.statistics.BaseStatistics;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.io.InputSplit;
+import org.apache.flink.core.io.InputSplitAssigner;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.util.DataFormatConverters;
+import org.apache.flink.types.Row;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Consumer;
+import java.util.stream.IntStream;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** TestInputFormat that reads data from (2 + delta) splits which share the 
same {@code queue}. */
+public class FullCacheTestInputFormat
+        extends RichInputFormat<RowData, 
FullCacheTestInputFormat.QueueInputSplit> {
+
+    private static final int DEFAULT_NUM_SPLITS = 2;
+    private static final AtomicInteger OPEN_CLOSED_COUNTER = new 
AtomicInteger(0);
+
+    // RowData is not serializable, so we store Rows
+    private final Collection<Row> dataRows;
+    private final DataFormatConverters.RowConverter rowConverter;
+    private final int deltaNumSplits;
+    private final transient Consumer<Collection<RowData>> secondLoadDataChange;
+
+    private transient ConcurrentLinkedQueue<RowData> queue;
+    private int loadCounter;
+    private int maxReadRecords;
+    private int readRecords;
+
+    private int numOpens;
+
+    public FullCacheTestInputFormat(
+            Collection<Row> dataRows,
+            DataFormatConverters.RowConverter rowConverter,
+            int deltaNumSplits,
+            Consumer<Collection<RowData>> secondLoadDataChange) {
+        // for unit tests
+        this.dataRows = dataRows;
+        this.rowConverter = rowConverter;
+        this.deltaNumSplits = deltaNumSplits;
+        this.secondLoadDataChange = secondLoadDataChange;
+    }
+
+    public FullCacheTestInputFormat(
+            Collection<Row> dataRows, DataFormatConverters.RowConverter 
rowConverter) {
+        // for integration tests
+        this.dataRows = dataRows;
+        this.rowConverter = rowConverter;
+        this.deltaNumSplits = 0;
+        this.secondLoadDataChange = null;
+    }
+
+    @Override
+    public QueueInputSplit[] createInputSplits(int minNumSplits) throws 
IOException {
+        int delta = loadCounter > 0 ? deltaNumSplits : 0;
+        int numSplits = DEFAULT_NUM_SPLITS + delta;
+        ConcurrentLinkedQueue<RowData> queue = new ConcurrentLinkedQueue<>();
+        QueueInputSplit[] splits = new QueueInputSplit[numSplits];
+        IntStream.range(0, numSplits).forEach(i -> splits[i] = new 
QueueInputSplit(queue, i));
+        dataRows.forEach(row -> queue.add(rowConverter.toInternal(row)));
+        // divide data evenly between InputFormat copies
+        loadCounter++;
+        if (loadCounter == 2 && secondLoadDataChange != null) {
+            secondLoadDataChange.accept(queue);
+        }
+        maxReadRecords = (int) Math.ceil((double) queue.size() / numSplits);
+        return splits;
+    }
+
+    @Override
+    public void openInputFormat() {
+        numOpens++;
+        OPEN_CLOSED_COUNTER.incrementAndGet();
+    }
+
+    @Override
+    public void open(QueueInputSplit split) throws IOException {
+        this.queue = split.getQueue();
+        this.readRecords = 0;
+        numOpens++;
+        OPEN_CLOSED_COUNTER.incrementAndGet();
+    }
+
+    @Override
+    public boolean reachedEnd() throws IOException {
+        return queue.isEmpty();
+    }
+
+    @Override
+    public RowData nextRecord(RowData reuse) throws IOException {
+        assertThat(numOpens).isEqualTo(2);
+        if (readRecords == maxReadRecords) {
+            return null;
+        }
+        readRecords++;
+        return queue.poll();
+    }
+
+    @Override
+    public InputSplitAssigner getInputSplitAssigner(QueueInputSplit[] 
inputSplits) {
+        return new DefaultInputSplitAssigner(inputSplits);
+    }
+
+    @Override
+    public void configure(Configuration parameters) {}
+
+    @Override
+    public BaseStatistics getStatistics(BaseStatistics cachedStatistics) 
throws IOException {
+        return null;
+    }
+
+    @Override
+    public void close() throws IOException {
+        OPEN_CLOSED_COUNTER.decrementAndGet();
+    }
+
+    @Override
+    public void closeInputFormat() {
+        OPEN_CLOSED_COUNTER.decrementAndGet();
+    }
+
+    public boolean isClosed() {
+        return OPEN_CLOSED_COUNTER.get() == 0;
+    }
+
+    /** {@link InputSplit} that provides queue to {@link InputFormat}. */
+    public static class QueueInputSplit implements InputSplit {
+
+        private final transient ConcurrentLinkedQueue<RowData> queue;
+        private final int splitNumber;
+
+        public QueueInputSplit(ConcurrentLinkedQueue<RowData> queue, int 
splitNumber) {
+            this.queue = queue;
+            this.splitNumber = splitNumber;
+        }
+
+        @Override
+        public int getSplitNumber() {
+            return splitNumber;
+        }
+
+        public ConcurrentLinkedQueue<RowData> getQueue() {
+            return queue;
+        }
+    }
+}
diff --git 
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/functions/table/fullcache/TestManualCacheReloadTrigger.java
 
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/functions/table/fullcache/TestManualCacheReloadTrigger.java
new file mode 100644
index 00000000000..3c4043313e8
--- /dev/null
+++ 
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/functions/table/fullcache/TestManualCacheReloadTrigger.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.functions.table.fullcache;
+
+import 
org.apache.flink.table.connector.source.lookup.cache.trigger.CacheReloadTrigger;
+
+/** Test implementation of {@link CacheReloadTrigger} that is triggered 
manually. */
+public class TestManualCacheReloadTrigger implements CacheReloadTrigger {
+
+    private Context context;
+    private boolean isClosed;
+
+    public void trigger() throws Exception {
+        if (context != null) {
+            context.triggerReload().get();
+        }
+    }
+
+    @Override
+    public void open(Context context) throws Exception {
+        this.context = context;
+        trigger();
+    }
+
+    @Override
+    public void close() throws Exception {
+        isClosed = true;
+    }
+
+    public boolean isClosed() {
+        return isClosed;
+    }
+}

Reply via email to