yihua commented on code in PR #11805:
URL: https://github.com/apache/hudi/pull/11805#discussion_r1744825114


##########
hudi-io/src/main/java/org/apache/hudi/storage/HoodieStorage.java:
##########
@@ -426,6 +426,26 @@ public List<StoragePathInfo> 
listDirectEntries(List<StoragePath> pathList) throw
     return result;
   }
 
+  /**
+   * Lists the file info of the direct files/directories in the given list of 
paths
+   * and filters the results, if the paths are directory.
+   *
+   * @param pathList given path list.
+   * @param filter filter to apply.
+   * @return the list of path info of the files/directories in the given paths.
+   * @throws FileNotFoundException when the path does not exist.
+   * @throws IOException           IO error.
+   */
+  @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
+  public List<StoragePathInfo> listDirectEntries(List<StoragePath> pathList,
+                                                 StoragePathFilter filter) 
throws IOException {
+    List<StoragePathInfo> result = new ArrayList<>();
+    for (StoragePath path : pathList) {
+      result.addAll(listDirectEntries(path, filter));
+    }
+    return result;

Review Comment:
   use `pathList.stream().flatMap...`



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackStrategy.java:
##########
@@ -272,17 +279,25 @@ private FileStatus[] 
fetchFilesFromCommitMetadata(HoodieInstant instantToRollbac
    * @param partitionPath
    * @param basePath
    * @param baseFileExtension
-   * @param fs
+   * @param storage
    * @return
    * @throws IOException
    */
-  private FileStatus[] fetchFilesFromListFiles(HoodieInstant 
instantToRollback, String partitionPath, String basePath,
-                                               String baseFileExtension, 
FileSystem fs)
-      throws IOException {
-    SerializablePathFilter pathFilter = 
getSerializablePathFilter(baseFileExtension, instantToRollback.getTimestamp());
-    Path[] filePaths = listFilesToBeDeleted(basePath, partitionPath);
+  private List<StoragePathInfo> fetchFilesFromListFiles(HoodieInstant 
instantToRollback,
+                                                        String partitionPath,
+                                                        String basePath,
+                                                        String 
baseFileExtension,
+                                                        HoodieStorage storage) 
{
+    StoragePathFilter pathFilter = getPathFilter(baseFileExtension, 
instantToRollback.getTimestamp());
+    List<StoragePath> filePaths = listFilesToBeDeleted(basePath, 
partitionPath);
+
+    try {
+      return storage.listDirectEntries(filePaths, pathFilter);
+    } catch (IOException ioe) {

Review Comment:
   Same here: Should this still throw `IOException` as before?



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackStrategy.java:
##########
@@ -221,49 +216,61 @@ private static String formatDeletePath(String path) {
     return path.substring(path.indexOf(":") + 1);
   }
 
-  private FileStatus[] listBaseFilesToBeDeleted(String commit, String 
basefileExtension, String partitionPath,
-                                                FileSystem fs) throws 
IOException {
+  private List<StoragePathInfo> listBaseFilesToBeDeleted(String commit,
+                                                         String 
basefileExtension,
+                                                         String partitionPath,
+                                                         HoodieStorage 
storage) throws IOException {
     LOG.info("Collecting files to be cleaned/rolledback up for path " + 
partitionPath + " and commit " + commit);
-    PathFilter filter = (path) -> {
+    StoragePathFilter filter = (path) -> {
       if (path.toString().contains(basefileExtension)) {
         String fileCommitTime = FSUtils.getCommitTime(path.getName());
         return commit.equals(fileCommitTime);
       }
       return false;
     };
-    return 
fs.listStatus(HadoopFSUtils.constructAbsolutePathInHadoopPath(config.getBasePath(),
 partitionPath), filter);
+    return 
storage.listDirectEntries(FSUtils.constructAbsolutePath(config.getBasePath(), 
partitionPath), filter);
   }
 
-  private FileStatus[] fetchFilesFromInstant(HoodieInstant instantToRollback, 
String partitionPath, String basePath,
-                                             String baseFileExtension, 
FileSystem fs,
-                                             Option<HoodieCommitMetadata> 
commitMetadataOptional,
-                                             Boolean isCommitMetadataCompleted,
-                                             HoodieTableType tableType) throws 
IOException {
+  private List<StoragePathInfo> fetchFilesFromInstant(HoodieInstant 
instantToRollback,
+                                                      String partitionPath, 
String basePath,
+                                                      String 
baseFileExtension, HoodieStorage storage,
+                                                      
Option<HoodieCommitMetadata> commitMetadataOptional,
+                                                      Boolean 
isCommitMetadataCompleted,
+                                                      HoodieTableType 
tableType) throws IOException {
     // go w/ commit metadata only for COW table. for MOR, we need to get 
associated log files when commit corresponding to base file is rolledback.
     if (isCommitMetadataCompleted && tableType == 
HoodieTableType.COPY_ON_WRITE) {
       return fetchFilesFromCommitMetadata(instantToRollback, partitionPath, 
basePath, commitMetadataOptional.get(),
-          baseFileExtension, fs);
+          baseFileExtension, storage);
     } else {
-      return fetchFilesFromListFiles(instantToRollback, partitionPath, 
basePath, baseFileExtension, fs);
+      return fetchFilesFromListFiles(instantToRollback, partitionPath, 
basePath, baseFileExtension, storage);
     }
   }
 
-  private FileStatus[] fetchFilesFromCommitMetadata(HoodieInstant 
instantToRollback, String partitionPath,
-                                                    String basePath, 
HoodieCommitMetadata commitMetadata,
-                                                    String baseFileExtension, 
FileSystem fs)
-      throws IOException {
-    SerializablePathFilter pathFilter = 
getSerializablePathFilter(baseFileExtension, instantToRollback.getTimestamp());
-    Path[] filePaths = getFilesFromCommitMetadata(basePath, commitMetadata, 
partitionPath);
-
-    return fs.listStatus(Arrays.stream(filePaths).filter(entry -> {
-      try {
-        return fs.exists(entry);
-      } catch (Exception e) {
-        LOG.error("Exists check failed for " + entry.toString(), e);
-      }
-      // if any Exception is thrown, do not ignore. let's try to add the file 
of interest to be deleted. we can't miss any files to be rolled back.
-      return true;
-    }).toArray(Path[]::new), pathFilter);
+  private List<StoragePathInfo> fetchFilesFromCommitMetadata(HoodieInstant 
instantToRollback,
+                                                             String 
partitionPath,
+                                                             String basePath,
+                                                             
HoodieCommitMetadata commitMetadata,
+                                                             String 
baseFileExtension,
+                                                             HoodieStorage 
storage) {
+    StoragePathFilter pathFilter = getPathFilter(baseFileExtension,
+        instantToRollback.getTimestamp());
+    List<StoragePath> filePaths = getFilesFromCommitMetadata(basePath, 
commitMetadata, partitionPath)
+        .filter(entry -> {
+          try {
+            return storage.exists(entry);
+          } catch (Exception e) {
+            LOG.error("Exists check failed for " + entry.toString(), e);
+          }
+          // if any Exception is thrown, do not ignore. let's try to add the 
file of interest to be deleted. we can't miss any files to be rolled back.
+          return true;
+        }).collect(Collectors.toList());
+    try {
+      return storage.listDirectEntries(filePaths, pathFilter);
+    } catch (IOException ioe) {
+      LOG.error("Failed to get StoragePathInfo", ioe);
+    }
+
+    return new ArrayList<>();

Review Comment:
   Should this still throw `IOException` as before?



##########
hudi-io/src/main/java/org/apache/hudi/storage/HoodieStorage.java:
##########
@@ -426,6 +426,26 @@ public List<StoragePathInfo> 
listDirectEntries(List<StoragePath> pathList) throw
     return result;
   }
 
+  /**
+   * Lists the file info of the direct files/directories in the given list of 
paths
+   * and filters the results, if the paths are directory.
+   *
+   * @param pathList given path list.
+   * @param filter filter to apply.
+   * @return the list of path info of the files/directories in the given paths.
+   * @throws FileNotFoundException when the path does not exist.
+   * @throws IOException           IO error.
+   */
+  @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
+  public List<StoragePathInfo> listDirectEntries(List<StoragePath> pathList,

Review Comment:
   Let's add unit tests on this new API in `TestHoodieStorageBase#testListing`



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/ConsistentBucketIndexUtils.java:
##########
@@ -267,8 +263,9 @@ private static Option<HoodieConsistentHashingMetadata> 
loadMetadataFromGivenFile
    * @param partition Partition metadata file belongs to
    * @return true if hashing metadata file is latest else false
    */
-  private static boolean recommitMetadataFile(HoodieTable table, FileStatus 
metaFile, String partition) {
-    Path partitionPath = new 
Path(FSUtils.constructAbsolutePath(table.getMetaClient().getBasePath(), 
partition).toUri());
+  private static boolean recommitMetadataFile(HoodieTable table, 
StoragePathInfo metaFile, String partition) {
+    StoragePath partitionPath = new StoragePath(
+        FSUtils.constructAbsolutePath(table.getMetaClient().getBasePath(), 
partition).toUri());

Review Comment:
   Seems not changed?
   ```suggestion
       StoragePath partitionPath = 
FSUtils.constructAbsolutePath(table.getMetaClient().getBasePath(), partition);
   ```



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/FileSystemBasedLockProvider.java:
##########
@@ -164,10 +163,10 @@ private boolean checkIfExpired() {
   }
 
   private void acquireLock() {
-    try (FSDataOutputStream fos = fs.create(this.lockFile, false)) {
-      if (!fs.exists(this.lockFile)) {
+    try (OutputStream os = storage.create(this.lockFile, false)) {
+      if (!storage.exists(this.lockFile)) {
         initLockInfo();
-        fos.writeBytes(lockInfo.toString());
+        os.write(lockInfo.toString().getBytes());

Review Comment:
   Use `StringUtils#getUTF8Bytes` instead of `lockInfo.toString().getBytes()`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to