This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 9bb8dcdf116 [HUDI-8092] Replace FileSystem and related classes to 
dehadoop hudi-client-common (#11805)
9bb8dcdf116 is described below

commit 9bb8dcdf116f4d2b916aebe19adebb673131c43f
Author: Shawn Chang <[email protected]>
AuthorDate: Sun Sep 8 19:58:16 2024 -0700

    [HUDI-8092] Replace FileSystem and related classes to dehadoop 
hudi-client-common (#11805)
    
    Co-authored-by: Shawn Chang <[email protected]>
    Co-authored-by: Y Ethan Guo <[email protected]>
---
 .../DirectMarkerTransactionManager.java            |   7 +-
 .../client/transaction/TransactionManager.java     |   3 +-
 .../lock/FileSystemBasedLockProvider.java          |  43 +++----
 .../hudi/client/transaction/lock/LockManager.java  |  14 +-
 .../index/bucket/ConsistentBucketIndexUtils.java   |  59 +++++----
 .../org/apache/hudi/io/HoodieAppendHandle.java     |  12 +-
 .../java/org/apache/hudi/io/HoodieCDCLogger.java   |  19 ++-
 .../org/apache/hudi/io/HoodieCreateHandle.java     |   4 +-
 .../java/org/apache/hudi/io/HoodieIOHandle.java    |   4 -
 .../java/org/apache/hudi/io/HoodieMergeHandle.java |   4 +-
 .../hudi/io/HoodieMergeHandleWithChangeLog.java    |   4 +-
 .../java/org/apache/hudi/table/HoodieTable.java    |   5 +-
 .../table/action/bootstrap/BootstrapUtils.java     |  80 ++++++------
 .../table/action/clean/CleanActionExecutor.java    |  25 ++--
 .../table/action/rollback/BaseRollbackHelper.java  |   9 +-
 .../rollback/ListingBasedRollbackStrategy.java     | 142 +++++++++++----------
 .../rollback/MarkerBasedRollbackStrategy.java      |  10 +-
 .../action/rollback/SerializablePathFilter.java    |  26 ----
 .../hudi/table/marker/DirectWriteMarkers.java      |  28 ++--
 ...nsactionDirectMarkerBasedDetectionStrategy.java |   4 +-
 .../org/apache/hudi/table/repair/RepairUtils.java  |   9 --
 .../table/upgrade/FourToFiveUpgradeHandler.java    |  10 +-
 .../hudi/table/upgrade/UpgradeDowngrade.java       |   9 +-
 .../org/apache/hudi/HoodieTestCommitGenerator.java |  27 ++--
 .../FileSystemBasedLockProviderTestClass.java      |  27 ++--
 .../hudi/client/transaction/TestLockManager.java   |   4 +-
 .../apache/hudi/table/repair/TestRepairUtils.java  |   3 +-
 .../hudi/utils/HoodieWriterClientTestHarness.java  |   3 +-
 .../FlinkMergeAndReplaceHandleWithChangeLog.java   |   3 +-
 .../hudi/io/FlinkMergeHandleWithChangeLog.java     |   3 +-
 .../SparkBootstrapCommitActionExecutor.java        |  10 +-
 .../table/action/bootstrap/TestBootstrapUtils.java |   5 +-
 .../table/functional/TestCleanActionExecutor.java  |  40 +++---
 .../java/org/apache/hudi/common/fs/FSUtils.java    |  27 ++++
 .../apache/hudi/hadoop/fs/SerializablePath.java    |  71 -----------
 .../hudi/storage/hadoop/HoodieHadoopStorage.java   |  12 ++
 .../org/apache/hudi/storage/HoodieStorage.java     |  20 +++
 .../hudi/io/storage/TestHoodieStorageBase.java     |  35 +++++
 .../org/apache/hudi/functional/TestBootstrap.java  |   8 +-
 .../apache/hudi/functional/TestOrcBootstrap.java   |  11 +-
 40 files changed, 394 insertions(+), 445 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/DirectMarkerTransactionManager.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/DirectMarkerTransactionManager.java
index aa99ca63ede..00c27127a50 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/DirectMarkerTransactionManager.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/DirectMarkerTransactionManager.java
@@ -27,8 +27,7 @@ import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieNotSupportedException;
-
-import org.apache.hadoop.fs.FileSystem;
+import org.apache.hudi.storage.HoodieStorage;
 
 import static org.apache.hudi.common.util.StringUtils.EMPTY_STRING;
 
@@ -41,8 +40,8 @@ import static 
org.apache.hudi.common.util.StringUtils.EMPTY_STRING;
 public class DirectMarkerTransactionManager extends TransactionManager {
   private final String filePath;
 
-  public DirectMarkerTransactionManager(HoodieWriteConfig config, FileSystem 
fs, String partitionPath, String fileId) {
-    super(new LockManager(config, fs, createUpdatedLockProps(config, 
partitionPath, fileId)), config.isLockRequired());
+  public DirectMarkerTransactionManager(HoodieWriteConfig config, 
HoodieStorage storage, String partitionPath, String fileId) {
+    super(new LockManager(config, storage, createUpdatedLockProps(config, 
partitionPath, fileId)), config.isLockRequired());
     this.filePath = partitionPath + "/" + fileId;
   }
 
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/TransactionManager.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/TransactionManager.java
index c02ed4a171c..b725243fca0 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/TransactionManager.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/TransactionManager.java
@@ -24,7 +24,6 @@ import org.apache.hudi.common.util.Option;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.storage.HoodieStorage;
 
-import org.apache.hadoop.fs.FileSystem;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -43,7 +42,7 @@ public class TransactionManager implements Serializable {
   private Option<HoodieInstant> lastCompletedTxnOwnerInstant = Option.empty();
 
   public TransactionManager(HoodieWriteConfig config, HoodieStorage storage) {
-    this(new LockManager(config, (FileSystem) storage.getFileSystem()), 
config.isLockRequired());
+    this(new LockManager(config, storage), config.isLockRequired());
   }
 
   protected TransactionManager(LockManager lockManager, boolean 
isLockRequired) {
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/FileSystemBasedLockProvider.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/FileSystemBasedLockProvider.java
index 73f14355ca9..8c0bc8842b9 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/FileSystemBasedLockProvider.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/FileSystemBasedLockProvider.java
@@ -32,19 +32,18 @@ import org.apache.hudi.config.HoodieLockConfig;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.exception.HoodieLockException;
-import org.apache.hudi.hadoop.fs.HadoopFSUtils;
+import org.apache.hudi.storage.HoodieStorage;
+import org.apache.hudi.storage.HoodieStorageUtils;
 import org.apache.hudi.storage.StorageConfiguration;
 import org.apache.hudi.storage.StoragePath;
 import org.apache.hudi.storage.StorageSchemes;
 
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.io.InputStream;
+import java.io.OutputStream;
 import java.io.Serializable;
 import java.text.SimpleDateFormat;
 import java.util.ArrayList;
@@ -65,8 +64,8 @@ public class FileSystemBasedLockProvider implements 
LockProvider<String>, Serial
   private static final Logger LOG = 
LoggerFactory.getLogger(FileSystemBasedLockProvider.class);
   private static final String LOCK_FILE_NAME = "lock";
   private final int lockTimeoutMinutes;
-  private final transient FileSystem fs;
-  private final transient Path lockFile;
+  private final transient HoodieStorage storage;
+  private final transient StoragePath lockFile;
   protected LockConfiguration lockConfiguration;
   private SimpleDateFormat sdf;
   private LockInfo lockInfo;
@@ -81,13 +80,13 @@ public class FileSystemBasedLockProvider implements 
LockProvider<String>, Serial
           + StoragePath.SEPARATOR + HoodieTableMetaClient.METAFOLDER_NAME;
     }
     this.lockTimeoutMinutes = 
lockConfiguration.getConfig().getInteger(FILESYSTEM_LOCK_EXPIRE_PROP_KEY);
-    this.lockFile = new Path(lockDirectory + StoragePath.SEPARATOR + 
LOCK_FILE_NAME);
+    this.lockFile = new StoragePath(lockDirectory + StoragePath.SEPARATOR + 
LOCK_FILE_NAME);
     this.lockInfo = new LockInfo();
     this.sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
-    this.fs = HadoopFSUtils.getFs(this.lockFile.toString(), configuration);
+    this.storage = HoodieStorageUtils.getStorage(this.lockFile.toString(), 
configuration);
     List<String> customSupportedFSs = 
lockConfiguration.getConfig().getStringList(HoodieCommonConfig.HOODIE_FS_ATOMIC_CREATION_SUPPORT.key(),
 ",", new ArrayList<>());
-    if (!customSupportedFSs.contains(this.fs.getScheme()) && 
!StorageSchemes.isAtomicCreationSupported(this.fs.getScheme())) {
-      throw new HoodieLockException("Unsupported scheme :" + 
this.fs.getScheme() + ", since this fs can not support atomic creation");
+    if (!customSupportedFSs.contains(this.storage.getScheme()) && 
!StorageSchemes.isAtomicCreationSupported(this.storage.getScheme())) {
+      throw new HoodieLockException("Unsupported scheme :" + 
this.storage.getScheme() + ", since this fs can not support atomic creation");
     }
   }
 
@@ -95,7 +94,7 @@ public class FileSystemBasedLockProvider implements 
LockProvider<String>, Serial
   public void close() {
     synchronized (LOCK_FILE_NAME) {
       try {
-        fs.delete(this.lockFile, true);
+        storage.deleteFile(this.lockFile);
       } catch (IOException e) {
         throw new 
HoodieLockException(generateLogStatement(LockState.FAILED_TO_RELEASE), e);
       }
@@ -107,9 +106,9 @@ public class FileSystemBasedLockProvider implements 
LockProvider<String>, Serial
     try {
       synchronized (LOCK_FILE_NAME) {
         // Check whether lock is already expired, if so try to delete lock file
-        if (fs.exists(this.lockFile)) {
+        if (storage.exists(this.lockFile)) {
           if (checkIfExpired()) {
-            fs.delete(this.lockFile, true);
+            storage.deleteFile(this.lockFile);
             LOG.warn("Delete expired lock file: " + this.lockFile);
           } else {
             reloadCurrentOwnerLockInfo();
@@ -117,7 +116,7 @@ public class FileSystemBasedLockProvider implements 
LockProvider<String>, Serial
           }
         }
         acquireLock();
-        return fs.exists(this.lockFile);
+        return storage.exists(this.lockFile);
       }
     } catch (IOException | HoodieIOException e) {
       LOG.info(generateLogStatement(LockState.FAILED_TO_ACQUIRE), e);
@@ -129,8 +128,8 @@ public class FileSystemBasedLockProvider implements 
LockProvider<String>, Serial
   public void unlock() {
     synchronized (LOCK_FILE_NAME) {
       try {
-        if (fs.exists(this.lockFile)) {
-          fs.delete(this.lockFile, true);
+        if (storage.exists(this.lockFile)) {
+          storage.deleteFile(this.lockFile);
         }
       } catch (IOException io) {
         throw new 
HoodieIOException(generateLogStatement(LockState.FAILED_TO_RELEASE), io);
@@ -153,7 +152,7 @@ public class FileSystemBasedLockProvider implements 
LockProvider<String>, Serial
       return false;
     }
     try {
-      long modificationTime = 
fs.getFileStatus(this.lockFile).getModificationTime();
+      long modificationTime = 
storage.getPathInfo(this.lockFile).getModificationTime();
       if (System.currentTimeMillis() - modificationTime > lockTimeoutMinutes * 
60 * 1000L) {
         return true;
       }
@@ -164,10 +163,10 @@ public class FileSystemBasedLockProvider implements 
LockProvider<String>, Serial
   }
 
   private void acquireLock() {
-    try (FSDataOutputStream fos = fs.create(this.lockFile, false)) {
-      if (!fs.exists(this.lockFile)) {
+    try (OutputStream os = storage.create(this.lockFile, false)) {
+      if (!storage.exists(this.lockFile)) {
         initLockInfo();
-        fos.writeBytes(lockInfo.toString());
+        os.write(StringUtils.getUTF8Bytes(lockInfo.toString()));
       }
     } catch (IOException e) {
       throw new 
HoodieIOException(generateLogStatement(LockState.FAILED_TO_ACQUIRE), e);
@@ -181,8 +180,8 @@ public class FileSystemBasedLockProvider implements 
LockProvider<String>, Serial
   }
 
   public void reloadCurrentOwnerLockInfo() {
-    try (InputStream is = fs.open(this.lockFile)) {
-      if (fs.exists(this.lockFile)) {
+    try (InputStream is = storage.open(this.lockFile)) {
+      if (storage.exists(this.lockFile)) {
         this.currentOwnerLockInfo = FileIOUtils.readAsUTFString(is);
       } else {
         this.currentOwnerLockInfo = "";
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/LockManager.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/LockManager.java
index 39df3c3fe43..57ed6df45af 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/LockManager.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/LockManager.java
@@ -28,11 +28,9 @@ import org.apache.hudi.config.HoodieLockConfig;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.exception.HoodieLockException;
-import org.apache.hudi.hadoop.fs.HadoopFSUtils;
+import org.apache.hudi.storage.HoodieStorage;
 import org.apache.hudi.storage.StorageConfiguration;
-import org.apache.hudi.storage.hadoop.HoodieHadoopStorage;
 
-import org.apache.hadoop.fs.FileSystem;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -58,19 +56,19 @@ public class LockManager implements Serializable, 
AutoCloseable {
   private transient HoodieLockMetrics metrics;
   private volatile LockProvider lockProvider;
 
-  public LockManager(HoodieWriteConfig writeConfig, FileSystem fs) {
-    this(writeConfig, fs, writeConfig.getProps());
+  public LockManager(HoodieWriteConfig writeConfig, HoodieStorage storage) {
+    this(writeConfig, storage, writeConfig.getProps());
   }
 
-  public LockManager(HoodieWriteConfig writeConfig, FileSystem fs, 
TypedProperties lockProps) {
+  public LockManager(HoodieWriteConfig writeConfig, HoodieStorage storage, 
TypedProperties lockProps) {
     this.writeConfig = writeConfig;
-    this.storageConf = HadoopFSUtils.getStorageConfWithCopy(fs.getConf());
+    this.storageConf = storage.getConf().newInstance();
     this.lockConfiguration = new LockConfiguration(lockProps);
     maxRetries = 
lockConfiguration.getConfig().getInteger(LOCK_ACQUIRE_CLIENT_NUM_RETRIES_PROP_KEY,
         
Integer.parseInt(HoodieLockConfig.LOCK_ACQUIRE_CLIENT_NUM_RETRIES.defaultValue()));
     maxWaitTimeInMs = 
lockConfiguration.getConfig().getLong(LOCK_ACQUIRE_CLIENT_RETRY_WAIT_TIME_IN_MILLIS_PROP_KEY,
         
Long.parseLong(HoodieLockConfig.LOCK_ACQUIRE_CLIENT_RETRY_WAIT_TIME_IN_MILLIS.defaultValue()));
-    metrics = new HoodieLockMetrics(writeConfig, new HoodieHadoopStorage(fs));
+    metrics = new HoodieLockMetrics(writeConfig, storage);
     lockRetryHelper = new RetryHelper<>(maxWaitTimeInMs, maxRetries, 
maxWaitTimeInMs,
         Arrays.asList(HoodieLockException.class, InterruptedException.class), 
"acquire lock");
   }
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/ConsistentBucketIndexUtils.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/ConsistentBucketIndexUtils.java
index 453f08a486a..1cd5d110de5 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/ConsistentBucketIndexUtils.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/ConsistentBucketIndexUtils.java
@@ -29,14 +29,11 @@ import org.apache.hudi.common.util.StringUtils;
 import org.apache.hudi.common.util.ValidationUtils;
 import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.exception.HoodieIndexException;
-import org.apache.hudi.hadoop.fs.HadoopFSUtils;
 import org.apache.hudi.storage.HoodieStorage;
 import org.apache.hudi.storage.StoragePath;
+import org.apache.hudi.storage.StoragePathInfo;
 import org.apache.hudi.table.HoodieTable;
 
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -45,7 +42,6 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Comparator;
 import java.util.HashMap;
 import java.util.List;
@@ -58,7 +54,6 @@ import static 
org.apache.hudi.common.model.HoodieConsistentHashingMetadata.HASHI
 import static 
org.apache.hudi.common.model.HoodieConsistentHashingMetadata.HASHING_METADATA_FILE_SUFFIX;
 import static 
org.apache.hudi.common.model.HoodieConsistentHashingMetadata.getTimestampFromFile;
 import static org.apache.hudi.common.util.StringUtils.getUTF8Bytes;
-import static org.apache.hudi.hadoop.fs.HadoopFSUtils.convertToStoragePath;
 
 /**
  * Utilities class for consistent bucket index metadata management.
@@ -110,30 +105,31 @@ public class ConsistentBucketIndexUtils {
    */
   public static Option<HoodieConsistentHashingMetadata> 
loadMetadata(HoodieTable table, String partition) {
     HoodieTableMetaClient metaClient = table.getMetaClient();
-    Path metadataPath = 
HadoopFSUtils.constructAbsolutePathInHadoopPath(metaClient.getHashingMetadataPath(),
 partition);
-    Path partitionPath = 
HadoopFSUtils.constructAbsolutePathInHadoopPath(metaClient.getBasePath().toString(),
 partition);
+    StoragePath metadataPath = 
FSUtils.constructAbsolutePath(metaClient.getHashingMetadataPath(), partition);
+    StoragePath partitionPath = 
FSUtils.constructAbsolutePath(metaClient.getBasePath(), partition);
     try {
-      Predicate<FileStatus> hashingMetaCommitFilePredicate = fileStatus -> {
-        String filename = fileStatus.getPath().getName();
+      Predicate<StoragePathInfo> hashingMetaCommitFilePredicate = pathInfo -> {
+        String filename = pathInfo.getPath().getName();
         return 
filename.contains(HoodieConsistentHashingMetadata.HASHING_METADATA_COMMIT_FILE_SUFFIX);
       };
-      Predicate<FileStatus> hashingMetadataFilePredicate = fileStatus -> {
-        String filename = fileStatus.getPath().getName();
+      Predicate<StoragePathInfo> hashingMetadataFilePredicate = pathInfo -> {
+        String filename = pathInfo.getPath().getName();
         return filename.contains(HASHING_METADATA_FILE_SUFFIX);
       };
-      final FileStatus[] metaFiles =
-          ((FileSystem) 
metaClient.getStorage().getFileSystem()).listStatus(metadataPath);
-      final TreeSet<String> commitMetaTss = 
Arrays.stream(metaFiles).filter(hashingMetaCommitFilePredicate)
+      final List<StoragePathInfo> metaFiles = 
metaClient.getStorage().listDirectEntries(metadataPath);
+      final TreeSet<String> commitMetaTss = 
metaFiles.stream().filter(hashingMetaCommitFilePredicate)
           .map(commitFile -> 
HoodieConsistentHashingMetadata.getTimestampFromFile(commitFile.getPath().getName()))
           .sorted()
           .collect(Collectors.toCollection(TreeSet::new));
-      final FileStatus[] hashingMetaFiles = 
Arrays.stream(metaFiles).filter(hashingMetadataFilePredicate)
+      final List<StoragePathInfo> hashingMetaFiles = 
metaFiles.stream().filter(hashingMetadataFilePredicate)
           .sorted(Comparator.comparing(f -> f.getPath().getName()))
-          .toArray(FileStatus[]::new);
+          .collect(Collectors.toList());
       // max committed metadata file
       final String maxCommitMetaFileTs = commitMetaTss.isEmpty() ? null : 
commitMetaTss.last();
       // max updated metadata file
-      FileStatus maxMetadataFile = hashingMetaFiles.length > 0 ? 
hashingMetaFiles[hashingMetaFiles.length - 1] : null;
+      StoragePathInfo maxMetadataFile = hashingMetaFiles.isEmpty()
+              ? null
+              : hashingMetaFiles.get(hashingMetaFiles.size() - 1);
       // If single file present in metadata and if its default file return it
       if (maxMetadataFile != null && 
HoodieConsistentHashingMetadata.getTimestampFromFile(maxMetadataFile.getPath().getName()).equals(HoodieTimeline.INIT_INSTANT_TS))
 {
         return loadMetadataFromGivenFile(table, maxMetadataFile);
@@ -146,9 +142,9 @@ public class ConsistentBucketIndexUtils {
       HoodieTimeline completedCommits = 
metaClient.getActiveTimeline().getCommitAndReplaceTimeline().filterCompletedInstants();
 
       // fix the in-consistency between un-committed and committed hashing 
metadata files.
-      List<FileStatus> fixed = new ArrayList<>();
-      Arrays.stream(hashingMetaFiles).forEach(hashingMetaFile -> {
-        Path path = hashingMetaFile.getPath();
+      List<StoragePathInfo> fixed = new ArrayList<>();
+      hashingMetaFiles.forEach(hashingMetaFile -> {
+        StoragePath path = hashingMetaFile.getPath();
         String timestamp = 
HoodieConsistentHashingMetadata.getTimestampFromFile(path.getName());
         if (maxCommitMetaFileTs != null && 
timestamp.compareTo(maxCommitMetaFileTs) <= 0) {
           // only fix the metadata with greater timestamp than max committed 
timestamp
@@ -206,14 +202,14 @@ public class ConsistentBucketIndexUtils {
    * Creates commit marker corresponding to hashing metadata file after post 
commit clustering operation.
    *
    * @param table         Hoodie table
-   * @param fileStatus    File for which commit marker should be created
+   * @param path          File for which commit marker should be created
    * @param partitionPath Partition path the file belongs to
    * @throws IOException
    */
-  private static void createCommitMarker(HoodieTable table, Path fileStatus, 
Path partitionPath) throws IOException {
+  private static void createCommitMarker(HoodieTable table, StoragePath path, 
StoragePath partitionPath) throws IOException {
     HoodieStorage storage = table.getStorage();
-    StoragePath fullPath = new StoragePath(convertToStoragePath(partitionPath),
-        getTimestampFromFile(fileStatus.getName()) + 
HASHING_METADATA_COMMIT_FILE_SUFFIX);
+    StoragePath fullPath = new StoragePath(partitionPath,
+        getTimestampFromFile(path.getName()) + 
HASHING_METADATA_COMMIT_FILE_SUFFIX);
     if (storage.exists(fullPath)) {
       return;
     }
@@ -236,11 +232,11 @@ public class ConsistentBucketIndexUtils {
    * @param metaFile Hashing metadata file
    * @return HoodieConsistentHashingMetadata object
    */
-  private static Option<HoodieConsistentHashingMetadata> 
loadMetadataFromGivenFile(HoodieTable table, FileStatus metaFile) {
+  private static Option<HoodieConsistentHashingMetadata> 
loadMetadataFromGivenFile(HoodieTable table, StoragePathInfo metaFile) {
     if (metaFile == null) {
       return Option.empty();
     }
-    try (InputStream is = 
table.getStorage().open(convertToStoragePath(metaFile.getPath()))) {
+    try (InputStream is = table.getStorage().open(metaFile.getPath())) {
       byte[] content = FileIOUtils.readAsByteArray(is);
       return Option.of(HoodieConsistentHashingMetadata.fromBytes(content));
     } catch (FileNotFoundException e) {
@@ -267,8 +263,8 @@ public class ConsistentBucketIndexUtils {
    * @param partition Partition metadata file belongs to
    * @return true if hashing metadata file is latest else false
    */
-  private static boolean recommitMetadataFile(HoodieTable table, FileStatus 
metaFile, String partition) {
-    Path partitionPath = new 
Path(FSUtils.constructAbsolutePath(table.getMetaClient().getBasePath(), 
partition).toUri());
+  private static boolean recommitMetadataFile(HoodieTable table, 
StoragePathInfo metaFile, String partition) {
+    StoragePath partitionPath = 
FSUtils.constructAbsolutePath(table.getMetaClient().getBasePath(), partition);
     String timestamp = getTimestampFromFile(metaFile.getPath().getName());
     if (table.getPendingCommitsTimeline().containsInstant(timestamp)) {
       return false;
@@ -279,7 +275,10 @@ public class ConsistentBucketIndexUtils {
     }
     HoodieConsistentHashingMetadata hoodieConsistentHashingMetadata = 
hoodieConsistentHashingMetadataOption.get();
 
-    Predicate<String> hoodieFileGroupIdPredicate = hoodieBaseFile -> 
hoodieConsistentHashingMetadata.getNodes().stream().anyMatch(node -> 
node.getFileIdPrefix().equals(hoodieBaseFile));
+    Predicate<String> hoodieFileGroupIdPredicate = hoodieBaseFile ->
+        hoodieConsistentHashingMetadata.getNodes()
+            .stream()
+            .anyMatch(node -> node.getFileIdPrefix().equals(hoodieBaseFile));
     if (table.getBaseFileOnlyView().getLatestBaseFiles(partition)
         .map(fileIdPrefix -> 
FSUtils.getFileIdPfxFromFileId(fileIdPrefix.getFileId())).anyMatch(hoodieFileGroupIdPredicate))
 {
       try {
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
index 30bcc767274..cb0c7dd283f 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
@@ -56,12 +56,10 @@ import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieAppendException;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.exception.HoodieUpsertException;
-import org.apache.hudi.hadoop.fs.HadoopFSUtils;
 import org.apache.hudi.storage.StoragePath;
 import org.apache.hudi.table.HoodieTable;
 
 import org.apache.avro.Schema;
-import org.apache.hadoop.fs.Path;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -329,9 +327,9 @@ public class HoodieAppendHandle<T, I, K, O> extends 
HoodieWriteHandle<T, I, K, O
   }
 
   private String makeFilePath(HoodieLogFile logFile) {
-    return partitionPath.length() == 0
-        ? new Path(logFile.getFileName()).toString()
-        : new Path(partitionPath, logFile.getFileName()).toString();
+    return partitionPath.isEmpty()
+        ? new StoragePath(logFile.getFileName()).toString()
+        : new StoragePath(partitionPath, logFile.getFileName()).toString();
   }
 
   private void resetWriteCounts() {
@@ -525,7 +523,9 @@ public class HoodieAppendHandle<T, I, K, O> extends 
HoodieWriteHandle<T, I, K, O
       // TODO we can actually deduce file size purely from AppendResult (based 
on offset and size
       //      of the appended block)
       for (WriteStatus status : statuses) {
-        long logFileSize = HadoopFSUtils.getFileSize(fs, new 
Path(config.getBasePath(), status.getStat().getPath()));
+        long logFileSize = storage.getPathInfo(
+            new StoragePath(config.getBasePath(), status.getStat().getPath()))
+            .getLength();
         status.getStat().setFileSizeInBytes(logFileSize);
       }
 
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCDCLogger.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCDCLogger.java
index d629b8be0ea..5677f4188bd 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCDCLogger.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCDCLogger.java
@@ -39,14 +39,13 @@ import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.exception.HoodieUpsertException;
-import org.apache.hudi.hadoop.fs.HadoopFSUtils;
+import org.apache.hudi.storage.HoodieStorage;
+import org.apache.hudi.storage.StoragePath;
 
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericData;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.generic.IndexedRecord;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
 
 import java.io.Closeable;
 import java.io.IOException;
@@ -72,7 +71,7 @@ public class HoodieCDCLogger implements Closeable {
 
   private final String partitionPath;
 
-  private final FileSystem fs;
+  private final HoodieStorage storage;
 
   private final Schema dataSchema;
 
@@ -102,14 +101,14 @@ public class HoodieCDCLogger implements Closeable {
 
   private final SizeEstimator<HoodieAvroPayload> sizeEstimator;
 
-  private final List<Path> cdcAbsPaths;
+  private final List<StoragePath> cdcAbsPaths;
 
   public HoodieCDCLogger(
       String commitTime,
       HoodieWriteConfig config,
       HoodieTableConfig tableConfig,
       String partitionPath,
-      FileSystem fs,
+      HoodieStorage storage,
       Schema schema,
       HoodieLogFormat.Writer cdcWriter,
       long maxInMemorySizeInBytes) {
@@ -119,7 +118,7 @@ public class HoodieCDCLogger implements Closeable {
           ? HoodieRecord.RECORD_KEY_METADATA_FIELD
           : tableConfig.getRecordKeyFieldProp();
       this.partitionPath = partitionPath;
-      this.fs = fs;
+      this.storage = storage;
       this.dataSchema = HoodieAvroUtils.removeMetadataFields(schema);
       this.cdcWriter = cdcWriter;
       this.cdcSupplementalLoggingMode = 
tableConfig.cdcSupplementalLoggingMode();
@@ -196,7 +195,7 @@ public class HoodieCDCLogger implements Closeable {
         HoodieLogBlock block = new HoodieCDCDataBlock(records, 
cdcDataBlockHeader, keyField);
         AppendResult result = 
cdcWriter.appendBlocks(Collections.singletonList(block));
 
-        Path cdcAbsPath = new Path(result.logFile().getPath().toUri());
+        StoragePath cdcAbsPath = result.logFile().getPath();
         if (!cdcAbsPaths.contains(cdcAbsPath)) {
           cdcAbsPaths.add(cdcAbsPath);
         }
@@ -213,10 +212,10 @@ public class HoodieCDCLogger implements Closeable {
   public Map<String, Long> getCDCWriteStats() {
     Map<String, Long> stats = new HashMap<>();
     try {
-      for (Path cdcAbsPath : cdcAbsPaths) {
+      for (StoragePath cdcAbsPath : cdcAbsPaths) {
         String cdcFileName = cdcAbsPath.getName();
         String cdcPath = StringUtils.isNullOrEmpty(partitionPath) ? 
cdcFileName : partitionPath + "/" + cdcFileName;
-        stats.put(cdcPath, HadoopFSUtils.getFileSize(fs, cdcAbsPath));
+        stats.put(cdcPath, storage.getPathInfo(cdcAbsPath).getLength());
       }
     } catch (IOException e) {
       throw new HoodieUpsertException("Failed to get cdc write stat", e);
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java
index dc61749bf93..12406927ae6 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java
@@ -32,14 +32,12 @@ import org.apache.hudi.common.model.MetadataValues;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieInsertException;
-import org.apache.hudi.hadoop.fs.HadoopFSUtils;
 import org.apache.hudi.io.storage.HoodieFileWriter;
 import org.apache.hudi.io.storage.HoodieFileWriterFactory;
 import org.apache.hudi.storage.StoragePath;
 import org.apache.hudi.table.HoodieTable;
 
 import org.apache.avro.Schema;
-import org.apache.hadoop.fs.Path;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -245,7 +243,7 @@ public class HoodieCreateHandle<T, I, K, O> extends 
HoodieWriteHandle<T, I, K, O
     stat.setPath(new StoragePath(config.getBasePath()), path);
     stat.setTotalWriteErrors(writeStatus.getTotalErrorRecords());
 
-    long fileSize = HadoopFSUtils.getFileSize(fs, new Path(path.toUri()));
+    long fileSize = storage.getPathInfo(path).getLength();
     stat.setTotalWriteBytes(fileSize);
     stat.setFileSizeInBytes(fileSize);
 
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieIOHandle.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieIOHandle.java
index 6865a6ac653..28c1dfbca56 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieIOHandle.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieIOHandle.java
@@ -24,14 +24,11 @@ import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.storage.HoodieStorage;
 import org.apache.hudi.table.HoodieTable;
 
-import org.apache.hadoop.fs.FileSystem;
-
 public abstract class HoodieIOHandle<T, I, K, O> {
 
   protected final String instantTime;
   protected final HoodieWriteConfig config;
   protected final HoodieTable<T, I, K, O> hoodieTable;
-  protected FileSystem fs;
   protected HoodieStorage storage;
 
   HoodieIOHandle(HoodieWriteConfig config, Option<String> instantTime, 
HoodieTable<T, I, K, O> hoodieTable) {
@@ -39,7 +36,6 @@ public abstract class HoodieIOHandle<T, I, K, O> {
     this.config = config;
     this.hoodieTable = hoodieTable;
     this.storage = getStorage();
-    this.fs = (FileSystem) storage.getFileSystem();
   }
 
   public abstract HoodieStorage getStorage();
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
index b5dd3374d27..1bf6f6b0138 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
@@ -42,7 +42,6 @@ import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieCorruptedDataException;
 import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.exception.HoodieUpsertException;
-import org.apache.hudi.hadoop.fs.HadoopFSUtils;
 import org.apache.hudi.io.storage.HoodieFileReader;
 import org.apache.hudi.io.storage.HoodieFileWriter;
 import org.apache.hudi.io.storage.HoodieFileWriterFactory;
@@ -52,7 +51,6 @@ import org.apache.hudi.storage.StoragePath;
 import org.apache.hudi.table.HoodieTable;
 
 import org.apache.avro.Schema;
-import org.apache.hadoop.fs.Path;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -439,7 +437,7 @@ public class HoodieMergeHandle<T, I, K, O> extends 
HoodieWriteHandle<T, I, K, O>
       fileWriter.close();
       fileWriter = null;
 
-      long fileSizeInBytes = HadoopFSUtils.getFileSize(fs, new 
Path(newFilePath.toUri()));
+      long fileSizeInBytes = storage.getPathInfo(newFilePath).getLength();
       HoodieWriteStat stat = writeStatus.getStat();
 
       stat.setTotalWriteBytes(fileSizeInBytes);
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandleWithChangeLog.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandleWithChangeLog.java
index fba72310513..08be71288f9 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandleWithChangeLog.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandleWithChangeLog.java
@@ -60,7 +60,7 @@ public class HoodieMergeHandleWithChangeLog<T, I, K, O> 
extends HoodieMergeHandl
         config,
         hoodieTable.getMetaClient().getTableConfig(),
         partitionPath,
-        fs,
+        storage,
         getWriterSchema(),
         createLogWriter(instantTime, HoodieCDCUtils.CDC_LOGFILE_SUFFIX),
         IOUtils.getMaxMemoryPerPartitionMerge(taskContextSupplier, config));
@@ -78,7 +78,7 @@ public class HoodieMergeHandleWithChangeLog<T, I, K, O> 
extends HoodieMergeHandl
         config,
         hoodieTable.getMetaClient().getTableConfig(),
         partitionPath,
-        fs,
+        storage,
         getWriterSchema(),
         createLogWriter(instantTime, HoodieCDCUtils.CDC_LOGFILE_SUFFIX),
         IOUtils.getMaxMemoryPerPartitionMerge(taskContextSupplier, config));
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
index a70f5e7edba..89b58a9f116 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
@@ -86,7 +86,6 @@ import org.apache.hudi.table.storage.HoodieLayoutFactory;
 import org.apache.hudi.table.storage.HoodieStorageLayout;
 
 import org.apache.avro.Schema;
-import org.apache.hadoop.fs.Path;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -774,7 +773,9 @@ public abstract class HoodieTable<T, I, K, O> implements 
Serializable {
       if (!invalidDataPaths.isEmpty()) {
         LOG.info("Removing duplicate files created due to task retries before 
committing. Paths=" + invalidDataPaths);
         Map<String, List<Pair<String, String>>> invalidPathsByPartition = 
invalidDataPaths.stream()
-            .map(dp -> Pair.of(new Path(basePath, dp).getParent().toString(), 
new Path(basePath, dp).toString()))
+            .map(dp ->
+                Pair.of(new StoragePath(basePath, dp).getParent().toString(),
+                    new StoragePath(basePath, dp).toString()))
             .collect(Collectors.groupingBy(Pair::getKey));
 
         // Ensure all files in delete list is actually present. This is 
mandatory for an eventually consistent FS.
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/bootstrap/BootstrapUtils.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/bootstrap/BootstrapUtils.java
index 4b12e72938d..8a73136d7e6 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/bootstrap/BootstrapUtils.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/bootstrap/BootstrapUtils.java
@@ -20,17 +20,16 @@ package org.apache.hudi.table.action.bootstrap;
 
 import org.apache.hudi.avro.model.HoodieFileStatus;
 import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.HoodieFileFormat;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.hadoop.fs.HadoopFSUtils;
-
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.LocatedFileStatus;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.PathFilter;
-import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hudi.storage.HoodieStorage;
+import org.apache.hudi.storage.HoodieStorageUtils;
+import org.apache.hudi.storage.StoragePath;
+import org.apache.hudi.storage.StoragePathFilter;
+import org.apache.hudi.storage.StoragePathInfo;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -45,53 +44,52 @@ public class BootstrapUtils {
   /**
    * Returns leaf folders with files under a path.
    * @param baseFileFormat Hoodie base file format
-   * @param fs  File System
+   * @param storage  Hoodie Storage
    * @param context JHoodieEngineContext
    * @return list of partition paths with files under them.
    * @throws IOException
    */
-  public static List<Pair<String, List<HoodieFileStatus>>> 
getAllLeafFoldersWithFiles(HoodieFileFormat baseFileFormat,
-                                                                               
       FileSystem fs, String basePathStr, HoodieEngineContext context) throws 
IOException {
-    final Path basePath = new Path(basePathStr);
+  public static List<Pair<String, List<HoodieFileStatus>>> 
getAllLeafFoldersWithFiles(
+      HoodieFileFormat baseFileFormat,
+      HoodieStorage storage,
+      String basePathStr,
+      HoodieEngineContext context) throws IOException {
+    final StoragePath basePath = new StoragePath(basePathStr);
     final String baseFileExtension = baseFileFormat.getFileExtension();
     final Map<Integer, List<String>> levelToPartitions = new HashMap<>();
     final Map<String, List<HoodieFileStatus>> partitionToFiles = new 
HashMap<>();
-    PathFilter filePathFilter = getFilePathFilter(baseFileExtension);
-    PathFilter metaPathFilter = getExcludeMetaPathFilter();
+    StoragePathFilter filePathFilter = getFilePathFilter(baseFileExtension);
+    StoragePathFilter metaPathFilter = getExcludeMetaPathFilter();
 
-    FileStatus[] topLevelStatuses = fs.listStatus(basePath);
+    List<StoragePathInfo> topLevelPathInfos = 
storage.listDirectEntries(basePath);
     List<String> subDirectories = new ArrayList<>();
 
     List<Pair<HoodieFileStatus, Pair<Integer, String>>> result = new 
ArrayList<>();
 
-    for (FileStatus topLevelStatus: topLevelStatuses) {
-      if (topLevelStatus.isFile() && 
filePathFilter.accept(topLevelStatus.getPath())) {
-        String relativePath = HadoopFSUtils.getRelativePartitionPath(basePath, 
topLevelStatus.getPath().getParent());
+    for (StoragePathInfo topLevelPathInfo: topLevelPathInfos) {
+      if (topLevelPathInfo.isFile() && 
filePathFilter.accept(topLevelPathInfo.getPath())) {
+        String relativePath = FSUtils.getRelativePartitionPath(basePath, 
topLevelPathInfo.getPath().getParent());
         Integer level = (int) relativePath.chars().filter(ch -> ch == 
'/').count();
-        HoodieFileStatus hoodieFileStatus = 
HadoopFSUtils.fromFileStatus(topLevelStatus);
+        HoodieFileStatus hoodieFileStatus = 
FSUtils.fromPathInfo(topLevelPathInfo);
         result.add(Pair.of(hoodieFileStatus, Pair.of(level, relativePath)));
-      } else if (topLevelStatus.isDirectory() && 
metaPathFilter.accept(topLevelStatus.getPath())) {
-        subDirectories.add(topLevelStatus.getPath().toString());
+      } else if (topLevelPathInfo.isDirectory() && 
metaPathFilter.accept(topLevelPathInfo.getPath())) {
+        subDirectories.add(topLevelPathInfo.getPath().toString());
       }
     }
 
-    if (subDirectories.size() > 0) {
+    if (!subDirectories.isEmpty()) {
       result.addAll(context.flatMap(subDirectories, directory -> {
-        PathFilter pathFilter = getFilePathFilter(baseFileExtension);
-        Path path = new Path(directory);
-        FileSystem fileSystem = HadoopFSUtils.getFs(path, 
HadoopFSUtils.getStorageConf());
-        RemoteIterator<LocatedFileStatus> itr = fileSystem.listFiles(path, 
true);
-        List<Pair<HoodieFileStatus, Pair<Integer, String>>> res = new 
ArrayList<>();
-        while (itr.hasNext()) {
-          FileStatus status = itr.next();
-          if (pathFilter.accept(status.getPath())) {
-            String relativePath = HadoopFSUtils.getRelativePartitionPath(new 
Path(basePathStr), status.getPath().getParent());
-            Integer level = (int) relativePath.chars().filter(ch -> ch == 
'/').count();
-            HoodieFileStatus hoodieFileStatus = 
HadoopFSUtils.fromFileStatus(status);
-            res.add(Pair.of(hoodieFileStatus, Pair.of(level, relativePath)));
-          }
-        }
-        return res.stream();
+        StoragePathFilter pathFilter = getFilePathFilter(baseFileExtension);
+        StoragePath path = new StoragePath(directory);
+        HoodieStorage tmpStorage = HoodieStorageUtils.getStorage(path, 
HadoopFSUtils.getStorageConf());
+        return tmpStorage.listFiles(path).stream()
+            .filter(pathInfo -> pathFilter.accept(pathInfo.getPath()))
+            .map(pathInfo -> {
+              String relativePath = FSUtils.getRelativePartitionPath(basePath, 
pathInfo.getPath().getParent());
+              Integer level = (int) relativePath.chars().filter(ch -> ch == 
'/').count();
+              HoodieFileStatus hoodieFileStatus = 
FSUtils.fromPathInfo(pathInfo);
+              return Pair.of(hoodieFileStatus, Pair.of(level, relativePath));
+            });
       }, subDirectories.size()));
     }
 
@@ -118,14 +116,12 @@ public class BootstrapUtils {
             .map(d -> Pair.of(d, 
partitionToFiles.get(d))).collect(Collectors.toList()) : new ArrayList<>();
   }
 
-  private static PathFilter getFilePathFilter(String baseFileExtension) {
-    return (path) -> {
-      return path.getName().endsWith(baseFileExtension);
-    };
+  private static StoragePathFilter getFilePathFilter(String baseFileExtension) 
{
+    return path -> path.getName().endsWith(baseFileExtension);
   }
 
-  private static PathFilter getExcludeMetaPathFilter() {
+  private static StoragePathFilter getExcludeMetaPathFilter() {
     // Avoid listing and including any folders under the meta folder
-    return (path) -> 
!path.toString().contains(HoodieTableMetaClient.METAFOLDER_NAME);
+    return path -> 
!path.toString().contains(HoodieTableMetaClient.METAFOLDER_NAME);
   }
 }
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanActionExecutor.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanActionExecutor.java
index a0f3fda6e0b..c44421e9ce2 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanActionExecutor.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanActionExecutor.java
@@ -36,11 +36,11 @@ import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieIOException;
 import 
org.apache.hudi.internal.schema.io.FileBasedInternalSchemaStorageManager;
+import org.apache.hudi.storage.HoodieStorage;
+import org.apache.hudi.storage.StoragePath;
 import org.apache.hudi.table.HoodieTable;
 import org.apache.hudi.table.action.BaseActionExecutor;
 
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -73,16 +73,17 @@ public class CleanActionExecutor<T, I, K, O> extends 
BaseActionExecutor<T, I, K,
     this.skipLocking = skipLocking;
   }
 
-  private static Boolean deleteFileAndGetResult(FileSystem fs, String 
deletePathStr) throws IOException {
-    Path deletePath = new Path(deletePathStr);
+  private static Boolean deleteFileAndGetResult(HoodieStorage storage, String 
deletePathStr) throws IOException {
+    StoragePath deletePath = new StoragePath(deletePathStr);
     LOG.debug("Working on delete path :" + deletePath);
     try {
-      boolean isDirectory = fs.isDirectory(deletePath);
-      boolean deleteResult = fs.delete(deletePath, isDirectory);
+      boolean deleteResult = storage.getPathInfo(deletePath).isDirectory()
+          ? storage.deleteDirectory(deletePath)
+          : storage.deleteFile(deletePath);
       if (deleteResult) {
         LOG.debug("Cleaned file at path :" + deletePath);
       } else {
-        if (fs.exists(deletePath)) {
+        if (storage.exists(deletePath)) {
           throw new HoodieIOException("Failed to delete path during clean 
execution " + deletePath);
         } else {
           LOG.debug("Already cleaned up file at path :" + deletePath);
@@ -97,16 +98,15 @@ public class CleanActionExecutor<T, I, K, O> extends 
BaseActionExecutor<T, I, K,
 
   private static Stream<Pair<String, PartitionCleanStat>> 
deleteFilesFunc(Iterator<Pair<String, CleanFileInfo>> cleanFileInfo, 
HoodieTable table) {
     Map<String, PartitionCleanStat> partitionCleanStatMap = new HashMap<>();
-    FileSystem fs = (FileSystem) table.getStorage().getFileSystem();
+    HoodieStorage storage = table.getStorage();
 
     cleanFileInfo.forEachRemaining(partitionDelFileTuple -> {
       String partitionPath = partitionDelFileTuple.getLeft();
-      Path deletePath = new 
Path(partitionDelFileTuple.getRight().getFilePath());
+      StoragePath deletePath = new 
StoragePath(partitionDelFileTuple.getRight().getFilePath());
       String deletePathStr = deletePath.toString();
       boolean deletedFileResult = false;
       try {
-        deletedFileResult = deleteFileAndGetResult(fs, deletePathStr);
-
+        deletedFileResult = deleteFileAndGetResult(storage, deletePathStr);
       } catch (IOException e) {
         LOG.error("Delete file failed: " + deletePathStr, e);
       }
@@ -158,8 +158,7 @@ public class CleanActionExecutor<T, I, K, O> extends 
BaseActionExecutor<T, I, K,
     partitionsToBeDeleted.forEach(entry -> {
       try {
         if (!isNullOrEmpty(entry)) {
-          deleteFileAndGetResult((FileSystem) 
table.getStorage().getFileSystem(),
-              table.getMetaClient().getBasePath() + "/" + entry);
+          deleteFileAndGetResult(table.getStorage(), 
table.getMetaClient().getBasePath() + "/" + entry);
         }
       } catch (IOException e) {
         LOG.warn("Partition deletion failed " + entry);
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackHelper.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackHelper.java
index 2e4e39f26b7..9d2f727aeb9 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackHelper.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackHelper.java
@@ -33,12 +33,9 @@ import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.exception.HoodieRollbackException;
-import org.apache.hudi.hadoop.fs.HadoopFSUtils;
 import org.apache.hudi.storage.StoragePath;
 import org.apache.hudi.storage.StoragePathInfo;
 
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -190,12 +187,12 @@ public class BaseRollbackHelper implements Serializable {
     return filesToBeDeleted.stream().map(fileToDelete -> {
       String basePath = metaClient.getBasePath().toString();
       try {
-        Path fullDeletePath = new Path(fileToDelete);
-        String partitionPath = HadoopFSUtils.getRelativePartitionPath(new 
Path(basePath), fullDeletePath.getParent());
+        StoragePath fullDeletePath = new StoragePath(fileToDelete);
+        String partitionPath = FSUtils.getRelativePartitionPath(new 
StoragePath(basePath), fullDeletePath.getParent());
         boolean isDeleted = true;
         if (doDelete) {
           try {
-            isDeleted = ((FileSystem) 
metaClient.getStorage().getFileSystem()).delete(fullDeletePath);
+            isDeleted = metaClient.getStorage().deleteFile(fullDeletePath);
           } catch (FileNotFoundException e) {
             // if first rollback attempt failed and retried again, chances 
that some files are already deleted.
             isDeleted = true;
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackStrategy.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackStrategy.java
index af3a88643a2..b2100867694 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackStrategy.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackStrategy.java
@@ -32,28 +32,26 @@ import org.apache.hudi.common.util.Option;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.exception.HoodieRollbackException;
-import org.apache.hudi.hadoop.fs.HadoopFSUtils;
+import org.apache.hudi.storage.HoodieStorage;
+import org.apache.hudi.storage.StoragePath;
+import org.apache.hudi.storage.StoragePathFilter;
+import org.apache.hudi.storage.StoragePathInfo;
 import org.apache.hudi.table.HoodieTable;
 
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.PathFilter;
 import org.jetbrains.annotations.NotNull;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.Supplier;
 import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 import static 
org.apache.hudi.common.table.timeline.MetadataConversionUtils.getHoodieCommitMetadata;
-import static org.apache.hudi.hadoop.fs.HadoopFSUtils.convertToStoragePath;
 import static 
org.apache.hudi.table.action.rollback.BaseRollbackHelper.EMPTY_STRING;
 
 /**
@@ -111,10 +109,10 @@ public class ListingBasedRollbackStrategy implements 
BaseRollbackPlanActionExecu
       return context.flatMap(partitionPaths, partitionPath -> {
         List<HoodieRollbackRequest> hoodieRollbackRequests = new 
ArrayList<>(partitionPaths.size());
 
-        Supplier<FileStatus[]> filesToDelete = () -> {
+        Supplier<List<StoragePathInfo>> filesToDelete = () -> {
           try {
             return fetchFilesFromInstant(instantToRollback, partitionPath, 
metaClient.getBasePath().toString(), baseFileExtension,
-                (FileSystem) metaClient.getStorage().getFileSystem(),
+                metaClient.getStorage(),
                 commitMetadataOptional, isCommitMetadataCompleted, tableType);
           } catch (IOException e) {
             throw new HoodieIOException("Fetching files to delete error", e);
@@ -149,8 +147,7 @@ public class ListingBasedRollbackStrategy implements 
BaseRollbackPlanActionExecu
                 // and not corresponding base commit log files created with 
this as baseCommit since updates would
                 // have been written to the log files.
                 
hoodieRollbackRequests.addAll(getHoodieRollbackRequests(partitionPath,
-                    listBaseFilesToBeDeleted(instantToRollback.getTimestamp(), 
baseFileExtension, partitionPath,
-                        (FileSystem) 
metaClient.getStorage().getFileSystem())));
+                    listBaseFilesToBeDeleted(instantToRollback.getTimestamp(), 
baseFileExtension, partitionPath, metaClient.getStorage())));
               } else {
                 // if this is part of a restore operation, we should 
rollback/delete entire file slice.
                 
hoodieRollbackRequests.addAll(getHoodieRollbackRequests(partitionPath,
@@ -182,34 +179,32 @@ public class ListingBasedRollbackStrategy implements 
BaseRollbackPlanActionExecu
     }
   }
 
-  private FileStatus[] listAllFilesSinceCommit(
-      String commit,
-      String baseFileExtension,
-      String partitionPath,
-      HoodieTableMetaClient metaClient) throws IOException {
+  private List<StoragePathInfo> listAllFilesSinceCommit(String commit,
+                                                        String 
baseFileExtension,
+                                                        String partitionPath,
+                                                        HoodieTableMetaClient 
metaClient) throws IOException {
     LOG.info("Collecting files to be cleaned/rolledback up for path " + 
partitionPath + " and commit " + commit);
     CompletionTimeQueryView completionTimeQueryView = new 
CompletionTimeQueryView(metaClient);
-    PathFilter filter = (path) -> {
+    StoragePathFilter filter = (path) -> {
       if (path.toString().contains(baseFileExtension)) {
         String fileCommitTime = FSUtils.getCommitTime(path.getName());
         return HoodieTimeline.compareTimestamps(commit, 
HoodieTimeline.LESSER_THAN_OR_EQUALS,
             fileCommitTime);
-      } else if (HadoopFSUtils.isLogFile(path)) {
-        String fileCommitTime = 
FSUtils.getDeltaCommitTimeFromLogPath(convertToStoragePath(path));
+      } else if (FSUtils.isLogFile(path)) {
+        String fileCommitTime = FSUtils.getDeltaCommitTimeFromLogPath(path);
         return completionTimeQueryView.isSlicedAfterOrOn(commit, 
fileCommitTime);
       }
       return false;
     };
-    return ((FileSystem) metaClient.getStorage().getFileSystem())
-        
.listStatus(HadoopFSUtils.constructAbsolutePathInHadoopPath(config.getBasePath(),
 partitionPath),
-            filter);
+    return metaClient.getStorage()
+        .listDirectEntries(FSUtils.constructAbsolutePath(config.getBasePath(), 
partitionPath), filter);
   }
 
   @NotNull
-  private List<HoodieRollbackRequest> getHoodieRollbackRequests(String 
partitionPath, FileStatus[] filesToDeletedStatus) {
-    return Arrays.stream(filesToDeletedStatus)
-        .map(fileStatus -> {
-          String dataFileToBeDeleted = fileStatus.getPath().toString();
+  private List<HoodieRollbackRequest> getHoodieRollbackRequests(String 
partitionPath, List<StoragePathInfo> filesToDeletedStatus) {
+    return filesToDeletedStatus.stream()
+        .map(pathInfo -> {
+          String dataFileToBeDeleted = pathInfo.getPath().toString();
           return formatDeletePath(dataFileToBeDeleted);
         })
         .map(s -> new HoodieRollbackRequest(partitionPath, EMPTY_STRING, 
EMPTY_STRING, Collections.singletonList(s), Collections.emptyMap()))
@@ -221,49 +216,56 @@ public class ListingBasedRollbackStrategy implements 
BaseRollbackPlanActionExecu
     return path.substring(path.indexOf(":") + 1);
   }
 
-  private FileStatus[] listBaseFilesToBeDeleted(String commit, String 
basefileExtension, String partitionPath,
-                                                FileSystem fs) throws 
IOException {
+  private List<StoragePathInfo> listBaseFilesToBeDeleted(String commit,
+                                                         String 
basefileExtension,
+                                                         String partitionPath,
+                                                         HoodieStorage 
storage) throws IOException {
     LOG.info("Collecting files to be cleaned/rolledback up for path " + 
partitionPath + " and commit " + commit);
-    PathFilter filter = (path) -> {
+    StoragePathFilter filter = (path) -> {
       if (path.toString().contains(basefileExtension)) {
         String fileCommitTime = FSUtils.getCommitTime(path.getName());
         return commit.equals(fileCommitTime);
       }
       return false;
     };
-    return 
fs.listStatus(HadoopFSUtils.constructAbsolutePathInHadoopPath(config.getBasePath(),
 partitionPath), filter);
+    return 
storage.listDirectEntries(FSUtils.constructAbsolutePath(config.getBasePath(), 
partitionPath), filter);
   }
 
-  private FileStatus[] fetchFilesFromInstant(HoodieInstant instantToRollback, 
String partitionPath, String basePath,
-                                             String baseFileExtension, 
FileSystem fs,
-                                             Option<HoodieCommitMetadata> 
commitMetadataOptional,
-                                             Boolean isCommitMetadataCompleted,
-                                             HoodieTableType tableType) throws 
IOException {
+  private List<StoragePathInfo> fetchFilesFromInstant(HoodieInstant 
instantToRollback,
+                                                      String partitionPath, 
String basePath,
+                                                      String 
baseFileExtension, HoodieStorage storage,
+                                                      
Option<HoodieCommitMetadata> commitMetadataOptional,
+                                                      Boolean 
isCommitMetadataCompleted,
+                                                      HoodieTableType 
tableType) throws IOException {
     // go w/ commit metadata only for COW table. for MOR, we need to get 
associated log files when commit corresponding to base file is rolledback.
     if (isCommitMetadataCompleted && tableType == 
HoodieTableType.COPY_ON_WRITE) {
       return fetchFilesFromCommitMetadata(instantToRollback, partitionPath, 
basePath, commitMetadataOptional.get(),
-          baseFileExtension, fs);
+          baseFileExtension, storage);
     } else {
-      return fetchFilesFromListFiles(instantToRollback, partitionPath, 
basePath, baseFileExtension, fs);
+      return fetchFilesFromListFiles(instantToRollback, partitionPath, 
basePath, baseFileExtension, storage);
     }
   }
 
-  private FileStatus[] fetchFilesFromCommitMetadata(HoodieInstant 
instantToRollback, String partitionPath,
-                                                    String basePath, 
HoodieCommitMetadata commitMetadata,
-                                                    String baseFileExtension, 
FileSystem fs)
-      throws IOException {
-    SerializablePathFilter pathFilter = 
getSerializablePathFilter(baseFileExtension, instantToRollback.getTimestamp());
-    Path[] filePaths = getFilesFromCommitMetadata(basePath, commitMetadata, 
partitionPath);
-
-    return fs.listStatus(Arrays.stream(filePaths).filter(entry -> {
-      try {
-        return fs.exists(entry);
-      } catch (Exception e) {
-        LOG.error("Exists check failed for " + entry.toString(), e);
-      }
-      // if any Exception is thrown, do not ignore. let's try to add the file 
of interest to be deleted. we can't miss any files to be rolled back.
-      return true;
-    }).toArray(Path[]::new), pathFilter);
+  private List<StoragePathInfo> fetchFilesFromCommitMetadata(HoodieInstant 
instantToRollback,
+                                                             String 
partitionPath,
+                                                             String basePath,
+                                                             
HoodieCommitMetadata commitMetadata,
+                                                             String 
baseFileExtension,
+                                                             HoodieStorage 
storage) throws IOException {
+    StoragePathFilter pathFilter = getPathFilter(baseFileExtension,
+        instantToRollback.getTimestamp());
+    List<StoragePath> filePaths = getFilesFromCommitMetadata(basePath, 
commitMetadata, partitionPath)
+        .filter(entry -> {
+          try {
+            return storage.exists(entry);
+          } catch (Exception e) {
+            LOG.error("Exists check failed for " + entry.toString(), e);
+          }
+          // if any Exception is thrown, do not ignore. let's try to add the 
file of interest to be deleted. we can't miss any files to be rolled back.
+          return true;
+        }).collect(Collectors.toList());
+
+    return storage.listDirectEntries(filePaths, pathFilter);
   }
 
   /**
@@ -272,17 +274,19 @@ public class ListingBasedRollbackStrategy implements 
BaseRollbackPlanActionExecu
    * @param partitionPath
    * @param basePath
    * @param baseFileExtension
-   * @param fs
+   * @param storage
    * @return
    * @throws IOException
    */
-  private FileStatus[] fetchFilesFromListFiles(HoodieInstant 
instantToRollback, String partitionPath, String basePath,
-                                               String baseFileExtension, 
FileSystem fs)
-      throws IOException {
-    SerializablePathFilter pathFilter = 
getSerializablePathFilter(baseFileExtension, instantToRollback.getTimestamp());
-    Path[] filePaths = listFilesToBeDeleted(basePath, partitionPath);
-
-    return fs.listStatus(filePaths, pathFilter);
+  private List<StoragePathInfo> fetchFilesFromListFiles(HoodieInstant 
instantToRollback,
+                                                        String partitionPath,
+                                                        String basePath,
+                                                        String 
baseFileExtension,
+                                                        HoodieStorage storage) 
throws IOException {
+    StoragePathFilter pathFilter = getPathFilter(baseFileExtension, 
instantToRollback.getTimestamp());
+    List<StoragePath> filePaths = listFilesToBeDeleted(basePath, 
partitionPath);
+
+    return storage.listDirectEntries(filePaths, pathFilter);
   }
 
   private Boolean checkCommitMetadataCompleted(HoodieInstant instantToRollback,
@@ -291,24 +295,26 @@ public class ListingBasedRollbackStrategy implements 
BaseRollbackPlanActionExecu
         && 
!WriteOperationType.UNKNOWN.equals(commitMetadataOptional.get().getOperationType());
   }
 
-  private static Path[] listFilesToBeDeleted(String basePath, String 
partitionPath) {
-    return new Path[] 
{HadoopFSUtils.constructAbsolutePathInHadoopPath(basePath, partitionPath)};
+  private static List<StoragePath> listFilesToBeDeleted(String basePath, 
String partitionPath) {
+    return Collections.singletonList(FSUtils.constructAbsolutePath(basePath, 
partitionPath));
   }
 
-  private static Path[] getFilesFromCommitMetadata(String basePath, 
HoodieCommitMetadata commitMetadata, String partitionPath) {
+  private static Stream<StoragePath> getFilesFromCommitMetadata(String 
basePath,
+                                                                
HoodieCommitMetadata commitMetadata,
+                                                                String 
partitionPath) {
     List<String> fullPaths = 
commitMetadata.getFullPathsByPartitionPath(basePath, partitionPath);
-    return fullPaths.stream().map(Path::new).toArray(Path[]::new);
+    return fullPaths.stream().map(StoragePath::new);
   }
 
   @NotNull
-  private static SerializablePathFilter getSerializablePathFilter(String 
basefileExtension, String commit) {
+  private static StoragePathFilter getPathFilter(String basefileExtension, 
String commit) {
     return (path) -> {
       if (path.toString().endsWith(basefileExtension)) {
         String fileCommitTime = FSUtils.getCommitTime(path.getName());
         return commit.equals(fileCommitTime);
-      } else if (HadoopFSUtils.isLogFile(path)) {
+      } else if (FSUtils.isLogFile(path)) {
         // Since the baseCommitTime is the only commit for new log files, it's 
okay here
-        String fileCommitTime = 
FSUtils.getDeltaCommitTimeFromLogPath(convertToStoragePath(path));
+        String fileCommitTime = FSUtils.getDeltaCommitTimeFromLogPath(path);
         return commit.equals(fileCommitTime);
       }
       return false;
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/MarkerBasedRollbackStrategy.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/MarkerBasedRollbackStrategy.java
index 84fe13c9a39..8a63d6865ac 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/MarkerBasedRollbackStrategy.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/MarkerBasedRollbackStrategy.java
@@ -20,16 +20,16 @@ package org.apache.hudi.table.action.rollback;
 
 import org.apache.hudi.avro.model.HoodieRollbackRequest;
 import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.IOType;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieRollbackException;
-import org.apache.hudi.hadoop.fs.HadoopFSUtils;
+import org.apache.hudi.storage.StoragePath;
 import org.apache.hudi.table.HoodieTable;
 import org.apache.hudi.table.marker.MarkerBasedRollbackUtils;
 import org.apache.hudi.table.marker.WriteMarkers;
 
-import org.apache.hadoop.fs.Path;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -71,9 +71,9 @@ public class MarkerBasedRollbackStrategy<T, I, K, O> 
implements BaseRollbackPlan
         String typeStr = 
markerFilePath.substring(markerFilePath.lastIndexOf(".") + 1);
         IOType type = IOType.valueOf(typeStr);
         String filePathStr = WriteMarkers.stripMarkerSuffix(markerFilePath);
-        Path filePath = new Path(basePath, filePathStr);
-        String partitionPath = HadoopFSUtils.getRelativePartitionPath(new 
Path(basePath), filePath.getParent());
-        String fileId = HadoopFSUtils.getFileIdFromFilePath(filePath);
+        StoragePath filePath = new StoragePath(basePath, filePathStr);
+        String partitionPath = FSUtils.getRelativePartitionPath(new 
StoragePath(basePath), filePath.getParent());
+        String fileId = FSUtils.getFileIdFromFilePath(filePath);
         switch (type) {
           case MERGE:
           case CREATE:
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/SerializablePathFilter.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/SerializablePathFilter.java
deleted file mode 100644
index e2affdf5ca8..00000000000
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/SerializablePathFilter.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.table.action.rollback;
-
-import org.apache.hadoop.fs.PathFilter;
-
-import java.io.Serializable;
-
-public interface SerializablePathFilter extends PathFilter, Serializable {
-}
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/DirectWriteMarkers.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/DirectWriteMarkers.java
index f7ed9370a95..86a0c6f0aea 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/DirectWriteMarkers.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/DirectWriteMarkers.java
@@ -32,18 +32,13 @@ import org.apache.hudi.common.util.ReflectionUtils;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.exception.HoodieIOException;
-import org.apache.hudi.hadoop.fs.HadoopFSUtils;
 import org.apache.hudi.storage.HoodieStorage;
+import org.apache.hudi.storage.HoodieStorageUtils;
 import org.apache.hudi.storage.StorageConfiguration;
 import org.apache.hudi.storage.StoragePath;
 import org.apache.hudi.storage.StoragePathInfo;
 import org.apache.hudi.table.HoodieTable;
 
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.LocatedFileStatus;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.RemoteIterator;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -111,23 +106,18 @@ public class DirectWriteMarkers extends WriteMarkers {
       }
     }
 
-    if (subDirectories.size() > 0) {
+    if (!subDirectories.isEmpty()) {
       parallelism = Math.min(subDirectories.size(), parallelism);
       StorageConfiguration<?> storageConf = storage.getConf();
       context.setJobStatus(this.getClass().getSimpleName(), "Obtaining marker 
files for all created, merged paths");
       dataFiles.addAll(context.flatMap(subDirectories, directory -> {
-        Path path = new Path(directory);
-        FileSystem fileSystem = HadoopFSUtils.getFs(path, storageConf);
-        RemoteIterator<LocatedFileStatus> itr = fileSystem.listFiles(path, 
true);
-        List<String> result = new ArrayList<>();
-        while (itr.hasNext()) {
-          FileStatus status = itr.next();
-          String pathStr = status.getPath().toString();
-          if (pathStr.contains(HoodieTableMetaClient.MARKER_EXTN) && 
!pathStr.endsWith(IOType.APPEND.name())) {
-            result.add(translateMarkerToDataPath(pathStr));
-          }
-        }
-        return result.stream();
+        StoragePath path = new StoragePath(directory);
+        HoodieStorage storage = HoodieStorageUtils.getStorage(path, 
storageConf);
+        return storage.listFiles(path).stream()
+            .map(pathInfo -> pathInfo.getPath().toString())
+            .filter(pathStr -> 
pathStr.contains(HoodieTableMetaClient.MARKER_EXTN)
+                && !pathStr.endsWith(IOType.APPEND.name()))
+            .map(this::translateMarkerToDataPath);
       }, parallelism));
     }
 
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/SimpleTransactionDirectMarkerBasedDetectionStrategy.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/SimpleTransactionDirectMarkerBasedDetectionStrategy.java
index 3d984ba781c..d09cc74f14e 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/SimpleTransactionDirectMarkerBasedDetectionStrategy.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/SimpleTransactionDirectMarkerBasedDetectionStrategy.java
@@ -24,7 +24,6 @@ import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieEarlyConflictDetectionException;
 import org.apache.hudi.storage.HoodieStorage;
 
-import org.apache.hadoop.fs.FileSystem;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -48,8 +47,7 @@ public class 
SimpleTransactionDirectMarkerBasedDetectionStrategy
   @Override
   public void detectAndResolveConflictIfNecessary() throws 
HoodieEarlyConflictDetectionException {
     DirectMarkerTransactionManager txnManager =
-        new DirectMarkerTransactionManager((HoodieWriteConfig) config,
-            (FileSystem) storage.getFileSystem(), partitionPath, fileId);
+        new DirectMarkerTransactionManager((HoodieWriteConfig) config, 
storage, partitionPath, fileId);
     try {
       // Need to do transaction before create marker file when using early 
conflict detection
       txnManager.beginTransaction(instantTime);
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/repair/RepairUtils.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/repair/RepairUtils.java
index 17def321f4a..f4eb9ef31cf 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/repair/RepairUtils.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/repair/RepairUtils.java
@@ -31,10 +31,7 @@ import org.apache.hudi.common.util.Option;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.storage.StoragePath;
 
-import org.apache.hadoop.fs.PathFilter;
-
 import java.io.IOException;
-import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
@@ -168,10 +165,4 @@ public final class RepairUtils {
       return Collections.emptyList();
     }
   }
-
-  /**
-   * Serializable path filter class for Spark job.
-   */
-  public interface SerializablePathFilter extends PathFilter, Serializable {
-  }
 }
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/FourToFiveUpgradeHandler.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/FourToFiveUpgradeHandler.java
index 592563b4991..697713dbceb 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/FourToFiveUpgradeHandler.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/FourToFiveUpgradeHandler.java
@@ -24,11 +24,9 @@ import org.apache.hudi.common.engine.HoodieEngineContext;
 import org.apache.hudi.common.table.HoodieTableConfig;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieException;
-import org.apache.hudi.hadoop.fs.HadoopFSUtils;
+import org.apache.hudi.storage.StoragePath;
 import org.apache.hudi.table.HoodieTable;
 
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -76,7 +74,7 @@ public class FourToFiveUpgradeHandler implements 
UpgradeHandler {
     }
   }
 
-  private boolean hasDefaultPartitionPath(HoodieWriteConfig config, 
HoodieTable  table) throws IOException {
+  private boolean hasDefaultPartitionPath(HoodieWriteConfig config, 
HoodieTable table) throws IOException {
     HoodieTableConfig tableConfig = table.getMetaClient().getTableConfig();
     if (!tableConfig.isTablePartitioned()) {
       return false;
@@ -88,7 +86,7 @@ public class FourToFiveUpgradeHandler implements 
UpgradeHandler {
       String[] partitions = tableConfig.getPartitionFields().get();
       checkPartitionPath = partitions[0] + "=" + 
DEPRECATED_DEFAULT_PARTITION_PATH;
     }
-    FileSystem fs = HadoopFSUtils.getFs(config.getBasePath(), 
table.getStorageConf());
-    return fs.exists(new Path(config.getBasePath() + "/" + 
checkPartitionPath));
+
+    return table.getStorage().exists(new StoragePath(config.getBasePath() + 
"/" + checkPartitionPath));
   }
 }
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/UpgradeDowngrade.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/UpgradeDowngrade.java
index 096c4e8135f..2d4e8abf89c 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/UpgradeDowngrade.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/UpgradeDowngrade.java
@@ -30,7 +30,6 @@ import org.apache.hudi.metadata.HoodieMetadataWriteUtils;
 import org.apache.hudi.metadata.HoodieTableMetadata;
 import org.apache.hudi.storage.StoragePath;
 
-import org.apache.hadoop.fs.Path;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -49,8 +48,8 @@ public class UpgradeDowngrade {
   private HoodieTableMetaClient metaClient;
   protected HoodieWriteConfig config;
   protected HoodieEngineContext context;
-  private Path updatedPropsFilePath;
-  private Path propsFilePath;
+  private StoragePath updatedPropsFilePath;
+  private StoragePath propsFilePath;
 
   public UpgradeDowngrade(
       HoodieTableMetaClient metaClient, HoodieWriteConfig config, 
HoodieEngineContext context,
@@ -58,8 +57,8 @@ public class UpgradeDowngrade {
     this.metaClient = metaClient;
     this.config = config;
     this.context = context;
-    this.updatedPropsFilePath = new Path(metaClient.getMetaPath().toString(), 
HOODIE_UPDATED_PROPERTY_FILE);
-    this.propsFilePath = new Path(metaClient.getMetaPath().toString(), 
HoodieTableConfig.HOODIE_PROPERTIES_FILE);
+    this.updatedPropsFilePath = new StoragePath(metaClient.getMetaPath(), 
HOODIE_UPDATED_PROPERTY_FILE);
+    this.propsFilePath = new StoragePath(metaClient.getMetaPath(), 
HoodieTableConfig.HOODIE_PROPERTIES_FILE);
     this.upgradeDowngradeHelper = upgradeDowngradeHelper;
   }
 
diff --git 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/HoodieTestCommitGenerator.java
 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/HoodieTestCommitGenerator.java
index 0356639bdeb..631aeebd58e 100644
--- 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/HoodieTestCommitGenerator.java
+++ 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/HoodieTestCommitGenerator.java
@@ -32,11 +32,11 @@ import org.apache.hudi.common.util.CollectionUtils;
 import org.apache.hudi.common.util.collection.ImmutablePair;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.exception.HoodieException;
-import org.apache.hudi.hadoop.fs.HadoopFSUtils;
 
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
+import org.apache.hudi.storage.HoodieStorage;
+import org.apache.hudi.storage.HoodieStorageUtils;
 import org.apache.hudi.storage.StorageConfiguration;
+import org.apache.hudi.storage.StoragePath;
 import org.apache.hudi.storage.hadoop.HadoopStorageConfiguration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -154,7 +154,7 @@ public class HoodieTestCommitGenerator {
         fileInfoList.forEach(fileInfo -> {
           HoodieWriteStat writeStat = new HoodieWriteStat();
           writeStat.setPartitionPath(partitionPath);
-          writeStat.setPath(new Path(partitionPath, 
fileInfo.getValue()).toString());
+          writeStat.setPath(new StoragePath(partitionPath, 
fileInfo.getValue()).toString());
           writeStat.setFileId(fileInfo.getKey());
           // Below are dummy values
           writeStat.setTotalWriteBytes(10000);
@@ -171,8 +171,9 @@ public class HoodieTestCommitGenerator {
   public static void createCommitFileWithMetadata(
       String basePath, StorageConfiguration<?> storageConf,
       String filename, byte[] content) throws IOException {
-    Path commitFilePath = new Path(basePath + "/" + 
HoodieTableMetaClient.METAFOLDER_NAME + "/" + filename);
-    try (OutputStream os = HadoopFSUtils.getFs(basePath, 
storageConf).create(commitFilePath, true)) {
+    HoodieStorage storage = HoodieStorageUtils.getStorage(basePath, 
storageConf);
+    StoragePath commitFilePath = new StoragePath(basePath + "/" + 
HoodieTableMetaClient.METAFOLDER_NAME + "/" + filename);
+    try (OutputStream os = storage.create(commitFilePath, true)) {
       os.write(content);
     }
   }
@@ -180,14 +181,14 @@ public class HoodieTestCommitGenerator {
   public static void createDataFile(
       String basePath, StorageConfiguration<?> storageConf,
       String partitionPath, String filename) throws IOException {
-    FileSystem fs = HadoopFSUtils.getFs(basePath, storageConf);
-    Path filePath = new Path(new Path(basePath, partitionPath), filename);
-    Path parent = filePath.getParent();
-    if (!fs.exists(parent)) {
-      fs.mkdirs(parent);
+    HoodieStorage storage = HoodieStorageUtils.getStorage(basePath, 
storageConf);
+    StoragePath filePath = new StoragePath(new StoragePath(basePath, 
partitionPath), filename);
+    StoragePath parent = filePath.getParent();
+    if (!storage.exists(parent)) {
+      storage.createDirectory(parent);
     }
-    if (!fs.exists(filePath)) {
-      fs.create(filePath);
+    if (!storage.exists(filePath)) {
+      storage.create(filePath);
     }
   }
 
diff --git 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/FileSystemBasedLockProviderTestClass.java
 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/FileSystemBasedLockProviderTestClass.java
index 2df166c1c71..30d239677f9 100644
--- 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/FileSystemBasedLockProviderTestClass.java
+++ 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/FileSystemBasedLockProviderTestClass.java
@@ -22,15 +22,14 @@ import org.apache.hudi.common.config.LockConfiguration;
 import org.apache.hudi.common.lock.LockProvider;
 import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.exception.HoodieLockException;
-import org.apache.hudi.hadoop.fs.HadoopFSUtils;
+import org.apache.hudi.storage.HoodieStorage;
+import org.apache.hudi.storage.HoodieStorageUtils;
 import org.apache.hudi.storage.StorageConfiguration;
 
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-
 import java.io.IOException;
 import java.io.Serializable;
 import java.util.concurrent.TimeUnit;
+import org.apache.hudi.storage.StoragePath;
 
 import static 
org.apache.hudi.common.config.LockConfiguration.FILESYSTEM_LOCK_PATH_PROP_KEY;
 import static 
org.apache.hudi.common.config.LockConfiguration.LOCK_ACQUIRE_NUM_RETRIES_PROP_KEY;
@@ -47,8 +46,8 @@ public class FileSystemBasedLockProviderTestClass implements 
LockProvider<String
 
   private final int retryMaxCount;
   private final int retryWaitTimeMs;
-  private transient FileSystem fs;
-  private transient Path lockFile;
+  private transient HoodieStorage storage;
+  private transient StoragePath lockFile;
   protected LockConfiguration lockConfiguration;
 
   public FileSystemBasedLockProviderTestClass(final LockConfiguration 
lockConfiguration, final StorageConfiguration<?> configuration) {
@@ -56,15 +55,15 @@ public class FileSystemBasedLockProviderTestClass 
implements LockProvider<String
     final String lockDirectory = 
lockConfiguration.getConfig().getString(FILESYSTEM_LOCK_PATH_PROP_KEY);
     this.retryWaitTimeMs = 
lockConfiguration.getConfig().getInteger(LOCK_ACQUIRE_RETRY_WAIT_TIME_IN_MILLIS_PROP_KEY);
     this.retryMaxCount = 
lockConfiguration.getConfig().getInteger(LOCK_ACQUIRE_NUM_RETRIES_PROP_KEY);
-    this.lockFile = new Path(lockDirectory + "/" + LOCK);
-    this.fs = HadoopFSUtils.getFs(this.lockFile.toString(), configuration);
+    this.lockFile = new StoragePath(lockDirectory + "/" + LOCK);
+    this.storage = HoodieStorageUtils.getStorage(this.lockFile.toString(), 
configuration);
   }
 
   @Override
   public void close() {
     synchronized (LOCK) {
       try {
-        fs.delete(this.lockFile, true);
+        storage.deleteDirectory(this.lockFile);
       } catch (IOException e) {
         throw new HoodieLockException("Unable to release lock: " + getLock(), 
e);
       }
@@ -76,7 +75,7 @@ public class FileSystemBasedLockProviderTestClass implements 
LockProvider<String
     try {
       int numRetries = 0;
       synchronized (LOCK) {
-        while (fs.exists(this.lockFile)) {
+        while (storage.exists(this.lockFile)) {
           LOCK.wait(retryWaitTimeMs);
           numRetries++;
           if (numRetries > retryMaxCount) {
@@ -84,7 +83,7 @@ public class FileSystemBasedLockProviderTestClass implements 
LockProvider<String
           }
         }
         acquireLock();
-        return fs.exists(this.lockFile);
+        return storage.exists(this.lockFile);
       }
     } catch (IOException | InterruptedException e) {
       throw new HoodieLockException("Failed to acquire lock: " + getLock(), e);
@@ -95,8 +94,8 @@ public class FileSystemBasedLockProviderTestClass implements 
LockProvider<String
   public void unlock() {
     synchronized (LOCK) {
       try {
-        if (fs.exists(this.lockFile)) {
-          fs.delete(this.lockFile, true);
+        if (storage.exists(this.lockFile)) {
+          storage.deleteDirectory(this.lockFile);
         }
       } catch (IOException io) {
         throw new HoodieIOException("Unable to delete lock " + getLock() + "on 
disk", io);
@@ -111,7 +110,7 @@ public class FileSystemBasedLockProviderTestClass 
implements LockProvider<String
 
   private void acquireLock() {
     try {
-      fs.create(this.lockFile, false).close();
+      storage.create(this.lockFile, false).close();
     } catch (IOException e) {
       throw new HoodieIOException("Failed to acquire lock: " + getLock(), e);
     }
diff --git 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestLockManager.java
 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestLockManager.java
index 398ce60f811..68959518850 100644
--- 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestLockManager.java
+++ 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestLockManager.java
@@ -28,7 +28,6 @@ import org.apache.hudi.config.HoodieLockConfig;
 import org.apache.hudi.config.HoodieWriteConfig;
 
 import org.apache.curator.test.TestingServer;
-import org.apache.hadoop.fs.FileSystem;
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.BeforeEach;
@@ -79,8 +78,7 @@ public class TestLockManager extends HoodieCommonTestHarness {
   @ValueSource(booleans = {true, false})
   void testLockAndUnlock(boolean multiWriter) {
     HoodieWriteConfig writeConfig = multiWriter ? getMultiWriterWriteConfig() 
: getSingleWriterWriteConfig();
-    LockManager lockManager = new LockManager(writeConfig,
-        (FileSystem) this.metaClient.getStorage().getFileSystem());
+    LockManager lockManager = new LockManager(writeConfig, 
this.metaClient.getStorage());
     LockManager mockLockManager = Mockito.spy(lockManager);
 
     assertDoesNotThrow(() -> {
diff --git 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/repair/TestRepairUtils.java
 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/repair/TestRepairUtils.java
index 31f113d1890..12e660477ed 100644
--- 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/repair/TestRepairUtils.java
+++ 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/repair/TestRepairUtils.java
@@ -31,7 +31,6 @@ import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.storage.StoragePath;
 
-import org.apache.hadoop.fs.Path;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
@@ -133,7 +132,7 @@ public class TestRepairUtils {
     Set<String> expectedPaths = partitionToFileIdAndNameMap.entrySet().stream()
         .flatMap(entry ->
             entry.getValue().stream()
-                .map(fileInfo -> new Path(entry.getKey(), 
fileInfo.getValue()).toString())
+                .map(fileInfo -> new StoragePath(entry.getKey(), 
fileInfo.getValue()).toString())
                 .collect(Collectors.toList())
                 .stream()
         ).collect(Collectors.toSet());
diff --git 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/utils/HoodieWriterClientTestHarness.java
 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/utils/HoodieWriterClientTestHarness.java
index 809f01b5576..20c85a396c7 100644
--- 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/utils/HoodieWriterClientTestHarness.java
+++ 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/utils/HoodieWriterClientTestHarness.java
@@ -85,7 +85,6 @@ import org.apache.hudi.table.marker.WriteMarkersFactory;
 import org.apache.hudi.testutils.MetadataMergeWriteStatus;
 
 import org.apache.avro.generic.GenericRecord;
-import org.apache.hadoop.fs.Path;
 import org.jetbrains.annotations.NotNull;
 
 import java.io.IOException;
@@ -690,7 +689,7 @@ public abstract class HoodieWriterClientTestHarness extends 
HoodieCommonTestHarn
       String markerName = 
MarkerUtils.readTimelineServerBasedMarkersFromFileSystem(
                       markerFolderPath, storage, context, 1).values().stream()
               .flatMap(Collection::stream).findFirst().get();
-      partitionPath = new Path(markerFolderPath, 
markerName).getParent().toString();
+      partitionPath = new StoragePath(markerFolderPath, 
markerName).getParent().toString();
     } else {
       partitionPath = storage.globEntries(
                       new StoragePath(String.format("%s/*/*/*/*", 
markerFolderPath)), path ->
diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeAndReplaceHandleWithChangeLog.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeAndReplaceHandleWithChangeLog.java
index b1049e1d73c..c99e801b6ba 100644
--- 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeAndReplaceHandleWithChangeLog.java
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeAndReplaceHandleWithChangeLog.java
@@ -33,7 +33,6 @@ import org.apache.hudi.table.HoodieTable;
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.generic.IndexedRecord;
-import org.apache.hadoop.fs.FileSystem;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -63,7 +62,7 @@ public class FlinkMergeAndReplaceHandleWithChangeLog<T, I, K, 
O>
         config,
         hoodieTable.getMetaClient().getTableConfig(),
         partitionPath,
-        (FileSystem) getStorage().getFileSystem(),
+        getStorage(),
         getWriterSchema(),
         createLogWriter(instantTime, HoodieCDCUtils.CDC_LOGFILE_SUFFIX),
         IOUtils.getMaxMemoryPerPartitionMerge(taskContextSupplier, config));
diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeHandleWithChangeLog.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeHandleWithChangeLog.java
index 040c7d5b514..0483096ce9a 100644
--- 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeHandleWithChangeLog.java
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeHandleWithChangeLog.java
@@ -32,7 +32,6 @@ import org.apache.hudi.table.HoodieTable;
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.generic.IndexedRecord;
-import org.apache.hadoop.fs.FileSystem;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -61,7 +60,7 @@ public class FlinkMergeHandleWithChangeLog<T, I, K, O>
         config,
         hoodieTable.getMetaClient().getTableConfig(),
         partitionPath,
-        (FileSystem) getStorage().getFileSystem(),
+        getStorage(),
         getWriterSchema(),
         createLogWriter(instantTime, HoodieCDCUtils.CDC_LOGFILE_SUFFIX),
         IOUtils.getMaxMemoryPerPartitionMerge(taskContextSupplier, config));
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/SparkBootstrapCommitActionExecutor.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/SparkBootstrapCommitActionExecutor.java
index 5bbc744c32f..b19a202cc77 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/SparkBootstrapCommitActionExecutor.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/SparkBootstrapCommitActionExecutor.java
@@ -48,9 +48,10 @@ import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.data.HoodieJavaRDD;
 import org.apache.hudi.exception.HoodieIOException;
-import org.apache.hudi.hadoop.fs.HadoopFSUtils;
 import org.apache.hudi.keygen.KeyGeneratorInterface;
 import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory;
+import org.apache.hudi.storage.HoodieStorage;
+import org.apache.hudi.storage.HoodieStorageUtils;
 import org.apache.hudi.table.HoodieTable;
 import org.apache.hudi.table.action.HoodieWriteMetadata;
 import org.apache.hudi.table.action.commit.BaseCommitActionExecutor;
@@ -58,7 +59,6 @@ import 
org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor;
 import org.apache.hudi.table.action.commit.SparkBulkInsertCommitActionExecutor;
 import org.apache.hudi.table.marker.WriteMarkersFactory;
 
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.spark.api.java.JavaRDD;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -83,7 +83,7 @@ public class SparkBootstrapCommitActionExecutor<T>
 
   private static final Logger LOG = 
LoggerFactory.getLogger(SparkBootstrapCommitActionExecutor.class);
   protected String bootstrapSchema = null;
-  private transient FileSystem bootstrapSourceFileSystem;
+  private transient HoodieStorage bootstrapSourceStorage;
 
   public SparkBootstrapCommitActionExecutor(HoodieSparkEngineContext context,
                                             HoodieWriteConfig config,
@@ -100,7 +100,7 @@ public class SparkBootstrapCommitActionExecutor<T>
         HoodieTimeline.METADATA_BOOTSTRAP_INSTANT_TS,
         WriteOperationType.BOOTSTRAP,
         extraMetadata);
-    bootstrapSourceFileSystem = 
HadoopFSUtils.getFs(config.getBootstrapSourceBasePath(), storageConf);
+    bootstrapSourceStorage = 
HoodieStorageUtils.getStorage(config.getBootstrapSourceBasePath(), storageConf);
   }
 
   private void validate() {
@@ -265,7 +265,7 @@ public class SparkBootstrapCommitActionExecutor<T>
    */
   private Map<BootstrapMode, List<Pair<String, List<HoodieFileStatus>>>> 
listAndProcessSourcePartitions() throws IOException {
     List<Pair<String, List<HoodieFileStatus>>> folders = 
BootstrapUtils.getAllLeafFoldersWithFiles(
-        table.getBaseFileFormat(), bootstrapSourceFileSystem, 
config.getBootstrapSourceBasePath(), context);
+        table.getBaseFileFormat(), bootstrapSourceStorage, 
config.getBootstrapSourceBasePath(), context);
 
     LOG.info("Fetching Bootstrap Schema !!");
     HoodieBootstrapSchemaProvider sourceSchemaProvider = new 
HoodieSparkBootstrapSchemaProvider(config);
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/bootstrap/TestBootstrapUtils.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/bootstrap/TestBootstrapUtils.java
index 97bcb8806b9..eca6cafe60a 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/bootstrap/TestBootstrapUtils.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/bootstrap/TestBootstrapUtils.java
@@ -24,7 +24,6 @@ import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.storage.StoragePath;
 import org.apache.hudi.testutils.HoodieClientTestBase;
 
-import org.apache.hadoop.fs.FileSystem;
 import org.junit.jupiter.api.Test;
 
 import java.io.IOException;
@@ -72,7 +71,7 @@ public class TestBootstrapUtils extends HoodieClientTestBase {
     List<Pair<String, List<HoodieFileStatus>>> collected =
         BootstrapUtils.getAllLeafFoldersWithFiles(
             metaClient.getTableConfig().getBaseFileFormat(),
-            (FileSystem) metaClient.getStorage().getFileSystem(),
+            metaClient.getStorage(),
             basePath, context);
     assertEquals(3, collected.size());
     collected.forEach(k -> assertEquals(2, k.getRight().size()));
@@ -81,7 +80,7 @@ public class TestBootstrapUtils extends HoodieClientTestBase {
     collected =
         BootstrapUtils.getAllLeafFoldersWithFiles(
             metaClient.getTableConfig().getBaseFileFormat(),
-            (FileSystem) metaClient.getStorage().getFileSystem(),
+            metaClient.getStorage(),
             basePath + "/" + folders.get(0), context);
     assertEquals(1, collected.size());
     collected.forEach(k -> assertEquals(2, k.getRight().size()));
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestCleanActionExecutor.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestCleanActionExecutor.java
index 206e243ba17..38f7b60b5ee 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestCleanActionExecutor.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestCleanActionExecutor.java
@@ -39,14 +39,13 @@ import org.apache.hudi.common.util.Option;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.storage.HoodieStorage;
+import org.apache.hudi.storage.HoodieStorageUtils;
 import org.apache.hudi.storage.StorageConfiguration;
+import org.apache.hudi.storage.StoragePath;
 import org.apache.hudi.table.HoodieTable;
 import org.apache.hudi.table.action.clean.CleanActionExecutor;
 import org.apache.hudi.table.action.clean.CleanPlanner;
 
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.EnumSource;
@@ -64,6 +63,7 @@ import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.when;
 
 /**
@@ -71,11 +71,11 @@ import static org.mockito.Mockito.when;
  */
 public class TestCleanActionExecutor {
 
-  private static final StorageConfiguration<Configuration> CONF = 
getDefaultStorageConf();
+  private static final StorageConfiguration<?> CONF = getDefaultStorageConf();
   private final HoodieEngineContext context = new 
HoodieLocalEngineContext(CONF);
   private final HoodieTable<?, ?, ?, ?> mockHoodieTable = 
mock(HoodieTable.class);
   private HoodieTableMetaClient metaClient;
-  private FileSystem fs;
+  private HoodieStorage storage;
 
   private static String PARTITION1 = "partition1";
 
@@ -88,12 +88,9 @@ public class TestCleanActionExecutor {
     when(mockHoodieTable.getMetaClient()).thenReturn(metaClient);
     HoodieTableConfig tableConfig = new HoodieTableConfig();
     when(metaClient.getTableConfig()).thenReturn(tableConfig);
-    HoodieStorage storage = mock(HoodieStorage.class);
+    storage = spy(HoodieStorageUtils.getStorage(CONF));
     when(metaClient.getStorage()).thenReturn(storage);
     when(mockHoodieTable.getStorage()).thenReturn(storage);
-    fs = mock(FileSystem.class);
-    when(storage.getFileSystem()).thenReturn(fs);
-    when(fs.getConf()).thenReturn(CONF.unwrap());
   }
 
   @ParameterizedTest
@@ -102,23 +99,26 @@ public class TestCleanActionExecutor {
     HoodieWriteConfig config = getCleanByCommitsConfig();
     String fileGroup = UUID.randomUUID() + "-0";
     HoodieBaseFile baseFile = new 
HoodieBaseFile(String.format("/tmp/base/%s_1-0-1_%s.parquet", fileGroup, 
"001"));
-    FileSystem localFs = new 
Path(baseFile.getPath()).getFileSystem(CONF.unwrap());
-    Path filePath = new Path(baseFile.getPath());
-    localFs.create(filePath);
+    HoodieStorage localStorage = 
HoodieStorageUtils.getStorage(baseFile.getPath(), CONF);
+    StoragePath filePath = new StoragePath(baseFile.getPath());
+
     if (failureType == CleanFailureType.TRUE_ON_DELETE) {
-      when(fs.delete(filePath, false)).thenReturn(true);
+      when(storage.deleteFile(filePath)).thenReturn(true);
     } else if (failureType == 
CleanFailureType.FALSE_ON_DELETE_IS_EXISTS_FALSE) {
-      when(fs.delete(filePath, false)).thenReturn(false);
-      when(fs.exists(filePath)).thenReturn(false);
+      when(storage.deleteFile(filePath)).thenReturn(false);
+      when(storage.exists(filePath)).thenReturn(false);
     } else if (failureType == CleanFailureType.FALSE_ON_DELETE_IS_EXISTS_TRUE) 
{
-      when(fs.delete(filePath, false)).thenReturn(false);
-      when(fs.exists(filePath)).thenReturn(true);
+      when(storage.deleteFile(filePath)).thenReturn(false);
+      when(storage.exists(filePath)).thenReturn(true);
     } else if (failureType == CleanFailureType.FILE_NOT_FOUND_EXC_ON_DELETE) {
-      when(fs.delete(filePath, false)).thenThrow(new 
FileNotFoundException("throwing file not found exception"));
+      when(storage.deleteFile(filePath)).thenThrow(new 
FileNotFoundException("throwing file not found exception"));
     } else {
       // run time exception
-      when(fs.delete(filePath, false)).thenThrow(new 
RuntimeException("throwing run time exception"));
+      when(storage.deleteFile(filePath)).thenThrow(new 
RuntimeException("throwing run time exception"));
     }
+    // we have to create the actual file after setting up mock logic for 
storage
+    // otherwise the file created would be deleted when setting up because 
storage is a spy
+    localStorage.create(filePath);
 
     Map<String, List<HoodieCleanFileInfo>> partitionCleanFileInfoMap = new 
HashMap<>();
     List<HoodieCleanFileInfo> cleanFileInfos = Collections.singletonList(new 
HoodieCleanFileInfo(baseFile.getPath(), false));
@@ -165,7 +165,7 @@ public class TestCleanActionExecutor {
     });
   }
 
-  private void assertCleanExecutionSuccess(CleanActionExecutor 
cleanActionExecutor, Path filePath) {
+  private void assertCleanExecutionSuccess(CleanActionExecutor 
cleanActionExecutor, StoragePath filePath) {
     HoodieCleanMetadata cleanMetadata = cleanActionExecutor.execute();
     assertTrue(cleanMetadata.getPartitionMetadata().containsKey(PARTITION1));
     HoodieCleanPartitionMetadata cleanPartitionMetadata = 
cleanMetadata.getPartitionMetadata().get(PARTITION1);
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java 
b/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java
index f4d29a138fe..65fb5073d7a 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java
@@ -19,6 +19,8 @@
 
 package org.apache.hudi.common.fs;
 
+import org.apache.hudi.avro.model.HoodieFileStatus;
+import org.apache.hudi.avro.model.HoodiePath;
 import org.apache.hudi.common.config.HoodieMetadataConfig;
 import org.apache.hudi.common.engine.HoodieEngineContext;
 import org.apache.hudi.common.model.HoodieFileFormat;
@@ -605,6 +607,31 @@ public class FSUtils {
     return false;
   }
 
+  public static HoodiePath fromStoragePath(StoragePath path) {
+    if (null == path) {
+      return null;
+    }
+    return HoodiePath.newBuilder().setUri(path.toString()).build();
+  }
+
+  public static HoodieFileStatus fromPathInfo(StoragePathInfo pathInfo) {
+    if (null == pathInfo) {
+      return null;
+    }
+
+    HoodieFileStatus fStatus = new HoodieFileStatus();
+
+    fStatus.setPath(fromStoragePath(pathInfo.getPath()));
+    fStatus.setLength(pathInfo.getLength());
+    fStatus.setIsDir(pathInfo.isDirectory());
+    fStatus.setBlockReplication((int) pathInfo.getBlockReplication());
+    fStatus.setBlockSize(pathInfo.getBlockSize());
+    fStatus.setModificationTime(pathInfo.getModificationTime());
+    fStatus.setAccessTime(pathInfo.getModificationTime());
+
+    return fStatus;
+  }
+
   /**
    * Processes sub-path in parallel.
    *
diff --git 
a/hudi-hadoop-common/src/main/java/org/apache/hudi/hadoop/fs/SerializablePath.java
 
b/hudi-hadoop-common/src/main/java/org/apache/hudi/hadoop/fs/SerializablePath.java
deleted file mode 100644
index c814a3ed969..00000000000
--- 
a/hudi-hadoop-common/src/main/java/org/apache/hudi/hadoop/fs/SerializablePath.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.hudi.hadoop.fs;
-
-import org.apache.hadoop.fs.Path;
-
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-import java.io.Serializable;
-import java.net.URI;
-import java.util.Objects;
-
-/**
- * {@link Serializable} wrapper encapsulating {@link Path}
- */
-public class SerializablePath implements Serializable {
-
-  private Path path;
-
-  public SerializablePath(Path path) {
-    this.path = path;
-  }
-
-  public Path get() {
-    return path;
-  }
-
-  private void writeObject(ObjectOutputStream out) throws IOException {
-    out.writeObject(path.toUri());
-  }
-
-  private void readObject(ObjectInputStream in) throws IOException, 
ClassNotFoundException {
-    URI uri = (URI) in.readObject();
-    path = new CachingPath(uri);
-  }
-
-  @Override
-  public String toString() {
-    return path.toString();
-  }
-
-  @Override
-  public boolean equals(Object o) {
-    if (this == o) {
-      return true;
-    }
-    if (o == null || getClass() != o.getClass()) {
-      return false;
-    }
-    SerializablePath that = (SerializablePath) o;
-    return Objects.equals(path, that.path);
-  }
-}
diff --git 
a/hudi-hadoop-common/src/main/java/org/apache/hudi/storage/hadoop/HoodieHadoopStorage.java
 
b/hudi-hadoop-common/src/main/java/org/apache/hudi/storage/hadoop/HoodieHadoopStorage.java
index 10826c7aace..e203f502a7d 100644
--- 
a/hudi-hadoop-common/src/main/java/org/apache/hudi/storage/hadoop/HoodieHadoopStorage.java
+++ 
b/hudi-hadoop-common/src/main/java/org/apache/hudi/storage/hadoop/HoodieHadoopStorage.java
@@ -211,6 +211,18 @@ public class HoodieHadoopStorage extends HoodieStorage {
         .collect(Collectors.toList());
   }
 
+  @Override
+  public List<StoragePathInfo> listDirectEntries(List<StoragePath> pathList,
+                                                 StoragePathFilter filter) 
throws IOException {
+    return Arrays.stream(fs.listStatus(
+            pathList.stream()
+                .map(HadoopFSUtils::convertToHadoopPath)
+                .toArray(Path[]::new),
+            e -> filter.accept(convertToStoragePath(e))))
+        .map(HadoopFSUtils::convertToStoragePathInfo)
+        .collect(Collectors.toList());
+  }
+
   @Override
   public List<StoragePathInfo> globEntries(StoragePath pathPattern)
       throws IOException {
diff --git a/hudi-io/src/main/java/org/apache/hudi/storage/HoodieStorage.java 
b/hudi-io/src/main/java/org/apache/hudi/storage/HoodieStorage.java
index fc5ae52de54..18c21d73b3b 100644
--- a/hudi-io/src/main/java/org/apache/hudi/storage/HoodieStorage.java
+++ b/hudi-io/src/main/java/org/apache/hudi/storage/HoodieStorage.java
@@ -426,6 +426,26 @@ public abstract class HoodieStorage implements Closeable {
     return result;
   }
 
+  /**
+   * Lists the file info of the direct files/directories in the given list of 
paths
+   * and filters the results, if the paths are directory.
+   *
+   * @param pathList given path list.
+   * @param filter filter to apply.
+   * @return the list of path info of the files/directories in the given paths.
+   * @throws FileNotFoundException when the path does not exist.
+   * @throws IOException           IO error.
+   */
+  @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
+  public List<StoragePathInfo> listDirectEntries(List<StoragePath> pathList,
+                                                 StoragePathFilter filter) 
throws IOException {
+    List<StoragePathInfo> result = new ArrayList<>();
+    for (StoragePath path : pathList) {
+      result.addAll(listDirectEntries(path, filter));
+    }
+    return result;
+  }
+
   /**
    * Returns all the files that match the pathPattern and are not checksum 
files.
    *
diff --git 
a/hudi-io/src/test/java/org/apache/hudi/io/storage/TestHoodieStorageBase.java 
b/hudi-io/src/test/java/org/apache/hudi/io/storage/TestHoodieStorageBase.java
index c953fbab7a9..0f6fa99e284 100644
--- 
a/hudi-io/src/test/java/org/apache/hudi/io/storage/TestHoodieStorageBase.java
+++ 
b/hudi-io/src/test/java/org/apache/hudi/io/storage/TestHoodieStorageBase.java
@@ -270,6 +270,41 @@ public abstract class TestHoodieStorageBase {
             e -> e.getParent().getName().equals("y") && 
e.getName().contains("1")));
   }
 
+  @Test
+  public void testListingWithFilter() throws IOException {
+    HoodieStorage storage = getStorage();
+    // Full list:
+    // w/1.file
+    // w/2.file
+    // x/1.file
+    // x/2.file
+    // x/y/1.file
+    // x/y/2.file
+    // x/z/1.file
+    // x/z/2.file
+    prepareFilesOnStorage(storage);
+
+    validatePathInfoList(
+        Arrays.stream(new StoragePathInfo[] {
+            getStoragePathInfo("x/y/2.file", false)
+        }).collect(Collectors.toList()),
+        storage.listDirectEntries(
+            new StoragePath(getTempDir(), "x/y"),
+            path -> path.getName().contains("2")));
+
+    validatePathInfoList(
+        Arrays.stream(new StoragePathInfo[] {
+            getStoragePathInfo("w/2.file", false),
+            getStoragePathInfo("x/y/2.file", false),
+            getStoragePathInfo("x/z/2.file", false)
+        }).collect(Collectors.toList()),
+        storage.listDirectEntries(Arrays.stream(new StoragePath[] {
+            new StoragePath(getTempDir(), "w"),
+            new StoragePath(getTempDir(), "x/y"),
+            new StoragePath(getTempDir(), "x/z")
+        }).collect(Collectors.toList()), path -> 
path.getName().equals("2.file")));
+  }
+
   @Test
   public void testFileNotFound() throws IOException {
     HoodieStorage storage = getStorage();
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBootstrap.java
 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBootstrap.java
index 99fcdcbf8a3..6969357763e 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBootstrap.java
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBootstrap.java
@@ -177,7 +177,7 @@ public class TestBootstrap extends 
HoodieSparkClientTestBase {
     String filePath =
         HadoopFSUtils.toPath(
             
BootstrapUtils.getAllLeafFoldersWithFiles(getConfig().getBaseFileFormat(),
-                    (FileSystem) metaClient.getStorage().getFileSystem(),
+                    metaClient.getStorage(),
                     srcPath, context).stream().findAny().map(p -> 
p.getValue().stream().findAny())
                 .orElse(null).get().getPath()).toString();
     HoodieAvroParquetReader parquetReader =
@@ -286,7 +286,7 @@ public class TestBootstrap extends 
HoodieSparkClientTestBase {
     metaClient.reloadActiveTimeline();
     assertEquals(0, metaClient.getCommitsTimeline().countInstants());
     assertEquals(0L, 
BootstrapUtils.getAllLeafFoldersWithFiles(config.getBaseFileFormat(),
-            (FileSystem) metaClient.getStorage().getFileSystem(), basePath, 
context)
+            metaClient.getStorage(), basePath, context)
         .stream().mapToLong(f -> f.getValue().size()).sum());
 
     BootstrapIndex index = BootstrapIndex.getBootstrapIndex(metaClient);
@@ -314,7 +314,7 @@ public class TestBootstrap extends 
HoodieSparkClientTestBase {
     generateNewDataSetAndReturnSchema(updateTimestamp, totalRecords, 
partitions, updateSPath);
     JavaRDD<HoodieRecord> updateBatch =
         generateInputBatch(jsc, 
BootstrapUtils.getAllLeafFoldersWithFiles(config.getBaseFileFormat(),
-                (FileSystem) metaClient.getStorage().getFileSystem(), 
updateSPath, context),
+                metaClient.getStorage(), updateSPath, context),
                 schema);
     String newInstantTs = client.startCommit();
     client.upsert(updateBatch, newInstantTs);
@@ -389,7 +389,7 @@ public class TestBootstrap extends 
HoodieSparkClientTestBase {
     if (checkNumRawFiles) {
       List<HoodieFileStatus> files =
           
BootstrapUtils.getAllLeafFoldersWithFiles(getConfig().getBaseFileFormat(),
-              (FileSystem) metaClient.getStorage().getFileSystem(),
+              metaClient.getStorage(),
           bootstrapBasePath, context).stream().flatMap(x -> 
x.getValue().stream()).collect(Collectors.toList());
       assertEquals(files.size() * numVersions,
           sqlContext.sql("select distinct _hoodie_file_name from 
bootstrapped").count());
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestOrcBootstrap.java
 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestOrcBootstrap.java
index f8c8527f413..613e620fc27 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestOrcBootstrap.java
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestOrcBootstrap.java
@@ -60,7 +60,6 @@ import org.apache.hudi.testutils.HoodieSparkClientTestBase;
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.orc.OrcFile;
@@ -157,7 +156,7 @@ public class TestOrcBootstrap extends 
HoodieSparkClientTestBase {
     }
     String filePath = 
HadoopFSUtils.toPath(BootstrapUtils.getAllLeafFoldersWithFiles(
             metaClient.getTableConfig().getBaseFileFormat(),
-            (FileSystem) metaClient.getStorage().getFileSystem(),
+            metaClient.getStorage(),
             srcPath, context).stream().findAny().map(p -> 
p.getValue().stream().findAny())
         .orElse(null).get().getPath()).toString();
     Reader orcReader =
@@ -271,7 +270,7 @@ public class TestOrcBootstrap extends 
HoodieSparkClientTestBase {
     assertEquals(0, metaClient.getCommitsTimeline().countInstants());
     assertEquals(0L,
         
BootstrapUtils.getAllLeafFoldersWithFiles(metaClient.getTableConfig().getBaseFileFormat(),
-                (FileSystem) metaClient.getStorage().getFileSystem(), 
basePath, context)
+                metaClient.getStorage(), basePath, context)
             .stream().flatMap(f -> f.getValue().stream()).count());
 
     BootstrapIndex index = BootstrapIndex.getBootstrapIndex(metaClient);
@@ -300,7 +299,7 @@ public class TestOrcBootstrap extends 
HoodieSparkClientTestBase {
     JavaRDD<HoodieRecord> updateBatch =
         generateInputBatch(jsc, BootstrapUtils.getAllLeafFoldersWithFiles(
                 metaClient.getTableConfig().getBaseFileFormat(),
-                (FileSystem) metaClient.getStorage().getFileSystem(), 
updateSPath, context),
+                metaClient.getStorage(), updateSPath, context),
             schema);
     String newInstantTs = client.startCommit();
     client.upsert(updateBatch, newInstantTs);
@@ -374,8 +373,8 @@ public class TestOrcBootstrap extends 
HoodieSparkClientTestBase {
     if (checkNumRawFiles) {
       List<HoodieFileStatus> files =
           
BootstrapUtils.getAllLeafFoldersWithFiles(metaClient.getTableConfig().getBaseFileFormat(),
-                  (FileSystem) metaClient.getStorage().getFileSystem(),
-                  bootstrapBasePath, context).stream().flatMap(x -> 
x.getValue().stream())
+                  metaClient.getStorage(), bootstrapBasePath, context)
+              .stream().flatMap(x -> x.getValue().stream())
               .collect(Collectors.toList());
       assertEquals(files.size() * numVersions,
           sqlContext.sql("select distinct _hoodie_file_name from 
bootstrapped").count());

Reply via email to