This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 5de762f6b2bc feat(flink): Support data skipping based on partitioned
RLI (#19006)
5de762f6b2bc is described below
commit 5de762f6b2bc83a09fb82bade67554165f4d3392
Author: Shuo Cheng <[email protected]>
AuthorDate: Thu Jun 18 14:23:18 2026 +0800
feat(flink): Support data skipping based on partitioned RLI (#19006)
---
.../java/org/apache/hudi/source/FileIndex.java | 8 +-
...rdLevelIndex.java => BaseRecordLevelIndex.java} | 64 +++--
.../hudi/source/stats/GlobalRecordLevelIndex.java | 54 ++++
.../apache/hudi/source/stats/RecordLevelIndex.java | 310 +++++++--------------
.../java/org/apache/hudi/source/TestFileIndex.java | 67 +++++
.../hudi/source/stats/TestRecordLevelIndex.java | 221 ++++++++++++++-
.../apache/hudi/table/ITTestHoodieDataSource.java | 48 ++++
.../test/java/org/apache/hudi/utils/TestData.java | 11 +
8 files changed, 534 insertions(+), 249 deletions(-)
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/FileIndex.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/FileIndex.java
index 75d00ff136e8..23aa83b3ba40 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/FileIndex.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/FileIndex.java
@@ -29,8 +29,8 @@ import org.apache.hudi.configuration.HadoopConfigurations;
import org.apache.hudi.index.bucket.BucketIdentifier;
import org.apache.hudi.source.prune.ColumnStatsProbe;
import org.apache.hudi.source.prune.PartitionPruners;
+import org.apache.hudi.source.stats.BaseRecordLevelIndex;
import org.apache.hudi.source.stats.FileStatsIndex;
-import org.apache.hudi.source.stats.RecordLevelIndex;
import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.storage.StoragePathInfo;
import org.apache.hudi.util.StreamerUtil;
@@ -73,7 +73,7 @@ public class FileIndex implements Serializable, AutoCloseable
{
private final Function<String, Integer> partitionBucketIdFunc; // for
bucket pruning
private List<String> partitionPaths; // cache of
partition paths
private final FileStatsIndex fileStatsIndex; // for data
skipping
- private final Option<RecordLevelIndex> recordLevelIndex;
+ private final Option<BaseRecordLevelIndex> recordLevelIndex;
private final HoodieTableMetaClient metaClient;
private FileIndex(
@@ -93,7 +93,7 @@ public class FileIndex implements Serializable, AutoCloseable
{
this.fileStatsIndex = new FileStatsIndex(path.toString(), rowType, conf,
metaClient);
this.partitionBucketIdFunc = partitionBucketIdFunc;
List<ExpressionEvaluators.Evaluator> evaluators =
Option.ofNullable(colStatsProbe).map(ColumnStatsProbe::getEvaluators).orElse(Collections.emptyList());
- this.recordLevelIndex = RecordLevelIndex.create(path.toString(), conf,
metaClient, evaluators, rowType);
+ this.recordLevelIndex = BaseRecordLevelIndex.create(path.toString(), conf,
metaClient, evaluators, rowType);
this.metaClient = metaClient;
}
@@ -288,7 +288,7 @@ public class FileIndex implements Serializable,
AutoCloseable {
@Override
public void close() {
this.fileStatsIndex.close();
- this.recordLevelIndex.ifPresent(RecordLevelIndex::close);
+ this.recordLevelIndex.ifPresent(BaseRecordLevelIndex::close);
this.partitionPruner.ifPresent(PartitionPruners.PartitionPruner::close);
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/stats/RecordLevelIndex.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/stats/BaseRecordLevelIndex.java
similarity index 84%
copy from
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/stats/RecordLevelIndex.java
copy to
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/stats/BaseRecordLevelIndex.java
index 67e20e9e2282..beb76b02f525 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/stats/RecordLevelIndex.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/stats/BaseRecordLevelIndex.java
@@ -19,9 +19,10 @@
package org.apache.hudi.source.stats;
import org.apache.hudi.client.common.HoodieFlinkEngineContext;
-import org.apache.hudi.common.data.HoodieListData;
import org.apache.hudi.common.data.HoodiePairData;
import org.apache.hudi.common.model.FileSlice;
+import org.apache.hudi.common.model.HoodieFileGroupId;
+import org.apache.hudi.common.model.HoodieIndexDefinition;
import org.apache.hudi.common.model.HoodieRecordGlobalLocation;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.util.HoodieDataUtils;
@@ -31,6 +32,7 @@ import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.configuration.OptionsResolver;
import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.index.record.HoodieRecordIndex;
import org.apache.hudi.keygen.KeyGenUtils;
import org.apache.hudi.keygen.KeyGenerator;
import org.apache.hudi.metadata.HoodieTableMetadata;
@@ -45,8 +47,6 @@ import org.apache.flink.table.data.TimestampData;
import org.apache.flink.table.types.logical.DecimalType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.math.BigDecimal;
import java.util.ArrayList;
@@ -58,20 +58,20 @@ import java.util.Set;
import java.util.stream.Collectors;
/**
- * An index support implementation that leverages Record Level Index to prune
file slices.
+ * Base index support that leverages Record Level Index to prune file slices.
*/
@Slf4j
-public class RecordLevelIndex implements FlinkMetadataIndex {
+public abstract class BaseRecordLevelIndex implements FlinkMetadataIndex {
private static final long serialVersionUID = 1L;
- private static final Logger LOG =
LoggerFactory.getLogger(RecordLevelIndex.class);
private final String basePath;
- private final Configuration conf;
- private final List<String> hoodieKeysFromFilter;
+ protected final Configuration conf;
+ protected final List<String> hoodieKeysFromFilter;
private final HoodieTableMetaClient metaClient;
private HoodieTableMetadata metadataTable;
- private RecordLevelIndex(
+ @VisibleForTesting
+ BaseRecordLevelIndex(
String basePath,
Configuration conf,
HoodieTableMetaClient metaClient,
@@ -109,24 +109,32 @@ public class RecordLevelIndex implements
FlinkMetadataIndex {
if (!isIndexAvailable()) {
return fileSlices;
}
- HoodiePairData<String, HoodieRecordGlobalLocation> recordIndexData = null;
+
try {
- recordIndexData =
-
getMetadataTable().readRecordIndexLocationsWithKeys(HoodieListData.eager(hoodieKeysFromFilter));
- List<Pair<String, HoodieRecordGlobalLocation>> recordIndexLocations =
HoodieDataUtils.dedupeAndCollectAsList(recordIndexData);
- Set<String> fileIds = recordIndexLocations.stream()
- .map(pair ->
pair.getValue().getFileId()).collect(Collectors.toSet());
- return fileSlices.stream().filter(fileSlice ->
fileIds.contains(fileSlice.getFileId())).collect(Collectors.toList());
+ Option<Set<HoodieFileGroupId>> candidateFileGroupIds =
+ lookupCandidateFileGroupIds(fileSlices);
+ return candidateFileGroupIds.map(candidates -> fileSlices.stream()
+ .filter(fileSlice -> candidates.contains(fileSlice.getFileGroupId()))
+ .collect(Collectors.toList()))
+ .orElse(fileSlices);
} catch (Throwable e) {
log.error("Failed to read metadata index: {} for data skipping",
getIndexPartitionName(), e);
return fileSlices;
- } finally {
- // Clean up the RDD to avoid memory leaks
-
Option.ofNullable(recordIndexData).ifPresent(HoodiePairData::unpersistWithDependencies);
}
}
- public static Option<RecordLevelIndex> create(
+ protected abstract Option<Set<HoodieFileGroupId>>
lookupCandidateFileGroupIds(List<FileSlice> fileSlices);
+
+ protected static Set<HoodieFileGroupId> getFileGroupIds(
+ HoodiePairData<String, HoodieRecordGlobalLocation> recordIndexData) {
+ List<Pair<String, HoodieRecordGlobalLocation>> recordIndexLocations =
+ HoodieDataUtils.dedupeAndCollectAsList(recordIndexData);
+ return recordIndexLocations.stream()
+ .map(pair -> new HoodieFileGroupId(pair.getValue().getPartitionPath(),
pair.getValue().getFileId()))
+ .collect(Collectors.toSet());
+ }
+
+ public static Option<BaseRecordLevelIndex> create(
String basePath,
Configuration conf,
HoodieTableMetaClient metaClient,
@@ -145,22 +153,28 @@ public class RecordLevelIndex implements
FlinkMetadataIndex {
String[] recordKeyFields =
metaClient.getTableConfig().getRecordKeyFields().orElse(new String[0]);
if (recordKeyFields.length == 0) {
- LOG.warn("The table do not have record keys, skipping the rli pruning.");
+ log.warn("The table do not have record keys, skipping the rli pruning.");
return Option.empty();
}
boolean consistentLogicalTimestampEnabled =
OptionsResolver.isConsistentLogicalTimestampEnabled(conf);
List<String> hoodieKeysFromFilter = computeHoodieKeyFromFilters(conf,
metaClient, evaluators, recordKeyFields, rowType,
consistentLogicalTimestampEnabled);
if (hoodieKeysFromFilter.isEmpty()) {
- LOG.warn("The number of keys from query predicate is empty, skipping the
rli pruning.");
+ log.warn("The number of keys from query predicate is empty, skipping the
rli pruning.");
return Option.empty();
}
int maxKeyNum = conf.get(FlinkOptions.READ_DATA_SKIPPING_RLI_KEYS_MAX_NUM);
if (hoodieKeysFromFilter.size() > maxKeyNum) {
- LOG.warn("The number of keys from query predicate: {} exceeds the upper
threshold: {}, skipping the rli pruning, the keys: {}",
+ log.warn("The number of keys from query predicate: {} exceeds the upper
threshold: {}, skipping the rli pruning, the keys: {}",
hoodieKeysFromFilter.size(), maxKeyNum, hoodieKeysFromFilter);
return Option.empty();
}
- return Option.of(new RecordLevelIndex(basePath, conf, metaClient,
hoodieKeysFromFilter));
+ HoodieIndexDefinition indexDefinition =
metaClient.getIndexForMetadataPartition(HoodieTableMetadataUtil.PARTITION_NAME_RECORD_INDEX).orElse(null);
+ if (indexDefinition == null) {
+ return Option.empty();
+ }
+ return Option.of(HoodieRecordIndex.isPartitioned(indexDefinition)
+ ? new RecordLevelIndex(basePath, conf, metaClient,
hoodieKeysFromFilter)
+ : new GlobalRecordLevelIndex(basePath, conf, metaClient,
hoodieKeysFromFilter));
}
/**
@@ -196,7 +210,7 @@ public class RecordLevelIndex implements FlinkMetadataIndex
{
: normalizeLiteral(val, keyField, fieldType,
consistentLogicalTimestampEnabled)));
}
if (recordKeys.isEmpty()) {
- LOG.info("No literals found for the record key: {}, therefore
filtering can not be performed", keyField);
+ log.info("No literals found for the record key: {}, therefore
filtering can not be performed", keyField);
return Collections.emptyList();
} else if (!isComplexRecordKey || hoodieKeys.isEmpty()) {
hoodieKeys = recordKeys;
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/stats/GlobalRecordLevelIndex.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/stats/GlobalRecordLevelIndex.java
new file mode 100644
index 000000000000..9f93f8757e2c
--- /dev/null
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/stats/GlobalRecordLevelIndex.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.source.stats;
+
+import org.apache.hudi.common.data.HoodieListData;
+import org.apache.hudi.common.data.HoodiePairData;
+import org.apache.hudi.common.model.FileSlice;
+import org.apache.hudi.common.model.HoodieFileGroupId;
+import org.apache.hudi.common.model.HoodieRecordGlobalLocation;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.util.Option;
+
+import org.apache.flink.configuration.Configuration;
+
+import java.util.List;
+import java.util.Set;
+
+/**
+ * An index support implementation that leverages global Record Level Index to
prune file slices.
+ */
+public class GlobalRecordLevelIndex extends BaseRecordLevelIndex {
+ private static final long serialVersionUID = 1L;
+
+ GlobalRecordLevelIndex(
+ String basePath,
+ Configuration conf,
+ HoodieTableMetaClient metaClient,
+ List<String> hoodieKeysFromFilter) {
+ super(basePath, conf, metaClient, hoodieKeysFromFilter);
+ }
+
+ @Override
+ protected Option<Set<HoodieFileGroupId>>
lookupCandidateFileGroupIds(List<FileSlice> fileSlices) {
+ HoodiePairData<String, HoodieRecordGlobalLocation> recordIndexData =
+
getMetadataTable().readRecordIndexLocationsWithKeys(HoodieListData.eager(hoodieKeysFromFilter));
+ return Option.of(getFileGroupIds(recordIndexData));
+ }
+}
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/stats/RecordLevelIndex.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/stats/RecordLevelIndex.java
index 67e20e9e2282..9263aaf5f9ac 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/stats/RecordLevelIndex.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/stats/RecordLevelIndex.java
@@ -22,262 +22,152 @@ import
org.apache.hudi.client.common.HoodieFlinkEngineContext;
import org.apache.hudi.common.data.HoodieListData;
import org.apache.hudi.common.data.HoodiePairData;
import org.apache.hudi.common.model.FileSlice;
+import org.apache.hudi.common.model.HoodieFileGroupId;
import org.apache.hudi.common.model.HoodieRecordGlobalLocation;
import org.apache.hudi.common.table.HoodieTableMetaClient;
-import org.apache.hudi.common.util.HoodieDataUtils;
import org.apache.hudi.common.util.Option;
-import org.apache.hudi.common.util.VisibleForTesting;
-import org.apache.hudi.common.util.collection.Pair;
-import org.apache.hudi.configuration.FlinkOptions;
-import org.apache.hudi.configuration.OptionsResolver;
-import org.apache.hudi.exception.HoodieException;
-import org.apache.hudi.keygen.KeyGenUtils;
-import org.apache.hudi.keygen.KeyGenerator;
+import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.metadata.HoodieTableMetadata;
import org.apache.hudi.metadata.HoodieTableMetadataUtil;
-import org.apache.hudi.sink.bulk.RowDataKeyGen;
-import org.apache.hudi.source.ExpressionEvaluators;
-import org.apache.hudi.util.StreamerUtil;
+import org.apache.hudi.metadata.MetadataPartitionType;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.table.data.TimestampData;
-import org.apache.flink.table.types.logical.DecimalType;
-import org.apache.flink.table.types.logical.LogicalType;
-import org.apache.flink.table.types.logical.RowType;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import java.math.BigDecimal;
import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
+import java.util.HashSet;
import java.util.List;
-import java.util.Objects;
+import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
/**
- * An index support implementation that leverages Record Level Index to prune
file slices.
+ * An index support implementation that leverages partitioned Record Level
Index to prune file slices.
+ *
+ * <p>Unlike the global RLI, record keys in partitioned RLI are only unique
within a data-table partition.
+ * This implementation therefore scopes metadata-table lookups by candidate
data partition. To avoid reading
+ * every RLI shard for each candidate partition, it first groups
filter-derived record keys by the shard they
+ * hash to within each partition, then performs one metadata lookup per {@code
(partition, shard)} group.
+ *
+ * <p>The grouped lookups use bounded local parallelism through {@link
HoodieFlinkEngineContext#map(List,
+ * org.apache.hudi.common.function.SerializableFunction, int)}. This is
intentionally not implemented with
+ * {@code HoodieFlinkEngineContext#parallelize}, because the Flink context
backs {@code parallelize} with
+ * in-memory {@code HoodieListData}, not distributed Flink tasks.
*/
@Slf4j
-public class RecordLevelIndex implements FlinkMetadataIndex {
+public class RecordLevelIndex extends BaseRecordLevelIndex {
private static final long serialVersionUID = 1L;
- private static final Logger LOG =
LoggerFactory.getLogger(RecordLevelIndex.class);
- private final String basePath;
- private final Configuration conf;
- private final List<String> hoodieKeysFromFilter;
- private final HoodieTableMetaClient metaClient;
- private HoodieTableMetadata metadataTable;
+ /**
+ * Upper bound on the number of candidate data-table partitions eligible for
a partitioned RLI lookup.
+ *
+ * <p>Unlike the global RLI (a single lookup over all keys), the partitioned
variant performs one metadata-table
+ * read per candidate partition. When a query does not filter on the
partition column the candidate set can span
+ * many partitions, and fanning out a lookup to each one can add latency
that outweighs the skipping benefit.
+ * Once the candidate partition count exceeds this threshold, pruning is
skipped.
+ */
+ private static final int MAX_PARTITIONS = 3;
- private RecordLevelIndex(
+ RecordLevelIndex(
String basePath,
Configuration conf,
HoodieTableMetaClient metaClient,
List<String> hoodieKeysFromFilter) {
- this.basePath = basePath;
- this.conf = conf;
- this.metaClient = metaClient;
- this.hoodieKeysFromFilter = hoodieKeysFromFilter;
- }
-
- @Override
- public String getIndexPartitionName() {
- return HoodieTableMetadataUtil.PARTITION_NAME_RECORD_INDEX;
+ super(basePath, conf, metaClient, hoodieKeysFromFilter);
}
+ /**
+ * Finds candidate data file groups by querying partitioned RLI for the
record keys extracted from filters.
+ *
+ * <p>The method fails open through {@link
BaseRecordLevelIndex#computeCandidateFileSlices(List)} if metadata
+ * lookup fails. Returning {@link Option#empty()} is reserved for cases
where pruning should be skipped without
+ * treating it as an error, such as exceeding the configured candidate
partition threshold.
+ */
@Override
- public boolean isIndexAvailable() {
- return metaClient.getTableConfig().isMetadataTableAvailable()
- &&
metaClient.getTableConfig().getMetadataPartitions().contains(HoodieTableMetadataUtil.PARTITION_NAME_RECORD_INDEX);
- }
-
- public HoodieTableMetadata getMetadataTable() {
- // initialize the metadata table lazily
- if (this.metadataTable == null) {
- this.metadataTable =
metaClient.getTableFormat().getMetadataFactory().create(
- HoodieFlinkEngineContext.DEFAULT,
- metaClient.getStorage(),
- StreamerUtil.metadataConfig(conf),
- basePath);
- }
- return this.metadataTable;
- }
-
- public List<FileSlice> computeCandidateFileSlices(List<FileSlice>
fileSlices) {
- if (!isIndexAvailable()) {
- return fileSlices;
- }
- HoodiePairData<String, HoodieRecordGlobalLocation> recordIndexData = null;
- try {
- recordIndexData =
-
getMetadataTable().readRecordIndexLocationsWithKeys(HoodieListData.eager(hoodieKeysFromFilter));
- List<Pair<String, HoodieRecordGlobalLocation>> recordIndexLocations =
HoodieDataUtils.dedupeAndCollectAsList(recordIndexData);
- Set<String> fileIds = recordIndexLocations.stream()
- .map(pair ->
pair.getValue().getFileId()).collect(Collectors.toSet());
- return fileSlices.stream().filter(fileSlice ->
fileIds.contains(fileSlice.getFileId())).collect(Collectors.toList());
- } catch (Throwable e) {
- log.error("Failed to read metadata index: {} for data skipping",
getIndexPartitionName(), e);
- return fileSlices;
- } finally {
- // Clean up the RDD to avoid memory leaks
-
Option.ofNullable(recordIndexData).ifPresent(HoodiePairData::unpersistWithDependencies);
- }
- }
-
- public static Option<RecordLevelIndex> create(
- String basePath,
- Configuration conf,
- HoodieTableMetaClient metaClient,
- List<ExpressionEvaluators.Evaluator> evaluators,
- RowType rowType) {
- if (evaluators.isEmpty() ||
!FlinkOptions.QUERY_TYPE_SNAPSHOT.equalsIgnoreCase(conf.get(FlinkOptions.QUERY_TYPE)))
{
+ protected Option<Set<HoodieFileGroupId>>
lookupCandidateFileGroupIds(List<FileSlice> fileSlices) {
+ Set<String> partitions = fileSlices.stream()
+ .map(FileSlice::getPartitionPath)
+ .collect(Collectors.toSet());
+ if (partitions.isEmpty()) {
return Option.empty();
}
- if (metaClient == null) {
- metaClient = StreamerUtil.createMetaClient(conf);
- }
- // disallow RLI for new encoding with complex key gen when the table
version is lower than NINE.
- if
(KeyGenUtils.mayUseNewEncodingForComplexKeyGen(metaClient.getTableConfig())) {
+ if (partitions.size() > MAX_PARTITIONS) {
+ log.info("The number of candidate partitions {} exceeds the partitioned
record level index lookup threshold {}. Skipping pruning.",
+ partitions.size(), MAX_PARTITIONS);
return Option.empty();
}
- String[] recordKeyFields =
metaClient.getTableConfig().getRecordKeyFields().orElse(new String[0]);
- if (recordKeyFields.length == 0) {
- LOG.warn("The table do not have record keys, skipping the rli pruning.");
- return Option.empty();
- }
- boolean consistentLogicalTimestampEnabled =
OptionsResolver.isConsistentLogicalTimestampEnabled(conf);
- List<String> hoodieKeysFromFilter = computeHoodieKeyFromFilters(conf,
metaClient, evaluators, recordKeyFields, rowType,
consistentLogicalTimestampEnabled);
- if (hoodieKeysFromFilter.isEmpty()) {
- LOG.warn("The number of keys from query predicate is empty, skipping the
rli pruning.");
- return Option.empty();
- }
- int maxKeyNum = conf.get(FlinkOptions.READ_DATA_SKIPPING_RLI_KEYS_MAX_NUM);
- if (hoodieKeysFromFilter.size() > maxKeyNum) {
- LOG.warn("The number of keys from query predicate: {} exceeds the upper
threshold: {}, skipping the rli pruning, the keys: {}",
- hoodieKeysFromFilter.size(), maxKeyNum, hoodieKeysFromFilter);
- return Option.empty();
- }
- return Option.of(new RecordLevelIndex(basePath, conf, metaClient,
hoodieKeysFromFilter));
+ Set<HoodieFileGroupId> fileGroupIds = new HashSet<>();
+ HoodieTableMetadata metadataTable = getMetadataTable();
+ Map<String, List<FileSlice>> fileGroupsByDataPartition =
+
metadataTable.getBucketizedFileGroupsForPartitionedRLI(MetadataPartitionType.RECORD_INDEX);
+ // Build one lookup group per RLI shard within a data partition so each
metadata-table lookup
+ // only needs to resolve keys targeting that shard.
+ List<PartitionShardKeys> lookupGroups =
groupKeysByPartitionAndShard(partitions, fileGroupsByDataPartition)
+ .entrySet().stream()
+ .flatMap(partitionEntry -> partitionEntry.getValue().values().stream()
+ .map(keysInSingleShard -> new
PartitionShardKeys(partitionEntry.getKey(), keysInSingleShard)))
+ .collect(Collectors.toList());
+ // HoodieFlinkEngineContext#parallelize returns in-memory HoodieListData;
use map(...) for bounded
+ // local parallelism over the already bucketed (partition, shard) lookup
groups.
+ HoodieFlinkEngineContext.DEFAULT.map(
+ lookupGroups,
+ lookupGroup -> lookupFileGroupIds(metadataTable, lookupGroup),
+ getLookupParallelism(lookupGroups.size()))
+ .forEach(fileGroupIds::addAll);
+ return Option.of(fileGroupIds);
}
/**
- * Given query filters, it filters the EqualTo, IN and OR queries on record
key columns and
- * returns the list of record key literals present in the query, for example:
- * <p>
- * filter1: `key1` = 'val1', returns {"val1"}
- * filter2: `key1` in ('val1', 'val2', 'val3'), returns {"val1", "vale",
"val3"}
- * filter3: `key1` = 'val1' OR `key1` = 'val2' or `key1` = 'val3', returns
{"val1", "vale", "val3"}
- * filter4: `key1` = 'val1' AND `key2` in ('val2', 'val3'), returns
{"key1:val1,key2:val2", "key1:val1,key2:val3"}
+ * Groups filter-derived record keys by data-table partition and RLI shard.
+ *
+ * <p>The lookup API is partition-scoped. This helper additionally splits
keys by RLI shard from the
+ * bucketized partitioned-RLI file groups returned by the metadata table.
*/
- @VisibleForTesting
- public static List<String> computeHoodieKeyFromFilters(
- Configuration conf,
- HoodieTableMetaClient metaClient,
- List<ExpressionEvaluators.Evaluator> evaluators,
- String[] keyFields,
- RowType rowType,
- boolean consistentLogicalTimestampEnabled) {
- String[] partitionFields =
metaClient.getTableConfig().getPartitionFields().orElse(new String[0]);
- // align with the check logic in RowDataKeyGen
- boolean isComplexRecordKey = keyFields.length > 1 ||
partitionFields.length > 1 &&
!OptionsResolver.useComplexKeygenNewEncoding(conf);
- List<String> hoodieKeys = new ArrayList<>();
- List<String> fieldNames = rowType.getFieldNames();
- for (String keyField: keyFields) {
- List<String> recordKeys = new ArrayList<>();
- LogicalType fieldType = rowType.getTypeAt(fieldNames.indexOf(keyField));
- for (ExpressionEvaluators.Evaluator evaluator: evaluators) {
- // if there exists multiple ref fields in an evaluator, ignore this
evaluator, e.g., key = 'key1' or age = 20
- List<Object> literals = collectLiterals(evaluator, keyField);
- literals.forEach(val -> recordKeys.add(isComplexRecordKey
- ? keyField + KeyGenerator.DEFAULT_COLUMN_VALUE_SEPARATOR +
normalizeLiteral(val, keyField, fieldType, consistentLogicalTimestampEnabled)
- : normalizeLiteral(val, keyField, fieldType,
consistentLogicalTimestampEnabled)));
- }
- if (recordKeys.isEmpty()) {
- LOG.info("No literals found for the record key: {}, therefore
filtering can not be performed", keyField);
- return Collections.emptyList();
- } else if (!isComplexRecordKey || hoodieKeys.isEmpty()) {
- hoodieKeys = recordKeys;
- } else {
- // Combine literals for this configured record key with literals for
the other configured record keys
- // If there are two literals for rk1, rk2, rk3 each. A total of 8
combinations will be generated
- List<String> tmpHoodieKeys = new ArrayList<>();
- for (String compositeKey: hoodieKeys) {
- for (String recordKey: recordKeys) {
- tmpHoodieKeys.add(compositeKey +
KeyGenerator.DEFAULT_RECORD_KEY_PARTS_SEPARATOR + recordKey);
- }
- }
- hoodieKeys = tmpHoodieKeys;
- }
- }
- return hoodieKeys;
+ private Map<String, Map<Integer, List<String>>> groupKeysByPartitionAndShard(
+ Set<String> partitions,
+ Map<String, List<FileSlice>> fileGroupsByDataPartition) {
+ return partitions.stream().collect(Collectors.toMap(
+ partition -> partition,
+ partition -> {
+ List<FileSlice> fileGroups =
fileGroupsByDataPartition.get(partition);
+ ValidationUtils.checkState(fileGroups != null &&
!fileGroups.isEmpty(),
+ "No record index file groups found for data partition: " +
partition);
+ return hoodieKeysFromFilter.stream().collect(Collectors.groupingBy(
+ key -> HoodieTableMetadataUtil.mapRecordKeyToFileGroupIndex(key,
fileGroups.size()),
+ Collectors.toCollection(ArrayList::new)));
+ }));
}
/**
- * Collect literal values for record key fields from the predicate.
+ * Looks up one pre-bucketed {@code (partition, shard)} group and converts
matched RLI locations to file group IDs.
*/
- private static List<Object> collectLiterals(ExpressionEvaluators.Evaluator
evaluator, String refName) {
- if (evaluator instanceof ExpressionEvaluators.LeafEvaluator
- && !((ExpressionEvaluators.LeafEvaluator)
evaluator).getName().equalsIgnoreCase(refName)) {
- return Collections.emptyList();
- }
- if (evaluator instanceof ExpressionEvaluators.EqualTo) {
- Object valueLiteral = ((ExpressionEvaluators.EqualTo)
evaluator).getVal();
- return valueLiteral == null ? Collections.emptyList() :
Collections.singletonList(valueLiteral);
- } else if (evaluator instanceof ExpressionEvaluators.In) {
- Object[] valueLiterals = ((ExpressionEvaluators.In) evaluator).getVals();
- if (valueLiterals.length < 1 ||
Arrays.stream(valueLiterals).anyMatch(Objects::isNull)) {
- return Collections.emptyList();
- }
- return Arrays.stream(valueLiterals).collect(Collectors.toList());
- } else if (evaluator instanceof ExpressionEvaluators.Or) {
- List<List<Object>> literalsList =
Arrays.stream(((ExpressionEvaluators.Or) evaluator).getEvaluators())
- .map(eval -> collectLiterals(eval,
refName)).collect(Collectors.toList());
- // if any child expr do not contain predicate on the key, just return
empty list
- if (literalsList.stream().anyMatch(List::isEmpty)) {
- return Collections.emptyList();
- }
- return
literalsList.stream().flatMap(List::stream).distinct().collect(Collectors.toList());
- } else {
- return Collections.emptyList();
- }
+ private static Set<HoodieFileGroupId> lookupFileGroupIds(
+ HoodieTableMetadata metadataTable,
+ PartitionShardKeys lookupGroup) {
+ HoodiePairData<String, HoodieRecordGlobalLocation> recordIndexData =
+ metadataTable.readRecordIndexLocationsWithKeys(
+ HoodieListData.eager(lookupGroup.keysInSingleShard),
Option.of(lookupGroup.partition));
+ return getFileGroupIds(recordIndexData);
}
/**
- * Normalize literal values before used to get record index locations.
+ * Caps local lookup parallelism by available processors and the number of
non-empty lookup groups.
*/
- private static String normalizeLiteral(Object value, String keyField,
LogicalType fieldType, boolean consistentLogicalTimestampEnabled) {
- switch (fieldType.getTypeRoot()) {
- case DECIMAL:
- // the scale of decimal data in predicate may not be aligned with that
in record index, padding 0 if necessary,
- // e.g., 1.11 with target scale 5, return 1.11000
- BigDecimal decimal = (BigDecimal) value;
- int targetScale = ((DecimalType) fieldType).getScale();
- value = decimal.scale() >= targetScale ? value :
decimal.setScale(targetScale);
- break;
- case TIMESTAMP_WITHOUT_TIME_ZONE:
- // the original value is extracted from literal by
ExpressionUtils#getValueFromLiteral, which is epoch millis
- // convert it back to TimestampData before reusing key generating
logic in RowDataKeyGen.
- value = TimestampData.fromEpochMillis((Long) value);
- break;
- default:
- break;
- }
- // to align with the hoodie key generating logic in writer side.
- return RowDataKeyGen.getRecordKey(value, keyField,
consistentLogicalTimestampEnabled);
+ private static int getLookupParallelism(int lookupGroupCount) {
+ return Math.max(1, Math.min(lookupGroupCount,
Runtime.getRuntime().availableProcessors()));
}
- @Override
- public void close() {
- if (this.metadataTable == null) {
- return;
- }
- try {
- this.metadataTable.close();
- } catch (Exception e) {
- throw new HoodieException("Exception happened during close metadata
table.", e);
+ /**
+ * Keys that hash to one RLI shard within one data-table partition.
+ */
+ private static class PartitionShardKeys {
+ private final String partition;
+ private final List<String> keysInSingleShard;
+
+ private PartitionShardKeys(String partition, List<String>
keysInSingleShard) {
+ this.partition = partition;
+ this.keysInSingleShard = keysInSingleShard;
}
}
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestFileIndex.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestFileIndex.java
index ae7aaa022df3..d421f6e72098 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestFileIndex.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestFileIndex.java
@@ -260,6 +260,73 @@ public class TestFileIndex {
assertThat(fileSlices.size(), is(expectedCnt));
}
+ @ParameterizedTest
+ @MethodSource("filtersAndResults")
+ void testFileListingWithPartitionedRecordLevelIndex(String recordFields,
ColumnStatsProbe probe, int maxKeyCnt, int expectedCnt) throws Exception {
+ DataType dataType = TestConfigurations.ROW_DATA_TYPE_WITH_ATOMIC_TYPES;
+ Configuration conf =
TestConfigurations.getDefaultConf(tempFile.getAbsolutePath(), dataType);
+ conf.set(FlinkOptions.TABLE_TYPE, FlinkOptions.TABLE_TYPE_COPY_ON_WRITE);
+ conf.set(FlinkOptions.METADATA_ENABLED, true);
+ conf.set(FlinkOptions.READ_DATA_SKIPPING_ENABLED, true);
+ conf.set(FlinkOptions.RECORD_KEY_FIELD, recordFields);
+ conf.set(FlinkOptions.READ_DATA_SKIPPING_RLI_KEYS_MAX_NUM, maxKeyCnt);
+ // Enable partitioned record level index specifically for this test
+ conf.setString(HoodieMetadataConfig.RECORD_LEVEL_INDEX_ENABLE_PROP.key(),
"true");
+
+ // Write test data
+ TestData.writeData(TestData.DATA_SET_WITH_ATOMIC_TYPES, conf);
+
+ HoodieTableMetaClient metaClient = StreamerUtil.createMetaClient(conf);
+ // Create a filter on the record key 'uuid' with EQUALS operator to
trigger record-level index
+ FileIndex fileIndex =
+ FileIndex.builder()
+ .path(new StoragePath(tempFile.getAbsolutePath()))
+ .conf(conf)
+ .rowType((RowType) dataType.getLogicalType())
+ .metaClient(metaClient)
+ .columnStatsProbe(probe)
+ .build();
+
+ // Get filtered file slices - this should use partitioned record-level
index data skipping
+ List<FileSlice> fileSlices = getFilteredFileSlices(metaClient, fileIndex);
+ assertThat(fileSlices.size(), is(expectedCnt));
+ }
+
+ @Test
+ void testFileListingWithPartitionedRecordLevelIndexExceedingMaxPartitions()
throws Exception {
+ Configuration conf =
TestConfigurations.getDefaultConf(tempFile.getAbsolutePath());
+ conf.set(FlinkOptions.TABLE_TYPE, FlinkOptions.TABLE_TYPE_COPY_ON_WRITE);
+ conf.set(FlinkOptions.READ_DATA_SKIPPING_ENABLED, true);
+ conf.set(FlinkOptions.RECORD_KEY_FIELD, "uuid");
+ conf.setString(HoodieMetadataConfig.RECORD_LEVEL_INDEX_ENABLE_PROP.key(),
"true");
+
+ TestData.writeData(TestData.DATA_SET_INSERT, conf);
+
+ HoodieTableMetaClient metaClient = StreamerUtil.createMetaClient(conf);
+ // record predicate `f_str` = 'str1' would normally prune to a single file
slice
+ CallExpression equalExpr = CallExpression.permanent(
+ BuiltInFunctionDefinitions.EQUALS,
+ Arrays.asList(
+ new FieldReferenceExpression("uuid", DataTypes.STRING(), 0, 0),
+ new ValueLiteralExpression("id1", DataTypes.STRING().notNull())
+ ),
+ DataTypes.BOOLEAN());
+ ColumnStatsProbe probe =
ColumnStatsProbe.newInstance(Collections.singletonList(equalExpr));
+ FileIndex fileIndex =
+ FileIndex.builder()
+ .path(new StoragePath(tempFile.getAbsolutePath()))
+ .conf(conf)
+ .rowType(TestConfigurations.ROW_TYPE)
+ .metaClient(metaClient)
+ .columnStatsProbe(probe)
+ .build();
+
+ // the number of candidate partitions (4) exceeds the configured threshold
(3),
+ // so partitioned record level index pruning is skipped and all file
slices are returned
+ List<FileSlice> fileSlices = getFilteredFileSlices(metaClient, fileIndex);
+ assertThat(fileSlices.size(), is(4));
+ }
+
private static Stream<Arguments> filtersAndResults() {
CallExpression equalTinyInt = CallExpression.permanent(
BuiltInFunctionDefinitions.EQUALS,
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/stats/TestRecordLevelIndex.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/stats/TestRecordLevelIndex.java
index a23a5247139c..1aae2af4ed7c 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/stats/TestRecordLevelIndex.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/stats/TestRecordLevelIndex.java
@@ -18,10 +18,19 @@
package org.apache.hudi.source.stats;
+import org.apache.hudi.common.data.HoodieData;
+import org.apache.hudi.common.data.HoodieListPairData;
+import org.apache.hudi.common.model.FileSlice;
+import org.apache.hudi.common.model.HoodieFileGroupId;
+import org.apache.hudi.common.model.HoodieRecordGlobalLocation;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.keygen.KeyGenerator;
+import org.apache.hudi.metadata.HoodieTableMetadata;
+import org.apache.hudi.metadata.HoodieTableMetadataUtil;
+import org.apache.hudi.metadata.MetadataPartitionType;
import org.apache.hudi.sink.bulk.RowDataKeyGen;
import org.apache.hudi.source.ExpressionEvaluators;
import org.apache.hudi.utils.TestConfigurations;
@@ -49,12 +58,28 @@ import java.time.LocalDateTime;
import java.time.ZoneId;
import java.util.Arrays;
import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
+import java.util.Map;
+import java.util.Set;
import java.util.stream.Collectors;
import static
org.apache.hudi.utils.TestConfigurations.ROW_DATA_TYPE_HOODIE_KEY_SPECIAL_DATA_TYPE;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertSame;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.argThat;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.lenient;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
/**
@@ -75,6 +100,182 @@ public class TestRecordLevelIndex {
.notNull();
private static final RowType ROW_TYPE_MULTI_KEYS = (RowType)
ROW_DATA_TYPE_MULTI_KEYS.getLogicalType();
+ @Test
+ void testPartitionedRliGroupsKeysByShardWithinPartition() {
+ HoodieTableMetadata metadataTable = mock(HoodieTableMetadata.class);
+ List<String> recordKeys = keysAcrossShards(2);
+ RecordLevelIndex recordLevelIndex = createRecordLevelIndex(metadataTable,
recordKeys);
+ List<FileSlice> fileSlices = Arrays.asList(
+ fileSlice("par1", "file1"),
+ fileSlice("par1", "file2"));
+ Map<String, List<FileSlice>> partitionedRliFileGroups = new HashMap<>();
+ partitionedRliFileGroups.put("par1", Arrays.asList(
+ fileSlice("par1", "rli-file1"),
+ fileSlice("par1", "rli-file2")));
+
when(metadataTable.getBucketizedFileGroupsForPartitionedRLI(MetadataPartitionType.RECORD_INDEX))
+ .thenReturn(partitionedRliFileGroups);
+ doReturn(HoodieListPairData.eager(Collections.singletonList(
+ Pair.of(recordKeys.get(0), new HoodieRecordGlobalLocation("par1",
"001", "file1")))))
+ .when(metadataTable).readRecordIndexLocationsWithKeys(any(),
eq(Option.of("par1")));
+
+ List<FileSlice> result =
recordLevelIndex.computeCandidateFileSlices(fileSlices);
+
+ assertEquals(Collections.singletonList(fileSlices.get(0)), result);
+ Set<Set<String>> expectedKeyGroups = recordKeys.stream()
+ .collect(Collectors.groupingBy(key ->
HoodieTableMetadataUtil.mapRecordKeyToFileGroupIndex(key, 2)))
+ .values().stream()
+ .map(HashSet::new)
+ .collect(Collectors.toSet());
+ verify(metadataTable,
times(expectedKeyGroups.size())).readRecordIndexLocationsWithKeys(
+ argThat(keys -> expectedKeyGroups.contains(new
HashSet<>(((HoodieData<String>) keys).collectAsList()))),
+ eq(Option.of("par1")));
+ verify(metadataTable, never()).readRecordIndexLocationsWithKeys(any());
+ }
+
+ @Test
+ void testPartitionedRliPrunesAtPartitionThreshold() {
+ HoodieTableMetadata metadataTable = mock(HoodieTableMetadata.class);
+ RecordLevelIndex recordLevelIndex = createRecordLevelIndex(metadataTable);
+ List<FileSlice> fileSlices = Arrays.asList(
+ fileSlice("par1", "file1"),
+ fileSlice("par2", "file2"),
+ fileSlice("par3", "file3"));
+
when(metadataTable.getBucketizedFileGroupsForPartitionedRLI(MetadataPartitionType.RECORD_INDEX))
+ .thenReturn(partitionedRliFileGroups("par1", "par2", "par3"));
+ doAnswer(invocation -> {
+ String partition = invocation.<Option<String>>getArgument(1).get();
+ return HoodieListPairData.eager(Collections.singletonList(
+ Pair.of("id1", new HoodieRecordGlobalLocation(
+ partition, "001", "file" + partition.substring(3)))));
+ })
+ .when(metadataTable).readRecordIndexLocationsWithKeys(any(), any());
+
+ List<FileSlice> result =
recordLevelIndex.computeCandidateFileSlices(fileSlices);
+
+ assertEquals(fileSlices, result);
+ verify(metadataTable).readRecordIndexLocationsWithKeys(any(),
eq(Option.of("par1")));
+ verify(metadataTable).readRecordIndexLocationsWithKeys(any(),
eq(Option.of("par2")));
+ verify(metadataTable).readRecordIndexLocationsWithKeys(any(),
eq(Option.of("par3")));
+ verify(metadataTable, never()).readRecordIndexLocationsWithKeys(any());
+ }
+
+ @Test
+ void testPartitionedRliSkipsPruningAbovePartitionThreshold() {
+ HoodieTableMetadata metadataTable = mock(HoodieTableMetadata.class);
+ RecordLevelIndex recordLevelIndex = createRecordLevelIndex(metadataTable);
+ List<FileSlice> fileSlices = Arrays.asList(
+ fileSlice("par1", "file1"),
+ fileSlice("par2", "file2"),
+ fileSlice("par3", "file3"),
+ fileSlice("par4", "file4"));
+
+ List<FileSlice> result =
recordLevelIndex.computeCandidateFileSlices(fileSlices);
+
+ assertSame(fileSlices, result);
+ verify(metadataTable,
never()).getBucketizedFileGroupsForPartitionedRLI(any());
+ verify(metadataTable, never()).readRecordIndexLocationsWithKeys(any(),
any());
+ verify(metadataTable, never()).readRecordIndexLocationsWithKeys(any());
+ }
+
+ @Test
+ void testPartitionedRliFallsBackWhenPartitionLookupFails() {
+ HoodieTableMetadata metadataTable = mock(HoodieTableMetadata.class);
+ RecordLevelIndex recordLevelIndex = createRecordLevelIndex(metadataTable);
+ List<FileSlice> fileSlices = Arrays.asList(
+ fileSlice("par1", "file1"),
+ fileSlice("par2", "file2"));
+
when(metadataTable.getBucketizedFileGroupsForPartitionedRLI(MetadataPartitionType.RECORD_INDEX))
+ .thenReturn(partitionedRliFileGroups("par1", "par2"));
+ doReturn(HoodieListPairData.eager(Collections.singletonList(
+ Pair.of("id1", new HoodieRecordGlobalLocation("par1", "001",
"file1")))))
+ .when(metadataTable).readRecordIndexLocationsWithKeys(any(),
eq(Option.of("par1")));
+ doThrow(new RuntimeException("lookup failure"))
+ .when(metadataTable).readRecordIndexLocationsWithKeys(any(),
eq(Option.of("par2")));
+
+ List<FileSlice> result =
recordLevelIndex.computeCandidateFileSlices(fileSlices);
+
+ assertSame(fileSlices, result);
+ verify(metadataTable).readRecordIndexLocationsWithKeys(any(),
eq(Option.of("par2")));
+ }
+
+ @Test
+ void testPartitionedRliSkipsPruningForNoCandidateFiles() {
+ HoodieTableMetadata metadataTable = mock(HoodieTableMetadata.class);
+ RecordLevelIndex recordLevelIndex = createRecordLevelIndex(metadataTable);
+ List<FileSlice> fileSlices = Collections.emptyList();
+
+ List<FileSlice> result =
recordLevelIndex.computeCandidateFileSlices(fileSlices);
+
+ assertSame(fileSlices, result);
+ verify(metadataTable,
never()).getBucketizedFileGroupsForPartitionedRLI(any());
+ verify(metadataTable, never()).readRecordIndexLocationsWithKeys(any(),
any());
+ verify(metadataTable, never()).readRecordIndexLocationsWithKeys(any());
+ }
+
+ @Test
+ void testGlobalRliMatchesPartitionAndFileId() {
+ HoodieTableMetadata metadataTable = mock(HoodieTableMetadata.class);
+ GlobalRecordLevelIndex recordLevelIndex =
createGlobalRecordLevelIndex(metadataTable);
+ List<FileSlice> fileSlices = Arrays.asList(
+ fileSlice("par1", "file1"),
+ fileSlice("par2", "file1"));
+ when(metadataTable.readRecordIndexLocationsWithKeys(any()))
+ .thenReturn(HoodieListPairData.eager(Collections.singletonList(
+ Pair.of("id1", new HoodieRecordGlobalLocation("par1", "001",
"file1")))));
+
+ List<FileSlice> result =
recordLevelIndex.computeCandidateFileSlices(fileSlices);
+
+ assertEquals(Collections.singletonList(fileSlices.get(0)), result);
+ verify(metadataTable).readRecordIndexLocationsWithKeys(any());
+ verify(metadataTable, never()).readRecordIndexLocationsWithKeys(any(),
any());
+ }
+
+ private RecordLevelIndex createRecordLevelIndex(HoodieTableMetadata
metadataTable) {
+ return createRecordLevelIndex(metadataTable,
Collections.singletonList("id1"));
+ }
+
+ private RecordLevelIndex createRecordLevelIndex(HoodieTableMetadata
metadataTable, List<String> recordKeys) {
+ mockAvailableRecordIndex();
+ RecordLevelIndex recordLevelIndex = spy(
+ new RecordLevelIndex("", new Configuration(), metaClient, recordKeys));
+
lenient().doReturn(metadataTable).when(recordLevelIndex).getMetadataTable();
+ return recordLevelIndex;
+ }
+
+ private GlobalRecordLevelIndex
createGlobalRecordLevelIndex(HoodieTableMetadata metadataTable) {
+ mockAvailableRecordIndex();
+ GlobalRecordLevelIndex recordLevelIndex = spy(
+ new GlobalRecordLevelIndex("", new Configuration(), metaClient,
Collections.singletonList("id1")));
+
lenient().doReturn(metadataTable).when(recordLevelIndex).getMetadataTable();
+ return recordLevelIndex;
+ }
+
+ private void mockAvailableRecordIndex() {
+ when(metaClient.getTableConfig()).thenReturn(tableConfig);
+ when(tableConfig.isMetadataTableAvailable()).thenReturn(true);
+
when(tableConfig.getMetadataPartitions()).thenReturn(Collections.singleton(HoodieTableMetadataUtil.PARTITION_NAME_RECORD_INDEX));
+ }
+
+ private static FileSlice fileSlice(String partition, String fileId) {
+ return new FileSlice(new HoodieFileGroupId(partition, fileId), "001");
+ }
+
+ private static Map<String, List<FileSlice>>
partitionedRliFileGroups(String... partitions) {
+ Map<String, List<FileSlice>> partitionedRliFileGroups = new HashMap<>();
+ Arrays.stream(partitions).forEach(partition ->
+ partitionedRliFileGroups.put(partition,
Collections.singletonList(fileSlice(partition, "rli-" + partition))));
+ return partitionedRliFileGroups;
+ }
+
+ private static List<String> keysAcrossShards(int fileGroupCount) {
+ Map<Integer, String> keysByShard = new HashMap<>();
+ for (int i = 0; keysByShard.size() < fileGroupCount; i++) {
+ String key = "id" + i;
+
keysByShard.putIfAbsent(HoodieTableMetadataUtil.mapRecordKeyToFileGroupIndex(key,
fileGroupCount), key);
+ }
+ return keysByShard.values().stream().collect(Collectors.toList());
+ }
+
private List<ExpressionEvaluators.Evaluator>
createColumnStatsProbe(BuiltInFunctionDefinition func, String refName,
List<String> vals) {
List<ResolvedExpression> args = vals.stream().map(
val -> new ValueLiteralExpression(val,
DataTypes.STRING().notNull())).collect(Collectors.toList());
@@ -125,7 +326,7 @@ public class TestRecordLevelIndex {
List<ExpressionEvaluators.Evaluator> evaluators = createColumnStatsProbe(
BuiltInFunctionDefinitions.EQUALS, "uuid",
Collections.singletonList("id1"));
String[] recordKeyFields = {"uuid"};
- List<String> result = RecordLevelIndex.computeHoodieKeyFromFilters(
+ List<String> result = BaseRecordLevelIndex.computeHoodieKeyFromFilters(
conf, metaClient, evaluators, recordKeyFields,
TestConfigurations.ROW_TYPE, false);
assertEquals(Collections.singletonList("id1"), result, "Should return the
simple record key value");
}
@@ -142,7 +343,7 @@ public class TestRecordLevelIndex {
// Test with IN operator
List<ExpressionEvaluators.Evaluator> evaluators =
createColumnStatsProbe(BuiltInFunctionDefinitions.IN, "uuid",
Arrays.asList("id1", "id2", "id3"));
String[] recordKeyFields = {"uuid"};
- List<String> result = RecordLevelIndex.computeHoodieKeyFromFilters(
+ List<String> result = BaseRecordLevelIndex.computeHoodieKeyFromFilters(
conf, metaClient, evaluators, recordKeyFields,
TestConfigurations.ROW_TYPE, false);
assertEquals(Arrays.asList("id1", "id2", "id3"), result, "Should return
all the IN operator values");
}
@@ -159,7 +360,7 @@ public class TestRecordLevelIndex {
// Test with OR operator (which should be converted to IN)
List<ExpressionEvaluators.Evaluator> evaluators =
createOrColumnStatsProbe("uuid", Arrays.asList("id1", "id2", "id3"));
String[] recordKeyFields = {"uuid"};
- List<String> result = RecordLevelIndex.computeHoodieKeyFromFilters(
+ List<String> result = BaseRecordLevelIndex.computeHoodieKeyFromFilters(
conf, metaClient, evaluators, recordKeyFields,
TestConfigurations.ROW_TYPE, false);
// Note: OR with two values "id1" and "id2" should result in the literals
from both evaluators
assertEquals(Arrays.asList("id1", "id2", "id3"), result, "Should return
values from OR operator");
@@ -188,7 +389,7 @@ public class TestRecordLevelIndex {
DataTypes.BOOLEAN())
);
String[] recordKeyFields = {"key1", "key2"};
- List<String> result = RecordLevelIndex.computeHoodieKeyFromFilters(
+ List<String> result = BaseRecordLevelIndex.computeHoodieKeyFromFilters(
conf, metaClient, ExpressionEvaluators.fromExpression(expressions),
recordKeyFields, ROW_TYPE_MULTI_KEYS, false);
// For complex keys, the format should be key1:val1,key2:val2
assertEquals(Arrays.asList("key1:val1" +
KeyGenerator.DEFAULT_RECORD_KEY_PARTS_SEPARATOR + "key2:val2"), result,
@@ -223,7 +424,7 @@ public class TestRecordLevelIndex {
);
String[] recordKeyFields = {"f_timestamp", "f_decimal"};
RowType rowType = (RowType)
ROW_DATA_TYPE_HOODIE_KEY_SPECIAL_DATA_TYPE.getLogicalType();
- List<String> result = RecordLevelIndex.computeHoodieKeyFromFilters(
+ List<String> result = BaseRecordLevelIndex.computeHoodieKeyFromFilters(
conf, metaClient, ExpressionEvaluators.fromExpression(expressions),
recordKeyFields, rowType, consistentLogicalTimestampEnabled);
String expectedTimestampVal =
RowDataKeyGen.getRecordKey(TimestampData.fromEpochMillis(1), "f_timestamp",
consistentLogicalTimestampEnabled);
assertEquals(Arrays.asList("f_timestamp:" + expectedTimestampVal +
KeyGenerator.DEFAULT_RECORD_KEY_PARTS_SEPARATOR + "f_decimal:1.10"), result,
@@ -254,7 +455,7 @@ public class TestRecordLevelIndex {
DataTypes.BOOLEAN())
);
String[] recordKeyFields = {"uuid"};
- List<String> result = RecordLevelIndex.computeHoodieKeyFromFilters(
+ List<String> result = BaseRecordLevelIndex.computeHoodieKeyFromFilters(
conf, metaClient, ExpressionEvaluators.fromExpression(expressions),
recordKeyFields, TestConfigurations.ROW_TYPE, false);
// This should return both values
assertEquals(Arrays.asList("id1", "id2"), result, "Should return multiple
values for same field");
@@ -273,7 +474,7 @@ public class TestRecordLevelIndex {
List<ExpressionEvaluators.Evaluator> evaluators = createColumnStatsProbe(
BuiltInFunctionDefinitions.EQUALS, "nonKeyField",
Collections.singletonList("val1"));
String[] recordKeyFields = {"uuid"};
- List<String> result = RecordLevelIndex.computeHoodieKeyFromFilters(
+ List<String> result = BaseRecordLevelIndex.computeHoodieKeyFromFilters(
conf, metaClient, evaluators, recordKeyFields,
TestConfigurations.ROW_TYPE, false);
assertEquals(Collections.emptyList(), result, "Should return empty list
when filtering on non-record key field");
@@ -293,7 +494,7 @@ public class TestRecordLevelIndex {
DataTypes.BOOLEAN());
evaluators =
ExpressionEvaluators.fromExpression(Collections.singletonList(orExpression));
- result = RecordLevelIndex.computeHoodieKeyFromFilters(
+ result = BaseRecordLevelIndex.computeHoodieKeyFromFilters(
conf, metaClient, evaluators, recordKeyFields,
TestConfigurations.ROW_TYPE, false);
assertEquals(Collections.emptyList(), result, "Should return empty list
when filtering on or predicate including multiple fields");
}
@@ -324,7 +525,7 @@ public class TestRecordLevelIndex {
DataTypes.BOOLEAN())
);
String[] recordKeyFields = {"key1", "key2"};
- List<String> result = RecordLevelIndex.computeHoodieKeyFromFilters(
+ List<String> result = BaseRecordLevelIndex.computeHoodieKeyFromFilters(
conf, metaClient, ExpressionEvaluators.fromExpression(expressions),
recordKeyFields, ROW_TYPE_MULTI_KEYS, false);
// Should have 4 combinations: (val1,val3), (val1,val4), (val2,val3),
(val2,val4)
List<String> expected = Arrays.asList(
@@ -335,4 +536,4 @@ public class TestRecordLevelIndex {
);
assertEquals(expected, result, "Should return all combinations of complex
record keys");
}
-}
\ No newline at end of file
+}
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
index 749da56c8a16..dbe596007f7a 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
@@ -614,6 +614,54 @@ public class ITTestHoodieDataSource {
+ "+I[id8, Han, 56, 1970-01-01T00:00:08, par4]]");
}
+ @ParameterizedTest
+ @MethodSource("tableTypeAndBooleanTrueFalseParams")
+ void testDataSkippingWithPartitionedRecordLevelIndex(
+ HoodieTableType tableType, boolean useSourceV2) throws Exception {
+ String writerTableDDL = sql("t1")
+ .option(FlinkOptions.PATH, tempFile.getAbsolutePath())
+ .options(getDefaultKeys())
+ .option(FlinkOptions.INDEX_TYPE,
HoodieIndex.IndexType.RECORD_LEVEL_INDEX.name())
+ .option(FlinkOptions.TABLE_TYPE, tableType.name())
+ .end();
+ streamTableEnv.executeSql(writerTableDDL);
+ execInsertSql(streamTableEnv, TestSQL.INSERT_T1);
+
+ String readerTableDDL = sql("t1_read")
+ .option(FlinkOptions.PATH, tempFile.getAbsolutePath())
+ .options(getDefaultKeys())
+ .option(FlinkOptions.READ_DATA_SKIPPING_ENABLED, true)
+ .option(FlinkOptions.TABLE_TYPE, tableType.name())
+ .option(FlinkOptions.READ_SOURCE_V2_ENABLED, useSourceV2)
+ .end();
+ batchTableEnv.executeSql(readerTableDDL);
+
+ List<Row> result = CollectionUtil.iterableToList(
+ () -> batchTableEnv.sqlQuery(
+ "select * from t1_read where `partition` = 'par1' and uuid =
'id1'").execute().collect());
+ assertRowsEquals(result, "[+I[id1, Danny, 23, 1970-01-01T00:00:01,
par1]]");
+
+ List<Row> multiPartitionResult = CollectionUtil.iterableToList(
+ () -> batchTableEnv.sqlQuery(
+ "select * from t1_read where `partition` in ('par1', 'par4') and
uuid in ('id1', 'id7')").execute().collect());
+ assertRowsEquals(multiPartitionResult, "["
+ + "+I[id1, Danny, 23, 1970-01-01T00:00:01, par1], "
+ + "+I[id7, Bob, 44, 1970-01-01T00:00:07, par4]]");
+
+ // insert a record with id1 into the par4.
+ String insert = "insert into t1 values ('id1','Jack',23,TIMESTAMP
'1970-01-01 00:00:01','par4')";
+ execInsertSql(streamTableEnv, insert);
+ // test scenario query predicate also includes a partition name which
doesn't exist.
+ multiPartitionResult = CollectionUtil.iterableToList(
+ () -> batchTableEnv.sqlQuery(
+ "select * from t1_read where `partition` in ('par1', 'par4',
'par5') and uuid in ('id1', 'id7')").execute().collect());
+ assertRowsEqualsUnordered(multiPartitionResult,
+ Arrays.asList(
+ "+I[id1, Danny, 23, 1970-01-01T00:00:01, par1]",
+ "+I[id1, Jack, 23, 1970-01-01T00:00:01, par4]",
+ "+I[id7, Bob, 44, 1970-01-01T00:00:07, par4]"));
+ }
+
@ParameterizedTest
@MethodSource("tableTypeAndSourceV2AndBooleanTrueFalseParams")
void testReadWithPartitionStatsPruning(HoodieTableType tableType, boolean
useSourceV2, boolean hiveStylePartitioning) throws Exception {
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
index 2c8f5c29640f..e227e8bd71d4 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
@@ -674,6 +674,17 @@ public class TestData {
return obj == null ? "null" : obj.toString();
}
+ /**
+ * Assert that expected row strings and actual collection of rows are equal
regardless of the order.
+ *
+ * @param rows Actual result rows
+ * @param expected Expected row strings
+ */
+ public static void assertRowsEqualsUnordered(List<Row> rows, List<String>
expected) {
+ List<String> actualRows =
rows.stream().map(Row::toString).collect(Collectors.toList());
+ assertEquals(new HashSet<>(expected), new HashSet<>(actualRows));
+ }
+
/**
* Sort the {@code rows} using field at index 0 and asserts
* it equals with the expected string {@code expected}.