alexeykudinkin commented on code in PR #6680:
URL: https://github.com/apache/hudi/pull/6680#discussion_r979062168


##########
hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java:
##########
@@ -221,60 +329,46 @@ protected HoodieTimeline getActiveTimeline() {
   }
 
   /**
-   * Load all partition paths and it's files under the query table path.
+   * Load partition paths and it's files under the query table path.
    */
-  private Map<PartitionPath, FileStatus[]> loadPartitionPathFiles() {
-    // List files in all partition paths
-    List<PartitionPath> pathToFetch = new ArrayList<>();
-    Map<PartitionPath, FileStatus[]> cachedPartitionToFiles = new HashMap<>();
-
-    // Fetch from the FileStatusCache
-    List<PartitionPath> partitionPaths = getAllQueryPartitionPaths();
-    partitionPaths.forEach(partitionPath -> {
-      Option<FileStatus[]> filesInPartition = 
fileStatusCache.get(partitionPath.fullPartitionPath(basePath));
-      if (filesInPartition.isPresent()) {
-        cachedPartitionToFiles.put(partitionPath, filesInPartition.get());
-      } else {
-        pathToFetch.add(partitionPath);
-      }
-    });
-
-    Map<PartitionPath, FileStatus[]> fetchedPartitionToFiles;
-
-    if (pathToFetch.isEmpty()) {
-      fetchedPartitionToFiles = Collections.emptyMap();
-    } else {
-      Map<String, PartitionPath> fullPartitionPathsMapToFetch = 
pathToFetch.stream()
-          .collect(Collectors.toMap(
-              partitionPath -> 
partitionPath.fullPartitionPath(basePath).toString(),
-              Function.identity())
-          );
-
-      fetchedPartitionToFiles =
-          
getAllFilesInPartitionsUnchecked(fullPartitionPathsMapToFetch.keySet())
-              .entrySet()
-              .stream()
-              .collect(Collectors.toMap(e -> 
fullPartitionPathsMapToFetch.get(e.getKey()), e -> e.getValue()));
-
+  private FileStatus[] loadPartitionPathFiles(PartitionPath partition) {
+    // Try fetch from the FileStatusCache first
+    Option<FileStatus[]> files = 
fileStatusCache.get(partition.fullPartitionPath(basePath));
+    if (files.isPresent()) {
+      return files.get();
     }
 
-    // Update the fileStatusCache
-    fetchedPartitionToFiles.forEach((partitionPath, filesInPartition) -> {
-      fileStatusCache.put(partitionPath.fullPartitionPath(basePath), 
filesInPartition);
-    });
+    try {
+      Path path = partition.fullPartitionPath(basePath);
+      FileStatus[] fetchedFiles = tableMetadata.getAllFilesInPartition(path);
 
-    return CollectionUtils.combine(cachedPartitionToFiles, 
fetchedPartitionToFiles);
+      // Update the fileStatusCache
+      fileStatusCache.put(partition.fullPartitionPath(basePath), fetchedFiles);
+      return fetchedFiles;
+    } catch (IOException e) {
+      throw new HoodieIOException("Failed to list partition path (" + 
partition + ") for a table", e);
+    }
   }
 
   private void doRefresh() {
+    doRefresh(false);
+  }
+  
+  private void doRefresh(boolean initMetadataOnly) {

Review Comment:
   Instead of boolean, let's split this method into 2:
   
    - One init-ing only MT
    - One also doing file-listing



##########
hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java:
##########
@@ -180,15 +185,118 @@ public void close() throws Exception {
   }
 
   protected List<PartitionPath> getAllQueryPartitionPaths() {
+    if (this.cachedAllPartitionPaths != null) {
+      return this.cachedAllPartitionPaths;
+    }
+
     List<String> queryRelativePartitionPaths = queryPaths.stream()
         .map(path -> FSUtils.getRelativePartitionPath(basePath, path))
         .collect(Collectors.toList());
 
-    // Load all the partition path from the basePath, and filter by the query 
partition path.
-    // TODO load files from the queryRelativePartitionPaths directly.
-    List<String> matchedPartitionPaths = getAllPartitionPathsUnchecked()
-        .stream()
-        .filter(path -> 
queryRelativePartitionPaths.stream().anyMatch(path::startsWith))
+    this.cachedAllPartitionPaths = 
getQueryPartitionPaths(queryRelativePartitionPaths);

Review Comment:
   Let's extract any state modifications to a separate method w/ explicit 
contract to do so: for ex, previously all the loading has been done by `loadX` 
methods invoked from `doRefresh`
   
   We should not be modifying the state w/in getters in this case



##########
hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java:
##########
@@ -180,15 +185,118 @@ public void close() throws Exception {
   }
 
   protected List<PartitionPath> getAllQueryPartitionPaths() {
+    if (this.cachedAllPartitionPaths != null) {
+      return this.cachedAllPartitionPaths;
+    }
+
     List<String> queryRelativePartitionPaths = queryPaths.stream()
         .map(path -> FSUtils.getRelativePartitionPath(basePath, path))
         .collect(Collectors.toList());
 
-    // Load all the partition path from the basePath, and filter by the query 
partition path.
-    // TODO load files from the queryRelativePartitionPaths directly.
-    List<String> matchedPartitionPaths = getAllPartitionPathsUnchecked()
-        .stream()
-        .filter(path -> 
queryRelativePartitionPaths.stream().anyMatch(path::startsWith))
+    this.cachedAllPartitionPaths = 
getQueryPartitionPaths(queryRelativePartitionPaths);
+
+    // If the partition value contains InternalRow.empty, we query it as a 
non-partitioned table.
+    this.queryAsNonePartitionedTable = 
this.cachedAllPartitionPaths.stream().anyMatch(p -> p.values.length == 0);
+    return this.cachedAllPartitionPaths;
+  }
+
+  protected Map<PartitionPath, List<FileSlice>> getAllInputFileSlices() {
+    if (!isAllInputFileSlicesCached) {

Review Comment:
   Why do we need separate boolean for that?



##########
hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java:
##########
@@ -180,15 +185,118 @@ public void close() throws Exception {
   }
 
   protected List<PartitionPath> getAllQueryPartitionPaths() {
+    if (this.cachedAllPartitionPaths != null) {
+      return this.cachedAllPartitionPaths;
+    }
+
     List<String> queryRelativePartitionPaths = queryPaths.stream()
         .map(path -> FSUtils.getRelativePartitionPath(basePath, path))
         .collect(Collectors.toList());
 
-    // Load all the partition path from the basePath, and filter by the query 
partition path.
-    // TODO load files from the queryRelativePartitionPaths directly.
-    List<String> matchedPartitionPaths = getAllPartitionPathsUnchecked()
-        .stream()
-        .filter(path -> 
queryRelativePartitionPaths.stream().anyMatch(path::startsWith))
+    this.cachedAllPartitionPaths = 
getQueryPartitionPaths(queryRelativePartitionPaths);
+
+    // If the partition value contains InternalRow.empty, we query it as a 
non-partitioned table.
+    this.queryAsNonePartitionedTable = 
this.cachedAllPartitionPaths.stream().anyMatch(p -> p.values.length == 0);
+    return this.cachedAllPartitionPaths;
+  }
+
+  protected Map<PartitionPath, List<FileSlice>> getAllInputFileSlices() {
+    if (!isAllInputFileSlicesCached) {
+      doRefresh();
+    }
+    return cachedAllInputFileSlices;
+  }
+
+  /**
+   * Get input file slice for the given partition. Will use cache directly if 
it is computed before.
+   */
+  protected List<FileSlice> getCachedInputFileSlices(PartitionPath partition) {
+    return cachedAllInputFileSlices.computeIfAbsent(partition, (p) -> {
+      FileStatus[] files = loadPartitionPathFiles(p);

Review Comment:
   Let's extract actual listing to a standalone method



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala:
##########
@@ -187,12 +189,16 @@ class SparkHoodieTableFileIndex(spark: SparkSession,
    * @param predicates     The filter condition.
    * @return The pruned partition paths.
    */
-  protected def prunePartition(partitionPaths: Seq[PartitionPath], predicates: 
Seq[Expression]): Seq[PartitionPath] = {
+  protected def prunePartition(predicates: Seq[Expression]): 
Seq[PartitionPath] = {
     val partitionColumnNames = partitionSchema.fields.map(_.name).toSet
     val partitionPruningPredicates = predicates.filter {
       _.references.map(_.name).toSet.subsetOf(partitionColumnNames)
     }
     if (partitionPruningPredicates.nonEmpty) {
+      val equalPredicate = 
partitionPruningPredicates.filter(_.isInstanceOf[EqualTo])
+      val names = 
equalPredicate.map(_.asInstanceOf[EqualTo]).map(_.left.asInstanceOf[AttributeReference].name).toArray

Review Comment:
   We can not assume what those predicates might be



##########
hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java:
##########
@@ -180,15 +185,118 @@ public void close() throws Exception {
   }
 
   protected List<PartitionPath> getAllQueryPartitionPaths() {
+    if (this.cachedAllPartitionPaths != null) {
+      return this.cachedAllPartitionPaths;
+    }
+
     List<String> queryRelativePartitionPaths = queryPaths.stream()
         .map(path -> FSUtils.getRelativePartitionPath(basePath, path))
         .collect(Collectors.toList());
 
-    // Load all the partition path from the basePath, and filter by the query 
partition path.
-    // TODO load files from the queryRelativePartitionPaths directly.
-    List<String> matchedPartitionPaths = getAllPartitionPathsUnchecked()
-        .stream()
-        .filter(path -> 
queryRelativePartitionPaths.stream().anyMatch(path::startsWith))
+    this.cachedAllPartitionPaths = 
getQueryPartitionPaths(queryRelativePartitionPaths);
+
+    // If the partition value contains InternalRow.empty, we query it as a 
non-partitioned table.
+    this.queryAsNonePartitionedTable = 
this.cachedAllPartitionPaths.stream().anyMatch(p -> p.values.length == 0);
+    return this.cachedAllPartitionPaths;
+  }
+
+  protected Map<PartitionPath, List<FileSlice>> getAllInputFileSlices() {
+    if (!isAllInputFileSlicesCached) {
+      doRefresh();
+    }
+    return cachedAllInputFileSlices;
+  }
+
+  /**
+   * Get input file slice for the given partition. Will use cache directly if 
it is computed before.
+   */
+  protected List<FileSlice> getCachedInputFileSlices(PartitionPath partition) {
+    return cachedAllInputFileSlices.computeIfAbsent(partition, (p) -> {
+      FileStatus[] files = loadPartitionPathFiles(p);
+      HoodieTimeline activeTimeline = getActiveTimeline();
+      Option<HoodieInstant> latestInstant = activeTimeline.lastInstant();
+
+      HoodieTableFileSystemView fileSystemView = new 
HoodieTableFileSystemView(metaClient, activeTimeline, files);
+
+      Option<String> queryInstant = specifiedQueryInstant.or(() -> 
latestInstant.map(HoodieInstant::getTimestamp));
+
+      validate(activeTimeline, queryInstant);
+
+      List<FileSlice> ret;
+      if (tableType.equals(HoodieTableType.MERGE_ON_READ) && 
queryType.equals(HoodieTableQueryType.SNAPSHOT)) {
+        ret = queryInstant.map(instant ->
+                fileSystemView.getLatestMergedFileSlicesBeforeOrOn(p.path, 
queryInstant.get())
+                    .collect(Collectors.toList())
+            )
+            .orElse(Collections.emptyList());
+      } else {
+        ret = queryInstant.map(instant ->
+                fileSystemView.getLatestFileSlicesBeforeOrOn(p.path, instant, 
true)
+            )
+            .orElse(fileSystemView.getLatestFileSlices(p.path))
+            .collect(Collectors.toList());
+      }
+
+      cachedFileSize += 
ret.stream().mapToLong(BaseHoodieTableFileIndex::fileSliceSize).sum();
+      return ret;
+    });
+  }
+
+  /**
+   * Get partition path with the given partition value
+   * @param partitionNames partition names
+   * @param values partition values
+   * @return partitions that match the given partition values
+   */
+  protected List<PartitionPath> getPartitionPaths(String[] partitionNames, 
String[] values) {
+    if (partitionNames.length == 0 || partitionNames.length != values.length) {
+      LOG.info("The input partition names or value is empty, fallback to 
return all partition paths");
+      return getAllQueryPartitionPaths();
+    }
+
+    if (cachedAllPartitionPaths != null) {
+      LOG.info("All partition paths have already loaded, use it directly");
+      return cachedAllPartitionPaths;
+    }
+
+    boolean hiveStylePartitioning = 
Boolean.parseBoolean(metaClient.getTableConfig().getHiveStylePartitioningEnable());
+    boolean urlEncodePartitioning = 
Boolean.parseBoolean(this.metaClient.getTableConfig().getUrlEncodePartitioning());
+    Map<String, Integer> partitionNamesToIdx = IntStream.range(0, 
partitionNames.length)
+        .mapToObj(i -> Pair.of(i, partitionNames[i]))
+        .collect(Collectors.toMap(Pair::getValue, Pair::getKey));
+    StringBuilder queryPartitionPath = new StringBuilder();
+    int idx = 0;
+    for (; idx < partitionNames.length; ++idx) {
+      String columnNames = this.partitionColumns[idx];
+      if (partitionNamesToIdx.containsKey(columnNames)) {
+        int k = partitionNamesToIdx.get(columnNames);
+        String value =  urlEncodePartitioning ? 
PartitionPathEncodeUtils.escapePathName(values[k]) : values[k];
+        queryPartitionPath.append(hiveStylePartitioning ? columnNames + "=" : 
"").append(value).append("/");
+      } else {
+        break;
+      }
+    }
+    queryPartitionPath.deleteCharAt(queryPartitionPath.length() - 1);
+    // Return directly if all partition values are specified.
+    if (idx == this.partitionColumns.length) {

Review Comment:
   We can move this conditional up before the iteration



##########
hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java:
##########
@@ -180,15 +185,118 @@ public void close() throws Exception {
   }
 
   protected List<PartitionPath> getAllQueryPartitionPaths() {
+    if (this.cachedAllPartitionPaths != null) {
+      return this.cachedAllPartitionPaths;
+    }
+
     List<String> queryRelativePartitionPaths = queryPaths.stream()
         .map(path -> FSUtils.getRelativePartitionPath(basePath, path))
         .collect(Collectors.toList());
 
-    // Load all the partition path from the basePath, and filter by the query 
partition path.
-    // TODO load files from the queryRelativePartitionPaths directly.
-    List<String> matchedPartitionPaths = getAllPartitionPathsUnchecked()
-        .stream()
-        .filter(path -> 
queryRelativePartitionPaths.stream().anyMatch(path::startsWith))
+    this.cachedAllPartitionPaths = 
getQueryPartitionPaths(queryRelativePartitionPaths);
+
+    // If the partition value contains InternalRow.empty, we query it as a 
non-partitioned table.
+    this.queryAsNonePartitionedTable = 
this.cachedAllPartitionPaths.stream().anyMatch(p -> p.values.length == 0);
+    return this.cachedAllPartitionPaths;
+  }
+
+  protected Map<PartitionPath, List<FileSlice>> getAllInputFileSlices() {
+    if (!isAllInputFileSlicesCached) {
+      doRefresh();
+    }
+    return cachedAllInputFileSlices;
+  }
+
+  /**
+   * Get input file slice for the given partition. Will use cache directly if 
it is computed before.
+   */
+  protected List<FileSlice> getCachedInputFileSlices(PartitionPath partition) {
+    return cachedAllInputFileSlices.computeIfAbsent(partition, (p) -> {
+      FileStatus[] files = loadPartitionPathFiles(p);
+      HoodieTimeline activeTimeline = getActiveTimeline();
+      Option<HoodieInstant> latestInstant = activeTimeline.lastInstant();
+
+      HoodieTableFileSystemView fileSystemView = new 
HoodieTableFileSystemView(metaClient, activeTimeline, files);
+
+      Option<String> queryInstant = specifiedQueryInstant.or(() -> 
latestInstant.map(HoodieInstant::getTimestamp));
+
+      validate(activeTimeline, queryInstant);
+
+      List<FileSlice> ret;
+      if (tableType.equals(HoodieTableType.MERGE_ON_READ) && 
queryType.equals(HoodieTableQueryType.SNAPSHOT)) {
+        ret = queryInstant.map(instant ->
+                fileSystemView.getLatestMergedFileSlicesBeforeOrOn(p.path, 
queryInstant.get())
+                    .collect(Collectors.toList())
+            )
+            .orElse(Collections.emptyList());
+      } else {
+        ret = queryInstant.map(instant ->
+                fileSystemView.getLatestFileSlicesBeforeOrOn(p.path, instant, 
true)
+            )
+            .orElse(fileSystemView.getLatestFileSlices(p.path))
+            .collect(Collectors.toList());
+      }
+
+      cachedFileSize += 
ret.stream().mapToLong(BaseHoodieTableFileIndex::fileSliceSize).sum();
+      return ret;
+    });
+  }
+
+  /**
+   * Get partition path with the given partition value
+   * @param partitionNames partition names
+   * @param values partition values
+   * @return partitions that match the given partition values
+   */
+  protected List<PartitionPath> getPartitionPaths(String[] partitionNames, 
String[] values) {
+    if (partitionNames.length == 0 || partitionNames.length != values.length) {
+      LOG.info("The input partition names or value is empty, fallback to 
return all partition paths");
+      return getAllQueryPartitionPaths();
+    }
+
+    if (cachedAllPartitionPaths != null) {
+      LOG.info("All partition paths have already loaded, use it directly");
+      return cachedAllPartitionPaths;
+    }
+
+    boolean hiveStylePartitioning = 
Boolean.parseBoolean(metaClient.getTableConfig().getHiveStylePartitioningEnable());
+    boolean urlEncodePartitioning = 
Boolean.parseBoolean(this.metaClient.getTableConfig().getUrlEncodePartitioning());
+    Map<String, Integer> partitionNamesToIdx = IntStream.range(0, 
partitionNames.length)
+        .mapToObj(i -> Pair.of(i, partitionNames[i]))
+        .collect(Collectors.toMap(Pair::getValue, Pair::getKey));
+    StringBuilder queryPartitionPath = new StringBuilder();
+    int idx = 0;
+    for (; idx < partitionNames.length; ++idx) {
+      String columnNames = this.partitionColumns[idx];
+      if (partitionNamesToIdx.containsKey(columnNames)) {
+        int k = partitionNamesToIdx.get(columnNames);
+        String value =  urlEncodePartitioning ? 
PartitionPathEncodeUtils.escapePathName(values[k]) : values[k];
+        queryPartitionPath.append(hiveStylePartitioning ? columnNames + "=" : 
"").append(value).append("/");
+      } else {
+        break;
+      }
+    }
+    queryPartitionPath.deleteCharAt(queryPartitionPath.length() - 1);
+    // Return directly if all partition values are specified.
+    if (idx == this.partitionColumns.length) {
+      return Collections.singletonList(new 
PartitionPath(queryPartitionPath.toString(), 
parsePartitionColumnValues(partitionColumns, queryPartitionPath.toString())));
+    }
+    return 
getQueryPartitionPaths(Collections.singletonList(queryPartitionPath.toString()));
+  }
+
+  private List<PartitionPath> getQueryPartitionPaths(List<String> 
queryRelativePartitionPaths) {
+    List<String> matchedPartitionPaths = queryRelativePartitionPaths.stream()
+        .flatMap(prefix -> {
+          try {
+            // Handle wildcard specially. This will have FileIndex to query 
the table as non-partitioned-table
+            if (prefix.contains("*")) {

Review Comment:
   How could there be a wildcard?



##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java:
##########
@@ -143,6 +143,13 @@ protected Option<HoodieRecord<HoodieMetadataPayload>> 
getRecordByKey(String key,
     return recordsByKeys.size() == 0 ? Option.empty() : 
recordsByKeys.get(0).getValue();
   }
 
+  @Override
+  public List<String> getPartitionPathsWithPrefix(String prefix) throws 
IOException {
+    return getAllPartitionPaths().stream()

Review Comment:
   This is going to be incredibly wasteful in case of MT due to HUDI-4911



##########
hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java:
##########
@@ -180,15 +185,118 @@ public void close() throws Exception {
   }
 
   protected List<PartitionPath> getAllQueryPartitionPaths() {
+    if (this.cachedAllPartitionPaths != null) {
+      return this.cachedAllPartitionPaths;
+    }
+
     List<String> queryRelativePartitionPaths = queryPaths.stream()
         .map(path -> FSUtils.getRelativePartitionPath(basePath, path))
         .collect(Collectors.toList());
 
-    // Load all the partition path from the basePath, and filter by the query 
partition path.
-    // TODO load files from the queryRelativePartitionPaths directly.
-    List<String> matchedPartitionPaths = getAllPartitionPathsUnchecked()
-        .stream()
-        .filter(path -> 
queryRelativePartitionPaths.stream().anyMatch(path::startsWith))
+    this.cachedAllPartitionPaths = 
getQueryPartitionPaths(queryRelativePartitionPaths);
+
+    // If the partition value contains InternalRow.empty, we query it as a 
non-partitioned table.
+    this.queryAsNonePartitionedTable = 
this.cachedAllPartitionPaths.stream().anyMatch(p -> p.values.length == 0);
+    return this.cachedAllPartitionPaths;
+  }
+
+  protected Map<PartitionPath, List<FileSlice>> getAllInputFileSlices() {
+    if (!isAllInputFileSlicesCached) {
+      doRefresh();
+    }
+    return cachedAllInputFileSlices;
+  }
+
+  /**
+   * Get input file slice for the given partition. Will use cache directly if 
it is computed before.
+   */
+  protected List<FileSlice> getCachedInputFileSlices(PartitionPath partition) {
+    return cachedAllInputFileSlices.computeIfAbsent(partition, (p) -> {
+      FileStatus[] files = loadPartitionPathFiles(p);
+      HoodieTimeline activeTimeline = getActiveTimeline();
+      Option<HoodieInstant> latestInstant = activeTimeline.lastInstant();
+
+      HoodieTableFileSystemView fileSystemView = new 
HoodieTableFileSystemView(metaClient, activeTimeline, files);
+
+      Option<String> queryInstant = specifiedQueryInstant.or(() -> 
latestInstant.map(HoodieInstant::getTimestamp));
+
+      validate(activeTimeline, queryInstant);
+
+      List<FileSlice> ret;
+      if (tableType.equals(HoodieTableType.MERGE_ON_READ) && 
queryType.equals(HoodieTableQueryType.SNAPSHOT)) {
+        ret = queryInstant.map(instant ->
+                fileSystemView.getLatestMergedFileSlicesBeforeOrOn(p.path, 
queryInstant.get())
+                    .collect(Collectors.toList())
+            )
+            .orElse(Collections.emptyList());
+      } else {
+        ret = queryInstant.map(instant ->
+                fileSystemView.getLatestFileSlicesBeforeOrOn(p.path, instant, 
true)
+            )
+            .orElse(fileSystemView.getLatestFileSlices(p.path))
+            .collect(Collectors.toList());
+      }
+
+      cachedFileSize += 
ret.stream().mapToLong(BaseHoodieTableFileIndex::fileSliceSize).sum();
+      return ret;
+    });
+  }
+
+  /**
+   * Get partition path with the given partition value
+   * @param partitionNames partition names
+   * @param values partition values
+   * @return partitions that match the given partition values
+   */
+  protected List<PartitionPath> getPartitionPaths(String[] partitionNames, 
String[] values) {
+    if (partitionNames.length == 0 || partitionNames.length != values.length) {
+      LOG.info("The input partition names or value is empty, fallback to 
return all partition paths");
+      return getAllQueryPartitionPaths();
+    }
+
+    if (cachedAllPartitionPaths != null) {
+      LOG.info("All partition paths have already loaded, use it directly");
+      return cachedAllPartitionPaths;
+    }
+
+    boolean hiveStylePartitioning = 
Boolean.parseBoolean(metaClient.getTableConfig().getHiveStylePartitioningEnable());
+    boolean urlEncodePartitioning = 
Boolean.parseBoolean(this.metaClient.getTableConfig().getUrlEncodePartitioning());
+    Map<String, Integer> partitionNamesToIdx = IntStream.range(0, 
partitionNames.length)
+        .mapToObj(i -> Pair.of(i, partitionNames[i]))
+        .collect(Collectors.toMap(Pair::getValue, Pair::getKey));
+    StringBuilder queryPartitionPath = new StringBuilder();

Review Comment:
   Let's make it clear that this could be a path's prefix



##########
hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java:
##########
@@ -180,15 +185,118 @@ public void close() throws Exception {
   }
 
   protected List<PartitionPath> getAllQueryPartitionPaths() {
+    if (this.cachedAllPartitionPaths != null) {
+      return this.cachedAllPartitionPaths;
+    }
+
     List<String> queryRelativePartitionPaths = queryPaths.stream()
         .map(path -> FSUtils.getRelativePartitionPath(basePath, path))
         .collect(Collectors.toList());
 
-    // Load all the partition path from the basePath, and filter by the query 
partition path.
-    // TODO load files from the queryRelativePartitionPaths directly.
-    List<String> matchedPartitionPaths = getAllPartitionPathsUnchecked()
-        .stream()
-        .filter(path -> 
queryRelativePartitionPaths.stream().anyMatch(path::startsWith))
+    this.cachedAllPartitionPaths = 
getQueryPartitionPaths(queryRelativePartitionPaths);
+
+    // If the partition value contains InternalRow.empty, we query it as a 
non-partitioned table.
+    this.queryAsNonePartitionedTable = 
this.cachedAllPartitionPaths.stream().anyMatch(p -> p.values.length == 0);
+    return this.cachedAllPartitionPaths;
+  }
+
+  protected Map<PartitionPath, List<FileSlice>> getAllInputFileSlices() {
+    if (!isAllInputFileSlicesCached) {
+      doRefresh();
+    }
+    return cachedAllInputFileSlices;
+  }
+
+  /**
+   * Get input file slice for the given partition. Will use cache directly if 
it is computed before.
+   */
+  protected List<FileSlice> getCachedInputFileSlices(PartitionPath partition) {
+    return cachedAllInputFileSlices.computeIfAbsent(partition, (p) -> {
+      FileStatus[] files = loadPartitionPathFiles(p);
+      HoodieTimeline activeTimeline = getActiveTimeline();
+      Option<HoodieInstant> latestInstant = activeTimeline.lastInstant();
+
+      HoodieTableFileSystemView fileSystemView = new 
HoodieTableFileSystemView(metaClient, activeTimeline, files);
+
+      Option<String> queryInstant = specifiedQueryInstant.or(() -> 
latestInstant.map(HoodieInstant::getTimestamp));
+
+      validate(activeTimeline, queryInstant);
+
+      List<FileSlice> ret;
+      if (tableType.equals(HoodieTableType.MERGE_ON_READ) && 
queryType.equals(HoodieTableQueryType.SNAPSHOT)) {
+        ret = queryInstant.map(instant ->
+                fileSystemView.getLatestMergedFileSlicesBeforeOrOn(p.path, 
queryInstant.get())
+                    .collect(Collectors.toList())
+            )
+            .orElse(Collections.emptyList());
+      } else {
+        ret = queryInstant.map(instant ->
+                fileSystemView.getLatestFileSlicesBeforeOrOn(p.path, instant, 
true)
+            )
+            .orElse(fileSystemView.getLatestFileSlices(p.path))
+            .collect(Collectors.toList());
+      }
+
+      cachedFileSize += 
ret.stream().mapToLong(BaseHoodieTableFileIndex::fileSliceSize).sum();
+      return ret;
+    });
+  }
+
+  /**
+   * Get partition path with the given partition value
+   * @param partitionNames partition names
+   * @param values partition values
+   * @return partitions that match the given partition values
+   */
+  protected List<PartitionPath> getPartitionPaths(String[] partitionNames, 
String[] values) {
+    if (partitionNames.length == 0 || partitionNames.length != values.length) {
+      LOG.info("The input partition names or value is empty, fallback to 
return all partition paths");
+      return getAllQueryPartitionPaths();
+    }
+
+    if (cachedAllPartitionPaths != null) {
+      LOG.info("All partition paths have already loaded, use it directly");
+      return cachedAllPartitionPaths;
+    }
+
+    boolean hiveStylePartitioning = 
Boolean.parseBoolean(metaClient.getTableConfig().getHiveStylePartitioningEnable());
+    boolean urlEncodePartitioning = 
Boolean.parseBoolean(this.metaClient.getTableConfig().getUrlEncodePartitioning());
+    Map<String, Integer> partitionNamesToIdx = IntStream.range(0, 
partitionNames.length)

Review Comment:
   Should be singular (`partitionNameToIdx`)



##########
hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java:
##########
@@ -180,15 +185,118 @@ public void close() throws Exception {
   }
 
   protected List<PartitionPath> getAllQueryPartitionPaths() {
+    if (this.cachedAllPartitionPaths != null) {
+      return this.cachedAllPartitionPaths;
+    }
+
     List<String> queryRelativePartitionPaths = queryPaths.stream()
         .map(path -> FSUtils.getRelativePartitionPath(basePath, path))
         .collect(Collectors.toList());
 
-    // Load all the partition path from the basePath, and filter by the query 
partition path.
-    // TODO load files from the queryRelativePartitionPaths directly.
-    List<String> matchedPartitionPaths = getAllPartitionPathsUnchecked()
-        .stream()
-        .filter(path -> 
queryRelativePartitionPaths.stream().anyMatch(path::startsWith))
+    this.cachedAllPartitionPaths = 
getQueryPartitionPaths(queryRelativePartitionPaths);
+
+    // If the partition value contains InternalRow.empty, we query it as a 
non-partitioned table.
+    this.queryAsNonePartitionedTable = 
this.cachedAllPartitionPaths.stream().anyMatch(p -> p.values.length == 0);
+    return this.cachedAllPartitionPaths;
+  }
+
+  protected Map<PartitionPath, List<FileSlice>> getAllInputFileSlices() {
+    if (!isAllInputFileSlicesCached) {
+      doRefresh();
+    }
+    return cachedAllInputFileSlices;
+  }
+
+  /**
+   * Get input file slice for the given partition. Will use cache directly if 
it is computed before.
+   */
+  protected List<FileSlice> getCachedInputFileSlices(PartitionPath partition) {
+    return cachedAllInputFileSlices.computeIfAbsent(partition, (p) -> {
+      FileStatus[] files = loadPartitionPathFiles(p);
+      HoodieTimeline activeTimeline = getActiveTimeline();
+      Option<HoodieInstant> latestInstant = activeTimeline.lastInstant();
+
+      HoodieTableFileSystemView fileSystemView = new 
HoodieTableFileSystemView(metaClient, activeTimeline, files);
+
+      Option<String> queryInstant = specifiedQueryInstant.or(() -> 
latestInstant.map(HoodieInstant::getTimestamp));
+
+      validate(activeTimeline, queryInstant);
+
+      List<FileSlice> ret;
+      if (tableType.equals(HoodieTableType.MERGE_ON_READ) && 
queryType.equals(HoodieTableQueryType.SNAPSHOT)) {
+        ret = queryInstant.map(instant ->
+                fileSystemView.getLatestMergedFileSlicesBeforeOrOn(p.path, 
queryInstant.get())
+                    .collect(Collectors.toList())
+            )
+            .orElse(Collections.emptyList());
+      } else {
+        ret = queryInstant.map(instant ->
+                fileSystemView.getLatestFileSlicesBeforeOrOn(p.path, instant, 
true)
+            )
+            .orElse(fileSystemView.getLatestFileSlices(p.path))
+            .collect(Collectors.toList());
+      }
+
+      cachedFileSize += 
ret.stream().mapToLong(BaseHoodieTableFileIndex::fileSliceSize).sum();
+      return ret;
+    });
+  }
+
+  /**
+   * Get partition path with the given partition value
+   * @param partitionNames partition names
+   * @param values partition values
+   * @return partitions that match the given partition values
+   */
+  protected List<PartitionPath> getPartitionPaths(String[] partitionNames, 
String[] values) {
+    if (partitionNames.length == 0 || partitionNames.length != values.length) {
+      LOG.info("The input partition names or value is empty, fallback to 
return all partition paths");
+      return getAllQueryPartitionPaths();
+    }
+
+    if (cachedAllPartitionPaths != null) {
+      LOG.info("All partition paths have already loaded, use it directly");
+      return cachedAllPartitionPaths;
+    }
+
+    boolean hiveStylePartitioning = 
Boolean.parseBoolean(metaClient.getTableConfig().getHiveStylePartitioningEnable());
+    boolean urlEncodePartitioning = 
Boolean.parseBoolean(this.metaClient.getTableConfig().getUrlEncodePartitioning());
+    Map<String, Integer> partitionNamesToIdx = IntStream.range(0, 
partitionNames.length)
+        .mapToObj(i -> Pair.of(i, partitionNames[i]))
+        .collect(Collectors.toMap(Pair::getValue, Pair::getKey));
+    StringBuilder queryPartitionPath = new StringBuilder();
+    int idx = 0;
+    for (; idx < partitionNames.length; ++idx) {
+      String columnNames = this.partitionColumns[idx];
+      if (partitionNamesToIdx.containsKey(columnNames)) {
+        int k = partitionNamesToIdx.get(columnNames);
+        String value =  urlEncodePartitioning ? 
PartitionPathEncodeUtils.escapePathName(values[k]) : values[k];
+        queryPartitionPath.append(hiveStylePartitioning ? columnNames + "=" : 
"").append(value).append("/");

Review Comment:
   If i understood your intention correctly, then there would be a following 
problems: in the partition path ordering of the column is fixed, therefore if 
query specifies partition column predicates which only referenced columns that 
do not constitute the prefix of the actual partition path, then this would not 
work. For ex, let's say there are 2 partition columns A and B, and the path 
looks like `A=foo/B=bar`, but then the query only specifies predicate 
containing only B



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to