This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 761af879198 [HUDI-6563] Supports flink lookup join (#9228)
761af879198 is described below
commit 761af879198c213947fe0ace32029e440489a442
Author: chao chen <[email protected]>
AuthorDate: Mon May 20 08:01:41 2024 +0800
[HUDI-6563] Supports flink lookup join (#9228)
---
.../apache/hudi/configuration/FlinkOptions.java | 10 ++
.../java/org/apache/hudi/source/FileIndex.java | 25 +--
.../apache/hudi/source/IncrementalInputSplits.java | 3 +-
.../org/apache/hudi/table/HoodieTableFactory.java | 7 +-
.../org/apache/hudi/table/HoodieTableSource.java | 84 +++++++---
.../hudi/table/lookup/HoodieLookupFunction.java | 177 +++++++++++++++++++++
.../hudi/table/lookup/HoodieLookupTableReader.java | 73 +++++++++
.../org/apache/hudi/util/SerializableSchema.java | 108 +++++++++++++
.../java/org/apache/hudi/source/TestFileIndex.java | 10 +-
.../apache/hudi/table/ITTestHoodieDataSource.java | 34 +++-
.../apache/hudi/table/TestHoodieTableSource.java | 10 +-
.../apache/hudi/table/format/TestInputFormat.java | 7 +-
12 files changed, 497 insertions(+), 51 deletions(-)
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
index 25fb00bc5a9..9a8b99a85c5 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
@@ -47,12 +47,14 @@ import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.Configuration;
+import java.time.Duration;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import static org.apache.flink.configuration.ConfigOptions.key;
import static
org.apache.hudi.common.table.cdc.HoodieCDCSupplementalLoggingMode.DATA_BEFORE_AFTER;
import static
org.apache.hudi.common.util.PartitionPathEncodeUtils.DEFAULT_PARTITION_PATH;
@@ -1058,6 +1060,14 @@ public class FlinkOptions extends HoodieConfig {
.defaultValue(HoodieSyncTableStrategy.ALL.name())
.withDescription("Hive table synchronization strategy. Available option:
RO, RT, ALL.");
+ public static final ConfigOption<Duration> LOOKUP_JOIN_CACHE_TTL =
+ key("lookup.join.cache.ttl")
+ .durationType()
+ .defaultValue(Duration.ofMinutes(60))
+ .withDescription(
+ "The cache TTL (e.g. 10min) for the build table in lookup
join.");
+
+
// -------------------------------------------------------------------------
// Utilities
// -------------------------------------------------------------------------
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/FileIndex.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/FileIndex.java
index a1461975811..3361a58e4e9 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/FileIndex.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/FileIndex.java
@@ -38,14 +38,15 @@ import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.RowType;
-import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
+import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collection;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
@@ -60,10 +61,12 @@ import java.util.stream.Collectors;
*
* <p>It caches the partition paths to avoid redundant look up.
*/
-public class FileIndex {
+public class FileIndex implements Serializable {
+ private static final long serialVersionUID = 1L;
+
private static final Logger LOG = LoggerFactory.getLogger(FileIndex.class);
- private final Path path;
+ private final StoragePath path;
private final RowType rowType;
private final boolean tableExists;
private final HoodieMetadataConfig metadataConfig;
@@ -73,7 +76,7 @@ public class FileIndex {
private final int dataBucket; // for
bucket pruning
private List<String> partitionPaths; // cache of
partition paths
- private FileIndex(Path path, Configuration conf, RowType rowType, DataPruner
dataPruner, PartitionPruners.PartitionPruner partitionPruner, int dataBucket) {
+ private FileIndex(StoragePath path, Configuration conf, RowType rowType,
DataPruner dataPruner, PartitionPruners.PartitionPruner partitionPruner, int
dataBucket) {
this.path = path;
this.rowType = rowType;
this.hadoopConf = HadoopConfigurations.getHadoopConf(conf);
@@ -111,7 +114,7 @@ public class FileIndex {
List<String> partitionKeys,
String defaultParName,
boolean hivePartition) {
- if (partitionKeys.size() == 0) {
+ if (partitionKeys.isEmpty()) {
// non partitioned table
return Collections.emptyList();
}
@@ -152,10 +155,10 @@ public class FileIndex {
List<StoragePathInfo> allFiles = FSUtils.getFilesInPartitions(
new HoodieFlinkEngineContext(hadoopConf), metadataConfig,
path.toString(), partitions)
.values().stream()
- .flatMap(e -> e.stream())
+ .flatMap(Collection::stream)
.collect(Collectors.toList());
- if (allFiles.size() == 0) {
+ if (allFiles.isEmpty()) {
// returns early for empty table.
return allFiles;
}
@@ -190,11 +193,11 @@ public class FileIndex {
* @param partitionPath The relative partition path, may be empty if the
table is non-partitioned.
* @return The full partition path string
*/
- private static String fullPartitionPath(Path basePath, String partitionPath)
{
+ private static String fullPartitionPath(StoragePath basePath, String
partitionPath) {
if (partitionPath.isEmpty()) {
return basePath.toString();
}
- return new Path(basePath, partitionPath).toString();
+ return new StoragePath(basePath, partitionPath).toString();
}
/**
@@ -338,7 +341,7 @@ public class FileIndex {
* Builder for {@link FileIndex}.
*/
public static class Builder {
- private Path path;
+ private StoragePath path;
private Configuration conf;
private RowType rowType;
private DataPruner dataPruner;
@@ -348,7 +351,7 @@ public class FileIndex {
private Builder() {
}
- public Builder path(Path path) {
+ public Builder path(StoragePath path) {
this.path = path;
return this;
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java
index 815c6a26350..fde19014589 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java
@@ -38,6 +38,7 @@ import org.apache.hudi.configuration.OptionsResolver;
import org.apache.hudi.metadata.HoodieTableMetadataUtil;
import org.apache.hudi.sink.partitioner.profile.WriteProfiles;
import org.apache.hudi.source.prune.PartitionPruners;
+import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.storage.StoragePathInfo;
import org.apache.hudi.table.format.cdc.CdcInputSplit;
import org.apache.hudi.table.format.mor.MergeOnReadInputSplit;
@@ -409,7 +410,7 @@ public class IncrementalInputSplits implements Serializable
{
private FileIndex getFileIndex() {
return FileIndex.builder()
- .path(new org.apache.hadoop.fs.Path(path.toUri()))
+ .path(new StoragePath(path.toUri()))
.conf(conf)
.rowType(rowType)
.partitionPruner(partitionPruner)
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java
index 65f0199ae80..ac35e14b935 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java
@@ -30,8 +30,10 @@ import org.apache.hudi.exception.HoodieValidationException;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.keygen.NonpartitionedAvroKeyGenerator;
import org.apache.hudi.keygen.TimestampBasedAvroKeyGenerator;
+import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.util.AvroSchemaConverter;
import org.apache.hudi.util.DataTypeUtils;
+import org.apache.hudi.util.SerializableSchema;
import org.apache.hudi.util.StreamerUtil;
import org.apache.flink.configuration.ConfigOption;
@@ -48,7 +50,6 @@ import
org.apache.flink.table.factories.DynamicTableSinkFactory;
import org.apache.flink.table.factories.DynamicTableSourceFactory;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.LogicalType;
-import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -81,13 +82,13 @@ public class HoodieTableFactory implements
DynamicTableSourceFactory, DynamicTab
@Override
public DynamicTableSource createDynamicTableSource(Context context) {
Configuration conf =
FlinkOptions.fromMap(context.getCatalogTable().getOptions());
- Path path = new Path(conf.getOptional(FlinkOptions.PATH).orElseThrow(() ->
+ StoragePath path = new
StoragePath(conf.getOptional(FlinkOptions.PATH).orElseThrow(() ->
new ValidationException("Option [path] should not be empty.")));
setupTableOptions(conf.getString(FlinkOptions.PATH), conf);
ResolvedSchema schema = context.getCatalogTable().getResolvedSchema();
setupConfOptions(conf, context.getObjectIdentifier(),
context.getCatalogTable(), schema);
return new HoodieTableSource(
- schema,
+ SerializableSchema.create(schema),
path,
context.getCatalogTable().getPartitionKeys(),
conf.getString(FlinkOptions.PARTITION_DEFAULT_NAME),
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
index 49f41d7fcad..0e3749a493f 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
@@ -53,7 +53,10 @@ import
org.apache.hudi.source.rebalance.selector.StreamReadBucketIndexKeySelecto
import org.apache.hudi.source.prune.DataPruner;
import org.apache.hudi.source.prune.PartitionPruners;
import org.apache.hudi.source.prune.PrimaryKeyPruners;
+import org.apache.hudi.storage.StorageConfiguration;
+import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.storage.StoragePathInfo;
+import org.apache.hudi.storage.hadoop.HadoopStorageConfiguration;
import org.apache.hudi.table.format.FilePathUtils;
import org.apache.hudi.table.format.InternalSchemaManager;
import org.apache.hudi.table.format.cdc.CdcInputFormat;
@@ -61,10 +64,13 @@ import
org.apache.hudi.table.format.cow.CopyOnWriteInputFormat;
import org.apache.hudi.table.format.mor.MergeOnReadInputFormat;
import org.apache.hudi.table.format.mor.MergeOnReadInputSplit;
import org.apache.hudi.table.format.mor.MergeOnReadTableState;
+import org.apache.hudi.table.lookup.HoodieLookupFunction;
+import org.apache.hudi.table.lookup.HoodieLookupTableReader;
import org.apache.hudi.util.AvroSchemaConverter;
import org.apache.hudi.util.ChangelogModes;
import org.apache.hudi.util.ExpressionUtils;
import org.apache.hudi.util.InputFormats;
+import org.apache.hudi.util.SerializableSchema;
import org.apache.hudi.util.StreamerUtil;
import org.apache.avro.Schema;
@@ -80,11 +86,11 @@ import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import
org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction;
import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;
import org.apache.flink.table.api.DataTypes;
-import org.apache.flink.table.catalog.Column;
-import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.connector.source.LookupTableSource;
import org.apache.flink.table.connector.source.ScanTableSource;
+import org.apache.flink.table.connector.source.TableFunctionProvider;
import
org.apache.flink.table.connector.source.abilities.SupportsFilterPushDown;
import org.apache.flink.table.connector.source.abilities.SupportsLimitPushDown;
import
org.apache.flink.table.connector.source.abilities.SupportsProjectionPushDown;
@@ -99,6 +105,8 @@ import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
+import java.io.Serializable;
+import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
@@ -111,6 +119,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
+import static org.apache.hudi.configuration.FlinkOptions.LOOKUP_JOIN_CACHE_TTL;
import static
org.apache.hudi.configuration.HadoopConfigurations.getParquetConf;
import static org.apache.hudi.util.ExpressionUtils.filterSimpleCallExpression;
import static org.apache.hudi.util.ExpressionUtils.splitExprByPartitionCall;
@@ -122,18 +131,21 @@ public class HoodieTableSource implements
ScanTableSource,
SupportsProjectionPushDown,
SupportsLimitPushDown,
- SupportsFilterPushDown {
+ SupportsFilterPushDown,
+ LookupTableSource,
+ Serializable {
+ private static final long serialVersionUID = 1L;
private static final Logger LOG =
LoggerFactory.getLogger(HoodieTableSource.class);
private static final long NO_LIMIT_CONSTANT = -1;
- private final transient org.apache.hadoop.conf.Configuration hadoopConf;
- private final transient HoodieTableMetaClient metaClient;
+ private final StorageConfiguration<org.apache.hadoop.conf.Configuration>
hadoopConf;
+ private final HoodieTableMetaClient metaClient;
private final long maxCompactionMemoryInBytes;
- private final ResolvedSchema schema;
+ private final SerializableSchema schema;
private final RowType tableRowType;
- private final Path path;
+ private final StoragePath path;
private final List<String> partitionKeys;
private final String defaultPartName;
private final Configuration conf;
@@ -145,11 +157,11 @@ public class HoodieTableSource implements
private DataPruner dataPruner;
private PartitionPruners.PartitionPruner partitionPruner;
private int dataBucket;
- private FileIndex fileIndex;
+ private transient FileIndex fileIndex;
public HoodieTableSource(
- ResolvedSchema schema,
- Path path,
+ SerializableSchema schema,
+ StoragePath path,
List<String> partitionKeys,
String defaultPartName,
Configuration conf) {
@@ -157,8 +169,8 @@ public class HoodieTableSource implements
}
public HoodieTableSource(
- ResolvedSchema schema,
- Path path,
+ SerializableSchema schema,
+ StoragePath path,
List<String> partitionKeys,
String defaultPartName,
Configuration conf,
@@ -170,7 +182,7 @@ public class HoodieTableSource implements
@Nullable Long limit,
@Nullable HoodieTableMetaClient metaClient,
@Nullable InternalSchemaManager internalSchemaManager) {
- this.schema =
ResolvedSchema.of(schema.getColumns().stream().filter(Column::isPhysical).collect(Collectors.toList()));
+ this.schema = schema;
this.tableRowType = (RowType)
this.schema.toSourceRowDataType().notNull().getLogicalType();
this.path = path;
this.partitionKeys = partitionKeys;
@@ -182,8 +194,8 @@ public class HoodieTableSource implements
this.dataBucket = dataBucket;
this.requiredPos = Optional.ofNullable(requiredPos).orElseGet(() ->
IntStream.range(0, this.tableRowType.getFieldCount()).toArray());
this.limit = Optional.ofNullable(limit).orElse(NO_LIMIT_CONSTANT);
- this.hadoopConf = HadoopConfigurations.getHadoopConf(conf);
- this.metaClient = Optional.ofNullable(metaClient).orElseGet(() ->
StreamerUtil.metaClientForReader(conf, hadoopConf));
+ this.hadoopConf = new
HadoopStorageConfiguration(HadoopConfigurations.getHadoopConf(conf));
+ this.metaClient = Optional.ofNullable(metaClient).orElseGet(() ->
StreamerUtil.metaClientForReader(conf, this.hadoopConf.unwrap()));
this.maxCompactionMemoryInBytes =
StreamerUtil.getMaxCompactionMemoryInBytes(conf);
this.internalSchemaManager =
Optional.ofNullable(internalSchemaManager).orElseGet(() ->
InternalSchemaManager.get(this.conf, this.metaClient));
}
@@ -290,6 +302,18 @@ public class HoodieTableSource implements
this.limit = limit;
}
+ @Override
+ public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext context)
{
+ Duration duration = conf.get(LOOKUP_JOIN_CACHE_TTL);
+ return TableFunctionProvider.of(
+ new HoodieLookupFunction(
+ new HoodieLookupTableReader(this::getBatchInputFormat, conf),
+ tableRowType,
+ getLookupKeys(context.getKeys()),
+ duration
+ ));
+ }
+
private DataType getProducedDataType() {
String[] schemaFieldNames = this.schema.getColumnNames().toArray(new
String[0]);
DataType[] schemaTypes = this.schema.getColumnDataTypes().toArray(new
DataType[0]);
@@ -325,8 +349,8 @@ public class HoodieTableSource implements
LOG.info("Partition pruner for hoodie source, condition is:\n" + joiner);
List<ExpressionEvaluators.Evaluator> evaluators =
ExpressionEvaluators.fromExpression(partitionFilters);
List<DataType> partitionTypes = this.partitionKeys.stream().map(name ->
- this.schema.getColumn(name).orElseThrow(() -> new
HoodieValidationException("Field " + name + " does not exist")))
- .map(Column::getDataType)
+ this.schema.getColumn(name).orElseThrow(() -> new
HoodieValidationException("Field " + name + " does not exist")))
+ .map(SerializableSchema.Column::getDataType)
.collect(Collectors.toList());
String defaultParName =
conf.getString(FlinkOptions.PARTITION_DEFAULT_NAME);
boolean hivePartition =
conf.getBoolean(FlinkOptions.HIVE_STYLE_PARTITIONING);
@@ -348,11 +372,11 @@ public class HoodieTableSource implements
private List<MergeOnReadInputSplit> buildInputSplits() {
FileIndex fileIndex = getOrBuildFileIndex();
List<String> relPartitionPaths = fileIndex.getOrBuildPartitionPaths();
- if (relPartitionPaths.size() == 0) {
+ if (relPartitionPaths.isEmpty()) {
return Collections.emptyList();
}
List<StoragePathInfo> pathInfoList = fileIndex.getFilesInPartitions();
- if (pathInfoList.size() == 0) {
+ if (pathInfoList.isEmpty()) {
throw new HoodieException("No files found for reading in user provided
path.");
}
@@ -404,7 +428,7 @@ public class HoodieTableSource implements
switch (tableType) {
case MERGE_ON_READ:
final List<MergeOnReadInputSplit> inputSplits = buildInputSplits();
- if (inputSplits.size() == 0) {
+ if (inputSplits.isEmpty()) {
// When there is no input splits, just return an empty source.
LOG.warn("No input splits generate for MERGE_ON_READ input
format, returns empty collection instead");
return InputFormats.EMPTY_INPUT_FORMAT;
@@ -536,7 +560,7 @@ public class HoodieTableSource implements
private InputFormat<RowData, ?> baseFileOnlyInputFormat() {
final List<StoragePathInfo> pathInfoList = getReadFiles();
- if (pathInfoList.size() == 0) {
+ if (pathInfoList.isEmpty()) {
return InputFormats.EMPTY_INPUT_FORMAT;
}
@@ -560,7 +584,7 @@ public class HoodieTableSource implements
this.conf.getBoolean(FlinkOptions.HIVE_STYLE_PARTITIONING),
this.predicates,
this.limit == NO_LIMIT_CONSTANT ? Long.MAX_VALUE : this.limit, //
ParquetInputFormat always uses the limit value
- getParquetConf(this.conf, this.hadoopConf),
+ getParquetConf(this.conf, this.hadoopConf.unwrap()),
this.conf.getBoolean(FlinkOptions.READ_UTC_TIMEZONE),
this.internalSchemaManager
);
@@ -585,6 +609,20 @@ public class HoodieTableSource implements
return this.fileIndex;
}
+ private int[] getLookupKeys(int[][] keys) {
+ int[] keyIndices = new int[keys.length];
+ int i = 0;
+ for (int[] key : keys) {
+ if (key.length > 1) {
+ throw new UnsupportedOperationException(
+ "Hive lookup can not support nested key now.");
+ }
+ keyIndices[i] = key[0];
+ i++;
+ }
+ return keyIndices;
+ }
+
@VisibleForTesting
public Schema getTableAvroSchema() {
try {
@@ -623,7 +661,7 @@ public class HoodieTableSource implements
public List<StoragePathInfo> getReadFiles() {
FileIndex fileIndex = getOrBuildFileIndex();
List<String> relPartitionPaths = fileIndex.getOrBuildPartitionPaths();
- if (relPartitionPaths.size() == 0) {
+ if (relPartitionPaths.isEmpty()) {
return Collections.emptyList();
}
return fileIndex.getFilesInPartitions();
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/lookup/HoodieLookupFunction.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/lookup/HoodieLookupFunction.java
new file mode 100644
index 00000000000..3fc65ce12b4
--- /dev/null
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/lookup/HoodieLookupFunction.java
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.lookup;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.functions.FunctionContext;
+import org.apache.flink.table.functions.TableFunction;
+import org.apache.flink.table.runtime.typeutils.InternalSerializers;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Lookup function for filesystem connector tables.
+ *
+ * <p>Note: reference Flink FileSystemLookupFunction to avoid additional
connector jar dependencies.
+ */
+public class HoodieLookupFunction extends TableFunction<RowData> {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(HoodieLookupFunction.class);
+
+ // the max number of retries before throwing exception, in case of failure
to load the table
+ // into cache
+ private static final int MAX_RETRIES = 3;
+ // interval between retries
+ private static final Duration RETRY_INTERVAL = Duration.ofSeconds(10);
+
+ private final HoodieLookupTableReader partitionReader;
+ private final RowData.FieldGetter[] lookupFieldGetters;
+ private final Duration reloadInterval;
+ private final TypeSerializer<RowData> serializer;
+ private final RowType rowType;
+
+ // cache for lookup data
+ private transient Map<RowData, List<RowData>> cache;
+ // timestamp when cache expires
+ private transient long nextLoadTime;
+
+ public HoodieLookupFunction(
+ HoodieLookupTableReader partitionReader,
+ RowType rowType,
+ int[] lookupKeys,
+ Duration reloadInterval) {
+ this.partitionReader = partitionReader;
+ this.rowType = rowType;
+ this.lookupFieldGetters = new RowData.FieldGetter[lookupKeys.length];
+ for (int i = 0; i < lookupKeys.length; i++) {
+ lookupFieldGetters[i] =
+ RowData.createFieldGetter(rowType.getTypeAt(lookupKeys[i]),
lookupKeys[i]);
+ }
+ this.reloadInterval = reloadInterval;
+ this.serializer = InternalSerializers.create(rowType);
+ }
+
+ @Override
+ public void open(FunctionContext context) throws Exception {
+ super.open(context);
+ cache = new HashMap<>();
+ nextLoadTime = -1L;
+ }
+
+ @Override
+ public TypeInformation<RowData> getResultType() {
+ return InternalTypeInfo.of(rowType);
+ }
+
+ public void eval(Object... values) {
+ checkCacheReload();
+ RowData lookupKey = GenericRowData.of(values);
+ List<RowData> matchedRows = cache.get(lookupKey);
+ if (matchedRows != null) {
+ for (RowData matchedRow : matchedRows) {
+ collect(matchedRow);
+ }
+ }
+ }
+
+ private void checkCacheReload() {
+ if (nextLoadTime > System.currentTimeMillis()) {
+ return;
+ }
+ if (nextLoadTime > 0) {
+ LOG.info(
+ "Lookup join cache has expired after {} minute(s), reloading",
+ reloadInterval.toMinutes());
+ } else {
+ LOG.info("Populating lookup join cache");
+ }
+ int numRetry = 0;
+ while (true) {
+ cache.clear();
+ try {
+ long count = 0;
+ GenericRowData reuse = new GenericRowData(rowType.getFieldCount());
+ partitionReader.open();
+ RowData row;
+ while ((row = partitionReader.read(reuse)) != null) {
+ count++;
+ RowData rowData = serializer.copy(row);
+ RowData key = extractLookupKey(rowData);
+ List<RowData> rows = cache.computeIfAbsent(key, k -> new
ArrayList<>());
+ rows.add(rowData);
+ }
+ partitionReader.close();
+ nextLoadTime = System.currentTimeMillis() + reloadInterval.toMillis();
+ LOG.info("Loaded {} row(s) into lookup join cache", count);
+ return;
+ } catch (Exception e) {
+ if (numRetry >= MAX_RETRIES) {
+ throw new FlinkRuntimeException(
+ String.format(
+ "Failed to load table into cache after %d retries",
numRetry),
+ e);
+ }
+ numRetry++;
+ long toSleep = numRetry * RETRY_INTERVAL.toMillis();
+ LOG.warn(
+ String.format(
+ "Failed to load table into cache, will retry in %d seconds",
+ toSleep / 1000),
+ e);
+ try {
+ Thread.sleep(toSleep);
+ } catch (InterruptedException ex) {
+ LOG.warn("Interrupted while waiting to retry failed cache load,
aborting");
+ throw new FlinkRuntimeException(ex);
+ }
+ }
+ }
+ }
+
+ private RowData extractLookupKey(RowData row) {
+ GenericRowData key = new GenericRowData(lookupFieldGetters.length);
+ for (int i = 0; i < lookupFieldGetters.length; i++) {
+ key.setField(i, lookupFieldGetters[i].getFieldOrNull(row));
+ }
+ return key;
+ }
+
+ @Override
+ public void close() throws Exception {
+ // no operation
+ }
+
+ @VisibleForTesting
+ public Duration getReloadInterval() {
+ return reloadInterval;
+ }
+}
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/lookup/HoodieLookupTableReader.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/lookup/HoodieLookupTableReader.java
new file mode 100644
index 00000000000..0460cd42691
--- /dev/null
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/lookup/HoodieLookupTableReader.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.lookup;
+
+import org.apache.hudi.common.function.SerializableSupplier;
+
+import org.apache.flink.api.common.io.InputFormat;
+import org.apache.flink.api.common.io.RichInputFormat;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.io.InputSplit;
+import org.apache.flink.table.data.RowData;
+import org.jetbrains.annotations.Nullable;
+
+import java.io.IOException;
+import java.io.Serializable;
+
+/**
+ * Hudi look up table reader.
+ */
+public class HoodieLookupTableReader implements Serializable {
+ private static final long serialVersionUID = 1L;
+
+ private final SerializableSupplier<InputFormat<RowData, ?>>
inputFormatSupplier;
+ private final Configuration conf;
+
+ private InputFormat inputFormat;
+
+ public HoodieLookupTableReader(SerializableSupplier<InputFormat<RowData, ?>>
inputFormatSupplier, Configuration conf) {
+ this.inputFormatSupplier = inputFormatSupplier;
+ this.conf = conf;
+ }
+
+ public void open() throws IOException {
+ this.inputFormat = inputFormatSupplier.get();
+ inputFormat.configure(conf);
+ InputSplit[] inputSplits = inputFormat.createInputSplits(1);
+ ((RichInputFormat) inputFormat).openInputFormat();
+ inputFormat.open(inputSplits[0]);
+ }
+
+ @Nullable
+ public RowData read(RowData reuse) throws IOException {
+ if (!inputFormat.reachedEnd()) {
+ return (RowData) inputFormat.nextRecord(reuse);
+ }
+ return null;
+ }
+
+ public void close() throws IOException {
+ if (this.inputFormat != null) {
+ inputFormat.close();
+ }
+ if (inputFormat instanceof RichInputFormat) {
+ ((RichInputFormat) inputFormat).closeInputFormat();
+ }
+ }
+}
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/SerializableSchema.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/SerializableSchema.java
new file mode 100644
index 00000000000..626a238178c
--- /dev/null
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/SerializableSchema.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.util;
+
+import org.apache.hudi.common.util.Option;
+
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.utils.DataTypeUtils;
+
+import java.io.Serializable;
+import java.util.List;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.table.api.DataTypes.ROW;
+
+/**
+ * A serializable substitute for {@code ResolvedSchema}.
+ */
+public class SerializableSchema implements Serializable {
+ private static final long serialVersionUID = 1L;
+ private final List<Column> columns;
+
+ private SerializableSchema(List<Column> columns) {
+ this.columns = columns;
+ }
+
+ public static SerializableSchema create(ResolvedSchema resolvedSchema) {
+ List<Column> columns = resolvedSchema.getColumns().stream()
+ .filter(org.apache.flink.table.catalog.Column::isPhysical)
+ .map(column -> Column.create(column.getName(), column.getDataType()))
+ .collect(Collectors.toList());
+ return new SerializableSchema(columns);
+ }
+
+ public List<String> getColumnNames() {
+ return
this.columns.stream().map(Column::getName).collect(Collectors.toList());
+ }
+
+ public List<DataType> getColumnDataTypes() {
+ return
this.columns.stream().map(Column::getDataType).collect(Collectors.toList());
+ }
+
+ public Option<Column> getColumn(String columnName) {
+ return Option.fromJavaOptional(this.columns.stream().filter((col) ->
col.getName().equals(columnName)).findFirst());
+ }
+
+ public DataType toSourceRowDataType() {
+ return this.toRowDataType((c) -> true);
+ }
+
+ private DataType toRowDataType(Predicate<Column> predicate) {
+ final DataTypes.Field[] fieldsArray = columns.stream().filter(predicate)
+ .map(SerializableSchema::columnToField)
+ .toArray(DataTypes.Field[]::new);
+ // the row should never be null
+ return ROW(fieldsArray).notNull();
+ }
+
+ private static DataTypes.Field columnToField(Column column) {
+ return DataTypes.FIELD(column.getName(),
DataTypeUtils.removeTimeAttribute(column.getDataType()));
+ }
+
+ // -------------------------------------------------------------------------
+ // Utilities
+ // -------------------------------------------------------------------------
+ public static class Column implements Serializable {
+ private static final long serialVersionUID = 1L;
+
+ private final String name;
+ private final DataType dataType;
+
+ private Column(String name, DataType dataType) {
+ this.name = name;
+ this.dataType = dataType;
+ }
+
+ public static Column create(String name, DataType type) {
+ return new Column(name, type);
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public DataType getDataType() {
+ return dataType;
+ }
+ }
+}
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestFileIndex.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestFileIndex.java
index 8ed8a391010..20a5cb34fdf 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestFileIndex.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestFileIndex.java
@@ -23,6 +23,7 @@ import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.keygen.NonpartitionedAvroKeyGenerator;
import org.apache.hudi.source.prune.DataPruner;
+import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.storage.StoragePathInfo;
import org.apache.hudi.utils.TestConfigurations;
import org.apache.hudi.utils.TestData;
@@ -37,7 +38,6 @@ import
org.apache.flink.table.expressions.FieldReferenceExpression;
import org.apache.flink.table.expressions.ValueLiteralExpression;
import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
import org.apache.flink.table.functions.FunctionIdentifier;
-import org.apache.hadoop.fs.Path;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import org.junit.jupiter.params.ParameterizedTest;
@@ -75,7 +75,7 @@ public class TestFileIndex {
conf.setBoolean(METADATA_ENABLED, true);
conf.setBoolean(HIVE_STYLE_PARTITIONING, hiveStylePartitioning);
TestData.writeData(TestData.DATA_SET_INSERT, conf);
- FileIndex fileIndex = FileIndex.builder().path(new
Path(tempFile.getAbsolutePath())).conf(conf)
+ FileIndex fileIndex = FileIndex.builder().path(new
StoragePath(tempFile.getAbsolutePath())).conf(conf)
.rowType(TestConfigurations.ROW_TYPE).build();
List<String> partitionKeys = Collections.singletonList("partition");
List<Map<String, String>> partitions =
@@ -99,7 +99,7 @@ public class TestFileIndex {
conf.setString(KEYGEN_CLASS_NAME,
NonpartitionedAvroKeyGenerator.class.getName());
conf.setBoolean(METADATA_ENABLED, true);
TestData.writeData(TestData.DATA_SET_INSERT, conf);
- FileIndex fileIndex = FileIndex.builder().path(new
Path(tempFile.getAbsolutePath())).conf(conf)
+ FileIndex fileIndex = FileIndex.builder().path(new
StoragePath(tempFile.getAbsolutePath())).conf(conf)
.rowType(TestConfigurations.ROW_TYPE).build();
List<String> partitionKeys = Collections.singletonList("");
List<Map<String, String>> partitions =
@@ -117,7 +117,7 @@ public class TestFileIndex {
void testFileListingEmptyTable(boolean enableMetadata) {
Configuration conf =
TestConfigurations.getDefaultConf(tempFile.getAbsolutePath());
conf.setBoolean(METADATA_ENABLED, enableMetadata);
- FileIndex fileIndex = FileIndex.builder().path(new
Path(tempFile.getAbsolutePath())).conf(conf)
+ FileIndex fileIndex = FileIndex.builder().path(new
StoragePath(tempFile.getAbsolutePath())).conf(conf)
.rowType(TestConfigurations.ROW_TYPE).build();
List<String> partitionKeys = Collections.singletonList("partition");
List<Map<String, String>> partitions =
@@ -140,7 +140,7 @@ public class TestFileIndex {
FileIndex fileIndex =
FileIndex.builder()
- .path(new Path(tempFile.getAbsolutePath()))
+ .path(new StoragePath(tempFile.getAbsolutePath()))
.conf(conf).rowType(TestConfigurations.ROW_TYPE_BIGINT)
.dataPruner(DataPruner.newInstance(Collections.singletonList(new
CallExpression(
FunctionIdentifier.of("greaterThan"),
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
index dc253587d5b..78211e34a7f 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
@@ -80,8 +80,8 @@ import java.util.stream.Stream;
import static org.apache.hudi.utils.TestConfigurations.catalog;
import static org.apache.hudi.utils.TestConfigurations.sql;
import static org.apache.hudi.utils.TestData.array;
-import static org.apache.hudi.utils.TestData.assertRowsEqualsUnordered;
import static org.apache.hudi.utils.TestData.assertRowsEquals;
+import static org.apache.hudi.utils.TestData.assertRowsEqualsUnordered;
import static org.apache.hudi.utils.TestData.map;
import static org.apache.hudi.utils.TestData.row;
import static org.junit.jupiter.api.Assertions.assertFalse;
@@ -668,6 +668,38 @@ public class ITTestHoodieDataSource {
assertRowsEquals(result2, TestData.DATA_SET_SOURCE_MERGED);
}
+ @ParameterizedTest
+ @EnumSource(value = HoodieTableType.class)
+ void testLookupJoin(HoodieTableType tableType) {
+ TableEnvironment tableEnv = streamTableEnv;
+ String hoodieTableDDL = sql("t1")
+ .option(FlinkOptions.PATH, tempFile.getAbsolutePath())
+ .option(FlinkOptions.TABLE_NAME, tableType)
+ .end();
+ tableEnv.executeSql(hoodieTableDDL);
+
+ String hoodieTableDDL2 = sql("t2")
+ .option(FlinkOptions.PATH, tempFile.getAbsolutePath())
+ .option(FlinkOptions.TABLE_NAME, tableType)
+ .end();
+ tableEnv.executeSql(hoodieTableDDL2);
+
+ execInsertSql(tableEnv, TestSQL.INSERT_T1);
+
+ tableEnv.executeSql("create view t1_view as select *,"
+ + "PROCTIME() as proc_time from t1");
+
+ // Join two hudi tables with the same data
+ String sql = "insert into t2 select b.* from t1_view o "
+ + " join t1/*+ OPTIONS('lookup.join.cache.ttl'= '2 day') */ "
+ + " FOR SYSTEM_TIME AS OF o.proc_time AS b on o.uuid = b.uuid";
+ execInsertSql(tableEnv, sql);
+ List<Row> result = CollectionUtil.iterableToList(
+ () -> tableEnv.sqlQuery("select * from t2").execute().collect());
+
+ assertRowsEquals(result, TestData.DATA_SET_SOURCE_INSERT);
+ }
+
@ParameterizedTest
@EnumSource(value = ExecMode.class)
void testWriteAndReadParMiddle(ExecMode execMode) throws Exception {
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableSource.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableSource.java
index 1d9db480d38..1e3550aee27 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableSource.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableSource.java
@@ -23,8 +23,10 @@ import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.source.ExpressionPredicates;
import org.apache.hudi.source.prune.DataPruner;
import org.apache.hudi.source.prune.PrimaryKeyPruners;
+import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.storage.StoragePathInfo;
import org.apache.hudi.table.format.mor.MergeOnReadInputFormat;
+import org.apache.hudi.util.SerializableSchema;
import org.apache.hudi.utils.TestConfigurations;
import org.apache.hudi.utils.TestData;
@@ -117,8 +119,8 @@ public class TestHoodieTableSource {
TestData.writeData(TestData.DATA_SET_INSERT, conf);
HoodieTableSource tableSource = new HoodieTableSource(
- TestConfigurations.TABLE_SCHEMA,
- new Path(tempFile.getPath()),
+ SerializableSchema.create(TestConfigurations.TABLE_SCHEMA),
+ new StoragePath(tempFile.getPath()),
Arrays.asList(conf.getString(FlinkOptions.PARTITION_PATH_FIELD).split(",")),
"default-par",
conf);
@@ -334,8 +336,8 @@ public class TestHoodieTableSource {
private HoodieTableSource createHoodieTableSource(Configuration conf) {
return new HoodieTableSource(
- TestConfigurations.TABLE_SCHEMA,
- new Path(conf.getString(FlinkOptions.PATH)),
+ SerializableSchema.create(TestConfigurations.TABLE_SCHEMA),
+ new StoragePath(conf.getString(FlinkOptions.PATH)),
Arrays.asList(conf.getString(FlinkOptions.PARTITION_PATH_FIELD).split(",")),
"default-par",
conf);
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java
index eadf4ffb26c..00ed50b7514 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java
@@ -33,6 +33,7 @@ import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.configuration.HadoopConfigurations;
import org.apache.hudi.source.IncrementalInputSplits;
import org.apache.hudi.source.prune.PartitionPruners;
+import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.storage.hadoop.HadoopStorageConfiguration;
import org.apache.hudi.table.HoodieTableSource;
import org.apache.hudi.table.format.cdc.CdcInputFormat;
@@ -41,6 +42,7 @@ import
org.apache.hudi.table.format.mor.MergeOnReadInputFormat;
import org.apache.hudi.table.format.mor.MergeOnReadInputSplit;
import org.apache.hudi.util.AvroSchemaConverter;
import org.apache.hudi.util.FlinkWriteClients;
+import org.apache.hudi.util.SerializableSchema;
import org.apache.hudi.util.StreamerUtil;
import org.apache.hudi.utils.TestConfigurations;
import org.apache.hudi.utils.TestData;
@@ -55,7 +57,6 @@ import org.apache.flink.table.expressions.CallExpression;
import org.apache.flink.table.expressions.FieldReferenceExpression;
import org.apache.flink.table.expressions.ValueLiteralExpression;
import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
-import org.apache.hadoop.fs.Path;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import org.junit.jupiter.params.ParameterizedTest;
@@ -1182,8 +1183,8 @@ public class TestInputFormat {
private HoodieTableSource getTableSource(Configuration conf) {
return new HoodieTableSource(
- TestConfigurations.TABLE_SCHEMA,
- new Path(tempFile.getAbsolutePath()),
+ SerializableSchema.create(TestConfigurations.TABLE_SCHEMA),
+ new StoragePath(tempFile.getAbsolutePath()),
Collections.singletonList("partition"),
"default",
conf);