This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 761af879198 [HUDI-6563] Supports flink lookup join (#9228)
761af879198 is described below

commit 761af879198c213947fe0ace32029e440489a442
Author: chao chen <[email protected]>
AuthorDate: Mon May 20 08:01:41 2024 +0800

    [HUDI-6563] Supports flink lookup join (#9228)
---
 .../apache/hudi/configuration/FlinkOptions.java    |  10 ++
 .../java/org/apache/hudi/source/FileIndex.java     |  25 +--
 .../apache/hudi/source/IncrementalInputSplits.java |   3 +-
 .../org/apache/hudi/table/HoodieTableFactory.java  |   7 +-
 .../org/apache/hudi/table/HoodieTableSource.java   |  84 +++++++---
 .../hudi/table/lookup/HoodieLookupFunction.java    | 177 +++++++++++++++++++++
 .../hudi/table/lookup/HoodieLookupTableReader.java |  73 +++++++++
 .../org/apache/hudi/util/SerializableSchema.java   | 108 +++++++++++++
 .../java/org/apache/hudi/source/TestFileIndex.java |  10 +-
 .../apache/hudi/table/ITTestHoodieDataSource.java  |  34 +++-
 .../apache/hudi/table/TestHoodieTableSource.java   |  10 +-
 .../apache/hudi/table/format/TestInputFormat.java  |   7 +-
 12 files changed, 497 insertions(+), 51 deletions(-)

diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
index 25fb00bc5a9..9a8b99a85c5 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
@@ -47,12 +47,14 @@ import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.ConfigOptions;
 import org.apache.flink.configuration.Configuration;
 
+import java.time.Duration;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+import static org.apache.flink.configuration.ConfigOptions.key;
 import static 
org.apache.hudi.common.table.cdc.HoodieCDCSupplementalLoggingMode.DATA_BEFORE_AFTER;
 import static 
org.apache.hudi.common.util.PartitionPathEncodeUtils.DEFAULT_PARTITION_PATH;
 
@@ -1058,6 +1060,14 @@ public class FlinkOptions extends HoodieConfig {
       .defaultValue(HoodieSyncTableStrategy.ALL.name())
       .withDescription("Hive table synchronization strategy. Available option: 
RO, RT, ALL.");
 
+  public static final ConfigOption<Duration> LOOKUP_JOIN_CACHE_TTL =
+      key("lookup.join.cache.ttl")
+          .durationType()
+          .defaultValue(Duration.ofMinutes(60))
+          .withDescription(
+              "The cache TTL (e.g. 10min) for the build table in lookup 
join.");
+
+
   // -------------------------------------------------------------------------
   //  Utilities
   // -------------------------------------------------------------------------
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/FileIndex.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/FileIndex.java
index a1461975811..3361a58e4e9 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/FileIndex.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/FileIndex.java
@@ -38,14 +38,15 @@ import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.types.logical.RowType;
-import org.apache.hadoop.fs.Path;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nullable;
 
+import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.LinkedHashMap;
 import java.util.List;
@@ -60,10 +61,12 @@ import java.util.stream.Collectors;
  *
  * <p>It caches the partition paths to avoid redundant look up.
  */
-public class FileIndex {
+public class FileIndex implements Serializable {
+  private static final long serialVersionUID = 1L;
+
   private static final Logger LOG = LoggerFactory.getLogger(FileIndex.class);
 
-  private final Path path;
+  private final StoragePath path;
   private final RowType rowType;
   private final boolean tableExists;
   private final HoodieMetadataConfig metadataConfig;
@@ -73,7 +76,7 @@ public class FileIndex {
   private final int dataBucket;                                   // for 
bucket pruning
   private List<String> partitionPaths;                            // cache of 
partition paths
 
-  private FileIndex(Path path, Configuration conf, RowType rowType, DataPruner 
dataPruner, PartitionPruners.PartitionPruner partitionPruner, int dataBucket) {
+  private FileIndex(StoragePath path, Configuration conf, RowType rowType, 
DataPruner dataPruner, PartitionPruners.PartitionPruner partitionPruner, int 
dataBucket) {
     this.path = path;
     this.rowType = rowType;
     this.hadoopConf = HadoopConfigurations.getHadoopConf(conf);
@@ -111,7 +114,7 @@ public class FileIndex {
       List<String> partitionKeys,
       String defaultParName,
       boolean hivePartition) {
-    if (partitionKeys.size() == 0) {
+    if (partitionKeys.isEmpty()) {
       // non partitioned table
       return Collections.emptyList();
     }
@@ -152,10 +155,10 @@ public class FileIndex {
     List<StoragePathInfo> allFiles = FSUtils.getFilesInPartitions(
             new HoodieFlinkEngineContext(hadoopConf), metadataConfig, 
path.toString(), partitions)
         .values().stream()
-        .flatMap(e -> e.stream())
+        .flatMap(Collection::stream)
         .collect(Collectors.toList());
 
-    if (allFiles.size() == 0) {
+    if (allFiles.isEmpty()) {
       // returns early for empty table.
       return allFiles;
     }
@@ -190,11 +193,11 @@ public class FileIndex {
    * @param partitionPath The relative partition path, may be empty if the 
table is non-partitioned.
    * @return The full partition path string
    */
-  private static String fullPartitionPath(Path basePath, String partitionPath) 
{
+  private static String fullPartitionPath(StoragePath basePath, String 
partitionPath) {
     if (partitionPath.isEmpty()) {
       return basePath.toString();
     }
-    return new Path(basePath, partitionPath).toString();
+    return new StoragePath(basePath, partitionPath).toString();
   }
 
   /**
@@ -338,7 +341,7 @@ public class FileIndex {
    * Builder for {@link FileIndex}.
    */
   public static class Builder {
-    private Path path;
+    private StoragePath path;
     private Configuration conf;
     private RowType rowType;
     private DataPruner dataPruner;
@@ -348,7 +351,7 @@ public class FileIndex {
     private Builder() {
     }
 
-    public Builder path(Path path) {
+    public Builder path(StoragePath path) {
       this.path = path;
       return this;
     }
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java
index 815c6a26350..fde19014589 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java
@@ -38,6 +38,7 @@ import org.apache.hudi.configuration.OptionsResolver;
 import org.apache.hudi.metadata.HoodieTableMetadataUtil;
 import org.apache.hudi.sink.partitioner.profile.WriteProfiles;
 import org.apache.hudi.source.prune.PartitionPruners;
+import org.apache.hudi.storage.StoragePath;
 import org.apache.hudi.storage.StoragePathInfo;
 import org.apache.hudi.table.format.cdc.CdcInputSplit;
 import org.apache.hudi.table.format.mor.MergeOnReadInputSplit;
@@ -409,7 +410,7 @@ public class IncrementalInputSplits implements Serializable 
{
 
   private FileIndex getFileIndex() {
     return FileIndex.builder()
-        .path(new org.apache.hadoop.fs.Path(path.toUri()))
+        .path(new StoragePath(path.toUri()))
         .conf(conf)
         .rowType(rowType)
         .partitionPruner(partitionPruner)
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java
index 65f0199ae80..ac35e14b935 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java
@@ -30,8 +30,10 @@ import org.apache.hudi.exception.HoodieValidationException;
 import org.apache.hudi.index.HoodieIndex;
 import org.apache.hudi.keygen.NonpartitionedAvroKeyGenerator;
 import org.apache.hudi.keygen.TimestampBasedAvroKeyGenerator;
+import org.apache.hudi.storage.StoragePath;
 import org.apache.hudi.util.AvroSchemaConverter;
 import org.apache.hudi.util.DataTypeUtils;
+import org.apache.hudi.util.SerializableSchema;
 import org.apache.hudi.util.StreamerUtil;
 
 import org.apache.flink.configuration.ConfigOption;
@@ -48,7 +50,6 @@ import 
org.apache.flink.table.factories.DynamicTableSinkFactory;
 import org.apache.flink.table.factories.DynamicTableSourceFactory;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.logical.LogicalType;
-import org.apache.hadoop.fs.Path;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -81,13 +82,13 @@ public class HoodieTableFactory implements 
DynamicTableSourceFactory, DynamicTab
   @Override
   public DynamicTableSource createDynamicTableSource(Context context) {
     Configuration conf = 
FlinkOptions.fromMap(context.getCatalogTable().getOptions());
-    Path path = new Path(conf.getOptional(FlinkOptions.PATH).orElseThrow(() ->
+    StoragePath path = new 
StoragePath(conf.getOptional(FlinkOptions.PATH).orElseThrow(() ->
         new ValidationException("Option [path] should not be empty.")));
     setupTableOptions(conf.getString(FlinkOptions.PATH), conf);
     ResolvedSchema schema = context.getCatalogTable().getResolvedSchema();
     setupConfOptions(conf, context.getObjectIdentifier(), 
context.getCatalogTable(), schema);
     return new HoodieTableSource(
-        schema,
+        SerializableSchema.create(schema),
         path,
         context.getCatalogTable().getPartitionKeys(),
         conf.getString(FlinkOptions.PARTITION_DEFAULT_NAME),
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
index 49f41d7fcad..0e3749a493f 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
@@ -53,7 +53,10 @@ import 
org.apache.hudi.source.rebalance.selector.StreamReadBucketIndexKeySelecto
 import org.apache.hudi.source.prune.DataPruner;
 import org.apache.hudi.source.prune.PartitionPruners;
 import org.apache.hudi.source.prune.PrimaryKeyPruners;
+import org.apache.hudi.storage.StorageConfiguration;
+import org.apache.hudi.storage.StoragePath;
 import org.apache.hudi.storage.StoragePathInfo;
+import org.apache.hudi.storage.hadoop.HadoopStorageConfiguration;
 import org.apache.hudi.table.format.FilePathUtils;
 import org.apache.hudi.table.format.InternalSchemaManager;
 import org.apache.hudi.table.format.cdc.CdcInputFormat;
@@ -61,10 +64,13 @@ import 
org.apache.hudi.table.format.cow.CopyOnWriteInputFormat;
 import org.apache.hudi.table.format.mor.MergeOnReadInputFormat;
 import org.apache.hudi.table.format.mor.MergeOnReadInputSplit;
 import org.apache.hudi.table.format.mor.MergeOnReadTableState;
+import org.apache.hudi.table.lookup.HoodieLookupFunction;
+import org.apache.hudi.table.lookup.HoodieLookupTableReader;
 import org.apache.hudi.util.AvroSchemaConverter;
 import org.apache.hudi.util.ChangelogModes;
 import org.apache.hudi.util.ExpressionUtils;
 import org.apache.hudi.util.InputFormats;
+import org.apache.hudi.util.SerializableSchema;
 import org.apache.hudi.util.StreamerUtil;
 
 import org.apache.avro.Schema;
@@ -80,11 +86,11 @@ import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import 
org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;
 import org.apache.flink.table.api.DataTypes;
-import org.apache.flink.table.catalog.Column;
-import org.apache.flink.table.catalog.ResolvedSchema;
 import org.apache.flink.table.connector.ChangelogMode;
 import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.connector.source.LookupTableSource;
 import org.apache.flink.table.connector.source.ScanTableSource;
+import org.apache.flink.table.connector.source.TableFunctionProvider;
 import 
org.apache.flink.table.connector.source.abilities.SupportsFilterPushDown;
 import org.apache.flink.table.connector.source.abilities.SupportsLimitPushDown;
 import 
org.apache.flink.table.connector.source.abilities.SupportsProjectionPushDown;
@@ -99,6 +105,8 @@ import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nullable;
 
+import java.io.Serializable;
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -111,6 +119,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
+import static org.apache.hudi.configuration.FlinkOptions.LOOKUP_JOIN_CACHE_TTL;
 import static 
org.apache.hudi.configuration.HadoopConfigurations.getParquetConf;
 import static org.apache.hudi.util.ExpressionUtils.filterSimpleCallExpression;
 import static org.apache.hudi.util.ExpressionUtils.splitExprByPartitionCall;
@@ -122,18 +131,21 @@ public class HoodieTableSource implements
     ScanTableSource,
     SupportsProjectionPushDown,
     SupportsLimitPushDown,
-    SupportsFilterPushDown {
+    SupportsFilterPushDown,
+    LookupTableSource,
+    Serializable {
+  private static final long serialVersionUID = 1L;
   private static final Logger LOG = 
LoggerFactory.getLogger(HoodieTableSource.class);
 
   private static final long NO_LIMIT_CONSTANT = -1;
 
-  private final transient org.apache.hadoop.conf.Configuration hadoopConf;
-  private final transient HoodieTableMetaClient metaClient;
+  private final StorageConfiguration<org.apache.hadoop.conf.Configuration> 
hadoopConf;
+  private final HoodieTableMetaClient metaClient;
   private final long maxCompactionMemoryInBytes;
 
-  private final ResolvedSchema schema;
+  private final SerializableSchema schema;
   private final RowType tableRowType;
-  private final Path path;
+  private final StoragePath path;
   private final List<String> partitionKeys;
   private final String defaultPartName;
   private final Configuration conf;
@@ -145,11 +157,11 @@ public class HoodieTableSource implements
   private DataPruner dataPruner;
   private PartitionPruners.PartitionPruner partitionPruner;
   private int dataBucket;
-  private FileIndex fileIndex;
+  private transient FileIndex fileIndex;
 
   public HoodieTableSource(
-      ResolvedSchema schema,
-      Path path,
+      SerializableSchema schema,
+      StoragePath path,
       List<String> partitionKeys,
       String defaultPartName,
       Configuration conf) {
@@ -157,8 +169,8 @@ public class HoodieTableSource implements
   }
 
   public HoodieTableSource(
-      ResolvedSchema schema,
-      Path path,
+      SerializableSchema schema,
+      StoragePath path,
       List<String> partitionKeys,
       String defaultPartName,
       Configuration conf,
@@ -170,7 +182,7 @@ public class HoodieTableSource implements
       @Nullable Long limit,
       @Nullable HoodieTableMetaClient metaClient,
       @Nullable InternalSchemaManager internalSchemaManager) {
-    this.schema = 
ResolvedSchema.of(schema.getColumns().stream().filter(Column::isPhysical).collect(Collectors.toList()));
+    this.schema = schema;
     this.tableRowType = (RowType) 
this.schema.toSourceRowDataType().notNull().getLogicalType();
     this.path = path;
     this.partitionKeys = partitionKeys;
@@ -182,8 +194,8 @@ public class HoodieTableSource implements
     this.dataBucket = dataBucket;
     this.requiredPos = Optional.ofNullable(requiredPos).orElseGet(() -> 
IntStream.range(0, this.tableRowType.getFieldCount()).toArray());
     this.limit = Optional.ofNullable(limit).orElse(NO_LIMIT_CONSTANT);
-    this.hadoopConf = HadoopConfigurations.getHadoopConf(conf);
-    this.metaClient = Optional.ofNullable(metaClient).orElseGet(() -> 
StreamerUtil.metaClientForReader(conf, hadoopConf));
+    this.hadoopConf = new 
HadoopStorageConfiguration(HadoopConfigurations.getHadoopConf(conf));
+    this.metaClient = Optional.ofNullable(metaClient).orElseGet(() -> 
StreamerUtil.metaClientForReader(conf, this.hadoopConf.unwrap()));
     this.maxCompactionMemoryInBytes = 
StreamerUtil.getMaxCompactionMemoryInBytes(conf);
     this.internalSchemaManager = 
Optional.ofNullable(internalSchemaManager).orElseGet(() -> 
InternalSchemaManager.get(this.conf, this.metaClient));
   }
@@ -290,6 +302,18 @@ public class HoodieTableSource implements
     this.limit = limit;
   }
 
+  @Override
+  public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext context) 
{
+    Duration duration = conf.get(LOOKUP_JOIN_CACHE_TTL);
+    return TableFunctionProvider.of(
+        new HoodieLookupFunction(
+            new HoodieLookupTableReader(this::getBatchInputFormat, conf),
+            tableRowType,
+            getLookupKeys(context.getKeys()),
+            duration
+        ));
+  }
+
   private DataType getProducedDataType() {
     String[] schemaFieldNames = this.schema.getColumnNames().toArray(new 
String[0]);
     DataType[] schemaTypes = this.schema.getColumnDataTypes().toArray(new 
DataType[0]);
@@ -325,8 +349,8 @@ public class HoodieTableSource implements
     LOG.info("Partition pruner for hoodie source, condition is:\n" + joiner);
     List<ExpressionEvaluators.Evaluator> evaluators = 
ExpressionEvaluators.fromExpression(partitionFilters);
     List<DataType> partitionTypes = this.partitionKeys.stream().map(name ->
-        this.schema.getColumn(name).orElseThrow(() -> new 
HoodieValidationException("Field " + name + " does not exist")))
-        .map(Column::getDataType)
+            this.schema.getColumn(name).orElseThrow(() -> new 
HoodieValidationException("Field " + name + " does not exist")))
+        .map(SerializableSchema.Column::getDataType)
         .collect(Collectors.toList());
     String defaultParName = 
conf.getString(FlinkOptions.PARTITION_DEFAULT_NAME);
     boolean hivePartition = 
conf.getBoolean(FlinkOptions.HIVE_STYLE_PARTITIONING);
@@ -348,11 +372,11 @@ public class HoodieTableSource implements
   private List<MergeOnReadInputSplit> buildInputSplits() {
     FileIndex fileIndex = getOrBuildFileIndex();
     List<String> relPartitionPaths = fileIndex.getOrBuildPartitionPaths();
-    if (relPartitionPaths.size() == 0) {
+    if (relPartitionPaths.isEmpty()) {
       return Collections.emptyList();
     }
     List<StoragePathInfo> pathInfoList = fileIndex.getFilesInPartitions();
-    if (pathInfoList.size() == 0) {
+    if (pathInfoList.isEmpty()) {
       throw new HoodieException("No files found for reading in user provided 
path.");
     }
 
@@ -404,7 +428,7 @@ public class HoodieTableSource implements
         switch (tableType) {
           case MERGE_ON_READ:
             final List<MergeOnReadInputSplit> inputSplits = buildInputSplits();
-            if (inputSplits.size() == 0) {
+            if (inputSplits.isEmpty()) {
               // When there is no input splits, just return an empty source.
               LOG.warn("No input splits generate for MERGE_ON_READ input 
format, returns empty collection instead");
               return InputFormats.EMPTY_INPUT_FORMAT;
@@ -536,7 +560,7 @@ public class HoodieTableSource implements
 
   private InputFormat<RowData, ?> baseFileOnlyInputFormat() {
     final List<StoragePathInfo> pathInfoList = getReadFiles();
-    if (pathInfoList.size() == 0) {
+    if (pathInfoList.isEmpty()) {
       return InputFormats.EMPTY_INPUT_FORMAT;
     }
 
@@ -560,7 +584,7 @@ public class HoodieTableSource implements
         this.conf.getBoolean(FlinkOptions.HIVE_STYLE_PARTITIONING),
         this.predicates,
         this.limit == NO_LIMIT_CONSTANT ? Long.MAX_VALUE : this.limit, // 
ParquetInputFormat always uses the limit value
-        getParquetConf(this.conf, this.hadoopConf),
+        getParquetConf(this.conf, this.hadoopConf.unwrap()),
         this.conf.getBoolean(FlinkOptions.READ_UTC_TIMEZONE),
         this.internalSchemaManager
     );
@@ -585,6 +609,20 @@ public class HoodieTableSource implements
     return this.fileIndex;
   }
 
+  private int[] getLookupKeys(int[][] keys) {
+    int[] keyIndices = new int[keys.length];
+    int i = 0;
+    for (int[] key : keys) {
+      if (key.length > 1) {
+        throw new UnsupportedOperationException(
+            "Hive lookup can not support nested key now.");
+      }
+      keyIndices[i] = key[0];
+      i++;
+    }
+    return keyIndices;
+  }
+
   @VisibleForTesting
   public Schema getTableAvroSchema() {
     try {
@@ -623,7 +661,7 @@ public class HoodieTableSource implements
   public List<StoragePathInfo> getReadFiles() {
     FileIndex fileIndex = getOrBuildFileIndex();
     List<String> relPartitionPaths = fileIndex.getOrBuildPartitionPaths();
-    if (relPartitionPaths.size() == 0) {
+    if (relPartitionPaths.isEmpty()) {
       return Collections.emptyList();
     }
     return fileIndex.getFilesInPartitions();
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/lookup/HoodieLookupFunction.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/lookup/HoodieLookupFunction.java
new file mode 100644
index 00000000000..3fc65ce12b4
--- /dev/null
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/lookup/HoodieLookupFunction.java
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.lookup;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.functions.FunctionContext;
+import org.apache.flink.table.functions.TableFunction;
+import org.apache.flink.table.runtime.typeutils.InternalSerializers;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Lookup function for filesystem connector tables.
+ *
+ * <p>Note: reference Flink FileSystemLookupFunction to avoid additional 
connector jar dependencies.
+ */
+public class HoodieLookupFunction extends TableFunction<RowData> {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(HoodieLookupFunction.class);
+
+  // the max number of retries before throwing exception, in case of failure 
to load the table
+  // into cache
+  private static final int MAX_RETRIES = 3;
+  // interval between retries
+  private static final Duration RETRY_INTERVAL = Duration.ofSeconds(10);
+
+  private final HoodieLookupTableReader partitionReader;
+  private final RowData.FieldGetter[] lookupFieldGetters;
+  private final Duration reloadInterval;
+  private final TypeSerializer<RowData> serializer;
+  private final RowType rowType;
+
+  // cache for lookup data
+  private transient Map<RowData, List<RowData>> cache;
+  // timestamp when cache expires
+  private transient long nextLoadTime;
+
+  public HoodieLookupFunction(
+      HoodieLookupTableReader partitionReader,
+      RowType rowType,
+      int[] lookupKeys,
+      Duration reloadInterval) {
+    this.partitionReader = partitionReader;
+    this.rowType = rowType;
+    this.lookupFieldGetters = new RowData.FieldGetter[lookupKeys.length];
+    for (int i = 0; i < lookupKeys.length; i++) {
+      lookupFieldGetters[i] =
+          RowData.createFieldGetter(rowType.getTypeAt(lookupKeys[i]), 
lookupKeys[i]);
+    }
+    this.reloadInterval = reloadInterval;
+    this.serializer = InternalSerializers.create(rowType);
+  }
+
+  @Override
+  public void open(FunctionContext context) throws Exception {
+    super.open(context);
+    cache = new HashMap<>();
+    nextLoadTime = -1L;
+  }
+
+  @Override
+  public TypeInformation<RowData> getResultType() {
+    return InternalTypeInfo.of(rowType);
+  }
+
+  public void eval(Object... values) {
+    checkCacheReload();
+    RowData lookupKey = GenericRowData.of(values);
+    List<RowData> matchedRows = cache.get(lookupKey);
+    if (matchedRows != null) {
+      for (RowData matchedRow : matchedRows) {
+        collect(matchedRow);
+      }
+    }
+  }
+
+  private void checkCacheReload() {
+    if (nextLoadTime > System.currentTimeMillis()) {
+      return;
+    }
+    if (nextLoadTime > 0) {
+      LOG.info(
+          "Lookup join cache has expired after {} minute(s), reloading",
+          reloadInterval.toMinutes());
+    } else {
+      LOG.info("Populating lookup join cache");
+    }
+    int numRetry = 0;
+    while (true) {
+      cache.clear();
+      try {
+        long count = 0;
+        GenericRowData reuse = new GenericRowData(rowType.getFieldCount());
+        partitionReader.open();
+        RowData row;
+        while ((row = partitionReader.read(reuse)) != null) {
+          count++;
+          RowData rowData = serializer.copy(row);
+          RowData key = extractLookupKey(rowData);
+          List<RowData> rows = cache.computeIfAbsent(key, k -> new 
ArrayList<>());
+          rows.add(rowData);
+        }
+        partitionReader.close();
+        nextLoadTime = System.currentTimeMillis() + reloadInterval.toMillis();
+        LOG.info("Loaded {} row(s) into lookup join cache", count);
+        return;
+      } catch (Exception e) {
+        if (numRetry >= MAX_RETRIES) {
+          throw new FlinkRuntimeException(
+              String.format(
+                  "Failed to load table into cache after %d retries", 
numRetry),
+              e);
+        }
+        numRetry++;
+        long toSleep = numRetry * RETRY_INTERVAL.toMillis();
+        LOG.warn(
+            String.format(
+                "Failed to load table into cache, will retry in %d seconds",
+                toSleep / 1000),
+            e);
+        try {
+          Thread.sleep(toSleep);
+        } catch (InterruptedException ex) {
+          LOG.warn("Interrupted while waiting to retry failed cache load, 
aborting");
+          throw new FlinkRuntimeException(ex);
+        }
+      }
+    }
+  }
+
+  private RowData extractLookupKey(RowData row) {
+    GenericRowData key = new GenericRowData(lookupFieldGetters.length);
+    for (int i = 0; i < lookupFieldGetters.length; i++) {
+      key.setField(i, lookupFieldGetters[i].getFieldOrNull(row));
+    }
+    return key;
+  }
+
+  @Override
+  public void close() throws Exception {
+    // no operation
+  }
+
+  @VisibleForTesting
+  public Duration getReloadInterval() {
+    return reloadInterval;
+  }
+}
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/lookup/HoodieLookupTableReader.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/lookup/HoodieLookupTableReader.java
new file mode 100644
index 00000000000..0460cd42691
--- /dev/null
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/lookup/HoodieLookupTableReader.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.lookup;
+
+import org.apache.hudi.common.function.SerializableSupplier;
+
+import org.apache.flink.api.common.io.InputFormat;
+import org.apache.flink.api.common.io.RichInputFormat;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.io.InputSplit;
+import org.apache.flink.table.data.RowData;
+import org.jetbrains.annotations.Nullable;
+
+import java.io.IOException;
+import java.io.Serializable;
+
+/**
+ * Hudi look up table reader.
+ */
+public class HoodieLookupTableReader implements Serializable {
+  private static final long serialVersionUID = 1L;
+
+  private final SerializableSupplier<InputFormat<RowData, ?>> 
inputFormatSupplier;
+  private final Configuration conf;
+
+  private InputFormat inputFormat;
+
+  public HoodieLookupTableReader(SerializableSupplier<InputFormat<RowData, ?>> 
inputFormatSupplier, Configuration conf) {
+    this.inputFormatSupplier = inputFormatSupplier;
+    this.conf = conf;
+  }
+
+  public void open() throws IOException {
+    this.inputFormat = inputFormatSupplier.get();
+    inputFormat.configure(conf);
+    InputSplit[] inputSplits = inputFormat.createInputSplits(1);
+    ((RichInputFormat) inputFormat).openInputFormat();
+    inputFormat.open(inputSplits[0]);
+  }
+
+  @Nullable
+  public RowData read(RowData reuse) throws IOException {
+    if (!inputFormat.reachedEnd()) {
+      return (RowData) inputFormat.nextRecord(reuse);
+    }
+    return null;
+  }
+
+  public void close() throws IOException {
+    if (this.inputFormat != null) {
+      inputFormat.close();
+    }
+    if (inputFormat instanceof RichInputFormat) {
+      ((RichInputFormat) inputFormat).closeInputFormat();
+    }
+  }
+}
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/SerializableSchema.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/SerializableSchema.java
new file mode 100644
index 00000000000..626a238178c
--- /dev/null
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/SerializableSchema.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.util;
+
+import org.apache.hudi.common.util.Option;
+
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.utils.DataTypeUtils;
+
+import java.io.Serializable;
+import java.util.List;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.table.api.DataTypes.ROW;
+
+/**
+ * A serializable substitute for {@code ResolvedSchema}.
+ */
+public class SerializableSchema implements Serializable {
+  private static final long serialVersionUID = 1L;
+  private final List<Column> columns;
+
+  private SerializableSchema(List<Column> columns) {
+    this.columns = columns;
+  }
+
+  public static SerializableSchema create(ResolvedSchema resolvedSchema) {
+    List<Column> columns = resolvedSchema.getColumns().stream()
+        .filter(org.apache.flink.table.catalog.Column::isPhysical)
+        .map(column -> Column.create(column.getName(), column.getDataType()))
+        .collect(Collectors.toList());
+    return new SerializableSchema(columns);
+  }
+
+  public List<String> getColumnNames() {
+    return 
this.columns.stream().map(Column::getName).collect(Collectors.toList());
+  }
+
+  public List<DataType> getColumnDataTypes() {
+    return 
this.columns.stream().map(Column::getDataType).collect(Collectors.toList());
+  }
+
+  public Option<Column> getColumn(String columnName) {
+    return Option.fromJavaOptional(this.columns.stream().filter((col) -> 
col.getName().equals(columnName)).findFirst());
+  }
+
+  public DataType toSourceRowDataType() {
+    return this.toRowDataType((c) -> true);
+  }
+
+  private DataType toRowDataType(Predicate<Column> predicate) {
+    final DataTypes.Field[] fieldsArray = columns.stream().filter(predicate)
+        .map(SerializableSchema::columnToField)
+        .toArray(DataTypes.Field[]::new);
+    // the row should never be null
+    return ROW(fieldsArray).notNull();
+  }
+
+  private static DataTypes.Field columnToField(Column column) {
+    return DataTypes.FIELD(column.getName(), 
DataTypeUtils.removeTimeAttribute(column.getDataType()));
+  }
+
+  // -------------------------------------------------------------------------
+  //  Utilities
+  // -------------------------------------------------------------------------
+  public static class Column implements Serializable {
+    private static final long serialVersionUID = 1L;
+
+    private final String name;
+    private final DataType dataType;
+
+    private Column(String name, DataType dataType) {
+      this.name = name;
+      this.dataType = dataType;
+    }
+
+    public static Column create(String name, DataType type) {
+      return new Column(name, type);
+    }
+
+    public String getName() {
+      return name;
+    }
+
+    public DataType getDataType() {
+      return dataType;
+    }
+  }
+}
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestFileIndex.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestFileIndex.java
index 8ed8a391010..20a5cb34fdf 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestFileIndex.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestFileIndex.java
@@ -23,6 +23,7 @@ import org.apache.hudi.common.model.HoodieFileFormat;
 import org.apache.hudi.configuration.FlinkOptions;
 import org.apache.hudi.keygen.NonpartitionedAvroKeyGenerator;
 import org.apache.hudi.source.prune.DataPruner;
+import org.apache.hudi.storage.StoragePath;
 import org.apache.hudi.storage.StoragePathInfo;
 import org.apache.hudi.utils.TestConfigurations;
 import org.apache.hudi.utils.TestData;
@@ -37,7 +38,6 @@ import 
org.apache.flink.table.expressions.FieldReferenceExpression;
 import org.apache.flink.table.expressions.ValueLiteralExpression;
 import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
 import org.apache.flink.table.functions.FunctionIdentifier;
-import org.apache.hadoop.fs.Path;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
 import org.junit.jupiter.params.ParameterizedTest;
@@ -75,7 +75,7 @@ public class TestFileIndex {
     conf.setBoolean(METADATA_ENABLED, true);
     conf.setBoolean(HIVE_STYLE_PARTITIONING, hiveStylePartitioning);
     TestData.writeData(TestData.DATA_SET_INSERT, conf);
-    FileIndex fileIndex = FileIndex.builder().path(new 
Path(tempFile.getAbsolutePath())).conf(conf)
+    FileIndex fileIndex = FileIndex.builder().path(new 
StoragePath(tempFile.getAbsolutePath())).conf(conf)
         .rowType(TestConfigurations.ROW_TYPE).build();
     List<String> partitionKeys = Collections.singletonList("partition");
     List<Map<String, String>> partitions =
@@ -99,7 +99,7 @@ public class TestFileIndex {
     conf.setString(KEYGEN_CLASS_NAME, 
NonpartitionedAvroKeyGenerator.class.getName());
     conf.setBoolean(METADATA_ENABLED, true);
     TestData.writeData(TestData.DATA_SET_INSERT, conf);
-    FileIndex fileIndex = FileIndex.builder().path(new 
Path(tempFile.getAbsolutePath())).conf(conf)
+    FileIndex fileIndex = FileIndex.builder().path(new 
StoragePath(tempFile.getAbsolutePath())).conf(conf)
         .rowType(TestConfigurations.ROW_TYPE).build();
     List<String> partitionKeys = Collections.singletonList("");
     List<Map<String, String>> partitions =
@@ -117,7 +117,7 @@ public class TestFileIndex {
   void testFileListingEmptyTable(boolean enableMetadata) {
     Configuration conf = 
TestConfigurations.getDefaultConf(tempFile.getAbsolutePath());
     conf.setBoolean(METADATA_ENABLED, enableMetadata);
-    FileIndex fileIndex = FileIndex.builder().path(new 
Path(tempFile.getAbsolutePath())).conf(conf)
+    FileIndex fileIndex = FileIndex.builder().path(new 
StoragePath(tempFile.getAbsolutePath())).conf(conf)
         .rowType(TestConfigurations.ROW_TYPE).build();
     List<String> partitionKeys = Collections.singletonList("partition");
     List<Map<String, String>> partitions =
@@ -140,7 +140,7 @@ public class TestFileIndex {
 
     FileIndex fileIndex =
         FileIndex.builder()
-            .path(new Path(tempFile.getAbsolutePath()))
+            .path(new StoragePath(tempFile.getAbsolutePath()))
             .conf(conf).rowType(TestConfigurations.ROW_TYPE_BIGINT)
             .dataPruner(DataPruner.newInstance(Collections.singletonList(new 
CallExpression(
                 FunctionIdentifier.of("greaterThan"),
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
index dc253587d5b..78211e34a7f 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
@@ -80,8 +80,8 @@ import java.util.stream.Stream;
 import static org.apache.hudi.utils.TestConfigurations.catalog;
 import static org.apache.hudi.utils.TestConfigurations.sql;
 import static org.apache.hudi.utils.TestData.array;
-import static org.apache.hudi.utils.TestData.assertRowsEqualsUnordered;
 import static org.apache.hudi.utils.TestData.assertRowsEquals;
+import static org.apache.hudi.utils.TestData.assertRowsEqualsUnordered;
 import static org.apache.hudi.utils.TestData.map;
 import static org.apache.hudi.utils.TestData.row;
 import static org.junit.jupiter.api.Assertions.assertFalse;
@@ -668,6 +668,38 @@ public class ITTestHoodieDataSource {
     assertRowsEquals(result2, TestData.DATA_SET_SOURCE_MERGED);
   }
 
+  @ParameterizedTest
+  @EnumSource(value = HoodieTableType.class)
+  void testLookupJoin(HoodieTableType tableType) {
+    TableEnvironment tableEnv = streamTableEnv;
+    String hoodieTableDDL = sql("t1")
+        .option(FlinkOptions.PATH, tempFile.getAbsolutePath())
+        .option(FlinkOptions.TABLE_NAME, tableType)
+        .end();
+    tableEnv.executeSql(hoodieTableDDL);
+
+    String hoodieTableDDL2 = sql("t2")
+        .option(FlinkOptions.PATH, tempFile.getAbsolutePath())
+        .option(FlinkOptions.TABLE_NAME, tableType)
+        .end();
+    tableEnv.executeSql(hoodieTableDDL2);
+
+    execInsertSql(tableEnv, TestSQL.INSERT_T1);
+
+    tableEnv.executeSql("create view t1_view as select *,"
+        + "PROCTIME() as proc_time from t1");
+
+    // Join two hudi tables with the same data
+    String sql = "insert into t2 select b.* from t1_view o "
+        + "       join t1/*+ OPTIONS('lookup.join.cache.ttl'= '2 day') */  "
+        + "       FOR SYSTEM_TIME AS OF o.proc_time AS b on o.uuid = b.uuid";
+    execInsertSql(tableEnv, sql);
+    List<Row> result = CollectionUtil.iterableToList(
+        () -> tableEnv.sqlQuery("select * from t2").execute().collect());
+
+    assertRowsEquals(result, TestData.DATA_SET_SOURCE_INSERT);
+  }
+
   @ParameterizedTest
   @EnumSource(value = ExecMode.class)
   void testWriteAndReadParMiddle(ExecMode execMode) throws Exception {
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableSource.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableSource.java
index 1d9db480d38..1e3550aee27 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableSource.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableSource.java
@@ -23,8 +23,10 @@ import org.apache.hudi.configuration.FlinkOptions;
 import org.apache.hudi.source.ExpressionPredicates;
 import org.apache.hudi.source.prune.DataPruner;
 import org.apache.hudi.source.prune.PrimaryKeyPruners;
+import org.apache.hudi.storage.StoragePath;
 import org.apache.hudi.storage.StoragePathInfo;
 import org.apache.hudi.table.format.mor.MergeOnReadInputFormat;
+import org.apache.hudi.util.SerializableSchema;
 import org.apache.hudi.utils.TestConfigurations;
 import org.apache.hudi.utils.TestData;
 
@@ -117,8 +119,8 @@ public class TestHoodieTableSource {
     TestData.writeData(TestData.DATA_SET_INSERT, conf);
 
     HoodieTableSource tableSource = new HoodieTableSource(
-        TestConfigurations.TABLE_SCHEMA,
-        new Path(tempFile.getPath()),
+        SerializableSchema.create(TestConfigurations.TABLE_SCHEMA),
+        new StoragePath(tempFile.getPath()),
         
Arrays.asList(conf.getString(FlinkOptions.PARTITION_PATH_FIELD).split(",")),
         "default-par",
         conf);
@@ -334,8 +336,8 @@ public class TestHoodieTableSource {
 
   private HoodieTableSource createHoodieTableSource(Configuration conf) {
     return new HoodieTableSource(
-        TestConfigurations.TABLE_SCHEMA,
-        new Path(conf.getString(FlinkOptions.PATH)),
+        SerializableSchema.create(TestConfigurations.TABLE_SCHEMA),
+        new StoragePath(conf.getString(FlinkOptions.PATH)),
         
Arrays.asList(conf.getString(FlinkOptions.PARTITION_PATH_FIELD).split(",")),
         "default-par",
         conf);
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java
index eadf4ffb26c..00ed50b7514 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java
@@ -33,6 +33,7 @@ import org.apache.hudi.configuration.FlinkOptions;
 import org.apache.hudi.configuration.HadoopConfigurations;
 import org.apache.hudi.source.IncrementalInputSplits;
 import org.apache.hudi.source.prune.PartitionPruners;
+import org.apache.hudi.storage.StoragePath;
 import org.apache.hudi.storage.hadoop.HadoopStorageConfiguration;
 import org.apache.hudi.table.HoodieTableSource;
 import org.apache.hudi.table.format.cdc.CdcInputFormat;
@@ -41,6 +42,7 @@ import 
org.apache.hudi.table.format.mor.MergeOnReadInputFormat;
 import org.apache.hudi.table.format.mor.MergeOnReadInputSplit;
 import org.apache.hudi.util.AvroSchemaConverter;
 import org.apache.hudi.util.FlinkWriteClients;
+import org.apache.hudi.util.SerializableSchema;
 import org.apache.hudi.util.StreamerUtil;
 import org.apache.hudi.utils.TestConfigurations;
 import org.apache.hudi.utils.TestData;
@@ -55,7 +57,6 @@ import org.apache.flink.table.expressions.CallExpression;
 import org.apache.flink.table.expressions.FieldReferenceExpression;
 import org.apache.flink.table.expressions.ValueLiteralExpression;
 import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
-import org.apache.hadoop.fs.Path;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
 import org.junit.jupiter.params.ParameterizedTest;
@@ -1182,8 +1183,8 @@ public class TestInputFormat {
 
   private HoodieTableSource getTableSource(Configuration conf) {
     return new HoodieTableSource(
-        TestConfigurations.TABLE_SCHEMA,
-        new Path(tempFile.getAbsolutePath()),
+        SerializableSchema.create(TestConfigurations.TABLE_SCHEMA),
+        new StoragePath(tempFile.getAbsolutePath()),
         Collections.singletonList("partition"),
         "default",
         conf);


Reply via email to