sivabalan narayanan created HUDI-3864:
-----------------------------------------
Summary: Avoid fetching all files for all partitions on the
read/query path for flink
Key: HUDI-3864
URL: https://issues.apache.org/jira/browse/HUDI-3864
Project: Apache Hudi
Issue Type: Task
Components: flink
Reporter: sivabalan narayanan
Fetching all files across all partitions should be avoided in hot path.
especially on the query side. I inspected HoodieFileIndex for spark and things
looks to be ok. We only load files for the partitions involved in the query.
{code:java}
public BaseHoodieTableFileIndex(HoodieEngineContext engineContext,
HoodieTableMetaClient metaClient,
TypedProperties configProperties,
HoodieTableQueryType queryType,
List<Path> queryPaths,
{code}
Querypaths in above argument contains only the partitions involved in the
split.
later when we load the files, we load only for the matched partitions.
{code:java}
private Map<PartitionPath, FileStatus[]> loadPartitionPathFiles() {
// List files in all partition paths
List<PartitionPath> pathToFetch = new ArrayList<>();
Map<PartitionPath, FileStatus[]> cachedPartitionToFiles = new HashMap<>();
// Fetch from the FileStatusCache
List<PartitionPath> partitionPaths = getAllQueryPartitionPaths();
partitionPaths.forEach(partitionPath -> {
Option<FileStatus[]> filesInPartition =
fileStatusCache.get(partitionPath.fullPartitionPath(basePath));
if (filesInPartition.isPresent()) {
cachedPartitionToFiles.put(partitionPath, filesInPartition.get());
} else {
pathToFetch.add(partitionPath);
}
});
Map<PartitionPath, FileStatus[]> fetchedPartitionToFiles;
if (pathToFetch.isEmpty()) {
fetchedPartitionToFiles = Collections.emptyMap();
} else {
Map<String, PartitionPath> fullPartitionPathsMapToFetch =
pathToFetch.stream()
.collect(Collectors.toMap(
partitionPath ->
partitionPath.fullPartitionPath(basePath).toString(),
Function.identity())
);
fetchedPartitionToFiles =
FSUtils.getFilesInPartitions(
engineContext,
metadataConfig,
basePath,
fullPartitionPathsMapToFetch.keySet().toArray(new String[0]),
fileSystemStorageConfig.getSpillableDir())
.entrySet()
.stream()
.collect(Collectors.toMap(e ->
fullPartitionPathsMapToFetch.get(e.getKey()), e -> e.getValue()));
}
// Update the fileStatusCache
fetchedPartitionToFiles.forEach((partitionPath, filesInPartition) -> {
fileStatusCache.put(partitionPath.fullPartitionPath(basePath),
filesInPartition);
});
return CollectionUtils.combine(cachedPartitionToFiles,
fetchedPartitionToFiles);
} {code}
But I do see in flink, we are loading across all partitions. Lets try to see if
this can be avoided.
IncrementalInputSplits
[L180|https://github.com/apache/hudi/blob/d16740976e3aa89f2d934b0f1c48208dfe40bc5f/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java#L180]
fileStatuses = fileIndex.getFilesInPartitions();
HoodieTableSource
[L298|https://github.com/apache/hudi/blob/d16740976e3aa89f2d934b0f1c48208dfe40bc5f/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java#L298]
FileStatus[] fileStatuses = fileIndex.getFilesInPartitions();
I do see we pass in required partition paths in both places. But will leave it
to flink experts to inspect the code once and close out the ticket if no action
required.
--
This message was sent by Atlassian Jira
(v8.20.1#820001)