nsivabalan commented on code in PR #7690:
URL: https://github.com/apache/hudi/pull/7690#discussion_r1084652972


##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/savepoint/SavepointActionExecutor.java:
##########
@@ -84,15 +84,32 @@ public HoodieSavepointMetadata execute() {
           "Could not savepoint commit " + instantTime + " as this is beyond 
the lookup window " + lastCommitRetained);
 
       context.setJobStatus(this.getClass().getSimpleName(), "Collecting latest 
files for savepoint " + instantTime + " " + table.getConfig().getTableName());
-      List<String> partitions = FSUtils.getAllPartitionPaths(context, 
config.getMetadataConfig(), table.getMetaClient().getBasePath());
-      Map<String, List<String>> latestFilesMap = context.mapToPair(partitions, 
partitionPath -> {
-        // Scan all partitions files with this commit time
-        LOG.info("Collecting latest files in partition path " + partitionPath);
-        TableFileSystemView.BaseFileOnlyView view = 
table.getBaseFileOnlyView();
-        List<String> latestFiles = 
view.getLatestBaseFilesBeforeOrOn(partitionPath, instantTime)
-            .map(HoodieBaseFile::getFileName).collect(Collectors.toList());
-        return new ImmutablePair<>(partitionPath, latestFiles);
-      }, null);
+      TableFileSystemView.BaseFileOnlyView view = table.getBaseFileOnlyView();
+
+      Map<String, List<String>> latestFilesMap;
+      // NOTE: for performance, we have to use different logic here for 
listing the latest files
+      // before or on the given instant:
+      // (1) using metadata-table-based file listing: instead of parallelizing 
the partition
+      // listing which incurs unnecessary metadata table reads, we directly 
read the metadata
+      // table once in a batch manner through the timeline server;
+      // (2) using direct file system listing:  we parallelize the partition 
listing so that
+      // each partition can be listed on the file system concurrently through 
Spark.
+      if (config.getMetadataConfig().enabled()) {

Review Comment:
   likely there are other code paths in hudi where we can employ similar 
optimizations. can we file a follow up ticket to triage them and tackle



##########
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestBase.java:
##########
@@ -157,8 +156,8 @@ public HoodieWriteConfig.Builder getConfigBuilder(String 
schemaStr, IndexType in
         
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(indexType).build())
         
.withEmbeddedTimelineServerEnabled(true).withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder()
             .withEnableBackupForRemoteFileSystemView(false) // Fail test if 
problem connecting to timeline-server
-            .withRemoteServerPort(timelineServicePort)
-            
.withStorageType(FileSystemViewStorageType.EMBEDDED_KV_STORE).build());
+            .withRemoteServerPort(timelineServicePort).build());
+    //.withStorageType(FileSystemViewStorageType.EMBEDDED_KV_STORE)

Review Comment:
   why commented out? 



##########
hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java:
##########
@@ -81,6 +86,8 @@ public abstract class AbstractTableFileSystemView implements 
SyncableFileSystemV
   private static final Logger LOG = 
LogManager.getLogger(AbstractTableFileSystemView.class);
 
   protected HoodieTableMetaClient metaClient;
+  // TODO: to pass in the actual config
+  protected boolean assumeDatePartitioning = false;

Review Comment:
   tracking ticket please. 



##########
hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java:
##########
@@ -550,6 +674,28 @@ public final Stream<HoodieBaseFile> 
getLatestBaseFilesBeforeOrOn(String partitio
     }
   }
 
+  @Override

Review Comment:
   should we override this from within MetadataFileSystemView and throw from 
within AbstractTableFileSystemView ?



##########
hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java:
##########
@@ -289,6 +296,123 @@ private void clear() {
    */
   protected abstract void resetViewState();
 
+  /**
+   * Batch loading all the partitions if needed.
+   *
+   * @return A list of relative partition paths of all partitions.
+   */
+  private List<String> ensureAllPartitionsLoadedCorrectly() {

Review Comment:
   left a msg below. we should keep this within MetadataFSV. should not let FS 
based listing call in these by mistake. 



##########
hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java:
##########
@@ -319,6 +320,68 @@ public static List<String> 
getAllPartitionPaths(HoodieEngineContext engineContex
     }
   }
 
+  /**
+   * Gets all partition paths in a table, based on a relative partition path 
prefix.
+   *
+   * @param engineContext {@link HoodieEngineContext} instance for file 
listing.
+   * @param hadoopConf    Hadoop configuration.
+   * @param basePath      Base path of the table.
+   * @param prefix        Relative partition path prefix for listing.
+   * @param parallelism   Listing parallelism to use if the engine context 
supports it.
+   * @return A list of  all partition paths with the prefix.
+   * @throws IOException upon error.
+   */
+  public static List<String> getPartitionPathsWithPrefix(HoodieEngineContext 
engineContext,

Review Comment:
   we have FSUtils.getAllPartitionPaths(engineContext, basePath, false, 
cfg.assumeDatePartitioning) already. Don't think calling prefix based api is 
the right thing to do here. 



##########
hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java:
##########
@@ -289,6 +296,123 @@ private void clear() {
    */
   protected abstract void resetViewState();
 
+  /**
+   * Batch loading all the partitions if needed.
+   *
+   * @return A list of relative partition paths of all partitions.
+   */
+  private List<String> ensureAllPartitionsLoadedCorrectly() {
+    ValidationUtils.checkArgument(!isClosed(), "View is already closed");
+    try {
+      List<String> formattedPartitionList = getAllPartitionPaths().stream()
+          .map(this::formatPartitionKey).collect(Collectors.toList());
+      ensurePartitionsLoadedCorrectly(formattedPartitionList);
+      return formattedPartitionList;
+    } catch (IOException e) {
+      throw new HoodieIOException("Failed to get all partition paths", e);
+    }
+  }
+
+  /**
+   * Allows lazily loading the partitions if needed.
+   *
+   * @param partitionList list of partitions to be loaded if not present.
+   */
+  private void ensurePartitionsLoadedCorrectly(List<String> partitionList) {
+
+    ValidationUtils.checkArgument(!isClosed(), "View is already closed");
+
+    Set<String> partitionSet = new HashSet<>();
+    partitionList.forEach(partition ->
+        addedPartitions.computeIfAbsent(partition, partitionPathStr -> {
+          if (!isPartitionAvailableInStore(partitionPathStr)) {
+            partitionSet.add(partitionPathStr);
+          }
+          return true;
+        })
+    );
+
+    if (!partitionSet.isEmpty()) {
+      long beginTs = System.currentTimeMillis();
+      // Not loaded yet
+      try {
+        LOG.info("Building file system view for partitions " + partitionSet);
+
+        // Pairs of relative partition path and absolute partition path
+        List<Pair<String, Path>> absolutePartitionPathList = 
partitionSet.stream()
+            .map(partition -> Pair.of(
+                partition, 
FSUtils.getPartitionPath(metaClient.getBasePathV2(), partition)))
+            .collect(Collectors.toList());
+        long beginLsTs = System.currentTimeMillis();
+        Map<Pair<String, Path>, FileStatus[]> statusesMap =
+            listPartitions(absolutePartitionPathList);
+        long endLsTs = System.currentTimeMillis();
+        LOG.debug("Time taken to list partitions " + partitionSet + " =" + 
(endLsTs - beginLsTs));
+        statusesMap.forEach((partitionPair, statuses) -> {
+          String relativePartitionStr = partitionPair.getLeft();
+          List<HoodieFileGroup> groups = addFilesToView(statuses);
+          if (groups.isEmpty()) {
+            storePartitionView(relativePartitionStr, new ArrayList<>());
+          }
+          LOG.debug("#files found in partition (" + relativePartitionStr + ") 
=" + statuses.length);
+        });
+      } catch (IOException e) {
+        throw new HoodieIOException("Failed to list base files in partitions " 
+ partitionSet, e);
+      }
+      long endTs = System.currentTimeMillis();
+      LOG.debug("Time to load partition " + partitionSet + " =" + (endTs - 
beginTs));
+    }
+  }
+
+  /***
+   * @return A list of relative partition paths of all partitions.
+   * @throws IOException upon error.
+   */
+  protected List<String> getAllPartitionPaths() throws IOException {

Review Comment:
   let's not support these apis for FS based listing. 



##########
hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java:
##########
@@ -550,6 +674,28 @@ public final Stream<HoodieBaseFile> 
getLatestBaseFilesBeforeOrOn(String partitio
     }
   }
 
+  @Override
+  public final Map<String, Stream<HoodieBaseFile>> 
getAllLatestBaseFilesBeforeOrOn(String maxCommitTime) {
+    try {
+      readLock.lock();
+
+      List<String> formattedPartitionList = 
ensureAllPartitionsLoadedCorrectly();
+      return formattedPartitionList.stream().collect(Collectors.toMap(
+          Function.identity(),
+          partitionPath -> fetchAllStoredFileGroups(partitionPath)
+              .filter(fileGroup -> 
!isFileGroupReplacedBeforeOrOn(fileGroup.getFileGroupId(), maxCommitTime))

Review Comment:
   is it possible to reuse `getLatestBaseFilesBeforeOrOn(String partitionStr, 
String maxCommitTime)`  to avoid code duplication. just incase we make some bug 
fix in the other method, might forget to fix here.



##########
hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java:
##########
@@ -596,8 +742,8 @@ public final Stream<HoodieBaseFile> 
getLatestBaseFilesInRange(List<String> commi
       return fetchAllStoredFileGroups()
           .filter(fileGroup -> 
!isFileGroupReplacedBeforeAny(fileGroup.getFileGroupId(), commitsToReturn))
           .map(fileGroup -> Pair.of(fileGroup.getFileGroupId(), 
Option.fromJavaOptional(
-          fileGroup.getAllBaseFiles().filter(baseFile -> 
commitsToReturn.contains(baseFile.getCommitTime())
-              && !isBaseFileDueToPendingCompaction(baseFile) && 
!isBaseFileDueToPendingClustering(baseFile)).findFirst()))).filter(p -> 
p.getValue().isPresent())
+              fileGroup.getAllBaseFiles().filter(baseFile -> 
commitsToReturn.contains(baseFile.getCommitTime())

Review Comment:
   can you please revert unintentional changes. will be easy to review



##########
hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java:
##########
@@ -289,6 +296,123 @@ private void clear() {
    */
   protected abstract void resetViewState();
 
+  /**
+   * Batch loading all the partitions if needed.
+   *
+   * @return A list of relative partition paths of all partitions.
+   */
+  private List<String> ensureAllPartitionsLoadedCorrectly() {
+    ValidationUtils.checkArgument(!isClosed(), "View is already closed");
+    try {
+      List<String> formattedPartitionList = getAllPartitionPaths().stream()
+          .map(this::formatPartitionKey).collect(Collectors.toList());
+      ensurePartitionsLoadedCorrectly(formattedPartitionList);
+      return formattedPartitionList;
+    } catch (IOException e) {
+      throw new HoodieIOException("Failed to get all partition paths", e);
+    }
+  }
+
+  /**
+   * Allows lazily loading the partitions if needed.
+   *
+   * @param partitionList list of partitions to be loaded if not present.
+   */
+  private void ensurePartitionsLoadedCorrectly(List<String> partitionList) {
+
+    ValidationUtils.checkArgument(!isClosed(), "View is already closed");
+
+    Set<String> partitionSet = new HashSet<>();
+    partitionList.forEach(partition ->
+        addedPartitions.computeIfAbsent(partition, partitionPathStr -> {

Review Comment:
   we should add entries to addedPartitions only when the file groups 
pertaining to a particular partition is already loaded. there could be 
concurrent access to this map (addedPartitions). But in this impl. I see we 
first add them to the map add later listing files and populate the file groups. 
We might need to fix that.
   



##########
hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java:
##########
@@ -289,6 +296,123 @@ private void clear() {
    */
   protected abstract void resetViewState();
 
+  /**
+   * Batch loading all the partitions if needed.
+   *
+   * @return A list of relative partition paths of all partitions.
+   */
+  private List<String> ensureAllPartitionsLoadedCorrectly() {
+    ValidationUtils.checkArgument(!isClosed(), "View is already closed");
+    try {
+      List<String> formattedPartitionList = getAllPartitionPaths().stream()
+          .map(this::formatPartitionKey).collect(Collectors.toList());
+      ensurePartitionsLoadedCorrectly(formattedPartitionList);
+      return formattedPartitionList;
+    } catch (IOException e) {
+      throw new HoodieIOException("Failed to get all partition paths", e);
+    }
+  }
+
+  /**
+   * Allows lazily loading the partitions if needed.
+   *
+   * @param partitionList list of partitions to be loaded if not present.
+   */
+  private void ensurePartitionsLoadedCorrectly(List<String> partitionList) {
+
+    ValidationUtils.checkArgument(!isClosed(), "View is already closed");
+
+    Set<String> partitionSet = new HashSet<>();
+    partitionList.forEach(partition ->
+        addedPartitions.computeIfAbsent(partition, partitionPathStr -> {
+          if (!isPartitionAvailableInStore(partitionPathStr)) {
+            partitionSet.add(partitionPathStr);
+          }
+          return true;
+        })
+    );
+
+    if (!partitionSet.isEmpty()) {
+      long beginTs = System.currentTimeMillis();
+      // Not loaded yet
+      try {
+        LOG.info("Building file system view for partitions " + partitionSet);
+
+        // Pairs of relative partition path and absolute partition path
+        List<Pair<String, Path>> absolutePartitionPathList = 
partitionSet.stream()
+            .map(partition -> Pair.of(
+                partition, 
FSUtils.getPartitionPath(metaClient.getBasePathV2(), partition)))
+            .collect(Collectors.toList());
+        long beginLsTs = System.currentTimeMillis();
+        Map<Pair<String, Path>, FileStatus[]> statusesMap =
+            listPartitions(absolutePartitionPathList);
+        long endLsTs = System.currentTimeMillis();
+        LOG.debug("Time taken to list partitions " + partitionSet + " =" + 
(endLsTs - beginLsTs));
+        statusesMap.forEach((partitionPair, statuses) -> {
+          String relativePartitionStr = partitionPair.getLeft();
+          List<HoodieFileGroup> groups = addFilesToView(statuses);
+          if (groups.isEmpty()) {
+            storePartitionView(relativePartitionStr, new ArrayList<>());
+          }
+          LOG.debug("#files found in partition (" + relativePartitionStr + ") 
=" + statuses.length);
+        });
+      } catch (IOException e) {
+        throw new HoodieIOException("Failed to list base files in partitions " 
+ partitionSet, e);
+      }
+      long endTs = System.currentTimeMillis();
+      LOG.debug("Time to load partition " + partitionSet + " =" + (endTs - 
beginTs));
+    }
+  }
+
+  /***
+   * @return A list of relative partition paths of all partitions.
+   * @throws IOException upon error.
+   */
+  protected List<String> getAllPartitionPaths() throws IOException {
+    // TODO: integrate the direct FS listing with the actual engine context
+    LOG.warn("Getting all partition paths with file system listing 
sequentially can be very slow. "
+        + "This should not be invoked.");
+    String bathPath = metaClient.getBasePathV2().toString();
+    FileSystem fs = new 
Path(bathPath).getFileSystem(metaClient.getHadoopConf());
+    if (assumeDatePartitioning) {
+      return FSUtils.getAllPartitionFoldersThreeLevelsDown(fs, bathPath);
+    }
+    return FSUtils.getPartitionPathsWithPrefix(

Review Comment:
   calling prefix based apis to fetch all partition paths does not look right. 
lets see how we can fix this properly. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to