This is an automated email from the ASF dual-hosted git repository.
renqs pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 10a6f41fa12 [FLINK-28532][table] Support full caching in lookup join
runner using InputFormats as scan runtime provider (#20447)
10a6f41fa12 is described below
commit 10a6f41fa12284f15af55c359c0c0954800f02de
Author: SmirAlex <[email protected]>
AuthorDate: Wed Aug 10 04:12:29 2022 +0700
[FLINK-28532][table] Support full caching in lookup join runner using
InputFormats as scan runtime provider (#20447)
---
.../nodes/exec/common/CommonExecLookupJoin.java | 11 +-
.../table/planner/plan/utils/KeySelectorUtil.java | 29 +++-
.../table/planner/plan/utils/LookupJoinUtil.java | 123 +++++++++++++--
.../planner/codegen/ProjectionCodeGenerator.scala | 17 ++
.../physical/common/CommonPhysicalLookupJoin.scala | 9 +-
.../planner/factories/TestValuesTableFactory.java | 46 +++++-
.../runtime/batch/sql/join/LookupJoinITCase.scala | 47 ++++--
.../runtime/stream/sql/LookupJoinITCase.scala | 43 +++--
.../table/lookup/CachingLookupFunction.java | 24 ++-
.../table/lookup/fullcache/CacheLoader.java | 116 ++++++++++++++
.../table/lookup/fullcache/LookupFullCache.java | 101 ++++++++++++
.../lookup/fullcache/ReloadTriggerContext.java | 59 +++++++
.../inputformat/InputFormatCacheLoader.java | 159 +++++++++++++++++++
.../inputformat/InputSplitCacheLoadTask.java | 127 +++++++++++++++
.../keyselector/GenericRowDataKeySelector.java | 65 ++++++++
.../table/fullcache/FullCacheTestInputFormat.java | 175 +++++++++++++++++++++
.../fullcache/TestManualCacheReloadTrigger.java | 49 ++++++
17 files changed, 1132 insertions(+), 68 deletions(-)
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecLookupJoin.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecLookupJoin.java
index aaa2030ae65..facbb5ab50d 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecLookupJoin.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecLookupJoin.java
@@ -249,7 +249,11 @@ public abstract class CommonExecLookupJoin extends
ExecNodeBase<RowData>
// upsertMaterialize only works on sync lookup mode, async lookup is
unsupported.
if (!inputInsertOnly && upsertMaterialize) {
userDefinedFunction =
- LookupJoinUtil.getLookupFunction(temporalTable,
lookupKeys.keySet(), true);
+ LookupJoinUtil.getLookupFunction(
+ temporalTable,
+ lookupKeys.keySet(),
+ planner.getFlinkContext().getClassLoader(),
+ true);
UserDefinedFunctionHelper.prepareInstance(config,
userDefinedFunction);
return createSyncLookupJoinWithState(
@@ -268,7 +272,10 @@ public abstract class CommonExecLookupJoin extends
ExecNodeBase<RowData>
lookupKeyContainsPrimaryKey);
} else {
userDefinedFunction =
- LookupJoinUtil.getLookupFunction(temporalTable,
lookupKeys.keySet());
+ LookupJoinUtil.getLookupFunction(
+ temporalTable,
+ lookupKeys.keySet(),
+ planner.getFlinkContext().getClassLoader());
if (userDefinedFunction instanceof AsyncTableFunction) {
isAsyncEnabled = true;
}
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/KeySelectorUtil.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/KeySelectorUtil.java
index eacab8d7b2c..fff2f95f0f5 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/KeySelectorUtil.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/KeySelectorUtil.java
@@ -19,20 +19,30 @@
package org.apache.flink.table.planner.plan.utils;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.binary.BinaryRowData;
import org.apache.flink.table.planner.codegen.CodeGeneratorContext;
import org.apache.flink.table.planner.codegen.ProjectionCodeGenerator;
import org.apache.flink.table.runtime.generated.GeneratedProjection;
import org.apache.flink.table.runtime.keyselector.BinaryRowDataKeySelector;
import org.apache.flink.table.runtime.keyselector.EmptyRowDataKeySelector;
+import org.apache.flink.table.runtime.keyselector.GenericRowDataKeySelector;
import org.apache.flink.table.runtime.keyselector.RowDataKeySelector;
+import org.apache.flink.table.runtime.typeutils.InternalSerializers;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
/** Utility for KeySelector. */
public class KeySelectorUtil {
+ public static RowDataKeySelector getRowDataSelector(
+ ClassLoader classLoader, int[] keyFields,
InternalTypeInfo<RowData> rowType) {
+ return getRowDataSelector(classLoader, keyFields, rowType,
BinaryRowData.class);
+ }
+
/**
* Create a RowDataKeySelector to extract keys from DataStream which type
is {@link
* InternalTypeInfo} of {@link RowData}.
@@ -44,7 +54,10 @@ public class KeySelectorUtil {
* InternalTypeInfo} of {@link RowData}.
*/
public static RowDataKeySelector getRowDataSelector(
- ClassLoader classLoader, int[] keyFields,
InternalTypeInfo<RowData> rowType) {
+ ClassLoader classLoader,
+ int[] keyFields,
+ InternalTypeInfo<RowData> rowType,
+ Class<? extends RowData> outClass) {
if (keyFields.length > 0) {
LogicalType[] inputFieldTypes = rowType.toRowFieldTypes();
LogicalType[] keyFieldTypes = new LogicalType[keyFields.length];
@@ -61,9 +74,19 @@ public class KeySelectorUtil {
"KeyProjection",
inputType,
returnType,
- keyFields);
+ keyFields,
+ outClass);
InternalTypeInfo<RowData> keyRowType =
InternalTypeInfo.of(returnType);
- return new BinaryRowDataKeySelector(keyRowType,
generatedProjection);
+ if (outClass == BinaryRowData.class) {
+ return new BinaryRowDataKeySelector(keyRowType,
generatedProjection);
+ } else if (outClass == GenericRowData.class) {
+ RowDataSerializer keySerializer =
InternalSerializers.create(returnType);
+ return new GenericRowDataKeySelector(
+ keyRowType, keySerializer, generatedProjection);
+ } else {
+ throw new UnsupportedOperationException(
+ "Currently only GenericRowData and BinaryRowData
supported as outClass of KeySelector.");
+ }
} else {
return EmptyRowDataKeySelector.INSTANCE;
}
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/LookupJoinUtil.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/LookupJoinUtil.java
index fffe805cb08..5545dda5da6 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/LookupJoinUtil.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/LookupJoinUtil.java
@@ -19,22 +19,38 @@
package org.apache.flink.table.planner.plan.utils;
import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.io.InputFormat;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.connector.source.AsyncTableFunctionProvider;
+import org.apache.flink.table.connector.source.InputFormatProvider;
import org.apache.flink.table.connector.source.LookupTableSource;
+import org.apache.flink.table.connector.source.ScanTableSource;
+import org.apache.flink.table.connector.source.SourceFunctionProvider;
import org.apache.flink.table.connector.source.TableFunctionProvider;
import
org.apache.flink.table.connector.source.lookup.AsyncLookupFunctionProvider;
+import
org.apache.flink.table.connector.source.lookup.FullCachingLookupProvider;
import org.apache.flink.table.connector.source.lookup.LookupFunctionProvider;
import
org.apache.flink.table.connector.source.lookup.PartialCachingAsyncLookupProvider;
import
org.apache.flink.table.connector.source.lookup.PartialCachingLookupProvider;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
import org.apache.flink.table.functions.UserDefinedFunction;
+import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
import org.apache.flink.table.planner.plan.schema.LegacyTableSourceTable;
import org.apache.flink.table.planner.plan.schema.TableSourceTable;
import
org.apache.flink.table.runtime.connector.source.LookupRuntimeProviderContext;
import
org.apache.flink.table.runtime.functions.table.lookup.CachingAsyncLookupFunction;
import
org.apache.flink.table.runtime.functions.table.lookup.CachingLookupFunction;
+import
org.apache.flink.table.runtime.functions.table.lookup.fullcache.CacheLoader;
+import
org.apache.flink.table.runtime.functions.table.lookup.fullcache.LookupFullCache;
+import
org.apache.flink.table.runtime.functions.table.lookup.fullcache.inputformat.InputFormatCacheLoader;
+import org.apache.flink.table.runtime.keyselector.GenericRowDataKeySelector;
+import org.apache.flink.table.runtime.typeutils.InternalSerializers;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.sources.LookupableTableSource;
import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.util.Preconditions;
import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties;
@@ -153,8 +169,8 @@ public final class LookupJoinUtil {
/** Gets LookupFunction from temporal table according to the given lookup
keys. */
public static UserDefinedFunction getLookupFunction(
- RelOptTable temporalTable, Collection<Integer> lookupKeys) {
- return getLookupFunction(temporalTable, lookupKeys, false);
+ RelOptTable temporalTable, Collection<Integer> lookupKeys,
ClassLoader classLoader) {
+ return getLookupFunction(temporalTable, lookupKeys, classLoader,
false);
}
/**
@@ -163,23 +179,16 @@ public final class LookupJoinUtil {
* implemented.
*/
public static UserDefinedFunction getLookupFunction(
- RelOptTable temporalTable, Collection<Integer> lookupKeys, boolean
requireSyncLookup) {
+ RelOptTable temporalTable,
+ Collection<Integer> lookupKeys,
+ ClassLoader classLoader,
+ boolean requireSyncLookup) {
int[] lookupKeyIndicesInOrder = getOrderedLookupKeys(lookupKeys);
if (temporalTable instanceof TableSourceTable) {
- // TODO: support nested lookup keys in the future,
- // currently we only support top-level lookup keys
- int[][] indices =
- IntStream.of(lookupKeyIndicesInOrder)
- .mapToObj(i -> new int[] {i})
- .toArray(int[][]::new);
- LookupTableSource tableSource =
- (LookupTableSource) ((TableSourceTable)
temporalTable).tableSource();
- LookupRuntimeProviderContext providerContext =
- new LookupRuntimeProviderContext(indices);
LookupTableSource.LookupRuntimeProvider provider =
- tableSource.getLookupRuntimeProvider(providerContext);
+ getLookupRuntimeProvider(temporalTable, lookupKeys);
// TODO this method will be refactored in FLINK-28848
if (requireSyncLookup
@@ -199,6 +208,19 @@ public final class LookupJoinUtil {
return new CachingLookupFunction(
partialCachingLookupProvider.getCache(),
partialCachingLookupProvider.createLookupFunction());
+ } else if (provider instanceof FullCachingLookupProvider) {
+ FullCachingLookupProvider fullCachingLookupProvider =
+ (FullCachingLookupProvider) provider;
+ RowType tableSourceRowType =
+
FlinkTypeFactory.toLogicalRowType(temporalTable.getRowType());
+ LookupFullCache fullCache =
+ createFullCache(
+ fullCachingLookupProvider,
+ lookupKeyIndicesInOrder,
+ classLoader,
+ tableSourceRowType);
+ return new CachingLookupFunction(
+ fullCache,
fullCachingLookupProvider.createLookupFunction());
}
return ((LookupFunctionProvider)
provider).createLookupFunction();
} else if (provider instanceof AsyncLookupFunctionProvider) {
@@ -246,4 +268,77 @@ public final class LookupJoinUtil {
"table %s is neither TableSourceTable not
LegacyTableSourceTable",
temporalTable.getQualifiedName()));
}
+
+ public static boolean isAsyncLookup(RelOptTable temporalTable,
Collection<Integer> lookupKeys) {
+ if (temporalTable instanceof TableSourceTable) {
+ LookupTableSource.LookupRuntimeProvider provider =
+ getLookupRuntimeProvider(temporalTable, lookupKeys);
+ return provider instanceof AsyncLookupFunctionProvider
+ || provider instanceof AsyncTableFunctionProvider;
+ } else if (temporalTable instanceof LegacyTableSourceTable) {
+ LegacyTableSourceTable<?> legacyTableSourceTable =
+ (LegacyTableSourceTable<?>) temporalTable;
+ LookupableTableSource<?> lookupableTableSource =
+ (LookupableTableSource<?>)
legacyTableSourceTable.tableSource();
+ return lookupableTableSource.isAsyncEnabled();
+ }
+ throw new TableException(
+ String.format(
+ "table %s is neither TableSourceTable not
LegacyTableSourceTable",
+ temporalTable.getQualifiedName()));
+ }
+
+ private static LookupTableSource.LookupRuntimeProvider
getLookupRuntimeProvider(
+ RelOptTable temporalTable, Collection<Integer> lookupKeys) {
+ int[] lookupKeyIndicesInOrder = getOrderedLookupKeys(lookupKeys);
+ // TODO: support nested lookup keys in the future,
+ // currently we only support top-level lookup keys
+ int[][] indices =
+ IntStream.of(lookupKeyIndicesInOrder)
+ .mapToObj(i -> new int[] {i})
+ .toArray(int[][]::new);
+ LookupTableSource tableSource =
+ (LookupTableSource) ((TableSourceTable)
temporalTable).tableSource();
+ LookupRuntimeProviderContext providerContext = new
LookupRuntimeProviderContext(indices);
+ return tableSource.getLookupRuntimeProvider(providerContext);
+ }
+
+ private static LookupFullCache createFullCache(
+ FullCachingLookupProvider provider,
+ int[] lookupKeyIndicesInOrder,
+ ClassLoader classLoader,
+ RowType tableSourceRowType) {
+
+ ScanTableSource.ScanRuntimeProvider scanProvider =
provider.getScanRuntimeProvider();
+ Preconditions.checkArgument(
+ scanProvider.isBounded(),
+ "ScanRuntimeProvider that is used for data loading in "
+ + "lookup 'FULL' cache must be bounded.");
+
+ GenericRowDataKeySelector lookupTableKeySelector =
+ (GenericRowDataKeySelector)
+ KeySelectorUtil.getRowDataSelector(
+ classLoader,
+ lookupKeyIndicesInOrder,
+ InternalTypeInfo.of(tableSourceRowType),
+ GenericRowData.class);
+
+ if (scanProvider instanceof InputFormatProvider) {
+ InputFormat<RowData, ?> inputFormat =
+ ((InputFormatProvider) scanProvider).createInputFormat();
+ CacheLoader cacheLoader =
+ new InputFormatCacheLoader(
+ inputFormat,
+ lookupTableKeySelector,
+ InternalSerializers.create(tableSourceRowType));
+ return new LookupFullCache(cacheLoader,
provider.getCacheReloadTrigger());
+ } else if (scanProvider instanceof SourceFunctionProvider) {
+ // TODO support SourceFunctions
+ throw new UnsupportedOperationException(
+ "Full caching using SourceFunction currently not
supported.");
+ } else {
+ throw new UnsupportedOperationException(
+ "Currently only InputFormatProvider and
SourceFunctionProvider are supported as ScanRuntimeProviders for Full caching
lookup join.");
+ }
+ }
}
diff --git
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/ProjectionCodeGenerator.scala
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/ProjectionCodeGenerator.scala
index 82d3b7c7ad0..496af1394fe 100644
---
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/ProjectionCodeGenerator.scala
+++
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/ProjectionCodeGenerator.scala
@@ -138,4 +138,21 @@ object ProjectionCodeGenerator {
outputType,
inputMapping,
inputTerm = DEFAULT_INPUT1_TERM)
+
+ /** For java invoke. */
+ def generateProjection(
+ ctx: CodeGeneratorContext,
+ name: String,
+ inputType: RowType,
+ outputType: RowType,
+ inputMapping: Array[Int],
+ outClass: Class[_ <: RowData]): GeneratedProjection =
+ generateProjection(
+ ctx,
+ name,
+ inputType,
+ outputType,
+ inputMapping,
+ outClass = outClass,
+ inputTerm = DEFAULT_INPUT1_TERM)
}
diff --git
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/common/CommonPhysicalLookupJoin.scala
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/common/CommonPhysicalLookupJoin.scala
index 0d6d964c1fe..d8193eed182 100644
---
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/common/CommonPhysicalLookupJoin.scala
+++
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/common/CommonPhysicalLookupJoin.scala
@@ -19,7 +19,6 @@ package
org.apache.flink.table.planner.plan.nodes.physical.common
import org.apache.flink.table.api.TableException
import org.apache.flink.table.catalog.{ObjectIdentifier, UniqueConstraint}
-import org.apache.flink.table.functions.{AsyncTableFunction, TableFunction,
UserDefinedFunction}
import org.apache.flink.table.planner.calcite.FlinkTypeFactory
import org.apache.flink.table.planner.plan.nodes.FlinkRelNode
import org.apache.flink.table.planner.plan.schema.{IntermediateRelTable,
LegacyTableSourceTable, TableSourceTable}
@@ -160,12 +159,8 @@ abstract class CommonPhysicalLookupJoin(
case t: LegacyTableSourceTable[_] => t.tableIdentifier
}
- val lookupFunction: UserDefinedFunction =
- LookupJoinUtil.getLookupFunction(temporalTable,
allLookupKeys.keys.map(Int.box).toList.asJava)
- val isAsyncEnabled: Boolean = lookupFunction match {
- case _: TableFunction[_] => false
- case _: AsyncTableFunction[_] => true
- }
+ val isAsyncEnabled: Boolean =
+ LookupJoinUtil.isAsyncLookup(temporalTable,
allLookupKeys.keys.map(Int.box).toList.asJava)
super
.explainTerms(pw)
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java
index 44d7c537be2..60967f8494d 100644
---
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java
@@ -69,13 +69,17 @@ import
org.apache.flink.table.connector.source.abilities.SupportsReadingMetadata
import
org.apache.flink.table.connector.source.abilities.SupportsSourceWatermark;
import
org.apache.flink.table.connector.source.abilities.SupportsWatermarkPushDown;
import
org.apache.flink.table.connector.source.lookup.AsyncLookupFunctionProvider;
+import
org.apache.flink.table.connector.source.lookup.FullCachingLookupProvider;
import org.apache.flink.table.connector.source.lookup.LookupFunctionProvider;
import org.apache.flink.table.connector.source.lookup.LookupOptions;
import
org.apache.flink.table.connector.source.lookup.PartialCachingAsyncLookupProvider;
import
org.apache.flink.table.connector.source.lookup.PartialCachingLookupProvider;
import org.apache.flink.table.connector.source.lookup.cache.DefaultLookupCache;
import org.apache.flink.table.connector.source.lookup.cache.LookupCache;
+import
org.apache.flink.table.connector.source.lookup.cache.trigger.CacheReloadTrigger;
+import
org.apache.flink.table.connector.source.lookup.cache.trigger.PeriodicCacheReloadTrigger;
import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.util.DataFormatConverters;
import org.apache.flink.table.expressions.AggregateExpression;
import org.apache.flink.table.expressions.FieldReferenceExpression;
import org.apache.flink.table.expressions.ResolvedExpression;
@@ -100,6 +104,7 @@ import
org.apache.flink.table.planner.functions.aggfunctions.SumAggFunction;
import org.apache.flink.table.planner.runtime.utils.FailingCollectionSource;
import org.apache.flink.table.planner.utils.FilterUtils;
import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
+import
org.apache.flink.table.runtime.functions.table.fullcache.FullCacheTestInputFormat;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.BigIntType;
import org.apache.flink.table.types.logical.LogicalType;
@@ -136,6 +141,11 @@ import java.util.stream.Collectors;
import scala.collection.Seq;
import static
org.apache.flink.table.connector.source.lookup.LookupOptions.CACHE_TYPE;
+import static
org.apache.flink.table.connector.source.lookup.LookupOptions.FULL_CACHE_PERIODIC_RELOAD_INTERVAL;
+import static
org.apache.flink.table.connector.source.lookup.LookupOptions.FULL_CACHE_PERIODIC_RELOAD_SCHEDULE_MODE;
+import static
org.apache.flink.table.connector.source.lookup.LookupOptions.FULL_CACHE_RELOAD_STRATEGY;
+import static
org.apache.flink.table.connector.source.lookup.LookupOptions.FULL_CACHE_TIMED_RELOAD_INTERVAL_IN_DAYS;
+import static
org.apache.flink.table.connector.source.lookup.LookupOptions.FULL_CACHE_TIMED_RELOAD_ISO_TIME;
import static
org.apache.flink.table.connector.source.lookup.LookupOptions.PARTIAL_CACHE_CACHE_MISSING_KEY;
import static
org.apache.flink.table.connector.source.lookup.LookupOptions.PARTIAL_CACHE_EXPIRE_AFTER_ACCESS;
import static
org.apache.flink.table.connector.source.lookup.LookupOptions.PARTIAL_CACHE_EXPIRE_AFTER_WRITE;
@@ -410,6 +420,10 @@ public final class TestValuesTableFactory
if
(helper.getOptions().get(CACHE_TYPE).equals(LookupOptions.LookupCacheType.PARTIAL))
{
cache = DefaultLookupCache.fromConfig(helper.getOptions());
}
+ CacheReloadTrigger reloadTrigger = null;
+ if
(helper.getOptions().get(CACHE_TYPE).equals(LookupOptions.LookupCacheType.FULL))
{
+ reloadTrigger =
PeriodicCacheReloadTrigger.fromConfig(helper.getOptions());
+ }
Optional<List<String>> filterableFields =
helper.getOptions().getOptional(FILTERABLE_FIELDS);
@@ -525,7 +539,8 @@ public final class TestValuesTableFactory
partitions,
readableMetadata,
null,
- cache);
+ cache,
+ reloadTrigger);
}
} else {
try {
@@ -631,7 +646,12 @@ public final class TestValuesTableFactory
PARTIAL_CACHE_EXPIRE_AFTER_ACCESS,
PARTIAL_CACHE_EXPIRE_AFTER_WRITE,
PARTIAL_CACHE_CACHE_MISSING_KEY,
- PARTIAL_CACHE_MAX_ROWS));
+ PARTIAL_CACHE_MAX_ROWS,
+ FULL_CACHE_RELOAD_STRATEGY,
+ FULL_CACHE_PERIODIC_RELOAD_INTERVAL,
+ FULL_CACHE_PERIODIC_RELOAD_SCHEDULE_MODE,
+ FULL_CACHE_TIMED_RELOAD_ISO_TIME,
+ FULL_CACHE_TIMED_RELOAD_INTERVAL_IN_DAYS));
}
private static int validateAndExtractRowtimeIndex(
@@ -1474,6 +1494,7 @@ public final class TestValuesTableFactory
private final @Nullable String lookupFunctionClass;
private final @Nullable LookupCache cache;
+ private final @Nullable CacheReloadTrigger reloadTrigger;
private final boolean isAsync;
private TestValuesScanLookupTableSource(
@@ -1495,7 +1516,8 @@ public final class TestValuesTableFactory
List<Map<String, String>> allPartitions,
Map<String, DataType> readableMetadata,
@Nullable int[] projectedMetadataFields,
- @Nullable LookupCache cache) {
+ @Nullable LookupCache cache,
+ @Nullable CacheReloadTrigger reloadTrigger) {
super(
producedDataType,
changelogMode,
@@ -1516,6 +1538,7 @@ public final class TestValuesTableFactory
this.lookupFunctionClass = lookupFunctionClass;
this.isAsync = isAsync;
this.cache = cache;
+ this.reloadTrigger = reloadTrigger;
}
@SuppressWarnings({"unchecked", "rawtypes"})
@@ -1569,10 +1592,18 @@ public final class TestValuesTableFactory
} else {
TestValuesLookupFunction lookupFunction =
new TestValuesLookupFunction(data, lookupIndices,
converter);
- if (cache == null) {
- return LookupFunctionProvider.of(lookupFunction);
- } else {
+ if (cache != null) {
return PartialCachingLookupProvider.of(lookupFunction,
cache);
+ } else if (reloadTrigger != null) {
+ DataFormatConverters.RowConverter rowConverter =
+ new DataFormatConverters.RowConverter(
+ producedDataType.getChildren().toArray(new
DataType[] {}));
+ FullCacheTestInputFormat inputFormat =
+ new FullCacheTestInputFormat(data, rowConverter);
+ return FullCachingLookupProvider.of(
+ InputFormatProvider.of(inputFormat),
reloadTrigger);
+ } else {
+ return LookupFunctionProvider.of(lookupFunction);
}
}
}
@@ -1598,7 +1629,8 @@ public final class TestValuesTableFactory
allPartitions,
readableMetadata,
projectedMetadataFields,
- cache);
+ cache,
+ reloadTrigger);
}
}
diff --git
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/join/LookupJoinITCase.scala
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/join/LookupJoinITCase.scala
index aaf22ab7150..219c6bbef12 100644
---
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/join/LookupJoinITCase.scala
+++
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/join/LookupJoinITCase.scala
@@ -19,6 +19,7 @@ package org.apache.flink.table.planner.runtime.batch.sql.join
import org.apache.flink.table.api.{TableSchema, Types}
import org.apache.flink.table.connector.source.lookup.LookupOptions
+import
org.apache.flink.table.connector.source.lookup.LookupOptions.{LookupCacheType,
ReloadStrategy}
import org.apache.flink.table.data.GenericRowData
import org.apache.flink.table.data.binary.BinaryStringData
import org.apache.flink.table.planner.factories.TestValuesTableFactory
@@ -39,7 +40,7 @@ import java.util
import scala.collection.JavaConversions._
@RunWith(classOf[Parameterized])
-class LookupJoinITCase(legacyTableSource: Boolean, isAsyncMode: Boolean,
enableCache: Boolean)
+class LookupJoinITCase(legacyTableSource: Boolean, isAsyncMode: Boolean,
cacheType: LookupCacheType)
extends BatchTestBase {
val data = List(
@@ -104,11 +105,17 @@ class LookupJoinITCase(legacyTableSource: Boolean,
isAsyncMode: Boolean, enableC
} else {
val dataId = TestValuesTableFactory.registerData(data)
val cacheOptions =
- if (enableCache)
+ if (cacheType == LookupCacheType.PARTIAL)
s"""
- | '${LookupOptions.CACHE_TYPE.key()}' =
'${LookupOptions.LookupCacheType.PARTIAL}',
+ | '${LookupOptions.CACHE_TYPE.key()}' =
'${LookupCacheType.PARTIAL}',
| '${LookupOptions.PARTIAL_CACHE_MAX_ROWS.key()}' =
'${Long.MaxValue}',
|""".stripMargin
+ else if (cacheType == LookupCacheType.FULL)
+ s"""
+ | '${LookupOptions.CACHE_TYPE.key()}' =
'${LookupCacheType.FULL}',
+ | '${LookupOptions.FULL_CACHE_RELOAD_STRATEGY.key()}' =
'${ReloadStrategy.PERIODIC}',
+ | '${LookupOptions.FULL_CACHE_PERIODIC_RELOAD_INTERVAL.key()}' =
'${Long.MaxValue}',
+ |""".stripMargin
else ""
tEnv.executeSql(s"""
@@ -131,11 +138,17 @@ class LookupJoinITCase(legacyTableSource: Boolean,
isAsyncMode: Boolean, enableC
if (!legacyTableSource) {
val dataId = TestValuesTableFactory.registerData(data)
val cacheOptions =
- if (enableCache)
+ if (cacheType == LookupCacheType.PARTIAL)
s"""
- | '${LookupOptions.CACHE_TYPE.key()}' =
'${LookupOptions.LookupCacheType.PARTIAL}',
+ | '${LookupOptions.CACHE_TYPE.key()}' =
'${LookupCacheType.PARTIAL}',
| '${LookupOptions.PARTIAL_CACHE_MAX_ROWS.key()}' =
'${Long.MaxValue}',
|""".stripMargin
+ else if (cacheType == LookupCacheType.FULL)
+ s"""
+ | '${LookupOptions.CACHE_TYPE.key()}' =
'${LookupCacheType.FULL}',
+ | '${LookupOptions.FULL_CACHE_RELOAD_STRATEGY.key()}' =
'${ReloadStrategy.PERIODIC}',
+ | '${LookupOptions.FULL_CACHE_PERIODIC_RELOAD_INTERVAL.key()}' =
'${Long.MaxValue}',
+ |""".stripMargin
else ""
tEnv.executeSql(s"""
|CREATE TABLE $tableName (
@@ -331,7 +344,7 @@ class LookupJoinITCase(legacyTableSource: Boolean,
isAsyncMode: Boolean, enableC
@Test
def testLookupCacheSharingAcrossSubtasks(): Unit = {
- if (!enableCache) {
+ if (cacheType == LookupCacheType.NONE) {
return
}
// Keep the cache for later validation
@@ -364,9 +377,10 @@ class LookupJoinITCase(legacyTableSource: Boolean,
isAsyncMode: Boolean, enableC
val managedCaches = LookupCacheManager.getInstance().getManagedCaches
assertThat(managedCaches.size()).isEqualTo(1)
- // Validate 6 entries are cached
+ val numEntries = if (cacheType == LookupCacheType.PARTIAL) 6 else
userData.size
+ // Validate 6 entries are cached for PARTIAL and all entries for FULL
val cache =
managedCaches.get(managedCaches.keySet().iterator().next()).getCache
- assertThat(cache.size()).isEqualTo(6)
+ assertThat(cache.size()).isEqualTo(numEntries)
// Validate contents of cached entries
assertThatIterable(cache.getIfPresent(GenericRowData.of(jl(1L))))
@@ -401,18 +415,17 @@ object LookupJoinITCase {
val DYNAMIC_TABLE_SOURCE: JBoolean = JBoolean.FALSE;
val ASYNC_MODE: JBoolean = JBoolean.TRUE;
val SYNC_MODE: JBoolean = JBoolean.FALSE;
- val ENABLE_CACHE: JBoolean = JBoolean.TRUE;
- val DISABLE_CACHE: JBoolean = JBoolean.FALSE;
- @Parameterized.Parameters(name = "LegacyTableSource={0}, isAsyncMode = {1},
enableCache = {2}")
+ @Parameterized.Parameters(name = "LegacyTableSource={0}, isAsyncMode = {1},
cacheType = {2}")
def parameters(): util.Collection[Array[java.lang.Object]] = {
Seq[Array[AnyRef]](
- Array(LEGACY_TABLE_SOURCE, ASYNC_MODE, DISABLE_CACHE),
- Array(LEGACY_TABLE_SOURCE, SYNC_MODE, DISABLE_CACHE),
- Array(DYNAMIC_TABLE_SOURCE, ASYNC_MODE, DISABLE_CACHE),
- Array(DYNAMIC_TABLE_SOURCE, SYNC_MODE, DISABLE_CACHE),
- Array(DYNAMIC_TABLE_SOURCE, ASYNC_MODE, ENABLE_CACHE),
- Array(DYNAMIC_TABLE_SOURCE, SYNC_MODE, ENABLE_CACHE)
+ Array(LEGACY_TABLE_SOURCE, ASYNC_MODE, LookupCacheType.NONE),
+ Array(LEGACY_TABLE_SOURCE, SYNC_MODE, LookupCacheType.NONE),
+ Array(DYNAMIC_TABLE_SOURCE, ASYNC_MODE, LookupCacheType.NONE),
+ Array(DYNAMIC_TABLE_SOURCE, SYNC_MODE, LookupCacheType.NONE),
+ Array(DYNAMIC_TABLE_SOURCE, ASYNC_MODE, LookupCacheType.PARTIAL),
+ Array(DYNAMIC_TABLE_SOURCE, SYNC_MODE, LookupCacheType.PARTIAL),
+ Array(DYNAMIC_TABLE_SOURCE, SYNC_MODE, LookupCacheType.FULL)
)
}
}
diff --git
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/LookupJoinITCase.scala
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/LookupJoinITCase.scala
index 748026bf750..752a903ca39 100644
---
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/LookupJoinITCase.scala
+++
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/LookupJoinITCase.scala
@@ -22,6 +22,7 @@ import org.apache.flink.table.api._
import org.apache.flink.table.api.bridge.scala._
import org.apache.flink.table.api.config.OptimizerConfigOptions
import org.apache.flink.table.connector.source.lookup.LookupOptions
+import
org.apache.flink.table.connector.source.lookup.LookupOptions.{LookupCacheType,
ReloadStrategy}
import org.apache.flink.table.data.GenericRowData
import org.apache.flink.table.data.binary.BinaryStringData
import org.apache.flink.table.planner.factories.TestValuesTableFactory
@@ -44,7 +45,8 @@ import java.util.{Collection => JCollection}
import scala.collection.JavaConversions._
@RunWith(classOf[Parameterized])
-class LookupJoinITCase(legacyTableSource: Boolean, enableCache: Boolean)
extends StreamingTestBase {
+class LookupJoinITCase(legacyTableSource: Boolean, cacheType: LookupCacheType)
+ extends StreamingTestBase {
val data = List(
rowOf(1L, 12, "Julian"),
@@ -108,11 +110,17 @@ class LookupJoinITCase(legacyTableSource: Boolean,
enableCache: Boolean) extends
} else {
val dataId = TestValuesTableFactory.registerData(data)
val cacheOptions =
- if (enableCache)
+ if (cacheType == LookupCacheType.PARTIAL)
s"""
- | '${LookupOptions.CACHE_TYPE.key()}' =
'${LookupOptions.LookupCacheType.PARTIAL}',
+ | '${LookupOptions.CACHE_TYPE.key()}' =
'${LookupCacheType.PARTIAL}',
| '${LookupOptions.PARTIAL_CACHE_MAX_ROWS.key()}' =
'${Long.MaxValue}',
|""".stripMargin
+ else if (cacheType == LookupCacheType.FULL)
+ s"""
+ | '${LookupOptions.CACHE_TYPE.key()}' =
'${LookupCacheType.FULL}',
+ | '${LookupOptions.FULL_CACHE_RELOAD_STRATEGY.key()}' =
'${ReloadStrategy.PERIODIC}',
+ | '${LookupOptions.FULL_CACHE_PERIODIC_RELOAD_INTERVAL.key()}' =
'${Long.MaxValue}',
+ |""".stripMargin
else ""
tEnv.executeSql(s"""
@@ -133,11 +141,17 @@ class LookupJoinITCase(legacyTableSource: Boolean,
enableCache: Boolean) extends
if (!legacyTableSource) {
val dataId = TestValuesTableFactory.registerData(data)
val cacheOptions =
- if (enableCache)
+ if (cacheType == LookupCacheType.PARTIAL)
s"""
- | '${LookupOptions.CACHE_TYPE.key()}' =
'${LookupOptions.LookupCacheType.PARTIAL}',
+ | '${LookupOptions.CACHE_TYPE.key()}' =
'${LookupCacheType.PARTIAL}',
| '${LookupOptions.PARTIAL_CACHE_MAX_ROWS.key()}' =
'${Long.MaxValue}',
|""".stripMargin
+ else if (cacheType == LookupCacheType.FULL)
+ s"""
+ | '${LookupOptions.CACHE_TYPE.key()}' =
'${LookupCacheType.FULL}',
+ | '${LookupOptions.FULL_CACHE_RELOAD_STRATEGY.key()}' =
'${ReloadStrategy.PERIODIC}',
+ | '${LookupOptions.FULL_CACHE_PERIODIC_RELOAD_INTERVAL.key()}' =
'${Long.MaxValue}',
+ |""".stripMargin
else ""
tEnv.executeSql(s"""
|CREATE TABLE $tableName (
@@ -556,7 +570,7 @@ class LookupJoinITCase(legacyTableSource: Boolean,
enableCache: Boolean) extends
@Test
def testLookupCacheSharingAcrossSubtasks(): Unit = {
- if (!enableCache) {
+ if (cacheType == LookupCacheType.NONE) {
return
}
// Keep the cache for later validation
@@ -590,9 +604,10 @@ class LookupJoinITCase(legacyTableSource: Boolean,
enableCache: Boolean) extends
val managedCaches = LookupCacheManager.getInstance().getManagedCaches
assertThat(managedCaches.size()).isEqualTo(1)
- // Validate 6 entries are cached
+ val numEntries = if (cacheType == LookupCacheType.PARTIAL) 6 else
userData.size
+ // Validate 6 entries are cached for PARTIAL and all entries for FULL
val cache =
managedCaches.get(managedCaches.keySet().iterator().next()).getCache
- assertThat(cache.size()).isEqualTo(6)
+ assertThat(cache.size()).isEqualTo(numEntries)
// Validate contents of cached entries
assertThatIterable(cache.getIfPresent(GenericRowData.of(jl(1L))))
@@ -699,14 +714,14 @@ object LookupJoinITCase {
val LEGACY_TABLE_SOURCE: JBoolean = JBoolean.TRUE;
val DYNAMIC_TABLE_SOURCE: JBoolean = JBoolean.FALSE;
- val ENABLE_CACHE: JBoolean = JBoolean.TRUE;
- val DISABLE_CACHE: JBoolean = JBoolean.FALSE;
- @Parameterized.Parameters(name = "LegacyTableSource={0}, EnableCache={1}")
+ @Parameterized.Parameters(name = "LegacyTableSource={0}, cacheType={1}")
def parameters(): JCollection[Array[Object]] = {
Seq[Array[AnyRef]](
- Array(LEGACY_TABLE_SOURCE, DISABLE_CACHE),
- Array(DYNAMIC_TABLE_SOURCE, ENABLE_CACHE),
- Array(DYNAMIC_TABLE_SOURCE, DISABLE_CACHE))
+ Array(LEGACY_TABLE_SOURCE, LookupCacheType.NONE),
+ Array(DYNAMIC_TABLE_SOURCE, LookupCacheType.NONE),
+ Array(DYNAMIC_TABLE_SOURCE, LookupCacheType.PARTIAL),
+ Array(DYNAMIC_TABLE_SOURCE, LookupCacheType.FULL)
+ )
}
}
diff --git
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/table/lookup/CachingLookupFunction.java
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/table/lookup/CachingLookupFunction.java
index d3bbf3b6d7c..6fe97bb6fc9 100644
---
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/table/lookup/CachingLookupFunction.java
+++
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/table/lookup/CachingLookupFunction.java
@@ -20,6 +20,7 @@ package org.apache.flink.table.runtime.functions.table.lookup;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.SimpleCounter;
import org.apache.flink.metrics.groups.CacheMetricGroup;
@@ -29,6 +30,10 @@ import
org.apache.flink.table.connector.source.lookup.cache.LookupCache;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.functions.FunctionContext;
import org.apache.flink.table.functions.LookupFunction;
+import
org.apache.flink.table.runtime.functions.table.lookup.fullcache.LookupFullCache;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
import java.io.IOException;
import java.util.Collection;
@@ -52,7 +57,7 @@ public class CachingLookupFunction extends LookupFunction {
private static final long UNINITIALIZED = -1;
// The actual user-provided lookup function
- private final LookupFunction delegate;
+ @Nullable private final LookupFunction delegate;
private LookupCache cache;
private transient String cacheIdentifier;
@@ -70,7 +75,7 @@ public class CachingLookupFunction extends LookupFunction {
* actual cache instance will be retrieved from the {@link
LookupCacheManager} during {@link
* #open}.
*/
- public CachingLookupFunction(LookupCache cache, LookupFunction delegate) {
+ public CachingLookupFunction(LookupCache cache, @Nullable LookupFunction
delegate) {
this.cache = cache;
this.delegate = delegate;
}
@@ -103,7 +108,13 @@ public class CachingLookupFunction extends LookupFunction {
// Initialize cache and the delegating function
cache.open(cacheMetricGroup);
- delegate.open(context);
+ if (cache instanceof LookupFullCache) {
+ // TODO add Configuration into FunctionContext
+ ((LookupFullCache) cache).open(new Configuration());
+ }
+ if (delegate != null) {
+ delegate.open(context);
+ }
}
@Override
@@ -128,7 +139,9 @@ public class CachingLookupFunction extends LookupFunction {
@Override
public void close() throws Exception {
- delegate.close();
+ if (delegate != null) {
+ delegate.close();
+ }
if (cacheIdentifier != null) {
LookupCacheManager.getInstance().unregisterCache(cacheIdentifier);
}
@@ -142,6 +155,9 @@ public class CachingLookupFunction extends LookupFunction {
// -------------------------------- Helper functions
------------------------------
private Collection<RowData> lookupByDelegate(RowData keyRow) throws
IOException {
try {
+ Preconditions.checkState(
+ delegate != null,
+ "User's lookup function can't be null, if there are
possible cache misses.");
Collection<RowData> lookupValues = delegate.lookup(keyRow);
loadCounter.inc();
updateLatestLoadTime();
diff --git
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/table/lookup/fullcache/CacheLoader.java
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/table/lookup/fullcache/CacheLoader.java
new file mode 100644
index 00000000000..bbd099833ab
--- /dev/null
+++
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/table/lookup/fullcache/CacheLoader.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.functions.table.lookup.fullcache;
+
+import org.apache.flink.api.common.functions.AbstractRichFunction;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.ThreadSafeSimpleCounter;
+import org.apache.flink.metrics.groups.CacheMetricGroup;
+import
org.apache.flink.table.connector.source.ScanTableSource.ScanRuntimeProvider;
+import org.apache.flink.table.data.RowData;
+
+import org.apache.flink.shaded.guava30.com.google.common.base.Joiner;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * Abstract task that loads data in Full cache from source provided by {@link
ScanRuntimeProvider}.
+ */
+public abstract class CacheLoader extends AbstractRichFunction implements
Runnable, Serializable {
+ private static final Logger LOG =
LoggerFactory.getLogger(CacheLoader.class);
+
+ protected transient volatile ConcurrentHashMap<RowData,
Collection<RowData>> cache;
+
+ // 2 reloads can't be executed simultaneously, so they are performed under
lock
+ private final ReentrantLock reloadLock = new ReentrantLock();
+ // runtime waits for the first load to complete to start an execution
lookup join
+ private CountDownLatch firstLoadLatch;
+
+ // Cache metrics
+ private transient Counter loadCounter;
+ private transient Counter loadFailuresCounter;
+ private transient volatile long latestLoadTimeMs;
+
+ protected abstract void reloadCache() throws Exception;
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ firstLoadLatch = new CountDownLatch(1);
+ loadCounter = new ThreadSafeSimpleCounter();
+ loadFailuresCounter = new ThreadSafeSimpleCounter();
+ }
+
+ public void open(CacheMetricGroup cacheMetricGroup) {
+ // Register metrics
+ cacheMetricGroup.loadCounter(loadCounter);
+ cacheMetricGroup.numLoadFailuresCounter(loadFailuresCounter);
+ cacheMetricGroup.numCachedRecordsGauge(() -> (long) cache.size());
+ cacheMetricGroup.latestLoadTimeGauge(() -> latestLoadTimeMs);
+ }
+
+ public ConcurrentHashMap<RowData, Collection<RowData>> getCache() {
+ return cache;
+ }
+
+ public void awaitFirstLoad() throws InterruptedException {
+ firstLoadLatch.await();
+ }
+
+ @Override
+ public void run() {
+ // 2 reloads can't be executed simultaneously
+ reloadLock.lock();
+ try {
+ LOG.info("Lookup 'FULL' cache loading triggered.");
+ long start = System.currentTimeMillis();
+ reloadCache();
+ latestLoadTimeMs = System.currentTimeMillis() - start;
+ loadCounter.inc();
+ LOG.info(
+ "Lookup 'FULL' cache loading finished. Time elapsed - {}
ms. Number of records - {}.",
+ latestLoadTimeMs,
+ cache.size());
+ if (LOG.isDebugEnabled()) {
+ // 'if' guard statement prevents us from transforming cache to
string
+ LOG.debug(
+ "Cache content: \n{\n\t{}\n}",
+ Joiner.on(",\n\t").withKeyValueSeparator(" =
").join(cache));
+ }
+ } catch (Exception e) {
+ loadFailuresCounter.inc();
+ throw new RuntimeException("Failed to reload lookup 'FULL'
cache.", e);
+ } finally {
+ reloadLock.unlock();
+ firstLoadLatch.countDown();
+ }
+ }
+
+ @Override
+ public void close() throws Exception {
+ cache.clear();
+ }
+}
diff --git
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/table/lookup/fullcache/LookupFullCache.java
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/table/lookup/fullcache/LookupFullCache.java
new file mode 100644
index 00000000000..0a631766199
--- /dev/null
+++
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/table/lookup/fullcache/LookupFullCache.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.functions.table.lookup.fullcache;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.groups.CacheMetricGroup;
+import
org.apache.flink.table.connector.source.lookup.LookupOptions.LookupCacheType;
+import org.apache.flink.table.connector.source.lookup.cache.LookupCache;
+import
org.apache.flink.table.connector.source.lookup.cache.trigger.CacheReloadTrigger;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Collection;
+import java.util.Collections;
+
+/** Internal implementation of {@link LookupCache} for {@link
LookupCacheType#FULL}. */
+public class LookupFullCache implements LookupCache {
+ private static final long serialVersionUID = 1L;
+
+ private final CacheLoader cacheLoader;
+ private final CacheReloadTrigger reloadTrigger;
+
+ private transient volatile ReloadTriggerContext reloadTriggerContext;
+ private transient volatile Throwable reloadFailCause;
+
+ public LookupFullCache(CacheLoader cacheLoader, CacheReloadTrigger
reloadTrigger) {
+ this.cacheLoader = Preconditions.checkNotNull(cacheLoader);
+ this.reloadTrigger = Preconditions.checkNotNull(reloadTrigger);
+ }
+
+ @Override
+ public synchronized void open(CacheMetricGroup metricGroup) {
+ cacheLoader.open(metricGroup);
+ }
+
+ public synchronized void open(Configuration parameters) throws Exception {
+ if (reloadTriggerContext == null) {
+ cacheLoader.open(parameters);
+ reloadTriggerContext =
+ new ReloadTriggerContext(
+ cacheLoader,
+ th -> {
+ if (reloadFailCause == null) {
+ reloadFailCause = th;
+ } else {
+ reloadFailCause.addSuppressed(th);
+ }
+ });
+
+ reloadTrigger.open(reloadTriggerContext);
+ cacheLoader.awaitFirstLoad();
+ }
+ }
+
+ @Override
+ public Collection<RowData> getIfPresent(RowData key) {
+ if (reloadFailCause != null) {
+ throw new RuntimeException(reloadFailCause);
+ }
+ return cacheLoader.getCache().getOrDefault(key,
Collections.emptyList());
+ }
+
+ @Override
+ public Collection<RowData> put(RowData key, Collection<RowData> value) {
+ throw new UnsupportedOperationException(
+ "Lookup Full cache doesn't support public 'put' operation from
the outside.");
+ }
+
+ @Override
+ public void invalidate(RowData key) {
+ throw new UnsupportedOperationException(
+ "Lookup Full cache doesn't support public 'invalidate'
operation from the outside.");
+ }
+
+ @Override
+ public long size() {
+ return cacheLoader.getCache().size();
+ }
+
+ @Override
+ public void close() throws Exception {
+ cacheLoader.close();
+ reloadTrigger.close();
+ }
+}
diff --git
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/table/lookup/fullcache/ReloadTriggerContext.java
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/table/lookup/fullcache/ReloadTriggerContext.java
new file mode 100644
index 00000000000..f1dad16113a
--- /dev/null
+++
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/table/lookup/fullcache/ReloadTriggerContext.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.functions.table.lookup.fullcache;
+
+import
org.apache.flink.table.connector.source.lookup.cache.trigger.CacheReloadTrigger;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Consumer;
+
+/** Runtime implementation of {@link CacheReloadTrigger.Context}. */
+public class ReloadTriggerContext implements CacheReloadTrigger.Context {
+
+ private final Runnable reloadTask;
+ private final Consumer<Throwable> reloadFailCallback;
+
+ public ReloadTriggerContext(Runnable reloadTask, Consumer<Throwable>
reloadFailCallback) {
+ this.reloadTask = reloadTask;
+ this.reloadFailCallback = reloadFailCallback;
+ }
+
+ @Override
+ public long currentProcessingTime() {
+ // TODO add processingTime into FunctionContext
+ return System.currentTimeMillis();
+ }
+
+ @Override
+ public long currentWatermark() {
+ // TODO add watermarks into FunctionContext
+ throw new UnsupportedOperationException(
+ "Watermarks are currently unsupported in cache reload
triggers.");
+ }
+
+ @Override
+ public CompletableFuture<Void> triggerReload() {
+ return CompletableFuture.runAsync(reloadTask)
+ .exceptionally(
+ th -> {
+ reloadFailCallback.accept(th);
+ return null;
+ });
+ }
+}
diff --git
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/table/lookup/fullcache/inputformat/InputFormatCacheLoader.java
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/table/lookup/fullcache/inputformat/InputFormatCacheLoader.java
new file mode 100644
index 00000000000..f7b95dbd694
--- /dev/null
+++
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/table/lookup/fullcache/inputformat/InputFormatCacheLoader.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package
org.apache.flink.table.runtime.functions.table.lookup.fullcache.inputformat;
+
+import org.apache.flink.api.common.io.InputFormat;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.io.InputSplit;
+import org.apache.flink.table.data.RowData;
+import
org.apache.flink.table.runtime.functions.table.lookup.fullcache.CacheLoader;
+import org.apache.flink.table.runtime.keyselector.GenericRowDataKeySelector;
+import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
+import org.apache.flink.util.InstantiationUtil;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.stream.Collectors;
+
+/** {@link CacheLoader} that used {@link InputFormat} for loading data into
the cache. */
+public class InputFormatCacheLoader extends CacheLoader {
+ private static final long serialVersionUID = 1L;
+ private static final Logger LOG =
LoggerFactory.getLogger(InputFormatCacheLoader.class);
+
+ private final InputFormat<RowData, InputSplit> initialInputFormat;
+ private final GenericRowDataKeySelector keySelector;
+ private final RowDataSerializer cacheEntriesSerializer;
+
+ private transient volatile List<InputSplitCacheLoadTask> cacheLoadTasks;
+ private transient Configuration parameters;
+
+ private volatile boolean isStopped;
+
+ public InputFormatCacheLoader(
+ InputFormat<RowData, ?> initialInputFormat,
+ GenericRowDataKeySelector keySelector,
+ RowDataSerializer cacheEntriesSerializer) {
+ // noinspection unchecked
+ this.initialInputFormat = (InputFormat<RowData, InputSplit>)
initialInputFormat;
+ this.keySelector = keySelector;
+ this.cacheEntriesSerializer = cacheEntriesSerializer;
+ }
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ super.open(parameters);
+ this.parameters = parameters;
+ this.initialInputFormat.configure(parameters);
+ }
+
+ @Override
+ protected void reloadCache() throws Exception {
+ InputSplit[] inputSplits = createInputSplits();
+ int numSplits = inputSplits.length;
+ // load data into the another copy of cache
+ // notice: it requires twice more memory, but on the other hand we
don't need any blocking
+ // cache has default initialCapacity and loadFactor, but overridden
concurrencyLevel
+ ConcurrentHashMap<RowData, Collection<RowData>> newCache =
+ new ConcurrentHashMap<>(16, 0.75f,
getConcurrencyLevel(numSplits));
+ this.cacheLoadTasks =
+ Arrays.stream(inputSplits)
+ .map(split -> createCacheLoadTask(split, newCache))
+ .collect(Collectors.toList());
+
+ // run first task and start numTasks - 1 threads to run remaining tasks
+ ExecutorService cacheLoadTaskService = null;
+ List<Future<?>> futures = null;
+ if (numSplits > 1) {
+ futures = new ArrayList<>();
+ int numThreads = getConcurrencyLevel(numSplits) - 1;
+ cacheLoadTaskService = Executors.newFixedThreadPool(numThreads);
+ for (int i = 1; i < numSplits; i++) {
+ Future<?> future =
cacheLoadTaskService.submit(cacheLoadTasks.get(i));
+ futures.add(future);
+ }
+ }
+ cacheLoadTasks.get(0).run();
+ if (cacheLoadTaskService != null) {
+ for (Future<?> future : futures) {
+ future.get(); // if any exception occurs it will be thrown here
+ }
+ cacheLoadTaskService.shutdownNow();
+ }
+ if (!isStopped) {
+ // reassigning cache field is safe, because it's volatile
+ cache = newCache;
+ }
+ }
+
+ @Override
+ public void close() throws Exception {
+ super.close();
+ isStopped = true;
+ if (cacheLoadTasks != null) {
+ cacheLoadTasks.forEach(InputSplitCacheLoadTask::stopRunning);
+ }
+ }
+
+ private InputSplitCacheLoadTask createCacheLoadTask(
+ InputSplit inputSplit, ConcurrentHashMap<RowData,
Collection<RowData>> newCache) {
+ try {
+ // InputFormat and KeySelector are not thread-safe,
+ // so we create copies of them for each task
+ InputFormat<RowData, InputSplit> inputFormat =
+ InstantiationUtil.clone(initialInputFormat);
+ inputFormat.configure(parameters);
+ return new InputSplitCacheLoadTask(
+ newCache, keySelector.copy(), cacheEntriesSerializer,
inputFormat, inputSplit);
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to create
InputFormatCacheLoadTask", e);
+ }
+ }
+
+ private InputSplit[] createInputSplits() throws IOException {
+ InputSplit[] inputSplits =
this.initialInputFormat.createInputSplits(1);
+ if (LOG.isDebugEnabled()) {
+ // 'if' guard statement prevents us from transforming splits to
string
+ LOG.debug(
+ "InputFormat created {} InputSplits: {}",
+ inputSplits.length,
+ Arrays.deepToString(inputSplits));
+ }
+ Preconditions.checkState(
+ inputSplits.length >= 1,
+ "InputFormat must provide at least one input split to load
data into the lookup 'FULL' cache.");
+ return inputSplits;
+ }
+
+ private int getConcurrencyLevel(int numSplits) {
+ // creating many threads will cause performance issues because of
context switching
+ int numOfCores = Runtime.getRuntime().availableProcessors();
+ return Math.min(numSplits, numOfCores);
+ }
+}
diff --git
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/table/lookup/fullcache/inputformat/InputSplitCacheLoadTask.java
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/table/lookup/fullcache/inputformat/InputSplitCacheLoadTask.java
new file mode 100644
index 00000000000..8610b206fff
--- /dev/null
+++
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/table/lookup/fullcache/inputformat/InputSplitCacheLoadTask.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package
org.apache.flink.table.runtime.functions.table.lookup.fullcache.inputformat;
+
+import org.apache.flink.api.common.io.InputFormat;
+import org.apache.flink.api.common.io.RichInputFormat;
+import org.apache.flink.core.io.InputSplit;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.binary.BinaryRowData;
+import org.apache.flink.table.runtime.keyselector.RowDataKeySelector;
+import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
+import org.apache.flink.types.RowKind;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+/**
+ * Parallel task that loads data into the cache from {@link InputFormat} with
specified {@link
+ * InputSplit}.
+ */
+public class InputSplitCacheLoadTask implements Runnable {
+
+ private final ConcurrentHashMap<RowData, Collection<RowData>> cache;
+ private final RowDataKeySelector keySelector;
+ private final RowDataSerializer cacheEntriesSerializer;
+ private final InputFormat<RowData, InputSplit> inputFormat;
+ private final InputSplit inputSplit;
+
+ private volatile boolean isRunning = true;
+
+ public InputSplitCacheLoadTask(
+ ConcurrentHashMap<RowData, Collection<RowData>> cache,
+ RowDataKeySelector keySelector,
+ RowDataSerializer cacheEntriesSerializer,
+ InputFormat<RowData, InputSplit> inputFormat,
+ InputSplit inputSplit) {
+ this.cache = cache;
+ this.keySelector = keySelector;
+ this.inputFormat = inputFormat;
+ this.cacheEntriesSerializer = cacheEntriesSerializer;
+ this.inputSplit = inputSplit;
+ }
+
+ @Override
+ public void run() {
+ try {
+ if (inputFormat instanceof RichInputFormat) {
+ ((RichInputFormat<?, ?>) inputFormat).openInputFormat();
+ }
+ inputFormat.open(inputSplit);
+ RowData nextElement = new
BinaryRowData(cacheEntriesSerializer.getArity());
+ while (isRunning && !inputFormat.reachedEnd()) {
+ nextElement = inputFormat.nextRecord(nextElement);
+ if (nextElement != null) {
+ if (nextElement.getRowKind() != RowKind.INSERT) {
+ throw new IllegalStateException(
+ "InputFormat must provide only INSERT records
in lookup 'FULL' cache. Received record "
+ + nextElement);
+ }
+ RowData record = cacheEntriesSerializer.copy(nextElement);
+ RowData key = keySelector.getKey(record);
+ if (hasNoNulls(key)) {
+ Collection<RowData> resultRows =
+ cache.computeIfAbsent(
+ key,
+ // collection must be thread-safe and
can possibly
+ // contain duplicates
+ k -> new ConcurrentLinkedQueue<>());
+ resultRows.add(record);
+ }
+ } else {
+ break;
+ }
+ }
+ } catch (Exception e) {
+ throw new RuntimeException(
+ "Failed to load data into the lookup 'FULL' cache from
InputSplit "
+ + inputSplit,
+ e);
+ } finally {
+ try {
+ inputFormat.close();
+ if (inputFormat instanceof RichInputFormat) {
+ ((RichInputFormat<?, ?>) inputFormat).closeInputFormat();
+ }
+ } catch (IOException e) {
+ // can't do try-with-resources
+ throw new RuntimeException("Failed to close InputFormat.", e);
+ }
+ }
+ }
+
+ public void stopRunning() {
+ isRunning = false;
+ }
+
+ private static boolean hasNoNulls(RowData keys) {
+ // in SQL comparison 'null = null' returns false
+ // IS NOT DISTINCT FROM (which returns true) is not supported in
lookup join
+ // so to prevent equality of 2 nulls when searching in cache
+ // we must not store rows with nulls in order not to violate semantics
of '='
+ for (int i = 0; i < keys.getArity(); i++) {
+ if (keys.isNullAt(i)) {
+ return false;
+ }
+ }
+ return true;
+ }
+}
diff --git
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/keyselector/GenericRowDataKeySelector.java
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/keyselector/GenericRowDataKeySelector.java
new file mode 100644
index 00000000000..13f01fd7d76
--- /dev/null
+++
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/keyselector/GenericRowDataKeySelector.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.keyselector;
+
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.generated.GeneratedProjection;
+import org.apache.flink.table.runtime.generated.Projection;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
+
+/** A KeySelector which will extract key from RowData. The key type is
GenericRowData. */
+public class GenericRowDataKeySelector implements RowDataKeySelector {
+
+ private static final long serialVersionUID = 1L;
+
+ private final InternalTypeInfo<RowData> keyRowType;
+ private final RowDataSerializer keySerializer;
+ private final GeneratedProjection generatedProjection;
+ private transient Projection<RowData, GenericRowData> projection;
+
+ public GenericRowDataKeySelector(
+ InternalTypeInfo<RowData> keyRowType,
+ RowDataSerializer keySerializer,
+ GeneratedProjection generatedProjection) {
+ this.keyRowType = keyRowType;
+ this.generatedProjection = generatedProjection;
+ this.keySerializer = keySerializer;
+ }
+
+ @Override
+ public RowData getKey(RowData value) throws Exception {
+ if (projection == null) {
+ ClassLoader cl = Thread.currentThread().getContextClassLoader();
+ //noinspection unchecked
+ projection = generatedProjection.newInstance(cl);
+ }
+ return keySerializer.copy(projection.apply(value));
+ }
+
+ @Override
+ public InternalTypeInfo<RowData> getProducedType() {
+ return keyRowType;
+ }
+
+ public GenericRowDataKeySelector copy() {
+ return new GenericRowDataKeySelector(keyRowType, keySerializer,
generatedProjection);
+ }
+}
diff --git
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/functions/table/fullcache/FullCacheTestInputFormat.java
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/functions/table/fullcache/FullCacheTestInputFormat.java
new file mode 100644
index 00000000000..8084a279276
--- /dev/null
+++
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/functions/table/fullcache/FullCacheTestInputFormat.java
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.functions.table.fullcache;
+
+import org.apache.flink.api.common.io.DefaultInputSplitAssigner;
+import org.apache.flink.api.common.io.InputFormat;
+import org.apache.flink.api.common.io.RichInputFormat;
+import org.apache.flink.api.common.io.statistics.BaseStatistics;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.io.InputSplit;
+import org.apache.flink.core.io.InputSplitAssigner;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.util.DataFormatConverters;
+import org.apache.flink.types.Row;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Consumer;
+import java.util.stream.IntStream;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** TestInputFormat that reads data from (2 + delta) splits which share the
same {@code queue}. */
+public class FullCacheTestInputFormat
+ extends RichInputFormat<RowData,
FullCacheTestInputFormat.QueueInputSplit> {
+
+ private static final int DEFAULT_NUM_SPLITS = 2;
+ private static final AtomicInteger OPEN_CLOSED_COUNTER = new
AtomicInteger(0);
+
+ // RowData is not serializable, so we store Rows
+ private final Collection<Row> dataRows;
+ private final DataFormatConverters.RowConverter rowConverter;
+ private final int deltaNumSplits;
+ private final transient Consumer<Collection<RowData>> secondLoadDataChange;
+
+ private transient ConcurrentLinkedQueue<RowData> queue;
+ private int loadCounter;
+ private int maxReadRecords;
+ private int readRecords;
+
+ private int numOpens;
+
+ public FullCacheTestInputFormat(
+ Collection<Row> dataRows,
+ DataFormatConverters.RowConverter rowConverter,
+ int deltaNumSplits,
+ Consumer<Collection<RowData>> secondLoadDataChange) {
+ // for unit tests
+ this.dataRows = dataRows;
+ this.rowConverter = rowConverter;
+ this.deltaNumSplits = deltaNumSplits;
+ this.secondLoadDataChange = secondLoadDataChange;
+ }
+
+ public FullCacheTestInputFormat(
+ Collection<Row> dataRows, DataFormatConverters.RowConverter
rowConverter) {
+ // for integration tests
+ this.dataRows = dataRows;
+ this.rowConverter = rowConverter;
+ this.deltaNumSplits = 0;
+ this.secondLoadDataChange = null;
+ }
+
+ @Override
+ public QueueInputSplit[] createInputSplits(int minNumSplits) throws
IOException {
+ int delta = loadCounter > 0 ? deltaNumSplits : 0;
+ int numSplits = DEFAULT_NUM_SPLITS + delta;
+ ConcurrentLinkedQueue<RowData> queue = new ConcurrentLinkedQueue<>();
+ QueueInputSplit[] splits = new QueueInputSplit[numSplits];
+ IntStream.range(0, numSplits).forEach(i -> splits[i] = new
QueueInputSplit(queue, i));
+ dataRows.forEach(row -> queue.add(rowConverter.toInternal(row)));
+ // divide data evenly between InputFormat copies
+ loadCounter++;
+ if (loadCounter == 2 && secondLoadDataChange != null) {
+ secondLoadDataChange.accept(queue);
+ }
+ maxReadRecords = (int) Math.ceil((double) queue.size() / numSplits);
+ return splits;
+ }
+
+ @Override
+ public void openInputFormat() {
+ numOpens++;
+ OPEN_CLOSED_COUNTER.incrementAndGet();
+ }
+
+ @Override
+ public void open(QueueInputSplit split) throws IOException {
+ this.queue = split.getQueue();
+ this.readRecords = 0;
+ numOpens++;
+ OPEN_CLOSED_COUNTER.incrementAndGet();
+ }
+
+ @Override
+ public boolean reachedEnd() throws IOException {
+ return queue.isEmpty();
+ }
+
+ @Override
+ public RowData nextRecord(RowData reuse) throws IOException {
+ assertThat(numOpens).isEqualTo(2);
+ if (readRecords == maxReadRecords) {
+ return null;
+ }
+ readRecords++;
+ return queue.poll();
+ }
+
+ @Override
+ public InputSplitAssigner getInputSplitAssigner(QueueInputSplit[]
inputSplits) {
+ return new DefaultInputSplitAssigner(inputSplits);
+ }
+
+ @Override
+ public void configure(Configuration parameters) {}
+
+ @Override
+ public BaseStatistics getStatistics(BaseStatistics cachedStatistics)
throws IOException {
+ return null;
+ }
+
+ @Override
+ public void close() throws IOException {
+ OPEN_CLOSED_COUNTER.decrementAndGet();
+ }
+
+ @Override
+ public void closeInputFormat() {
+ OPEN_CLOSED_COUNTER.decrementAndGet();
+ }
+
+ public boolean isClosed() {
+ return OPEN_CLOSED_COUNTER.get() == 0;
+ }
+
+ /** {@link InputSplit} that provides queue to {@link InputFormat}. */
+ public static class QueueInputSplit implements InputSplit {
+
+ private final transient ConcurrentLinkedQueue<RowData> queue;
+ private final int splitNumber;
+
+ public QueueInputSplit(ConcurrentLinkedQueue<RowData> queue, int
splitNumber) {
+ this.queue = queue;
+ this.splitNumber = splitNumber;
+ }
+
+ @Override
+ public int getSplitNumber() {
+ return splitNumber;
+ }
+
+ public ConcurrentLinkedQueue<RowData> getQueue() {
+ return queue;
+ }
+ }
+}
diff --git
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/functions/table/fullcache/TestManualCacheReloadTrigger.java
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/functions/table/fullcache/TestManualCacheReloadTrigger.java
new file mode 100644
index 00000000000..3c4043313e8
--- /dev/null
+++
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/functions/table/fullcache/TestManualCacheReloadTrigger.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.functions.table.fullcache;
+
+import
org.apache.flink.table.connector.source.lookup.cache.trigger.CacheReloadTrigger;
+
+/** Test implementation of {@link CacheReloadTrigger} that is triggered
manually. */
+public class TestManualCacheReloadTrigger implements CacheReloadTrigger {
+
+ private Context context;
+ private boolean isClosed;
+
+ public void trigger() throws Exception {
+ if (context != null) {
+ context.triggerReload().get();
+ }
+ }
+
+ @Override
+ public void open(Context context) throws Exception {
+ this.context = context;
+ trigger();
+ }
+
+ @Override
+ public void close() throws Exception {
+ isClosed = true;
+ }
+
+ public boolean isClosed() {
+ return isClosed;
+ }
+}