alexeykudinkin commented on code in PR #6680:
URL: https://github.com/apache/hudi/pull/6680#discussion_r980388571


##########
hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java:
##########
@@ -139,7 +144,11 @@ public BaseHoodieTableFileIndex(HoodieEngineContext 
engineContext,
     this.engineContext = engineContext;
     this.fileStatusCache = fileStatusCache;
 
-    doRefresh();
+    if (shouldRefresh) {

Review Comment:
   Let's add a comment explaining the difference b/w these branches 
(elaborating on lazy-semantic)



##########
hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java:
##########
@@ -180,15 +189,125 @@ public void close() throws Exception {
   }
 
   protected List<PartitionPath> getAllQueryPartitionPaths() {
+    if (this.cachedAllPartitionPaths != null) {

Review Comment:
   nit: We can avoid prefixing `this.` to fields when accessing; this is 
usually reserved to assign fields w/in ctors when there's local var w/ the same 
name is present in the scope



##########
hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java:
##########
@@ -180,15 +189,125 @@ public void close() throws Exception {
   }
 
   protected List<PartitionPath> getAllQueryPartitionPaths() {
+    if (this.cachedAllPartitionPaths != null) {
+      return this.cachedAllPartitionPaths;
+    }
+
+    loadAllQueryPartitionPaths();
+    return this.cachedAllPartitionPaths;
+  }
+
+  private void loadAllQueryPartitionPaths() {
     List<String> queryRelativePartitionPaths = queryPaths.stream()
         .map(path -> FSUtils.getRelativePartitionPath(basePath, path))
         .collect(Collectors.toList());
 
-    // Load all the partition path from the basePath, and filter by the query 
partition path.
-    // TODO load files from the queryRelativePartitionPaths directly.
-    List<String> matchedPartitionPaths = getAllPartitionPathsUnchecked()
-        .stream()
-        .filter(path -> 
queryRelativePartitionPaths.stream().anyMatch(path::startsWith))
+    this.cachedAllPartitionPaths = 
getQueryPartitionPaths(queryRelativePartitionPaths);
+
+    // If the partition value contains InternalRow.empty, we query it as a 
non-partitioned table.
+    this.queryAsNonePartitionedTable = 
this.cachedAllPartitionPaths.stream().anyMatch(p -> p.values.length == 0);
+  }
+
+  protected Map<PartitionPath, List<FileSlice>> getAllInputFileSlices() {
+    if (!isAllInputFileSlicesCached) {
+      doRefresh();
+    }
+    return cachedAllInputFileSlices;
+  }
+
+  /**
+   * Get input file slice for the given partition. Will use cache directly if 
it is computed before.
+   */
+  protected List<FileSlice> getCachedInputFileSlices(PartitionPath partition) {
+    return cachedAllInputFileSlices.computeIfAbsent(partition, 
this::loadInputFileSlicesOfPartition);
+  }
+
+  private List<FileSlice> loadInputFileSlicesOfPartition(PartitionPath p) {
+    FileStatus[] files = loadPartitionPathFiles(p);
+    HoodieTimeline activeTimeline = getActiveTimeline();
+    Option<HoodieInstant> latestInstant = activeTimeline.lastInstant();
+
+    HoodieTableFileSystemView fileSystemView = new 
HoodieTableFileSystemView(metaClient, activeTimeline, files);
+
+    Option<String> queryInstant = specifiedQueryInstant.or(() -> 
latestInstant.map(HoodieInstant::getTimestamp));
+
+    validate(activeTimeline, queryInstant);
+
+    List<FileSlice> ret;
+    if (tableType.equals(HoodieTableType.MERGE_ON_READ) && 
queryType.equals(HoodieTableQueryType.SNAPSHOT)) {
+      ret = queryInstant.map(instant ->
+              fileSystemView.getLatestMergedFileSlicesBeforeOrOn(p.path, 
queryInstant.get())
+                  .collect(Collectors.toList())
+          )
+          .orElse(Collections.emptyList());
+    } else {
+      ret = queryInstant.map(instant ->
+              fileSystemView.getLatestFileSlicesBeforeOrOn(p.path, instant, 
true)
+          )
+          .orElse(fileSystemView.getLatestFileSlices(p.path))
+          .collect(Collectors.toList());
+    }
+
+    cachedFileSize += 
ret.stream().mapToLong(BaseHoodieTableFileIndex::fileSliceSize).sum();
+    return ret;
+  }
+
+  /**
+   * Get partition path with the given partition value
+   * @param partitionNames partition names
+   * @param values partition values
+   * @return partitions that match the given partition values
+   */
+  protected List<PartitionPath> getPartitionPaths(String[] partitionNames, 
String[] values) {
+    if (partitionNames.length == 0 || partitionNames.length != values.length) {
+      LOG.info("The input partition names or value is empty, fallback to 
return all partition paths");
+      return getAllQueryPartitionPaths();
+    }
+
+    if (cachedAllPartitionPaths != null) {
+      LOG.info("All partition paths have already loaded, use it directly");
+      return cachedAllPartitionPaths;
+    }
+
+    boolean hiveStylePartitioning = 
Boolean.parseBoolean(metaClient.getTableConfig().getHiveStylePartitioningEnable());
+    boolean urlEncodePartitioning = 
Boolean.parseBoolean(this.metaClient.getTableConfig().getUrlEncodePartitioning());
+    Map<String, Integer> partitionNameToIdx = IntStream.range(0, 
partitionNames.length)
+        .mapToObj(i -> Pair.of(i, partitionNames[i]))
+        .collect(Collectors.toMap(Pair::getValue, Pair::getKey));
+    StringBuilder queryPartitionPath = new StringBuilder();
+    int idx = 0;
+    for (; idx < partitionNames.length; ++idx) {
+      String columnNames = this.partitionColumns[idx];
+      if (partitionNameToIdx.containsKey(columnNames)) {
+        int k = partitionNameToIdx.get(columnNames);
+        String value =  urlEncodePartitioning ? 
PartitionPathEncodeUtils.escapePathName(values[k]) : values[k];
+        queryPartitionPath.append(hiveStylePartitioning ? columnNames + "=" : 
"").append(value).append("/");
+      } else {
+        break;
+      }
+    }
+    queryPartitionPath.deleteCharAt(queryPartitionPath.length() - 1);
+    // Return directly if all partition values are specified.
+    if (idx == this.partitionColumns.length) {
+      return Collections.singletonList(new 
PartitionPath(queryPartitionPath.toString(), 
parsePartitionColumnValues(partitionColumns, queryPartitionPath.toString())));
+    }
+    // The predicate forms a prefix of partition path, do listing to the path 
only.
+    return 
getQueryPartitionPaths(Collections.singletonList(queryPartitionPath.toString()));
+  }
+
+  private List<PartitionPath> getQueryPartitionPaths(List<String> 
queryRelativePartitionPaths) {

Review Comment:
   nit: Let's rename this to `listQueryPartitionPath` to make it apparent that 
this will entail MT/FS listing operation



##########
hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java:
##########
@@ -180,15 +189,125 @@ public void close() throws Exception {
   }
 
   protected List<PartitionPath> getAllQueryPartitionPaths() {
+    if (this.cachedAllPartitionPaths != null) {
+      return this.cachedAllPartitionPaths;
+    }
+
+    loadAllQueryPartitionPaths();
+    return this.cachedAllPartitionPaths;
+  }
+
+  private void loadAllQueryPartitionPaths() {
     List<String> queryRelativePartitionPaths = queryPaths.stream()
         .map(path -> FSUtils.getRelativePartitionPath(basePath, path))
         .collect(Collectors.toList());
 
-    // Load all the partition path from the basePath, and filter by the query 
partition path.
-    // TODO load files from the queryRelativePartitionPaths directly.
-    List<String> matchedPartitionPaths = getAllPartitionPathsUnchecked()
-        .stream()
-        .filter(path -> 
queryRelativePartitionPaths.stream().anyMatch(path::startsWith))
+    this.cachedAllPartitionPaths = 
getQueryPartitionPaths(queryRelativePartitionPaths);
+
+    // If the partition value contains InternalRow.empty, we query it as a 
non-partitioned table.
+    this.queryAsNonePartitionedTable = 
this.cachedAllPartitionPaths.stream().anyMatch(p -> p.values.length == 0);
+  }
+
+  protected Map<PartitionPath, List<FileSlice>> getAllInputFileSlices() {
+    if (!isAllInputFileSlicesCached) {
+      doRefresh();
+    }
+    return cachedAllInputFileSlices;
+  }
+
+  /**
+   * Get input file slice for the given partition. Will use cache directly if 
it is computed before.
+   */
+  protected List<FileSlice> getCachedInputFileSlices(PartitionPath partition) {
+    return cachedAllInputFileSlices.computeIfAbsent(partition, 
this::loadInputFileSlicesOfPartition);
+  }
+
+  private List<FileSlice> loadInputFileSlicesOfPartition(PartitionPath p) {
+    FileStatus[] files = loadPartitionPathFiles(p);
+    HoodieTimeline activeTimeline = getActiveTimeline();
+    Option<HoodieInstant> latestInstant = activeTimeline.lastInstant();
+
+    HoodieTableFileSystemView fileSystemView = new 
HoodieTableFileSystemView(metaClient, activeTimeline, files);
+
+    Option<String> queryInstant = specifiedQueryInstant.or(() -> 
latestInstant.map(HoodieInstant::getTimestamp));

Review Comment:
   Let's create a `Lazy` field for this one (this is not supposed to change for 
given FileIndex instance)



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala:
##########
@@ -165,7 +164,7 @@ case class HoodieFileIndex(spark: SparkSession,
 
       logInfo(s"Total base files: $totalFileSize; " +
         s"candidate files after data skipping : $candidateFileSize; " +
-        s"skipping percent ${if (allFiles.nonEmpty && totalFileSize > 0) 
(totalFileSize - candidateFileSize) / totalFileSize.toDouble else 0}")
+        s"skipping percent ${if (isAllInputFileSlicesCached && 
allFiles.nonEmpty && totalFileSize > 0) (totalFileSize - candidateFileSize) / 
totalFileSize.toDouble else 0}")

Review Comment:
   If we're loading lazily let's instead output `"is disabled"`



##########
hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java:
##########
@@ -180,15 +189,125 @@ public void close() throws Exception {
   }
 
   protected List<PartitionPath> getAllQueryPartitionPaths() {
+    if (this.cachedAllPartitionPaths != null) {
+      return this.cachedAllPartitionPaths;
+    }
+
+    loadAllQueryPartitionPaths();
+    return this.cachedAllPartitionPaths;
+  }
+
+  private void loadAllQueryPartitionPaths() {
     List<String> queryRelativePartitionPaths = queryPaths.stream()
         .map(path -> FSUtils.getRelativePartitionPath(basePath, path))
         .collect(Collectors.toList());
 
-    // Load all the partition path from the basePath, and filter by the query 
partition path.
-    // TODO load files from the queryRelativePartitionPaths directly.
-    List<String> matchedPartitionPaths = getAllPartitionPathsUnchecked()
-        .stream()
-        .filter(path -> 
queryRelativePartitionPaths.stream().anyMatch(path::startsWith))
+    this.cachedAllPartitionPaths = 
getQueryPartitionPaths(queryRelativePartitionPaths);
+
+    // If the partition value contains InternalRow.empty, we query it as a 
non-partitioned table.
+    this.queryAsNonePartitionedTable = 
this.cachedAllPartitionPaths.stream().anyMatch(p -> p.values.length == 0);
+  }
+
+  protected Map<PartitionPath, List<FileSlice>> getAllInputFileSlices() {
+    if (!isAllInputFileSlicesCached) {
+      doRefresh();
+    }
+    return cachedAllInputFileSlices;
+  }
+
+  /**
+   * Get input file slice for the given partition. Will use cache directly if 
it is computed before.
+   */
+  protected List<FileSlice> getCachedInputFileSlices(PartitionPath partition) {
+    return cachedAllInputFileSlices.computeIfAbsent(partition, 
this::loadInputFileSlicesOfPartition);
+  }
+
+  private List<FileSlice> loadInputFileSlicesOfPartition(PartitionPath p) {

Review Comment:
   nit: `loadFileSlicesForPartition`



##########
hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java:
##########
@@ -180,15 +189,125 @@ public void close() throws Exception {
   }
 
   protected List<PartitionPath> getAllQueryPartitionPaths() {
+    if (this.cachedAllPartitionPaths != null) {
+      return this.cachedAllPartitionPaths;
+    }
+
+    loadAllQueryPartitionPaths();
+    return this.cachedAllPartitionPaths;
+  }
+
+  private void loadAllQueryPartitionPaths() {
     List<String> queryRelativePartitionPaths = queryPaths.stream()
         .map(path -> FSUtils.getRelativePartitionPath(basePath, path))
         .collect(Collectors.toList());
 
-    // Load all the partition path from the basePath, and filter by the query 
partition path.
-    // TODO load files from the queryRelativePartitionPaths directly.
-    List<String> matchedPartitionPaths = getAllPartitionPathsUnchecked()
-        .stream()
-        .filter(path -> 
queryRelativePartitionPaths.stream().anyMatch(path::startsWith))
+    this.cachedAllPartitionPaths = 
getQueryPartitionPaths(queryRelativePartitionPaths);
+
+    // If the partition value contains InternalRow.empty, we query it as a 
non-partitioned table.
+    this.queryAsNonePartitionedTable = 
this.cachedAllPartitionPaths.stream().anyMatch(p -> p.values.length == 0);
+  }
+
+  protected Map<PartitionPath, List<FileSlice>> getAllInputFileSlices() {
+    if (!isAllInputFileSlicesCached) {
+      doRefresh();
+    }
+    return cachedAllInputFileSlices;
+  }
+
+  /**
+   * Get input file slice for the given partition. Will use cache directly if 
it is computed before.
+   */
+  protected List<FileSlice> getCachedInputFileSlices(PartitionPath partition) {
+    return cachedAllInputFileSlices.computeIfAbsent(partition, 
this::loadInputFileSlicesOfPartition);
+  }
+
+  private List<FileSlice> loadInputFileSlicesOfPartition(PartitionPath p) {
+    FileStatus[] files = loadPartitionPathFiles(p);
+    HoodieTimeline activeTimeline = getActiveTimeline();
+    Option<HoodieInstant> latestInstant = activeTimeline.lastInstant();
+
+    HoodieTableFileSystemView fileSystemView = new 
HoodieTableFileSystemView(metaClient, activeTimeline, files);
+
+    Option<String> queryInstant = specifiedQueryInstant.or(() -> 
latestInstant.map(HoodieInstant::getTimestamp));
+
+    validate(activeTimeline, queryInstant);
+
+    List<FileSlice> ret;
+    if (tableType.equals(HoodieTableType.MERGE_ON_READ) && 
queryType.equals(HoodieTableQueryType.SNAPSHOT)) {
+      ret = queryInstant.map(instant ->
+              fileSystemView.getLatestMergedFileSlicesBeforeOrOn(p.path, 
queryInstant.get())
+                  .collect(Collectors.toList())
+          )
+          .orElse(Collections.emptyList());
+    } else {
+      ret = queryInstant.map(instant ->
+              fileSystemView.getLatestFileSlicesBeforeOrOn(p.path, instant, 
true)
+          )
+          .orElse(fileSystemView.getLatestFileSlices(p.path))
+          .collect(Collectors.toList());
+    }
+
+    cachedFileSize += 
ret.stream().mapToLong(BaseHoodieTableFileIndex::fileSliceSize).sum();
+    return ret;
+  }
+
+  /**
+   * Get partition path with the given partition value
+   * @param partitionNames partition names
+   * @param values partition values
+   * @return partitions that match the given partition values
+   */
+  protected List<PartitionPath> getPartitionPaths(String[] partitionNames, 
String[] values) {
+    if (partitionNames.length == 0 || partitionNames.length != values.length) {
+      LOG.info("The input partition names or value is empty, fallback to 
return all partition paths");
+      return getAllQueryPartitionPaths();
+    }
+
+    if (cachedAllPartitionPaths != null) {
+      LOG.info("All partition paths have already loaded, use it directly");
+      return cachedAllPartitionPaths;
+    }
+
+    boolean hiveStylePartitioning = 
Boolean.parseBoolean(metaClient.getTableConfig().getHiveStylePartitioningEnable());
+    boolean urlEncodePartitioning = 
Boolean.parseBoolean(this.metaClient.getTableConfig().getUrlEncodePartitioning());

Review Comment:
   Let's add a tracking task to consolidate this logic w/in key-gens



##########
hudi-common/src/main/java/org/apache/hudi/metadata/FileSystemBackedTableMetadata.java:
##########
@@ -74,8 +75,13 @@ public List<String> getAllPartitionPaths() throws 
IOException {
       return FSUtils.getAllPartitionFoldersThreeLevelsDown(fs, 
datasetBasePath);
     }
 
+    return getPartitionPathsWithPrefix("");
+  }
+
+  @Override
+  public List<String> getPartitionPathsWithPrefix(String prefix) throws 
IOException {
     List<Path> pathsToList = new CopyOnWriteArrayList<>();
-    pathsToList.add(basePath);
+    pathsToList.add(StringUtils.isNullOrEmpty(prefix) ? new 
Path(datasetBasePath) : new Path(datasetBasePath, prefix));

Review Comment:
   This implementation currently assumes that either a) prefix is "" or b) 
prefix is an actual relative path (instead of, say, prefix of the name of the 
folder) which could be misleading
   
   Instead, i'd suggest we do the following:
   
    - Let's create a separate method 
`getPartitionPathsWithPrefix(Option<String> prefix)`
    - Not allow empty prefixes to be passed in
    - Make sure that implementation can handle actual prefixes (instead of 
currently digesting only the full-paths)



##########
hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java:
##########
@@ -180,15 +185,118 @@ public void close() throws Exception {
   }
 
   protected List<PartitionPath> getAllQueryPartitionPaths() {
+    if (this.cachedAllPartitionPaths != null) {
+      return this.cachedAllPartitionPaths;
+    }
+
     List<String> queryRelativePartitionPaths = queryPaths.stream()
         .map(path -> FSUtils.getRelativePartitionPath(basePath, path))
         .collect(Collectors.toList());
 
-    // Load all the partition path from the basePath, and filter by the query 
partition path.
-    // TODO load files from the queryRelativePartitionPaths directly.
-    List<String> matchedPartitionPaths = getAllPartitionPathsUnchecked()
-        .stream()
-        .filter(path -> 
queryRelativePartitionPaths.stream().anyMatch(path::startsWith))
+    this.cachedAllPartitionPaths = 
getQueryPartitionPaths(queryRelativePartitionPaths);
+
+    // If the partition value contains InternalRow.empty, we query it as a 
non-partitioned table.
+    this.queryAsNonePartitionedTable = 
this.cachedAllPartitionPaths.stream().anyMatch(p -> p.values.length == 0);
+    return this.cachedAllPartitionPaths;
+  }
+
+  protected Map<PartitionPath, List<FileSlice>> getAllInputFileSlices() {
+    if (!isAllInputFileSlicesCached) {

Review Comment:
   Let's also add a comment explaining why we're doing full refresh here in 
that case



##########
hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java:
##########
@@ -180,15 +185,118 @@ public void close() throws Exception {
   }
 
   protected List<PartitionPath> getAllQueryPartitionPaths() {
+    if (this.cachedAllPartitionPaths != null) {
+      return this.cachedAllPartitionPaths;
+    }
+
     List<String> queryRelativePartitionPaths = queryPaths.stream()
         .map(path -> FSUtils.getRelativePartitionPath(basePath, path))
         .collect(Collectors.toList());
 
-    // Load all the partition path from the basePath, and filter by the query 
partition path.
-    // TODO load files from the queryRelativePartitionPaths directly.
-    List<String> matchedPartitionPaths = getAllPartitionPathsUnchecked()
-        .stream()
-        .filter(path -> 
queryRelativePartitionPaths.stream().anyMatch(path::startsWith))
+    this.cachedAllPartitionPaths = 
getQueryPartitionPaths(queryRelativePartitionPaths);
+
+    // If the partition value contains InternalRow.empty, we query it as a 
non-partitioned table.
+    this.queryAsNonePartitionedTable = 
this.cachedAllPartitionPaths.stream().anyMatch(p -> p.values.length == 0);
+    return this.cachedAllPartitionPaths;
+  }
+
+  protected Map<PartitionPath, List<FileSlice>> getAllInputFileSlices() {
+    if (!isAllInputFileSlicesCached) {
+      doRefresh();
+    }
+    return cachedAllInputFileSlices;
+  }
+
+  /**
+   * Get input file slice for the given partition. Will use cache directly if 
it is computed before.
+   */
+  protected List<FileSlice> getCachedInputFileSlices(PartitionPath partition) {
+    return cachedAllInputFileSlices.computeIfAbsent(partition, (p) -> {
+      FileStatus[] files = loadPartitionPathFiles(p);
+      HoodieTimeline activeTimeline = getActiveTimeline();
+      Option<HoodieInstant> latestInstant = activeTimeline.lastInstant();
+
+      HoodieTableFileSystemView fileSystemView = new 
HoodieTableFileSystemView(metaClient, activeTimeline, files);
+
+      Option<String> queryInstant = specifiedQueryInstant.or(() -> 
latestInstant.map(HoodieInstant::getTimestamp));
+
+      validate(activeTimeline, queryInstant);
+
+      List<FileSlice> ret;
+      if (tableType.equals(HoodieTableType.MERGE_ON_READ) && 
queryType.equals(HoodieTableQueryType.SNAPSHOT)) {
+        ret = queryInstant.map(instant ->
+                fileSystemView.getLatestMergedFileSlicesBeforeOrOn(p.path, 
queryInstant.get())
+                    .collect(Collectors.toList())
+            )
+            .orElse(Collections.emptyList());
+      } else {
+        ret = queryInstant.map(instant ->
+                fileSystemView.getLatestFileSlicesBeforeOrOn(p.path, instant, 
true)
+            )
+            .orElse(fileSystemView.getLatestFileSlices(p.path))
+            .collect(Collectors.toList());
+      }
+
+      cachedFileSize += 
ret.stream().mapToLong(BaseHoodieTableFileIndex::fileSliceSize).sum();
+      return ret;
+    });
+  }
+
+  /**
+   * Get partition path with the given partition value
+   * @param partitionNames partition names
+   * @param values partition values
+   * @return partitions that match the given partition values
+   */
+  protected List<PartitionPath> getPartitionPaths(String[] partitionNames, 
String[] values) {
+    if (partitionNames.length == 0 || partitionNames.length != values.length) {
+      LOG.info("The input partition names or value is empty, fallback to 
return all partition paths");
+      return getAllQueryPartitionPaths();
+    }
+
+    if (cachedAllPartitionPaths != null) {
+      LOG.info("All partition paths have already loaded, use it directly");
+      return cachedAllPartitionPaths;
+    }
+
+    boolean hiveStylePartitioning = 
Boolean.parseBoolean(metaClient.getTableConfig().getHiveStylePartitioningEnable());
+    boolean urlEncodePartitioning = 
Boolean.parseBoolean(this.metaClient.getTableConfig().getUrlEncodePartitioning());
+    Map<String, Integer> partitionNamesToIdx = IntStream.range(0, 
partitionNames.length)
+        .mapToObj(i -> Pair.of(i, partitionNames[i]))
+        .collect(Collectors.toMap(Pair::getValue, Pair::getKey));
+    StringBuilder queryPartitionPath = new StringBuilder();
+    int idx = 0;
+    for (; idx < partitionNames.length; ++idx) {
+      String columnNames = this.partitionColumns[idx];
+      if (partitionNamesToIdx.containsKey(columnNames)) {
+        int k = partitionNamesToIdx.get(columnNames);
+        String value =  urlEncodePartitioning ? 
PartitionPathEncodeUtils.escapePathName(values[k]) : values[k];
+        queryPartitionPath.append(hiveStylePartitioning ? columnNames + "=" : 
"").append(value).append("/");
+      } else {
+        break;
+      }
+    }
+    queryPartitionPath.deleteCharAt(queryPartitionPath.length() - 1);
+    // Return directly if all partition values are specified.
+    if (idx == this.partitionColumns.length) {

Review Comment:
   We don't need to use `idx` for it (we can use `partitionNames.length`)



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala:
##########
@@ -187,12 +189,16 @@ class SparkHoodieTableFileIndex(spark: SparkSession,
    * @param predicates     The filter condition.
    * @return The pruned partition paths.
    */
-  protected def prunePartition(partitionPaths: Seq[PartitionPath], predicates: 
Seq[Expression]): Seq[PartitionPath] = {
+  protected def prunePartition(predicates: Seq[Expression]): 
Seq[PartitionPath] = {

Review Comment:
   Let's rename this to `listMatchingPartitionPaths` (current name is 
misaligned after signature change)



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala:
##########
@@ -209,7 +215,8 @@ class SparkHoodieTableFileIndex(spark: SparkSession,
         s" after partition prune size is: ${prunedPartitionPaths.size}")
       prunedPartitionPaths
     } else {
-      partitionPaths
+      logInfo(s"No partition predicate provided, total partition size is: 
${getAllQueryPartitionPaths.asScala.size}")

Review Comment:
   Let's make sure we add comments elaborating on the lazy and non-lazy listing 
flows



##########
hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java:
##########
@@ -323,30 +424,13 @@ private void doRefresh() {
         .mapToLong(BaseHoodieTableFileIndex::fileSliceSize)
         .sum();
 
-    // If the partition value contains InternalRow.empty, we query it as a 
non-partitioned table.
-    queryAsNonePartitionedTable = partitionFiles.keySet().stream().anyMatch(p 
-> p.values.length == 0);

Review Comment:
   Let's actually replace this flag w/ `isPartitionedTable` predicate (sorry, 
it's my bad that i missed it cleaning it up last time)



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala:
##########
@@ -138,6 +138,14 @@ object DataSourceReadOptions {
         " read from the data file (in Hudi partition columns are persisted by 
default)." +
         " This config is a fallback allowing to preserve existing behavior, 
and should not be used otherwise.")
 
+  val REFRESH_PARTITION_AND_FILES_IN_INITIALIZATION: ConfigProperty[Boolean] =

Review Comment:
   I don't think we need a config for that, instead:
   
   - Lazy loading should be default behavior
   - We should add a config that can override default behavior and always load 
eagerly (fallback)



##########
hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java:
##########
@@ -180,15 +185,118 @@ public void close() throws Exception {
   }
 
   protected List<PartitionPath> getAllQueryPartitionPaths() {
+    if (this.cachedAllPartitionPaths != null) {
+      return this.cachedAllPartitionPaths;
+    }
+
     List<String> queryRelativePartitionPaths = queryPaths.stream()
         .map(path -> FSUtils.getRelativePartitionPath(basePath, path))
         .collect(Collectors.toList());
 
-    // Load all the partition path from the basePath, and filter by the query 
partition path.
-    // TODO load files from the queryRelativePartitionPaths directly.
-    List<String> matchedPartitionPaths = getAllPartitionPathsUnchecked()
-        .stream()
-        .filter(path -> 
queryRelativePartitionPaths.stream().anyMatch(path::startsWith))
+    this.cachedAllPartitionPaths = 
getQueryPartitionPaths(queryRelativePartitionPaths);
+
+    // If the partition value contains InternalRow.empty, we query it as a 
non-partitioned table.
+    this.queryAsNonePartitionedTable = 
this.cachedAllPartitionPaths.stream().anyMatch(p -> p.values.length == 0);
+    return this.cachedAllPartitionPaths;
+  }
+
+  protected Map<PartitionPath, List<FileSlice>> getAllInputFileSlices() {
+    if (!isAllInputFileSlicesCached) {

Review Comment:
   Understood. Let's rename this flag to actually associate it w/ lazy-loading 
semantic: `shouldRefreshLazily`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to