nsivabalan commented on code in PR #18136:
URL: https://github.com/apache/hudi/pull/18136#discussion_r2800668952
##########
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieROTablePathFilter.java:
##########
@@ -99,25 +100,25 @@ public class HoodieROTablePathFilter implements
Configurable, PathFilter, Serial
private transient HoodieLocalEngineContext engineContext;
-
private transient HoodieStorage storage;
public HoodieROTablePathFilter() {
- this(new Configuration());
+ this(HadoopFSUtils.getStorageConf());
Review Comment:
+1
lets TAL at all implementations extending from
HoodieBaseHadoopFsRelationFactory and we write them in
##########
hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java:
##########
@@ -267,14 +275,46 @@ private Map<PartitionPath, List<FileSlice>>
loadFileSlicesForPartitions(List<Par
validateTimestampAsOf(metaClient, specifiedQueryInstant.get());
}
- List<StoragePathInfo> allFiles = listPartitionPathFiles(partitions);
HoodieTimeline activeTimeline = getActiveTimeline();
Option<HoodieInstant> latestInstant = activeTimeline.lastInstant();
+ Option<String> queryInstant = specifiedQueryInstant.or(() ->
latestInstant.map(HoodieInstant::requestedTime));
+ validate(activeTimeline, queryInstant);
- try (HoodieTableFileSystemView fileSystemView = new
HoodieTableFileSystemView(metaClient, activeTimeline, allFiles)) {
- Option<String> queryInstant = specifiedQueryInstant.or(() ->
latestInstant.map(HoodieInstant::requestedTime));
- validate(activeTimeline, queryInstant);
+ HoodieTimer timer = HoodieTimer.start();
+ List<StoragePathInfo> allFiles = listPartitionPathFiles(partitions,
activeTimeline);
+ log.info("On {} with query instant as {}, it took {}ms to list all files
{} Hudi partitions",
+ metaClient.getTableConfig().getTableName(), queryInstant.map(instant
-> instant).orElse("N/A"),
+ timer.endTimer(), partitions.size());
+
+ if (useROPathFilterForListing && !shouldIncludePendingCommits) {
+ // Group files by partition path, then by file group ID
+ Map<String, PartitionPath> partitionsMap = new HashMap<>();
+ partitions.forEach(p -> partitionsMap.put(p.path, p));
+ Map<PartitionPath, List<FileSlice>> partitionToFileSlices = new
HashMap<>();
+
+ for (StoragePathInfo pathInfo : allFiles) {
+ // Create FileSlice obj from StoragePathInfo.
+ String partitionPathStr = pathInfo.getPath().getParent().toString();
+ String relPartitionPath = FSUtils.getRelativePartitionPath(basePath,
pathInfo.getPath().getParent());
+ HoodieBaseFile baseFile = new HoodieBaseFile(pathInfo);
+ FileSlice fileSlice = new FileSlice(partitionPathStr,
baseFile.getCommitTime(), baseFile.getFileId());
+ fileSlice.setBaseFile(baseFile);
+
+ // Add the FileSlice to partitionToFileSlices
+ PartitionPath partitionPathObj = partitionsMap.get(relPartitionPath);
+ List<FileSlice> fileSlices =
partitionToFileSlices.computeIfAbsent(partitionPathObj, k -> new ArrayList<>());
+ fileSlices.add(fileSlice);
Review Comment:
can we avoid this special handling.
lets route all the files into FSV.
so that we maintain one flow for all cases.
just that the the input files could have already been filtered (if path
filter is applied), or could be referring to all files(if no path filter).
much simpler from maintainability standpoint.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]