This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch branch-0.x
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit 1536d50870a2f78f0a70e80bcc55053e9ac76dac
Author: Y Ethan Guo <[email protected]>
AuthorDate: Wed May 15 06:25:12 2024 -0700

    [HUDI-7745] Move Hadoop-dependent util methods to hudi-hadoop-common 
(#11193)
---
 .../hudi/aws/sync/AWSGlueCatalogSyncClient.java    |   4 +-
 .../hudi/client/utils/CommitMetadataUtils.java     |   4 +-
 .../index/bucket/ConsistentBucketIndexUtils.java   |   5 +-
 .../org/apache/hudi/io/HoodieAppendHandle.java     |   3 +-
 .../java/org/apache/hudi/io/HoodieCDCLogger.java   |   4 +-
 .../org/apache/hudi/io/HoodieCreateHandle.java     |   3 +-
 .../java/org/apache/hudi/io/HoodieMergeHandle.java |   3 +-
 .../table/action/bootstrap/BootstrapUtils.java     |  11 +-
 .../table/action/rollback/BaseRollbackHelper.java  |   5 +-
 .../rollback/ListingBasedRollbackStrategy.java     |   9 +-
 .../rollback/MarkerBasedRollbackStrategy.java      |   7 +-
 .../HoodieSparkBootstrapSchemaProvider.java        |   4 +-
 .../bootstrap/MetadataBootstrapHandlerFactory.java |   9 +-
 .../java/org/apache/hudi/table/TestCleaner.java    |   7 +-
 ...dieSparkMergeOnReadTableInsertUpdateDelete.java |   3 +-
 .../hudi/common/bootstrap/FileStatusUtils.java     |  86 -------
 .../java/org/apache/hudi/common/fs/FSUtils.java    | 229 +++--------------
 .../org/apache/hudi/common/util/ConfigUtils.java   |  57 -----
 .../apache/hudi/common/util/TestConfigUtils.java   |  10 +-
 .../sink/compact/ITTestHoodieFlinkCompactor.java   |   3 +-
 .../apache/hudi/common/util/HadoopConfigUtils.java |  91 +++++++
 .../org/apache/hudi/hadoop/fs/HadoopFSUtils.java   | 275 +++++++++++++++++++++
 .../org/apache/hudi/common/fs/TestFSUtils.java     |   6 +-
 .../table/view/TestHoodieTableFileSystemView.java  |   6 +-
 .../hudi/common/testutils/HoodieTestTable.java     |   3 +-
 .../hudi/common/util/TestHadoopConfigUtils.java    |  63 +++++
 .../hudi/hadoop/HoodieROTablePathFilter.java       |   3 +-
 .../realtime/HoodieParquetRealtimeInputFormat.java |   3 +-
 .../hudi/hadoop/utils/HoodieInputFormatUtils.java  |   2 +-
 .../TestHoodieMergeOnReadSnapshotReader.java       |   2 +-
 .../realtime/TestHoodieRealtimeRecordReader.java   |   3 +-
 .../SparkFullBootstrapDataProviderBase.java        |   4 +-
 .../procedures/ShowInvalidParquetProcedure.scala   |   2 +-
 .../org/apache/hudi/functional/TestBootstrap.java  |   7 +-
 .../apache/hudi/functional/TestOrcBootstrap.java   |  10 +-
 .../apache/hudi/sync/adb/HoodieAdbJdbcClient.java  |  12 +-
 .../org/apache/hudi/hive/ddl/HMSDDLExecutor.java   |   7 +-
 .../hudi/hive/ddl/QueryBasedDDLExecutor.java       |   7 +-
 .../org/apache/hudi/hive/TestHiveSyncTool.java     |   4 +-
 .../apache/hudi/sync/common/HoodieSyncClient.java  |   6 +-
 .../apache/hudi/sync/common/HoodieSyncConfig.java  |   4 +-
 .../apache/hudi/sync/common/HoodieSyncTool.java    |   4 +-
 .../hudi/utilities/HoodieDataTableUtils.java       |   2 +-
 .../apache/hudi/utilities/HoodieRepairTool.java    |   7 +-
 .../hudi/utilities/HoodieSnapshotCopier.java       |   2 +-
 .../hudi/utilities/HoodieSnapshotExporter.java     |   2 +-
 46 files changed, 570 insertions(+), 433 deletions(-)

diff --git 
a/hudi-aws/src/main/java/org/apache/hudi/aws/sync/AWSGlueCatalogSyncClient.java 
b/hudi-aws/src/main/java/org/apache/hudi/aws/sync/AWSGlueCatalogSyncClient.java
index 11e3eaea1c0..d379109a624 100644
--- 
a/hudi-aws/src/main/java/org/apache/hudi/aws/sync/AWSGlueCatalogSyncClient.java
+++ 
b/hudi-aws/src/main/java/org/apache/hudi/aws/sync/AWSGlueCatalogSyncClient.java
@@ -301,7 +301,7 @@ public class AWSGlueCatalogSyncClient extends 
HoodieSyncClient {
     try {
       StorageDescriptor sd = table.storageDescriptor();
       List<PartitionInput> partitionInputList = 
partitionsToAdd.stream().map(partition -> {
-        String fullPartitionPath = 
FSUtils.constructAbsolutePathInHadoopPath(s3aToS3(getBasePath()), 
partition).toString();
+        String fullPartitionPath = 
FSUtils.constructAbsolutePath(s3aToS3(getBasePath()), partition).toString();
         List<String> partitionValues = 
partitionValueExtractor.extractPartitionValuesInPath(partition);
         StorageDescriptor partitionSD = sd.copy(copySd -> 
copySd.location(fullPartitionPath));
         return 
PartitionInput.builder().values(partitionValues).storageDescriptor(partitionSD).build();
@@ -345,7 +345,7 @@ public class AWSGlueCatalogSyncClient extends 
HoodieSyncClient {
     try {
       StorageDescriptor sd = table.storageDescriptor();
       List<BatchUpdatePartitionRequestEntry> updatePartitionEntries = 
changedPartitions.stream().map(partition -> {
-        String fullPartitionPath = 
FSUtils.constructAbsolutePathInHadoopPath(s3aToS3(getBasePath()), 
partition).toString();
+        String fullPartitionPath = 
FSUtils.constructAbsolutePath(s3aToS3(getBasePath()), partition).toString();
         List<String> partitionValues = 
partitionValueExtractor.extractPartitionValuesInPath(partition);
         StorageDescriptor partitionSD = sd.copy(copySd -> 
copySd.location(fullPartitionPath));
         PartitionInput partitionInput = 
PartitionInput.builder().values(partitionValues).storageDescriptor(partitionSD).build();
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/CommitMetadataUtils.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/CommitMetadataUtils.java
index 64f55b09e80..56014542394 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/CommitMetadataUtils.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/CommitMetadataUtils.java
@@ -151,7 +151,7 @@ public class CommitMetadataUtils {
     List<String> logFilePaths = new ArrayList<>(logFilesMarkerPath);
     HoodiePairData<String, List<String>> partitionPathLogFilePair = 
context.parallelize(logFilePaths).mapToPair(logFilePath -> {
       Path logFileFullPath = new Path(basePathStr, logFilePath);
-      String partitionPath = FSUtils.getRelativePartitionPath(new 
Path(basePathStr), logFileFullPath.getParent());
+      String partitionPath = HadoopFSUtils.getRelativePartitionPath(new 
Path(basePathStr), logFileFullPath.getParent());
       return Pair.of(partitionPath, 
Collections.singletonList(logFileFullPath.getName()));
     });
     HoodiePairData<String, Map<String, List<String>>> 
partitionPathToFileIdAndLogFileList = partitionPathLogFilePair
@@ -169,7 +169,7 @@ public class CommitMetadataUtils {
           List<String> missingLogFiles = t.getValue();
           Map<String, List<String>> fileIdtologFiles = new HashMap<>();
           missingLogFiles.forEach(logFile -> {
-            String fileId = FSUtils.getFileIdFromLogPath(new 
Path(fullPartitionPath, logFile));
+            String fileId = HadoopFSUtils.getFileIdFromLogPath(new 
Path(fullPartitionPath, logFile));
             if (!fileIdtologFiles.containsKey(fileId)) {
               fileIdtologFiles.put(fileId, new ArrayList<>());
             }
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/ConsistentBucketIndexUtils.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/ConsistentBucketIndexUtils.java
index a90e0db6a06..069ec9e5b74 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/ConsistentBucketIndexUtils.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/ConsistentBucketIndexUtils.java
@@ -29,6 +29,7 @@ import org.apache.hudi.common.util.StringUtils;
 import org.apache.hudi.common.util.ValidationUtils;
 import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.exception.HoodieIndexException;
+import org.apache.hudi.hadoop.fs.HadoopFSUtils;
 import org.apache.hudi.storage.HoodieStorage;
 import org.apache.hudi.storage.StoragePath;
 import org.apache.hudi.table.HoodieTable;
@@ -108,8 +109,8 @@ public class ConsistentBucketIndexUtils {
    */
   public static Option<HoodieConsistentHashingMetadata> 
loadMetadata(HoodieTable table, String partition) {
     HoodieTableMetaClient metaClient = table.getMetaClient();
-    Path metadataPath = 
FSUtils.constructAbsolutePathInHadoopPath(metaClient.getHashingMetadataPath(), 
partition);
-    Path partitionPath = 
FSUtils.constructAbsolutePathInHadoopPath(metaClient.getBasePathV2().toString(),
 partition);
+    Path metadataPath = 
HadoopFSUtils.constructAbsolutePathInHadoopPath(metaClient.getHashingMetadataPath(),
 partition);
+    Path partitionPath = 
HadoopFSUtils.constructAbsolutePathInHadoopPath(metaClient.getBasePathV2().toString(),
 partition);
     try {
       Predicate<FileStatus> hashingMetaCommitFilePredicate = fileStatus -> {
         String filename = fileStatus.getPath().getName();
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
index 5b414c79b53..ce4a4a46506 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
@@ -55,6 +55,7 @@ import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieAppendException;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.exception.HoodieUpsertException;
+import org.apache.hudi.hadoop.fs.HadoopFSUtils;
 import org.apache.hudi.metadata.HoodieTableMetadata;
 import org.apache.hudi.storage.StoragePath;
 import org.apache.hudi.table.HoodieTable;
@@ -516,7 +517,7 @@ public class HoodieAppendHandle<T, I, K, O> extends 
HoodieWriteHandle<T, I, K, O
       // TODO we can actually deduce file size purely from AppendResult (based 
on offset and size
       //      of the appended block)
       for (WriteStatus status : statuses) {
-        long logFileSize = FSUtils.getFileSize(fs, new 
Path(config.getBasePath(), status.getStat().getPath()));
+        long logFileSize = HadoopFSUtils.getFileSize(fs, new 
Path(config.getBasePath(), status.getStat().getPath()));
         status.getStat().setFileSizeInBytes(logFileSize);
       }
 
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCDCLogger.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCDCLogger.java
index eec73b8ed9d..2397c2ea30f 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCDCLogger.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCDCLogger.java
@@ -19,7 +19,6 @@
 package org.apache.hudi.io;
 
 import org.apache.hudi.avro.HoodieAvroUtils;
-import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.HoodieAvroIndexedRecord;
 import org.apache.hudi.common.model.HoodieAvroPayload;
 import org.apache.hudi.common.model.HoodieRecord;
@@ -40,6 +39,7 @@ import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.exception.HoodieUpsertException;
+import org.apache.hudi.hadoop.fs.HadoopFSUtils;
 
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericData;
@@ -216,7 +216,7 @@ public class HoodieCDCLogger implements Closeable {
       for (Path cdcAbsPath : cdcAbsPaths) {
         String cdcFileName = cdcAbsPath.getName();
         String cdcPath = StringUtils.isNullOrEmpty(partitionPath) ? 
cdcFileName : partitionPath + "/" + cdcFileName;
-        stats.put(cdcPath, FSUtils.getFileSize(fs, cdcAbsPath));
+        stats.put(cdcPath, HadoopFSUtils.getFileSize(fs, cdcAbsPath));
       }
     } catch (IOException e) {
       throw new HoodieUpsertException("Failed to get cdc write stat", e);
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java
index aaad39c3453..07f30c1e3fa 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java
@@ -32,6 +32,7 @@ import org.apache.hudi.common.model.MetadataValues;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieInsertException;
+import org.apache.hudi.hadoop.fs.HadoopFSUtils;
 import org.apache.hudi.io.storage.HoodieFileWriter;
 import org.apache.hudi.io.storage.HoodieFileWriterFactory;
 import org.apache.hudi.storage.StoragePath;
@@ -244,7 +245,7 @@ public class HoodieCreateHandle<T, I, K, O> extends 
HoodieWriteHandle<T, I, K, O
     stat.setPath(new StoragePath(config.getBasePath()), path);
     stat.setTotalWriteErrors(writeStatus.getTotalErrorRecords());
 
-    long fileSize = FSUtils.getFileSize(fs, new Path(path.toUri()));
+    long fileSize = HadoopFSUtils.getFileSize(fs, new Path(path.toUri()));
     stat.setTotalWriteBytes(fileSize);
     stat.setFileSizeInBytes(fileSize);
 
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
index 8f310899174..ed18a2f0055 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
@@ -42,6 +42,7 @@ import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieCorruptedDataException;
 import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.exception.HoodieUpsertException;
+import org.apache.hudi.hadoop.fs.HadoopFSUtils;
 import org.apache.hudi.io.storage.HoodieFileReader;
 import org.apache.hudi.io.storage.HoodieFileReaderFactory;
 import org.apache.hudi.io.storage.HoodieFileWriter;
@@ -430,7 +431,7 @@ public class HoodieMergeHandle<T, I, K, O> extends 
HoodieWriteHandle<T, I, K, O>
       fileWriter.close();
       fileWriter = null;
 
-      long fileSizeInBytes = FSUtils.getFileSize(fs, new 
Path(newFilePath.toUri()));
+      long fileSizeInBytes = HadoopFSUtils.getFileSize(fs, new 
Path(newFilePath.toUri()));
       HoodieWriteStat stat = writeStatus.getStat();
 
       stat.setTotalWriteBytes(fileSizeInBytes);
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/bootstrap/BootstrapUtils.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/bootstrap/BootstrapUtils.java
index 3e9e6b42a61..6ced75a2a3b 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/bootstrap/BootstrapUtils.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/bootstrap/BootstrapUtils.java
@@ -19,11 +19,10 @@
 package org.apache.hudi.table.action.bootstrap;
 
 import org.apache.hudi.avro.model.HoodieFileStatus;
-import org.apache.hudi.common.bootstrap.FileStatusUtils;
 import org.apache.hudi.common.engine.HoodieEngineContext;
-import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.hadoop.fs.HadoopFSUtils;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
@@ -67,9 +66,9 @@ public class BootstrapUtils {
 
     for (FileStatus topLevelStatus: topLevelStatuses) {
       if (topLevelStatus.isFile() && 
filePathFilter.accept(topLevelStatus.getPath())) {
-        String relativePath = FSUtils.getRelativePartitionPath(basePath, 
topLevelStatus.getPath().getParent());
+        String relativePath = HadoopFSUtils.getRelativePartitionPath(basePath, 
topLevelStatus.getPath().getParent());
         Integer level = (int) relativePath.chars().filter(ch -> ch == 
'/').count();
-        HoodieFileStatus hoodieFileStatus = 
FileStatusUtils.fromFileStatus(topLevelStatus);
+        HoodieFileStatus hoodieFileStatus = 
HadoopFSUtils.fromFileStatus(topLevelStatus);
         result.add(Pair.of(hoodieFileStatus, Pair.of(level, relativePath)));
       } else if (topLevelStatus.isDirectory() && 
metaPathFilter.accept(topLevelStatus.getPath())) {
         subDirectories.add(topLevelStatus.getPath().toString());
@@ -86,9 +85,9 @@ public class BootstrapUtils {
         while (itr.hasNext()) {
           FileStatus status = itr.next();
           if (pathFilter.accept(status.getPath())) {
-            String relativePath = FSUtils.getRelativePartitionPath(new 
Path(basePathStr), status.getPath().getParent());
+            String relativePath = HadoopFSUtils.getRelativePartitionPath(new 
Path(basePathStr), status.getPath().getParent());
             Integer level = (int) relativePath.chars().filter(ch -> ch == 
'/').count();
-            HoodieFileStatus hoodieFileStatus = 
FileStatusUtils.fromFileStatus(status);
+            HoodieFileStatus hoodieFileStatus = 
HadoopFSUtils.fromFileStatus(status);
             res.add(Pair.of(hoodieFileStatus, Pair.of(level, relativePath)));
           }
         }
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackHelper.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackHelper.java
index ca3f9b1c570..856b56ca321 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackHelper.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackHelper.java
@@ -39,6 +39,7 @@ import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.exception.HoodieRollbackException;
+import org.apache.hudi.hadoop.fs.HadoopFSUtils;
 import org.apache.hudi.storage.HoodieStorage;
 import org.apache.hudi.storage.HoodieStorageUtils;
 import org.apache.hudi.storage.StorageConfiguration;
@@ -291,7 +292,7 @@ public class BaseRollbackHelper implements Serializable {
         // lets map each log file to partition path and log file name
         .mapToPair((SerializablePairFunction<String, String, String>) t -> {
           Path logFilePath = new Path(basePathStr, t);
-          String partitionPath = FSUtils.getRelativePartitionPath(new 
Path(basePathStr), logFilePath.getParent());
+          String partitionPath = HadoopFSUtils.getRelativePartitionPath(new 
Path(basePathStr), logFilePath.getParent());
           return Pair.of(partitionPath, logFilePath.getName());
         })
         // lets group by partition path and collect it as log file list per 
partition path
@@ -356,7 +357,7 @@ public class BaseRollbackHelper implements Serializable {
       String basePath = metaClient.getBasePathV2().toString();
       try {
         Path fullDeletePath = new Path(fileToDelete);
-        String partitionPath = FSUtils.getRelativePartitionPath(new 
Path(basePath), fullDeletePath.getParent());
+        String partitionPath = HadoopFSUtils.getRelativePartitionPath(new 
Path(basePath), fullDeletePath.getParent());
         boolean isDeleted = true;
         if (doDelete) {
           try {
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackStrategy.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackStrategy.java
index 1fd054b9407..e6eca0924bd 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackStrategy.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackStrategy.java
@@ -34,6 +34,7 @@ import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.StringUtils;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieRollbackException;
+import org.apache.hudi.hadoop.fs.HadoopFSUtils;
 import org.apache.hudi.storage.StoragePath;
 import org.apache.hudi.table.HoodieTable;
 
@@ -225,7 +226,7 @@ public class ListingBasedRollbackStrategy implements 
BaseRollbackPlanActionExecu
       }
       return false;
     };
-    return 
fs.listStatus(FSUtils.constructAbsolutePathInHadoopPath(config.getBasePath(), 
partitionPath), filter);
+    return 
fs.listStatus(HadoopFSUtils.constructAbsolutePathInHadoopPath(config.getBasePath(),
 partitionPath), filter);
   }
 
   private FileStatus[] fetchFilesFromInstant(HoodieInstant instantToRollback, 
String partitionPath, String basePath,
@@ -286,7 +287,7 @@ public class ListingBasedRollbackStrategy implements 
BaseRollbackPlanActionExecu
   }
 
   private static Path[] listFilesToBeDeleted(String basePath, String 
partitionPath) {
-    return new Path[] {FSUtils.constructAbsolutePathInHadoopPath(basePath, 
partitionPath)};
+    return new Path[] 
{HadoopFSUtils.constructAbsolutePathInHadoopPath(basePath, partitionPath)};
   }
 
   private static Path[] getFilesFromCommitMetadata(String basePath, 
HoodieCommitMetadata commitMetadata, String partitionPath) {
@@ -300,7 +301,7 @@ public class ListingBasedRollbackStrategy implements 
BaseRollbackPlanActionExecu
       if (path.toString().endsWith(basefileExtension)) {
         String fileCommitTime = FSUtils.getCommitTime(path.getName());
         return commit.equals(fileCommitTime);
-      } else if (FSUtils.isLogFile(path)) {
+      } else if (HadoopFSUtils.isLogFile(path)) {
         // Since the baseCommitTime is the only commit for new log files, it's 
okay here
         String fileCommitTime = FSUtils.getBaseCommitTimeFromLogPath(new 
StoragePath(path.toUri()));
         return commit.equals(fileCommitTime);
@@ -356,7 +357,7 @@ public class ListingBasedRollbackStrategy implements 
BaseRollbackPlanActionExecu
       FileSlice latestFileSlice = latestFileSlices.get(writeStat.getFileId());
       String fileId = writeStat.getFileId();
       String latestBaseInstant = latestFileSlice.getBaseInstantTime();
-      Path fullLogFilePath = 
FSUtils.constructAbsolutePathInHadoopPath(table.getConfig().getBasePath(), 
writeStat.getPath());
+      Path fullLogFilePath = 
HadoopFSUtils.constructAbsolutePathInHadoopPath(table.getConfig().getBasePath(),
 writeStat.getPath());
       Map<String, Long> logFilesWithBlocksToRollback = 
Collections.singletonMap(
           fullLogFilePath.toString(), writeStat.getTotalWriteBytes() > 0 ? 
writeStat.getTotalWriteBytes() : 1L);
       hoodieRollbackRequests.add(new HoodieRollbackRequest(partitionPath, 
fileId, latestBaseInstant,
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/MarkerBasedRollbackStrategy.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/MarkerBasedRollbackStrategy.java
index 5ba61b38803..f1648ede24a 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/MarkerBasedRollbackStrategy.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/MarkerBasedRollbackStrategy.java
@@ -30,6 +30,7 @@ import org.apache.hudi.common.util.Option;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.exception.HoodieRollbackException;
+import org.apache.hudi.hadoop.fs.HadoopFSUtils;
 import org.apache.hudi.storage.StoragePath;
 import org.apache.hudi.table.HoodieTable;
 import org.apache.hudi.table.marker.MarkerBasedRollbackUtils;
@@ -80,17 +81,17 @@ public class MarkerBasedRollbackStrategy<T, I, K, O> 
implements BaseRollbackPlan
         IOType type = IOType.valueOf(typeStr);
         String fileNameWithPartitionToRollback = 
WriteMarkers.stripMarkerSuffix(markerFilePath);
         Path fullFilePathToRollback = new Path(basePath, 
fileNameWithPartitionToRollback);
-        String partitionPath = FSUtils.getRelativePartitionPath(new 
Path(basePath), fullFilePathToRollback.getParent());
+        String partitionPath = HadoopFSUtils.getRelativePartitionPath(new 
Path(basePath), fullFilePathToRollback.getParent());
         switch (type) {
           case MERGE:
           case CREATE:
             String fileId = null;
             String baseInstantTime = null;
-            if (FSUtils.isBaseFile(fullFilePathToRollback)) {
+            if (HadoopFSUtils.isBaseFile(fullFilePathToRollback)) {
               HoodieBaseFile baseFileToDelete = new 
HoodieBaseFile(fullFilePathToRollback.toString());
               fileId = baseFileToDelete.getFileId();
               baseInstantTime = baseFileToDelete.getCommitTime();
-            } else if (FSUtils.isLogFile(fullFilePathToRollback)) {
+            } else if (HadoopFSUtils.isLogFile(fullFilePathToRollback)) {
               throw new HoodieRollbackException("Log files should have only 
APPEND as IOTypes " + fullFilePathToRollback);
             }
             Objects.requireNonNull(fileId, "Cannot find valid fileId from 
path: " + fullFilePathToRollback);
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/bootstrap/HoodieSparkBootstrapSchemaProvider.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/bootstrap/HoodieSparkBootstrapSchemaProvider.java
index 6319928f8de..cdbafc7c101 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/bootstrap/HoodieSparkBootstrapSchemaProvider.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/bootstrap/HoodieSparkBootstrapSchemaProvider.java
@@ -22,13 +22,13 @@ import org.apache.hudi.AvroConversionUtils;
 import org.apache.hudi.avro.HoodieAvroUtils;
 import org.apache.hudi.avro.model.HoodieFileStatus;
 import org.apache.hudi.client.common.HoodieSparkEngineContext;
-import org.apache.hudi.common.bootstrap.FileStatusUtils;
 import org.apache.hudi.common.engine.HoodieEngineContext;
 import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.util.AvroOrcUtils;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.hadoop.fs.HadoopFSUtils;
 
 import org.apache.avro.Schema;
 import org.apache.hadoop.conf.Configuration;
@@ -54,7 +54,7 @@ public class HoodieSparkBootstrapSchemaProvider extends 
HoodieBootstrapSchemaPro
   @Override
   protected Schema getBootstrapSourceSchema(HoodieEngineContext context, 
List<Pair<String, List<HoodieFileStatus>>> partitions) {
     Schema schema = partitions.stream().flatMap(p -> 
p.getValue().stream()).map(fs -> {
-          Path filePath = FileStatusUtils.toPath(fs.getPath());
+          Path filePath = HadoopFSUtils.toPath(fs.getPath());
           String extension = FSUtils.getFileExtension(filePath.getName());
           if (PARQUET.getFileExtension().equals(extension)) {
             return getBootstrapSourceSchemaParquet(writeConfig, context, 
filePath);
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/MetadataBootstrapHandlerFactory.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/MetadataBootstrapHandlerFactory.java
index 9fa9e1cbf73..98bbe9b1aba 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/MetadataBootstrapHandlerFactory.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/MetadataBootstrapHandlerFactory.java
@@ -18,13 +18,14 @@
 
 package org.apache.hudi.table.action.bootstrap;
 
-import org.apache.hadoop.fs.Path;
-import org.apache.hudi.common.bootstrap.FileStatusUtils;
+import org.apache.hudi.avro.model.HoodieFileStatus;
 import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.hadoop.fs.HadoopFSUtils;
 import org.apache.hudi.table.HoodieTable;
-import org.apache.hudi.avro.model.HoodieFileStatus;
+
+import org.apache.hadoop.fs.Path;
 
 import static org.apache.hudi.common.model.HoodieFileFormat.ORC;
 import static org.apache.hudi.common.model.HoodieFileFormat.PARQUET;
@@ -32,7 +33,7 @@ import static 
org.apache.hudi.common.model.HoodieFileFormat.PARQUET;
 public class MetadataBootstrapHandlerFactory {
 
   public static BootstrapMetadataHandler getMetadataHandler(HoodieWriteConfig 
config, HoodieTable table, HoodieFileStatus srcFileStatus) {
-    Path sourceFilePath = FileStatusUtils.toPath(srcFileStatus.getPath());
+    Path sourceFilePath = HadoopFSUtils.toPath(srcFileStatus.getPath());
 
     String extension = FSUtils.getFileExtension(sourceFilePath.toString());
     if (ORC.getFileExtension().equals(extension)) {
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java
index 26b3efed499..723fa6b1614 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java
@@ -71,6 +71,7 @@ import org.apache.hudi.config.HoodieArchivalConfig;
 import org.apache.hudi.config.HoodieCleanConfig;
 import org.apache.hudi.config.HoodieCompactionConfig;
 import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.hadoop.fs.HadoopFSUtils;
 import org.apache.hudi.index.HoodieIndex;
 import org.apache.hudi.index.SparkHoodieIndexFactory;
 import org.apache.hudi.metadata.HoodieTableMetadataWriter;
@@ -861,9 +862,9 @@ public class TestCleaner extends HoodieCleanerTestBase {
         
version2Plan.getFilePathsToBeDeletedPerPartition().get(partition1).size());
     
assertEquals(version1Plan.getFilesToBeDeletedPerPartition().get(partition2).size(),
         
version2Plan.getFilePathsToBeDeletedPerPartition().get(partition2).size());
-    assertEquals(new 
Path(FSUtils.constructAbsolutePathInHadoopPath(metaClient.getBasePath(), 
partition1), fileName1).toString(),
+    assertEquals(new 
StoragePath(FSUtils.constructAbsolutePath(metaClient.getBasePath(), 
partition1), fileName1).toString(),
         
version2Plan.getFilePathsToBeDeletedPerPartition().get(partition1).get(0).getFilePath());
-    assertEquals(new 
Path(FSUtils.constructAbsolutePathInHadoopPath(metaClient.getBasePath(), 
partition2), fileName2).toString(),
+    assertEquals(new 
StoragePath(FSUtils.constructAbsolutePath(metaClient.getBasePath(), 
partition2), fileName2).toString(),
         
version2Plan.getFilePathsToBeDeletedPerPartition().get(partition2).get(0).getFilePath());
 
     // Downgrade and verify version 1 plan
@@ -1341,7 +1342,7 @@ public class TestCleaner extends HoodieCleanerTestBase {
       String fileName = Paths.get(fullPath).getFileName().toString();
       return Pair.of(FSUtils.getFileId(fileName), 
FSUtils.getCommitTime(fileName));
     });
-    Stream<Pair<String, String>> stream2 = 
paths.stream().filter(rtFilePredicate).map(path -> 
Pair.of(FSUtils.getFileIdFromLogPath(new Path(path)),
+    Stream<Pair<String, String>> stream2 = 
paths.stream().filter(rtFilePredicate).map(path -> 
Pair.of(HadoopFSUtils.getFileIdFromLogPath(new Path(path)),
         FSUtils.getBaseCommitTimeFromLogPath(new StoragePath(path))));
     return Stream.concat(stream1, stream2);
   }
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableInsertUpdateDelete.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableInsertUpdateDelete.java
index 263a4d5314f..8e85208af6f 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableInsertUpdateDelete.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableInsertUpdateDelete.java
@@ -47,6 +47,7 @@ import 
org.apache.hudi.common.testutils.HoodieTestDataGenerator;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.config.HoodieCompactionConfig;
 import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.hadoop.fs.HadoopFSUtils;
 import org.apache.hudi.index.HoodieIndex;
 import org.apache.hudi.storage.StoragePathInfo;
 import org.apache.hudi.table.HoodieSparkTable;
@@ -368,7 +369,7 @@ public class 
TestHoodieSparkMergeOnReadTableInsertUpdateDelete extends SparkClie
       // inject a fake log file to test marker file for log file
       HoodieDeltaWriteStat correctWriteStat =
           (HoodieDeltaWriteStat) 
statuses.map(WriteStatus::getStat).take(1).get(0);
-      assertTrue(FSUtils.isLogFile(new Path(correctWriteStat.getPath())));
+      assertTrue(HadoopFSUtils.isLogFile(new 
Path(correctWriteStat.getPath())));
       HoodieLogFile correctLogFile = new 
HoodieLogFile(correctWriteStat.getPath());
       String correctWriteToken = 
FSUtils.getWriteTokenFromLogPath(correctLogFile.getPath());
 
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/bootstrap/FileStatusUtils.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/bootstrap/FileStatusUtils.java
index 026af3714b1..5593b2f7f53 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/bootstrap/FileStatusUtils.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/bootstrap/FileStatusUtils.java
@@ -18,62 +18,14 @@
 
 package org.apache.hudi.common.bootstrap;
 
-import org.apache.hudi.avro.model.HoodieFSPermission;
 import org.apache.hudi.avro.model.HoodieFileStatus;
-import org.apache.hudi.avro.model.HoodiePath;
-import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.storage.StoragePath;
 import org.apache.hudi.storage.StoragePathInfo;
 
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.permission.FsAction;
-import org.apache.hadoop.fs.permission.FsPermission;
-
-import java.io.IOException;
-
 /**
  * Helper functions around FileStatus and HoodieFileStatus.
  */
 public class FileStatusUtils {
-
-  public static Path toPath(HoodiePath path) {
-    if (null == path) {
-      return null;
-    }
-    return new Path(path.getUri());
-  }
-
-  public static HoodiePath fromPath(Path path) {
-    if (null == path) {
-      return null;
-    }
-    return HoodiePath.newBuilder().setUri(path.toString()).build();
-  }
-  
-  public static FsPermission toFSPermission(HoodieFSPermission fsPermission) {
-    if (null == fsPermission) {
-      return null;
-    }
-    FsAction userAction = fsPermission.getUserAction() != null ? 
FsAction.valueOf(fsPermission.getUserAction()) : null;
-    FsAction grpAction = fsPermission.getGroupAction() != null ? 
FsAction.valueOf(fsPermission.getGroupAction()) : null;
-    FsAction otherAction =
-        fsPermission.getOtherAction() != null ? 
FsAction.valueOf(fsPermission.getOtherAction()) : null;
-    boolean stickyBit = fsPermission.getStickyBit() != null ? 
fsPermission.getStickyBit() : false;
-    return new FsPermission(userAction, grpAction, otherAction, stickyBit);
-  }
-
-  public static HoodieFSPermission fromFSPermission(FsPermission fsPermission) 
{
-    if (null == fsPermission) {
-      return null;
-    }
-    String userAction = fsPermission.getUserAction() != null ? 
fsPermission.getUserAction().name() : null;
-    String grpAction = fsPermission.getGroupAction() != null ? 
fsPermission.getGroupAction().name() : null;
-    String otherAction = fsPermission.getOtherAction() != null ? 
fsPermission.getOtherAction().name() : null;
-    return 
HoodieFSPermission.newBuilder().setUserAction(userAction).setGroupAction(grpAction)
-        
.setOtherAction(otherAction).setStickyBit(fsPermission.getStickyBit()).build();
-  }
-
   public static StoragePathInfo toStoragePathInfo(HoodieFileStatus fileStatus) 
{
     if (null == fileStatus) {
       return null;
@@ -84,42 +36,4 @@ public class FileStatusUtils {
         fileStatus.getIsDir() == null ? false : fileStatus.getIsDir(),
         fileStatus.getBlockReplication().shortValue(), 
fileStatus.getBlockSize(), fileStatus.getModificationTime());
   }
-
-  public static HoodieFileStatus fromFileStatus(FileStatus fileStatus) {
-    if (null == fileStatus) {
-      return null;
-    }
-
-    HoodieFileStatus fStatus = new HoodieFileStatus();
-    try {
-      fStatus.setPath(fromPath(fileStatus.getPath()));
-      fStatus.setLength(fileStatus.getLen());
-      fStatus.setIsDir(fileStatus.isDirectory());
-      fStatus.setBlockReplication((int) fileStatus.getReplication());
-      fStatus.setBlockSize(fileStatus.getBlockSize());
-      fStatus.setModificationTime(fileStatus.getModificationTime());
-      fStatus.setAccessTime(fileStatus.getModificationTime());
-      fStatus.setSymlink(fileStatus.isSymlink() ? 
fromPath(fileStatus.getSymlink()) : null);
-      safeReadAndSetMetadata(fStatus, fileStatus);
-    } catch (IOException ioe) {
-      throw new HoodieIOException(ioe.getMessage(), ioe);
-    }
-    return fStatus;
-  }
-
-  /**
-   * Used to safely handle FileStatus calls which might fail on some 
FileSystem implementation.
-   * (DeprecatedLocalFileSystem)
-   */
-  private static void safeReadAndSetMetadata(HoodieFileStatus fStatus, 
FileStatus fileStatus) {
-    try {
-      fStatus.setOwner(fileStatus.getOwner());
-      fStatus.setGroup(fileStatus.getGroup());
-      fStatus.setPermission(fromFSPermission(fileStatus.getPermission()));
-    } catch (IllegalArgumentException ie) {
-      // Deprecated File System (testing) does not work well with this call
-      // skipping
-    }
-  }
-
 }
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java 
b/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java
index b2f87b9f01a..ec13861b849 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java
@@ -35,6 +35,7 @@ import org.apache.hudi.exception.HoodieValidationException;
 import org.apache.hudi.exception.InvalidHoodiePathException;
 import org.apache.hudi.metadata.HoodieTableMetadata;
 import org.apache.hudi.storage.HoodieStorage;
+import org.apache.hudi.storage.HoodieStorageUtils;
 import org.apache.hudi.storage.StorageConfiguration;
 import org.apache.hudi.storage.StoragePath;
 import org.apache.hudi.storage.StoragePathFilter;
@@ -42,9 +43,6 @@ import org.apache.hudi.storage.StoragePathInfo;
 import org.apache.hudi.storage.StorageSchemes;
 import org.apache.hudi.storage.inline.InLineFSUtils;
 
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.slf4j.Logger;
@@ -71,8 +69,6 @@ import java.util.regex.Pattern;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
-import static 
org.apache.hudi.storage.HoodieStorageUtils.getStorageConfWithCopy;
-
 /**
  * Utility functions related to accessing the file storage.
  */
@@ -85,8 +81,8 @@ public class FSUtils {
   public static final Pattern LOG_FILE_PATTERN =
       
Pattern.compile("^\\.(.+)_(.*)\\.(log|archive)\\.(\\d+)(_((\\d+)-(\\d+)-(\\d+))(.cdc)?)?");
   public static final Pattern PREFIX_BY_FILE_ID_PATTERN = 
Pattern.compile("^(.+)-(\\d+)");
-  private static final int MAX_ATTEMPTS_RECOVER_LEASE = 10;
 
+  private static final int MAX_ATTEMPTS_RECOVER_LEASE = 10;
   private static final String LOG_FILE_EXTENSION = ".log";
 
   private static final StoragePathFilter ALLOW_ALL_FILTER = file -> true;
@@ -102,17 +98,6 @@ public class FSUtils {
     return storage.exists(new StoragePath(path + "/" + 
HoodieTableMetaClient.METAFOLDER_NAME));
   }
 
-  /**
-   * Makes path qualified w/ {@link FileSystem}'s URI
-   *
-   * @param fs   instance of {@link FileSystem} path belongs to
-   * @param path path to be qualified
-   * @return qualified path, prefixed w/ the URI of the target FS object 
provided
-   */
-  public static Path makeQualified(FileSystem fs, Path path) {
-    return path.makeQualified(fs.getUri(), fs.getWorkingDirectory());
-  }
-
   /**
    * Makes path qualified with {@link HoodieStorage}'s URI.
    *
@@ -159,10 +144,6 @@ public class FSUtils {
     }
   }
 
-  public static long getFileSize(FileSystem fs, Path path) throws IOException {
-    return fs.getFileStatus(path).getLen();
-  }
-
   public static long getFileSize(HoodieStorage storage, StoragePath path) 
throws IOException {
     return storage.getPathInfo(path).getLength();
   }
@@ -188,13 +169,6 @@ public class FSUtils {
     return datePartitions;
   }
 
-  /**
-   * Given a base partition and a partition path, return relative path of 
partition path to the base path.
-   */
-  public static String getRelativePartitionPath(Path basePath, Path 
fullPartitionPath) {
-    return getRelativePartitionPath(new StoragePath(basePath.toUri()), new 
StoragePath(fullPartitionPath.toUri()));
-  }
-
   public static String getRelativePartitionPath(StoragePath basePath, 
StoragePath fullPartitionPath) {
     basePath = getPathWithoutSchemeAndAuthority(basePath);
     fullPartitionPath = getPathWithoutSchemeAndAuthority(fullPartitionPath);
@@ -316,7 +290,7 @@ public class FSUtils {
             result.add(Option.of(filenameToFileStatusMap.get(fileName)));
           } else {
             if (!ignoreMissingFiles) {
-              throw new FileNotFoundException("File not found: " + new 
Path(partitionPathIncludeBasePath.toString(), fileName));
+              throw new FileNotFoundException("File not found: " + new 
StoragePath(partitionPathIncludeBasePath, fileName));
             }
             result.add(Option.empty());
           }
@@ -387,18 +361,6 @@ public class FSUtils {
     return matcher.group(3);
   }
 
-  /**
-   * Get the first part of the file name in the log file. That will be the 
fileId. Log file do not have instantTime in
-   * the file name.
-   */
-  public static String getFileIdFromLogPath(Path path) {
-    Matcher matcher = LOG_FILE_PATTERN.matcher(path.getName());
-    if (!matcher.find()) {
-      throw new InvalidHoodiePathException(path.toString(), "LogFile");
-    }
-    return matcher.group(1);
-  }
-
   public static String getFileIdFromLogPath(StoragePath path) {
     Matcher matcher = LOG_FILE_PATTERN.matcher(path.getName());
     if (!matcher.find()) {
@@ -407,16 +369,6 @@ public class FSUtils {
     return matcher.group(1);
   }
 
-  /**
-   * Check if the file is a base file of a log file. Then get the fileId 
appropriately.
-   */
-  public static String getFileIdFromFilePath(Path filePath) {
-    if (FSUtils.isLogFile(filePath)) {
-      return FSUtils.getFileIdFromLogPath(filePath);
-    }
-    return FSUtils.getFileId(filePath.getName());
-  }
-
   public static String getFileIdFromFilePath(StoragePath filePath) {
     if (FSUtils.isLogFile(filePath)) {
       return FSUtils.getFileIdFromLogPath(filePath);
@@ -506,11 +458,6 @@ public class FSUtils {
     return HoodieLogFile.LOG_FILE_PREFIX + suffix;
   }
 
-  public static boolean isBaseFile(Path path) {
-    String extension = getFileExtension(path.getName());
-    return HoodieFileFormat.BASE_FILE_EXTENSIONS.contains(extension);
-  }
-
   public static boolean isBaseFile(StoragePath path) {
     String extension = getFileExtension(path.getName());
     return HoodieFileFormat.BASE_FILE_EXTENSIONS.contains(extension);
@@ -522,10 +469,6 @@ public class FSUtils {
         ? InLineFSUtils.getOuterFilePathFromInlinePath(logPath).getName() : 
logPath.getName());
   }
 
-  public static boolean isLogFile(Path logPath) {
-    return isLogFile(new StoragePath(logPath.getName()));
-  }
-
   public static boolean isLogFile(String fileName) {
     if (fileName.contains(LOG_FILE_EXTENSION)) {
       Matcher matcher = LOG_FILE_PATTERN.matcher(fileName);
@@ -534,40 +477,10 @@ public class FSUtils {
     return false;
   }
 
-  /**
-   * Returns true if the given path is a Base file or a Log file.
-   */
-  public static boolean isDataFile(Path path) {
-    return isBaseFile(path) || isLogFile(path);
-  }
-
   public static boolean isDataFile(StoragePath path) {
     return isBaseFile(path) || isLogFile(path);
   }
 
-  /**
-   * Get the names of all the base and log files in the given partition path.
-   */
-  public static FileStatus[] getAllDataFilesInPartition(FileSystem fs, Path 
partitionPath) throws IOException {
-    final Set<String> validFileExtensions = 
Arrays.stream(HoodieFileFormat.values())
-        
.map(HoodieFileFormat::getFileExtension).collect(Collectors.toCollection(HashSet::new));
-    final String logFileExtension = 
HoodieFileFormat.HOODIE_LOG.getFileExtension();
-
-    try {
-      return Arrays.stream(fs.listStatus(partitionPath, path -> {
-        String extension = FSUtils.getFileExtension(path.getName());
-        return validFileExtensions.contains(extension) || 
path.getName().contains(logFileExtension);
-      })).filter(FileStatus::isFile).toArray(FileStatus[]::new);
-    } catch (IOException e) {
-      // return empty FileStatus if partition does not exist already
-      if (!fs.exists(partitionPath)) {
-        return new FileStatus[0];
-      } else {
-        throw e;
-      }
-    }
-  }
-
   public static List<StoragePathInfo> getAllDataFilesInPartition(HoodieStorage 
storage,
                                                                  StoragePath 
partitionPath)
       throws IOException {
@@ -632,7 +545,7 @@ public class FSUtils {
    * computes the next log version for the specified fileId in the partition 
path.
    */
   public static int computeNextLogVersion(HoodieStorage storage, StoragePath 
partitionPath, final String fileId,
-      final String logFileExtension, final String baseCommitTime) throws 
IOException {
+                                          final String logFileExtension, final 
String baseCommitTime) throws IOException {
     Option<Pair<Integer, String>> currentVersionWithWriteToken =
         getLatestLogVersion(storage, partitionPath, fileId, logFileExtension, 
baseCommitTime);
     // handle potential overflow
@@ -640,29 +553,6 @@ public class FSUtils {
         : HoodieLogFile.LOGFILE_BASE_VERSION;
   }
 
-  /**
-   * When a file was opened and the task died without closing the stream, 
another task executor cannot open because the
-   * existing lease will be active. We will try to recover the lease, from 
HDFS. If a data node went down, it takes
-   * about 10 minutes for the lease to be recovered. But if the client dies, 
this should be instant.
-   */
-  public static boolean recoverDFSFileLease(final DistributedFileSystem dfs, 
final Path p)
-      throws IOException, InterruptedException {
-    LOG.info("Recover lease on dfs file {}", p);
-    // initiate the recovery
-    boolean recovered = false;
-    for (int nbAttempt = 0; nbAttempt < MAX_ATTEMPTS_RECOVER_LEASE; 
nbAttempt++) {
-      LOG.info("Attempt {} to recover lease on dfs file {}", nbAttempt, p);
-      recovered = dfs.recoverLease(p);
-      if (recovered) {
-        break;
-      }
-      // Sleep for 1 second before trying again. Typically it takes about 2-3 
seconds to recover
-      // under default settings
-      Thread.sleep(1000);
-    }
-    return recovered;
-  }
-
   public static void createPathIfNotExists(HoodieStorage storage, StoragePath 
partitionPath)
       throws IOException {
     if (!storage.exists(partitionPath)) {
@@ -674,10 +564,6 @@ public class FSUtils {
     return sizeInBytes / (1024 * 1024);
   }
 
-  public static Path constructAbsolutePathInHadoopPath(String basePath, String 
relativePartitionPath) {
-    return new Path(constructAbsolutePath(basePath, 
relativePartitionPath).toUri());
-  }
-
   public static StoragePath constructAbsolutePath(String basePath, String 
relativePartitionPath) {
     if (StringUtils.isNullOrEmpty(relativePartitionPath)) {
       return new StoragePath(basePath);
@@ -714,13 +600,6 @@ public class FSUtils {
     return filePathWithPartition.substring(offset);
   }
 
-  /**
-   * Get DFS full partition path (e.g. hdfs://ip-address:8020:/<absolute path>)
-   */
-  public static String getDFSFullPartitionPath(FileSystem fs, Path 
fullPartitionPath) {
-    return fs.getUri() + fullPartitionPath.toUri().getRawPath();
-  }
-
   /**
    * Helper to filter out paths under metadata folder when running 
fs.globStatus.
    *
@@ -766,27 +645,6 @@ public class FSUtils {
     return false;
   }
 
-  public static <T> Map<String, T> parallelizeFilesProcess(
-      HoodieEngineContext hoodieEngineContext,
-      FileSystem fs,
-      int parallelism,
-      SerializableFunction<Pair<String, StorageConfiguration<Configuration>>, 
T> pairFunction,
-      List<String> subPaths) {
-    Map<String, T> result = new HashMap<>();
-    if (subPaths.size() > 0) {
-      StorageConfiguration<Configuration> conf = 
getStorageConfWithCopy(fs.getConf());
-      int actualParallelism = Math.min(subPaths.size(), parallelism);
-
-      hoodieEngineContext.setJobStatus(FSUtils.class.getSimpleName(),
-          "Parallel listing paths " + String.join(",", subPaths));
-
-      result = hoodieEngineContext.mapToPair(subPaths,
-          subPath -> new ImmutablePair<>(subPath, pairFunction.apply(new 
ImmutablePair<>(subPath, conf))),
-          actualParallelism);
-    }
-    return result;
-  }
-
   /**
    * Processes sub-path in parallel.
    *
@@ -847,61 +705,17 @@ public class FSUtils {
    */
   public static boolean deleteSubPath(String subPathStr, 
StorageConfiguration<?> conf, boolean recursive) {
     try {
-      Path subPath = new Path(subPathStr);
-      FileSystem fileSystem = 
subPath.getFileSystem(conf.unwrapAs(Configuration.class));
-      return fileSystem.delete(subPath, recursive);
+      StoragePath subPath = new StoragePath(subPathStr);
+      HoodieStorage storage = HoodieStorageUtils.getStorage(subPath, conf);
+      if (recursive) {
+        return storage.deleteDirectory(subPath);
+      }
+      return storage.deleteFile(subPath);
     } catch (IOException e) {
       throw new HoodieIOException(e.getMessage(), e);
     }
   }
 
-  /**
-   * Lists file status at a certain level in the directory hierarchy.
-   * <p>
-   * E.g., given "/tmp/hoodie_table" as the rootPath, and 3 as the expected 
level,
-   * this method gives back the {@link FileStatus} of all files under
-   * "/tmp/hoodie_table/[*]/[*]/[*]/" folders.
-   *
-   * @param hoodieEngineContext {@link HoodieEngineContext} instance.
-   * @param fs                  {@link FileSystem} instance.
-   * @param rootPath            Root path for the file listing.
-   * @param expectLevel         Expected level of directory hierarchy for 
files to be added.
-   * @param parallelism         Parallelism for the file listing.
-   * @return A list of file status of files at the level.
-   */
-
-  public static List<FileStatus> getFileStatusAtLevel(
-      HoodieEngineContext hoodieEngineContext, FileSystem fs, Path rootPath,
-      int expectLevel, int parallelism) {
-    List<String> levelPaths = new ArrayList<>();
-    List<FileStatus> result = new ArrayList<>();
-    levelPaths.add(rootPath.toString());
-
-    for (int i = 0; i <= expectLevel; i++) {
-      result = FSUtils.parallelizeFilesProcess(hoodieEngineContext, fs, 
parallelism,
-          pairOfSubPathAndConf -> {
-            Path path = new Path(pairOfSubPathAndConf.getKey());
-            try {
-              FileSystem fileSystem = 
path.getFileSystem(pairOfSubPathAndConf.getValue().unwrap());
-              return Arrays.stream(fileSystem.listStatus(path))
-                .collect(Collectors.toList());
-            } catch (IOException e) {
-              throw new HoodieIOException("Failed to list " + path, e);
-            }
-          },
-          levelPaths)
-          .values().stream()
-          .flatMap(list -> list.stream()).collect(Collectors.toList());
-      if (i < expectLevel) {
-        levelPaths = result.stream()
-            .filter(FileStatus::isDirectory)
-            .map(fileStatus -> fileStatus.getPath().toString())
-            .collect(Collectors.toList());
-      }
-    }
-    return result;
-  }
-
   public static List<StoragePathInfo> getAllDataPathInfo(HoodieStorage 
storage, StoragePath path)
       throws IOException {
     List<StoragePathInfo> pathInfoList = new ArrayList<>();
@@ -917,6 +731,29 @@ public class FSUtils {
     return pathInfoList;
   }
 
+  /**
+   * When a file was opened and the task died without closing the stream, 
another task executor cannot open because the
+   * existing lease will be active. We will try to recover the lease, from 
HDFS. If a data node went down, it takes
+   * about 10 minutes for the lease to be recovered. But if the client dies, 
this should be instant.
+   */
+  public static boolean recoverDFSFileLease(final DistributedFileSystem dfs, 
final Path p)
+      throws IOException, InterruptedException {
+    LOG.info("Recover lease on dfs file {}", p);
+    // initiate the recovery
+    boolean recovered = false;
+    for (int nbAttempt = 0; nbAttempt < MAX_ATTEMPTS_RECOVER_LEASE; 
nbAttempt++) {
+      LOG.info("Attempt {} to recover lease on dfs file {}", nbAttempt, p);
+      recovered = dfs.recoverLease(p);
+      if (recovered) {
+        break;
+      }
+      // Sleep for 1 second before trying again. Typically it takes about 2-3 
seconds to recover
+      // under default settings
+      Thread.sleep(1000);
+    }
+    return recovered;
+  }
+
   /**
    * Serializable function interface.
    *
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/util/ConfigUtils.java 
b/hudi-common/src/main/java/org/apache/hudi/common/util/ConfigUtils.java
index 643b123d596..3866069d437 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/util/ConfigUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/ConfigUtils.java
@@ -27,7 +27,6 @@ import org.apache.hudi.common.table.HoodieTableConfig;
 import org.apache.hudi.exception.HoodieNotSupportedException;
 import org.apache.hudi.storage.StorageConfiguration;
 
-import org.apache.hadoop.conf.Configuration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -151,18 +150,6 @@ public class ConfigUtils {
     return sb.toString();
   }
 
-  /**
-   * Creates a Hadoop {@link Configuration} instance with the properties.
-   *
-   * @param props {@link Properties} instance.
-   * @return Hadoop {@link Configuration} instance.
-   */
-  public static Configuration createHadoopConf(Properties props) {
-    Configuration hadoopConf = new Configuration();
-    props.stringPropertyNames().forEach(k -> hadoopConf.set(k, 
props.getProperty(k)));
-    return hadoopConf;
-  }
-
   /**
    * Case-insensitive resolution of input enum name to the enum type
    */
@@ -301,32 +288,6 @@ public class ConfigUtils {
     return Option.empty();
   }
 
-  /**
-   * Gets the raw value for a {@link ConfigProperty} config from Hadoop 
configuration. The key and
-   * alternative keys are used to fetch the config.
-   *
-   * @param conf           Configs in Hadoop {@link Configuration}.
-   * @param configProperty {@link ConfigProperty} config to fetch.
-   * @return {@link Option} of value if the config exists; empty {@link 
Option} otherwise.
-   */
-  public static Option<String> getRawValueWithAltKeys(Configuration conf,
-                                                      ConfigProperty<?> 
configProperty) {
-    String value = conf.get(configProperty.key());
-    if (value != null) {
-      return Option.of(value);
-    }
-    for (String alternative : configProperty.getAlternatives()) {
-      String altValue = conf.get(alternative);
-      if (altValue != null) {
-        LOG.warn(String.format("The configuration key '%s' has been deprecated 
"
-                + "and may be removed in the future. Please use the new key 
'%s' instead.",
-            alternative, configProperty.key()));
-        return Option.of(altValue);
-      }
-    }
-    return Option.empty();
-  }
-
   /**
    * Gets the String value for a {@link ConfigProperty} config from 
properties. The key and
    * alternative keys are used to fetch the config. If the config is not 
found, an
@@ -453,24 +414,6 @@ public class ConfigUtils {
     return rawValue.map(v -> 
Boolean.parseBoolean(v.toString())).orElse(defaultValue);
   }
 
-  /**
-   * Gets the boolean value for a {@link ConfigProperty} config from Hadoop 
configuration. The key and
-   * alternative keys are used to fetch the config. The default value of 
{@link ConfigProperty}
-   * config, if exists, is returned if the config is not found in the 
configuration.
-   *
-   * @param conf           Configs in Hadoop {@link Configuration}.
-   * @param configProperty {@link ConfigProperty} config to fetch.
-   * @return boolean value if the config exists; default boolean value if the 
config does not exist
-   * and there is default value defined in the {@link ConfigProperty} config; 
{@code false} otherwise.
-   */
-  public static boolean getBooleanWithAltKeys(Configuration conf,
-                                              ConfigProperty<?> 
configProperty) {
-    Option<String> rawValue = getRawValueWithAltKeys(conf, configProperty);
-    boolean defaultValue = configProperty.hasDefaultValue()
-        ? Boolean.parseBoolean(configProperty.defaultValue().toString()) : 
false;
-    return rawValue.map(Boolean::parseBoolean).orElse(defaultValue);
-  }
-
   /**
    * Gets the integer value for a {@link ConfigProperty} config from 
properties. The key and
    * alternative keys are used to fetch the config. The default value of 
{@link ConfigProperty}
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/util/TestConfigUtils.java 
b/hudi-common/src/test/java/org/apache/hudi/common/util/TestConfigUtils.java
index 1f959ba1b58..5728dd8d36c 100644
--- a/hudi-common/src/test/java/org/apache/hudi/common/util/TestConfigUtils.java
+++ b/hudi-common/src/test/java/org/apache/hudi/common/util/TestConfigUtils.java
@@ -19,6 +19,8 @@
 
 package org.apache.hudi.common.util;
 
+import org.apache.hudi.common.config.ConfigProperty;
+
 import org.junit.jupiter.api.Test;
 
 import java.util.HashMap;
@@ -28,7 +30,13 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 
 public class TestConfigUtils {
-
+  public static final ConfigProperty<String> TEST_BOOLEAN_CONFIG_PROPERTY = 
ConfigProperty
+      .key("hoodie.test.boolean.config")
+      .defaultValue("true")
+      .withAlternatives("hudi.test.boolean.config")
+      .markAdvanced()
+      .withDocumentation("Testing boolean config.");
+  
   @Test
   public void testToMapSucceeds() {
     Map<String, String> expectedMap = new HashMap<>();
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/compact/ITTestHoodieFlinkCompactor.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/compact/ITTestHoodieFlinkCompactor.java
index ac4d2ea7783..b925a895628 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/compact/ITTestHoodieFlinkCompactor.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/compact/ITTestHoodieFlinkCompactor.java
@@ -53,7 +53,6 @@ import org.apache.flink.table.api.TableEnvironment;
 import org.apache.flink.table.api.config.ExecutionConfigOptions;
 import org.apache.flink.table.api.config.TableConfigOptions;
 import org.apache.flink.table.api.internal.TableEnvironmentImpl;
-import org.apache.hadoop.fs.Path;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
 import org.junit.jupiter.api.io.TempDir;
@@ -429,7 +428,7 @@ public class ITTestHoodieFlinkCompactor {
           try {
             
storage.listDirectEntries(FSUtils.constructAbsolutePath(metaClient.getBasePathV2(),
 partition))
                 .stream()
-                .filter(f -> FSUtils.isBaseFile(new Path(f.getPath().toUri())))
+                .filter(f -> FSUtils.isBaseFile(f.getPath()))
                 .forEach(f -> {
                   HoodieBaseFile baseFile = new HoodieBaseFile(f);
                   assertFalse(fileIdCommitTimeSet.contains(
diff --git 
a/hudi-hadoop-common/src/main/java/org/apache/hudi/common/util/HadoopConfigUtils.java
 
b/hudi-hadoop-common/src/main/java/org/apache/hudi/common/util/HadoopConfigUtils.java
new file mode 100644
index 00000000000..9f1347872e2
--- /dev/null
+++ 
b/hudi-hadoop-common/src/main/java/org/apache/hudi/common/util/HadoopConfigUtils.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.common.util;
+
+import org.apache.hudi.common.config.ConfigProperty;
+
+import org.apache.hadoop.conf.Configuration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Properties;
+
+/**
+ * Utils on Hadoop {@link Configuration}.
+ */
+public class HadoopConfigUtils {
+  private static final Logger LOG = 
LoggerFactory.getLogger(HadoopConfigUtils.class);
+
+  /**
+   * Creates a Hadoop {@link Configuration} instance with the properties.
+   *
+   * @param props {@link Properties} instance.
+   * @return Hadoop {@link Configuration} instance.
+   */
+  public static Configuration createHadoopConf(Properties props) {
+    Configuration hadoopConf = new Configuration();
+    props.stringPropertyNames().forEach(k -> hadoopConf.set(k, 
props.getProperty(k)));
+    return hadoopConf;
+  }
+
+  /**
+   * Gets the raw value for a {@link ConfigProperty} config from Hadoop 
configuration. The key and
+   * alternative keys are used to fetch the config.
+   *
+   * @param conf           Configs in Hadoop {@link Configuration}.
+   * @param configProperty {@link ConfigProperty} config to fetch.
+   * @return {@link Option} of value if the config exists; empty {@link 
Option} otherwise.
+   */
+  public static Option<String> getRawValueWithAltKeys(Configuration conf,
+                                                      ConfigProperty<?> 
configProperty) {
+    String value = conf.get(configProperty.key());
+    if (value != null) {
+      return Option.of(value);
+    }
+    for (String alternative : configProperty.getAlternatives()) {
+      String altValue = conf.get(alternative);
+      if (altValue != null) {
+        LOG.warn(String.format("The configuration key '%s' has been deprecated 
"
+                + "and may be removed in the future. Please use the new key 
'%s' instead.",
+            alternative, configProperty.key()));
+        return Option.of(altValue);
+      }
+    }
+    return Option.empty();
+  }
+
+  /**
+   * Gets the boolean value for a {@link ConfigProperty} config from Hadoop 
configuration. The key and
+   * alternative keys are used to fetch the config. The default value of 
{@link ConfigProperty}
+   * config, if exists, is returned if the config is not found in the 
configuration.
+   *
+   * @param conf           Configs in Hadoop {@link Configuration}.
+   * @param configProperty {@link ConfigProperty} config to fetch.
+   * @return boolean value if the config exists; default boolean value if the 
config does not exist
+   * and there is default value defined in the {@link ConfigProperty} config; 
{@code false} otherwise.
+   */
+  public static boolean getBooleanWithAltKeys(Configuration conf,
+                                              ConfigProperty<?> 
configProperty) {
+    Option<String> rawValue = getRawValueWithAltKeys(conf, configProperty);
+    boolean defaultValue = configProperty.hasDefaultValue()
+        ? Boolean.parseBoolean(configProperty.defaultValue().toString()) : 
false;
+    return rawValue.map(Boolean::parseBoolean).orElse(defaultValue);
+  }
+}
diff --git 
a/hudi-hadoop-common/src/main/java/org/apache/hudi/hadoop/fs/HadoopFSUtils.java 
b/hudi-hadoop-common/src/main/java/org/apache/hudi/hadoop/fs/HadoopFSUtils.java
index 3119ee8c0c0..ca504577b40 100644
--- 
a/hudi-hadoop-common/src/main/java/org/apache/hudi/hadoop/fs/HadoopFSUtils.java
+++ 
b/hudi-hadoop-common/src/main/java/org/apache/hudi/hadoop/fs/HadoopFSUtils.java
@@ -19,7 +19,18 @@
 
 package org.apache.hudi.hadoop.fs;
 
+import org.apache.hudi.avro.model.HoodieFSPermission;
+import org.apache.hudi.avro.model.HoodieFileStatus;
+import org.apache.hudi.avro.model.HoodiePath;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieFileFormat;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.util.collection.ImmutablePair;
+import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.exception.InvalidHoodiePathException;
+import org.apache.hudi.storage.HoodieStorageUtils;
 import org.apache.hudi.storage.StorageConfiguration;
 import org.apache.hudi.storage.StoragePath;
 import org.apache.hudi.storage.StoragePathInfo;
@@ -33,12 +44,22 @@ import org.apache.hadoop.fs.FSInputStream;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.fs.permission.FsPermission;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
+import java.util.Set;
+import java.util.regex.Matcher;
+import java.util.stream.Collectors;
 
 /**
  * Utility functions related to accessing the file storage on Hadoop.
@@ -264,4 +285,258 @@ public class HadoopFSUtils {
         HoodieWrapperFileSystem.class.getName());
     return returnConf;
   }
+
+  public static Path toPath(HoodiePath path) {
+    if (null == path) {
+      return null;
+    }
+    return new Path(path.getUri());
+  }
+
+  public static HoodiePath fromPath(Path path) {
+    if (null == path) {
+      return null;
+    }
+    return HoodiePath.newBuilder().setUri(path.toString()).build();
+  }
+
+  public static FsPermission toFSPermission(HoodieFSPermission fsPermission) {
+    if (null == fsPermission) {
+      return null;
+    }
+    FsAction userAction = fsPermission.getUserAction() != null ? 
FsAction.valueOf(fsPermission.getUserAction()) : null;
+    FsAction grpAction = fsPermission.getGroupAction() != null ? 
FsAction.valueOf(fsPermission.getGroupAction()) : null;
+    FsAction otherAction =
+        fsPermission.getOtherAction() != null ? 
FsAction.valueOf(fsPermission.getOtherAction()) : null;
+    boolean stickyBit = fsPermission.getStickyBit() != null ? 
fsPermission.getStickyBit() : false;
+    return new FsPermission(userAction, grpAction, otherAction, stickyBit);
+  }
+
+  public static HoodieFSPermission fromFSPermission(FsPermission fsPermission) 
{
+    if (null == fsPermission) {
+      return null;
+    }
+    String userAction = fsPermission.getUserAction() != null ? 
fsPermission.getUserAction().name() : null;
+    String grpAction = fsPermission.getGroupAction() != null ? 
fsPermission.getGroupAction().name() : null;
+    String otherAction = fsPermission.getOtherAction() != null ? 
fsPermission.getOtherAction().name() : null;
+    return 
HoodieFSPermission.newBuilder().setUserAction(userAction).setGroupAction(grpAction)
+        
.setOtherAction(otherAction).setStickyBit(fsPermission.getStickyBit()).build();
+  }
+
+  public static HoodieFileStatus fromFileStatus(FileStatus fileStatus) {
+    if (null == fileStatus) {
+      return null;
+    }
+
+    HoodieFileStatus fStatus = new HoodieFileStatus();
+    try {
+      fStatus.setPath(fromPath(fileStatus.getPath()));
+      fStatus.setLength(fileStatus.getLen());
+      fStatus.setIsDir(fileStatus.isDirectory());
+      fStatus.setBlockReplication((int) fileStatus.getReplication());
+      fStatus.setBlockSize(fileStatus.getBlockSize());
+      fStatus.setModificationTime(fileStatus.getModificationTime());
+      fStatus.setAccessTime(fileStatus.getModificationTime());
+      fStatus.setSymlink(fileStatus.isSymlink() ? 
fromPath(fileStatus.getSymlink()) : null);
+      safeReadAndSetMetadata(fStatus, fileStatus);
+    } catch (IOException ioe) {
+      throw new HoodieIOException(ioe.getMessage(), ioe);
+    }
+    return fStatus;
+  }
+
+  /**
+   * Used to safely handle FileStatus calls which might fail on some 
FileSystem implementation.
+   * (DeprecatedLocalFileSystem)
+   */
+  private static void safeReadAndSetMetadata(HoodieFileStatus fStatus, 
FileStatus fileStatus) {
+    try {
+      fStatus.setOwner(fileStatus.getOwner());
+      fStatus.setGroup(fileStatus.getGroup());
+      fStatus.setPermission(fromFSPermission(fileStatus.getPermission()));
+    } catch (IllegalArgumentException ie) {
+      // Deprecated File System (testing) does not work well with this call
+      // skipping
+    }
+  }
+
+  public static long getFileSize(FileSystem fs, Path path) throws IOException {
+    return fs.getFileStatus(path).getLen();
+  }
+
+  /**
+   * Given a base partition and a partition path, return relative path of 
partition path to the base path.
+   */
+  public static String getRelativePartitionPath(Path basePath, Path 
fullPartitionPath) {
+    return FSUtils.getRelativePartitionPath(new StoragePath(basePath.toUri()), 
new StoragePath(fullPartitionPath.toUri()));
+  }
+
+  /**
+   * Get the first part of the file name in the log file. That will be the 
fileId. Log file do not have instantTime in
+   * the file name.
+   */
+  public static String getFileIdFromLogPath(Path path) {
+    Matcher matcher = FSUtils.LOG_FILE_PATTERN.matcher(path.getName());
+    if (!matcher.find()) {
+      throw new InvalidHoodiePathException(path.toString(), "LogFile");
+    }
+    return matcher.group(1);
+  }
+
+  /**
+   * Check if the file is a base file of a log file. Then get the fileId 
appropriately.
+   */
+  public static String getFileIdFromFilePath(Path filePath) {
+    if (isLogFile(filePath)) {
+      return getFileIdFromLogPath(filePath);
+    }
+    return FSUtils.getFileId(filePath.getName());
+  }
+
+  public static boolean isBaseFile(Path path) {
+    String extension = FSUtils.getFileExtension(path.getName());
+    return HoodieFileFormat.BASE_FILE_EXTENSIONS.contains(extension);
+  }
+
+  public static boolean isLogFile(Path logPath) {
+    return FSUtils.isLogFile(new StoragePath(logPath.getName()));
+  }
+
+  /**
+   * Returns true if the given path is a Base file or a Log file.
+   */
+  public static boolean isDataFile(Path path) {
+    return isBaseFile(path) || isLogFile(path);
+  }
+
+  /**
+   * Get the names of all the base and log files in the given partition path.
+   */
+  public static FileStatus[] getAllDataFilesInPartition(FileSystem fs, Path 
partitionPath) throws IOException {
+    final Set<String> validFileExtensions = 
Arrays.stream(HoodieFileFormat.values())
+        
.map(HoodieFileFormat::getFileExtension).collect(Collectors.toCollection(HashSet::new));
+    final String logFileExtension = 
HoodieFileFormat.HOODIE_LOG.getFileExtension();
+
+    try {
+      return Arrays.stream(fs.listStatus(partitionPath, path -> {
+        String extension = FSUtils.getFileExtension(path.getName());
+        return validFileExtensions.contains(extension) || 
path.getName().contains(logFileExtension);
+      })).filter(FileStatus::isFile).toArray(FileStatus[]::new);
+    } catch (IOException e) {
+      // return empty FileStatus if partition does not exist already
+      if (!fs.exists(partitionPath)) {
+        return new FileStatus[0];
+      } else {
+        throw e;
+      }
+    }
+  }
+
+  public static Path constructAbsolutePathInHadoopPath(String basePath, String 
relativePartitionPath) {
+    return new Path(FSUtils.constructAbsolutePath(basePath, 
relativePartitionPath).toUri());
+  }
+
+  /**
+   * Get DFS full partition path (e.g. hdfs://ip-address:8020:/<absolute path>)
+   */
+  public static String getDFSFullPartitionPath(FileSystem fs, Path 
fullPartitionPath) {
+    return fs.getUri() + fullPartitionPath.toUri().getRawPath();
+  }
+
+  public static <T> Map<String, T> parallelizeFilesProcess(
+      HoodieEngineContext hoodieEngineContext,
+      FileSystem fs,
+      int parallelism,
+      FSUtils.SerializableFunction<Pair<String, 
StorageConfiguration<Configuration>>, T> pairFunction,
+      List<String> subPaths) {
+    Map<String, T> result = new HashMap<>();
+    if (subPaths.size() > 0) {
+      StorageConfiguration<Configuration> conf = 
HoodieStorageUtils.getStorageConfWithCopy(fs.getConf());
+      int actualParallelism = Math.min(subPaths.size(), parallelism);
+
+      hoodieEngineContext.setJobStatus(FSUtils.class.getSimpleName(),
+          "Parallel listing paths " + String.join(",", subPaths));
+
+      result = hoodieEngineContext.mapToPair(subPaths,
+          subPath -> new ImmutablePair<>(subPath, pairFunction.apply(new 
ImmutablePair<>(subPath, conf))),
+          actualParallelism);
+    }
+    return result;
+  }
+
+  /**
+   * Lists file status at a certain level in the directory hierarchy.
+   * <p>
+   * E.g., given "/tmp/hoodie_table" as the rootPath, and 3 as the expected 
level,
+   * this method gives back the {@link FileStatus} of all files under
+   * "/tmp/hoodie_table/[*]/[*]/[*]/" folders.
+   *
+   * @param hoodieEngineContext {@link HoodieEngineContext} instance.
+   * @param fs                  {@link FileSystem} instance.
+   * @param rootPath            Root path for the file listing.
+   * @param expectLevel         Expected level of directory hierarchy for 
files to be added.
+   * @param parallelism         Parallelism for the file listing.
+   * @return A list of file status of files at the level.
+   */
+
+  public static List<FileStatus> getFileStatusAtLevel(
+      HoodieEngineContext hoodieEngineContext, FileSystem fs, Path rootPath,
+      int expectLevel, int parallelism) {
+    List<String> levelPaths = new ArrayList<>();
+    List<FileStatus> result = new ArrayList<>();
+    levelPaths.add(rootPath.toString());
+
+    for (int i = 0; i <= expectLevel; i++) {
+      result = parallelizeFilesProcess(hoodieEngineContext, fs, parallelism,
+          pairOfSubPathAndConf -> {
+            Path path = new Path(pairOfSubPathAndConf.getKey());
+            try {
+              FileSystem fileSystem = 
path.getFileSystem(pairOfSubPathAndConf.getValue().unwrap());
+              return Arrays.stream(fileSystem.listStatus(path))
+                  .collect(Collectors.toList());
+            } catch (IOException e) {
+              throw new HoodieIOException("Failed to list " + path, e);
+            }
+          },
+          levelPaths)
+          .values().stream()
+          .flatMap(list -> list.stream()).collect(Collectors.toList());
+      if (i < expectLevel) {
+        levelPaths = result.stream()
+            .filter(FileStatus::isDirectory)
+            .map(fileStatus -> fileStatus.getPath().toString())
+            .collect(Collectors.toList());
+      }
+    }
+    return result;
+  }
+
+  public static Map<String, Boolean> deleteFilesParallelize(
+      HoodieTableMetaClient metaClient,
+      List<String> paths,
+      HoodieEngineContext context,
+      int parallelism,
+      boolean ignoreFailed) {
+    return HadoopFSUtils.parallelizeFilesProcess(context,
+        (FileSystem) metaClient.getStorage().getFileSystem(),
+        parallelism,
+        pairOfSubPathAndConf -> {
+          Path file = new Path(pairOfSubPathAndConf.getKey());
+          try {
+            FileSystem fs = (FileSystem) 
metaClient.getStorage().getFileSystem();
+            if (fs.exists(file)) {
+              return fs.delete(file, false);
+            }
+            return true;
+          } catch (IOException e) {
+            if (!ignoreFailed) {
+              throw new HoodieIOException("Failed to delete : " + file, e);
+            } else {
+              LOG.warn("Ignore failed deleting : " + file);
+              return true;
+            }
+          }
+        },
+        paths);
+  }
 }
diff --git 
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/fs/TestFSUtils.java 
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/fs/TestFSUtils.java
index 3822535e7db..076cef09074 100644
--- 
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/fs/TestFSUtils.java
+++ 
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/fs/TestFSUtils.java
@@ -201,10 +201,10 @@ public class TestFSUtils extends HoodieCommonTestHarness {
   public void testGetRelativePartitionPath() {
     Path basePath = new Path("/test/apache");
     Path partitionPath = new Path("/test/apache/hudi/sub");
-    assertEquals("hudi/sub", FSUtils.getRelativePartitionPath(basePath, 
partitionPath));
+    assertEquals("hudi/sub", HadoopFSUtils.getRelativePartitionPath(basePath, 
partitionPath));
 
     Path nonPartitionPath = new Path("/test/something/else");
-    assertThrows(IllegalArgumentException.class, () -> 
FSUtils.getRelativePartitionPath(basePath, nonPartitionPath));
+    assertThrows(IllegalArgumentException.class, () -> 
HadoopFSUtils.getRelativePartitionPath(basePath, nonPartitionPath));
   }
 
   @ParameterizedTest
@@ -534,7 +534,7 @@ public class TestFSUtils extends HoodieCommonTestHarness {
     StoragePath hoodieTempDir = getHoodieTempDir();
     HoodieStorage storage = metaClient.getStorage();
     prepareTestDirectory(storage, hoodieTempDir);
-    List<FileStatus> fileStatusList = FSUtils.getFileStatusAtLevel(
+    List<FileStatus> fileStatusList = HadoopFSUtils.getFileStatusAtLevel(
         new HoodieLocalEngineContext(storage.getConf()), (FileSystem) 
storage.getFileSystem(),
         new Path(baseUri), 3, 2);
     assertEquals(CollectionUtils.createImmutableSet(
diff --git 
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/view/TestHoodieTableFileSystemView.java
 
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/view/TestHoodieTableFileSystemView.java
index fb06fb743d9..f575a3cc877 100644
--- 
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/view/TestHoodieTableFileSystemView.java
+++ 
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/view/TestHoodieTableFileSystemView.java
@@ -25,7 +25,6 @@ import org.apache.hudi.avro.model.HoodieFSPermission;
 import org.apache.hudi.avro.model.HoodieFileStatus;
 import org.apache.hudi.avro.model.HoodiePath;
 import org.apache.hudi.avro.model.HoodieRequestedReplaceMetadata;
-import org.apache.hudi.common.bootstrap.FileStatusUtils;
 import org.apache.hudi.common.bootstrap.index.BootstrapIndex.IndexWriter;
 import org.apache.hudi.common.bootstrap.index.hfile.HFileBootstrapIndex;
 import org.apache.hudi.common.fs.FSUtils;
@@ -59,6 +58,7 @@ import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.collection.ImmutablePair;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.hadoop.fs.HadoopFSUtils;
 import org.apache.hudi.storage.HoodieStorage;
 import org.apache.hudi.storage.StoragePath;
 import org.apache.hudi.storage.StoragePathInfo;
@@ -427,10 +427,10 @@ public class TestHoodieTableFileSystemView extends 
HoodieCommonTestHarness {
                                  Option<BaseFile> bootstrapBaseFile, boolean 
testBootstrap) {
     if (testBootstrap) {
       assertTrue(bootstrapBaseFile.isPresent());
-      assertEquals(FileStatusUtils.toPath(srcFileStatus.getPath()),
+      assertEquals(HadoopFSUtils.toPath(srcFileStatus.getPath()),
           new Path(bootstrapBaseFile.get().getPath()));
       assertEquals(srcFileStatus.getPath(),
-          FileStatusUtils.fromPath(new 
Path(bootstrapBaseFile.get().getPath())));
+          HadoopFSUtils.fromPath(new Path(bootstrapBaseFile.get().getPath())));
       assertEquals(srcFileStatus.getModificationTime(),
           new 
Long(bootstrapBaseFile.get().getPathInfo().getModificationTime()));
       assertEquals(srcFileStatus.getBlockSize(), new 
Long(bootstrapBaseFile.get().getPathInfo().getBlockSize()));
diff --git 
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java
 
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java
index 1192004c9e9..49f499756bb 100644
--- 
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java
+++ 
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java
@@ -64,6 +64,7 @@ import org.apache.hudi.common.util.StringUtils;
 import org.apache.hudi.common.util.ValidationUtils;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.hadoop.fs.HadoopFSUtils;
 import org.apache.hudi.storage.HoodieStorage;
 import org.apache.hudi.storage.StoragePath;
 import org.apache.hudi.storage.StoragePathInfo;
@@ -444,7 +445,7 @@ public class HoodieTestTable {
   private Map<String, Long> getWrittenLogFiles(String instant, 
Map.Entry<String, List<String>> entry) {
     Map<String, Long> writtenLogFiles = new HashMap<>();
     for (String fileName : entry.getValue()) {
-      if (FSUtils.isLogFile(new Path(fileName))) {
+      if (HadoopFSUtils.isLogFile(new Path(fileName))) {
         if (testTableState.getPartitionToLogFileInfoMap(instant) != null
             && 
testTableState.getPartitionToLogFileInfoMap(instant).containsKey(entry.getKey()))
 {
           List<Pair<String, Integer[]>> fileInfos = 
testTableState.getPartitionToLogFileInfoMap(instant).get(entry.getKey());
diff --git 
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/util/TestHadoopConfigUtils.java
 
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/util/TestHadoopConfigUtils.java
new file mode 100644
index 00000000000..01733d1b75d
--- /dev/null
+++ 
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/util/TestHadoopConfigUtils.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.common.util;
+
+import org.apache.hadoop.conf.Configuration;
+import org.junit.jupiter.api.Test;
+
+import static 
org.apache.hudi.common.util.HadoopConfigUtils.getBooleanWithAltKeys;
+import static 
org.apache.hudi.common.util.HadoopConfigUtils.getRawValueWithAltKeys;
+import static 
org.apache.hudi.common.util.TestConfigUtils.TEST_BOOLEAN_CONFIG_PROPERTY;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class TestHadoopConfigUtils {
+  @Test
+  public void testGetRawValueWithAltKeysFromHadoopConf() {
+    Configuration conf = new Configuration();
+    assertEquals(Option.empty(), getRawValueWithAltKeys(conf, 
TEST_BOOLEAN_CONFIG_PROPERTY));
+
+    boolean setValue = 
!Boolean.parseBoolean(TEST_BOOLEAN_CONFIG_PROPERTY.defaultValue());
+    conf.setBoolean(TEST_BOOLEAN_CONFIG_PROPERTY.key(), setValue);
+    assertEquals(Option.of(String.valueOf(setValue)),
+        getRawValueWithAltKeys(conf, TEST_BOOLEAN_CONFIG_PROPERTY));
+
+    conf = new Configuration();
+    conf.setBoolean(TEST_BOOLEAN_CONFIG_PROPERTY.getAlternatives().get(0), 
setValue);
+    assertEquals(Option.of(String.valueOf(setValue)),
+        getRawValueWithAltKeys(conf, TEST_BOOLEAN_CONFIG_PROPERTY));
+  }
+
+  @Test
+  public void testGetBooleanWithAltKeysFromHadoopConf() {
+    Configuration conf = new Configuration();
+    
assertEquals(Boolean.parseBoolean(TEST_BOOLEAN_CONFIG_PROPERTY.defaultValue()),
+        getBooleanWithAltKeys(conf, TEST_BOOLEAN_CONFIG_PROPERTY));
+
+    boolean setValue = 
!Boolean.parseBoolean(TEST_BOOLEAN_CONFIG_PROPERTY.defaultValue());
+    conf.setBoolean(TEST_BOOLEAN_CONFIG_PROPERTY.key(), setValue);
+    assertEquals(setValue,
+        getBooleanWithAltKeys(conf, TEST_BOOLEAN_CONFIG_PROPERTY));
+
+    conf = new Configuration();
+    conf.setBoolean(TEST_BOOLEAN_CONFIG_PROPERTY.getAlternatives().get(0), 
setValue);
+    assertEquals(setValue,
+        getBooleanWithAltKeys(conf, TEST_BOOLEAN_CONFIG_PROPERTY));
+  }
+}
diff --git 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieROTablePathFilter.java
 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieROTablePathFilter.java
index 48fd4bc29c9..d6a62f3a061 100644
--- 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieROTablePathFilter.java
+++ 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieROTablePathFilter.java
@@ -19,7 +19,6 @@
 package org.apache.hudi.hadoop;
 
 import org.apache.hudi.common.engine.HoodieLocalEngineContext;
-import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.HoodieBaseFile;
 import org.apache.hudi.common.model.HoodiePartitionMetadata;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
@@ -208,7 +207,7 @@ public class HoodieROTablePathFilter implements 
Configurable, PathFilter, Serial
             fsView = 
FileSystemViewManager.createInMemoryFileSystemView(engineContext,
                 metaClient, HoodieInputFormatUtils.buildMetadataConfig(conf));
           }
-          String partition = FSUtils.getRelativePartitionPath(new 
Path(metaClient.getBasePath()), folder);
+          String partition = HadoopFSUtils.getRelativePartitionPath(new 
Path(metaClient.getBasePath()), folder);
           List<HoodieBaseFile> latestFiles = 
fsView.getLatestBaseFiles(partition).collect(Collectors.toList());
           // populate the cache
           if (!hoodiePathCache.containsKey(folder.toString())) {
diff --git 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieParquetRealtimeInputFormat.java
 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieParquetRealtimeInputFormat.java
index 2aee2edf135..7e74171c3f9 100644
--- 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieParquetRealtimeInputFormat.java
+++ 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieParquetRealtimeInputFormat.java
@@ -18,7 +18,6 @@
 
 package org.apache.hudi.hadoop.realtime;
 
-import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.table.HoodieTableConfig;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.util.Option;
@@ -79,7 +78,7 @@ public class HoodieParquetRealtimeInputFormat extends 
HoodieParquetInputFormat {
         + ", Ids :" + 
jobConf.get(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR));
 
     // for log only split, set the parquet reader as empty.
-    if (FSUtils.isLogFile(realtimeSplit.getPath())) {
+    if (HadoopFSUtils.isLogFile(realtimeSplit.getPath())) {
       return new HoodieRealtimeRecordReader(realtimeSplit, jobConf, new 
HoodieEmptyRecordReader(realtimeSplit, jobConf));
     }
 
diff --git 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieInputFormatUtils.java
 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieInputFormatUtils.java
index 33d25f1c21f..9db661daf81 100644
--- 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieInputFormatUtils.java
+++ 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieInputFormatUtils.java
@@ -184,7 +184,7 @@ public class HoodieInputFormatUtils {
       return getInputFormat(HoodieFileFormat.HFILE, realtime, conf);
     }
     // now we support read log file, try to find log file
-    if (FSUtils.isLogFile(new Path(path)) && realtime) {
+    if (HadoopFSUtils.isLogFile(new Path(path)) && realtime) {
       return getInputFormat(HoodieFileFormat.PARQUET, realtime, conf);
     }
     throw new HoodieIOException("Hoodie InputFormat not implemented for base 
file of type " + extension);
diff --git 
a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieMergeOnReadSnapshotReader.java
 
b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieMergeOnReadSnapshotReader.java
index 30ac00b0b0d..15a935bbd9e 100644
--- 
a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieMergeOnReadSnapshotReader.java
+++ 
b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieMergeOnReadSnapshotReader.java
@@ -60,7 +60,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
 
-import static org.apache.hudi.common.fs.FSUtils.getRelativePartitionPath;
+import static org.apache.hudi.hadoop.fs.HadoopFSUtils.getRelativePartitionPath;
 import static 
org.apache.hudi.hadoop.testutils.InputFormatTestUtil.writeDataBlockToLogFile;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertTrue;
diff --git 
a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieRealtimeRecordReader.java
 
b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieRealtimeRecordReader.java
index 7c0507bace6..c05e6e9d128 100644
--- 
a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieRealtimeRecordReader.java
+++ 
b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieRealtimeRecordReader.java
@@ -21,7 +21,6 @@ package org.apache.hudi.hadoop.realtime;
 import org.apache.hudi.avro.HoodieAvroUtils;
 import org.apache.hudi.avro.model.HoodieCompactionPlan;
 import org.apache.hudi.common.config.HoodieCommonConfig;
-import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.FileSlice;
 import org.apache.hudi.common.model.HoodieCommitMetadata;
 import org.apache.hudi.common.model.HoodieLogFile;
@@ -211,7 +210,7 @@ public class TestHoodieRealtimeRecordReader {
     // TODO: HUDI-154 Once Hive 2.x PR (PR-674) is merged, enable this change
     // logVersionsWithAction.add(Pair.of(HoodieTimeline.ROLLBACK_ACTION, 3));
     FileSlice fileSlice =
-        new FileSlice(partitioned ? FSUtils.getRelativePartitionPath(new 
Path(basePath.toString()),
+        new FileSlice(partitioned ? HadoopFSUtils.getRelativePartitionPath(new 
Path(basePath.toString()),
             new Path(partitionDir.getAbsolutePath())) : "default", 
baseInstant, "fileid0");
     logVersionsWithAction.forEach(logVersionWithAction -> {
       try {
diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/bootstrap/SparkFullBootstrapDataProviderBase.java
 
b/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/bootstrap/SparkFullBootstrapDataProviderBase.java
index c857b61e0a4..c1bd8be8f57 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/bootstrap/SparkFullBootstrapDataProviderBase.java
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/bootstrap/SparkFullBootstrapDataProviderBase.java
@@ -24,7 +24,6 @@ import org.apache.hudi.avro.HoodieAvroUtils;
 import org.apache.hudi.avro.model.HoodieFileStatus;
 import org.apache.hudi.client.bootstrap.FullRecordBootstrapDataProvider;
 import org.apache.hudi.client.common.HoodieSparkEngineContext;
-import org.apache.hudi.common.bootstrap.FileStatusUtils;
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieRecord;
@@ -34,6 +33,7 @@ import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.hadoop.fs.HadoopFSUtils;
 import org.apache.hudi.keygen.KeyGenerator;
 import org.apache.hudi.keygen.SparkKeyGeneratorInterface;
 import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
@@ -63,7 +63,7 @@ public abstract class SparkFullBootstrapDataProviderBase 
extends FullRecordBoots
   public JavaRDD<HoodieRecord> generateInputRecords(String tableName, String 
sourceBasePath,
                                                     List<Pair<String, 
List<HoodieFileStatus>>> partitionPathsWithFiles, HoodieWriteConfig config) {
     String[] filePaths = partitionPathsWithFiles.stream().map(Pair::getValue)
-        .flatMap(f -> f.stream().map(fs -> 
FileStatusUtils.toPath(fs.getPath()).toString()))
+        .flatMap(f -> f.stream().map(fs -> 
HadoopFSUtils.toPath(fs.getPath()).toString()))
         .toArray(String[]::new);
 
     // NOTE: "basePath" option is required for spark to discover the partition 
column
diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowInvalidParquetProcedure.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowInvalidParquetProcedure.scala
index b9119364715..dacfdef6739 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowInvalidParquetProcedure.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowInvalidParquetProcedure.scala
@@ -52,7 +52,7 @@ class ShowInvalidParquetProcedure extends BaseProcedure with 
ProcedureBuilder {
     val storageConf = 
HadoopFSUtils.getStorageConfWithCopy(jsc.hadoopConfiguration())
     javaRdd.rdd.map(part => {
       val fs = HadoopFSUtils.getFs(new Path(srcPath), storageConf.unwrap())
-      FSUtils.getAllDataFilesInPartition(fs, 
FSUtils.constructAbsolutePathInHadoopPath(srcPath, part))
+        HadoopFSUtils.getAllDataFilesInPartition(fs, 
HadoopFSUtils.constructAbsolutePathInHadoopPath(srcPath, part))
     }).flatMap(_.toList)
       .filter(status => {
         val filePath = status.getPath
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBootstrap.java
 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBootstrap.java
index 2b371cf1db3..feec6c78ab2 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBootstrap.java
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBootstrap.java
@@ -27,7 +27,6 @@ import 
org.apache.hudi.client.bootstrap.selector.BootstrapModeSelector;
 import 
org.apache.hudi.client.bootstrap.selector.FullRecordBootstrapModeSelector;
 import 
org.apache.hudi.client.bootstrap.selector.MetadataOnlyBootstrapModeSelector;
 import org.apache.hudi.client.common.HoodieSparkEngineContext;
-import org.apache.hudi.common.bootstrap.FileStatusUtils;
 import org.apache.hudi.common.bootstrap.index.BootstrapIndex;
 import org.apache.hudi.common.config.HoodieMetadataConfig;
 import org.apache.hudi.common.config.TypedProperties;
@@ -175,7 +174,7 @@ public class TestBootstrap extends 
HoodieSparkClientTestBase {
     } else {
       df.write().format("parquet").mode(SaveMode.Overwrite).save(srcPath);
     }
-    String filePath = 
FileStatusUtils.toPath(BootstrapUtils.getAllLeafFoldersWithFiles(
+    String filePath = 
HadoopFSUtils.toPath(BootstrapUtils.getAllLeafFoldersWithFiles(
             metaClient, (FileSystem) metaClient.getStorage().getFileSystem(),
             srcPath, context).stream().findAny().map(p -> 
p.getValue().stream().findAny())
             .orElse(null).get().getPath()).toString();
@@ -513,7 +512,7 @@ public class TestBootstrap extends 
HoodieSparkClientTestBase {
     @Override
     public JavaRDD<HoodieRecord> generateInputRecords(String tableName, String 
sourceBasePath,
         List<Pair<String, List<HoodieFileStatus>>> partitionPaths, 
HoodieWriteConfig config) {
-      String filePath = 
FileStatusUtils.toPath(partitionPaths.stream().flatMap(p -> 
p.getValue().stream())
+      String filePath = HadoopFSUtils.toPath(partitionPaths.stream().flatMap(p 
-> p.getValue().stream())
           .findAny().get().getPath()).toString();
       ParquetFileReader reader = null;
       JavaSparkContext jsc = HoodieSparkEngineContext.getSparkContext(context);
@@ -531,7 +530,7 @@ public class TestBootstrap extends 
HoodieSparkClientTestBase {
   private static JavaRDD<HoodieRecord> generateInputBatch(JavaSparkContext jsc,
       List<Pair<String, List<HoodieFileStatus>>> partitionPaths, Schema 
writerSchema) {
     List<Pair<String, Path>> fullFilePathsWithPartition = 
partitionPaths.stream().flatMap(p -> p.getValue().stream()
-        .map(x -> Pair.of(p.getKey(), 
FileStatusUtils.toPath(x.getPath())))).collect(Collectors.toList());
+        .map(x -> Pair.of(p.getKey(), 
HadoopFSUtils.toPath(x.getPath())))).collect(Collectors.toList());
     return jsc.parallelize(fullFilePathsWithPartition.stream().flatMap(p -> {
       try {
         Configuration conf = jsc.hadoopConfiguration();
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestOrcBootstrap.java
 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestOrcBootstrap.java
index fe105efff42..45921cd9568 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestOrcBootstrap.java
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestOrcBootstrap.java
@@ -28,7 +28,6 @@ import 
org.apache.hudi.client.bootstrap.selector.BootstrapModeSelector;
 import 
org.apache.hudi.client.bootstrap.selector.FullRecordBootstrapModeSelector;
 import 
org.apache.hudi.client.bootstrap.selector.MetadataOnlyBootstrapModeSelector;
 import org.apache.hudi.client.common.HoodieSparkEngineContext;
-import org.apache.hudi.common.bootstrap.FileStatusUtils;
 import org.apache.hudi.common.bootstrap.index.BootstrapIndex;
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.model.HoodieAvroRecord;
@@ -50,6 +49,7 @@ import org.apache.hudi.config.HoodieCompactionConfig;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.hadoop.HoodieParquetInputFormat;
+import org.apache.hudi.hadoop.fs.HadoopFSUtils;
 import org.apache.hudi.index.HoodieIndex.IndexType;
 import org.apache.hudi.io.hadoop.OrcReaderIterator;
 import org.apache.hudi.keygen.NonpartitionedKeyGenerator;
@@ -155,7 +155,7 @@ public class TestOrcBootstrap extends 
HoodieSparkClientTestBase {
     } else {
       df.write().format("orc").mode(SaveMode.Overwrite).save(srcPath);
     }
-    String filePath = 
FileStatusUtils.toPath(BootstrapUtils.getAllLeafFoldersWithFiles(metaClient, 
(FileSystem) metaClient.getStorage().getFileSystem(),
+    String filePath = 
HadoopFSUtils.toPath(BootstrapUtils.getAllLeafFoldersWithFiles(metaClient, 
(FileSystem) metaClient.getStorage().getFileSystem(),
         srcPath, context).stream().findAny().map(p -> 
p.getValue().stream().findAny())
         .orElse(null).get().getPath()).toString();
     Reader orcReader =
@@ -401,12 +401,12 @@ public class TestOrcBootstrap extends 
HoodieSparkClientTestBase {
     public JavaRDD<HoodieRecord> generateInputRecords(String tableName, String 
sourceBasePath,
                                                       List<Pair<String, 
List<HoodieFileStatus>>> partitionPaths,  HoodieWriteConfig config) {
       String[] filePaths = partitionPaths.stream().map(Pair::getValue)
-          .flatMap(f -> f.stream().map(fs -> 
FileStatusUtils.toPath(fs.getPath()).toString()))
+          .flatMap(f -> f.stream().map(fs -> 
HadoopFSUtils.toPath(fs.getPath()).toString()))
           .toArray(String[]::new);
 
       JavaSparkContext jsc = HoodieSparkEngineContext.getSparkContext(context);
 
-      String filePath = 
FileStatusUtils.toPath(partitionPaths.stream().flatMap(p -> 
p.getValue().stream())
+      String filePath = HadoopFSUtils.toPath(partitionPaths.stream().flatMap(p 
-> p.getValue().stream())
           .findAny().get().getPath()).toString();
       try {
         Reader orcReader = OrcFile.createReader(
@@ -425,7 +425,7 @@ public class TestOrcBootstrap extends 
HoodieSparkClientTestBase {
   private static JavaRDD<HoodieRecord> generateInputBatch(JavaSparkContext jsc,
                                                           List<Pair<String, 
List<HoodieFileStatus>>> partitionPaths, Schema writerSchema) {
     List<Pair<String, Path>> fullFilePathsWithPartition = 
partitionPaths.stream().flatMap(p -> p.getValue().stream()
-        .map(x -> Pair.of(p.getKey(), 
FileStatusUtils.toPath(x.getPath())))).collect(Collectors.toList());
+        .map(x -> Pair.of(p.getKey(), 
HadoopFSUtils.toPath(x.getPath())))).collect(Collectors.toList());
     return jsc.parallelize(fullFilePathsWithPartition.stream().flatMap(p -> {
       try {
         Configuration conf = jsc.hadoopConfiguration();
diff --git 
a/hudi-sync/hudi-adb-sync/src/main/java/org/apache/hudi/sync/adb/HoodieAdbJdbcClient.java
 
b/hudi-sync/hudi-adb-sync/src/main/java/org/apache/hudi/sync/adb/HoodieAdbJdbcClient.java
index 0c4305017f1..74fbe94aef7 100644
--- 
a/hudi-sync/hudi-adb-sync/src/main/java/org/apache/hudi/sync/adb/HoodieAdbJdbcClient.java
+++ 
b/hudi-sync/hudi-adb-sync/src/main/java/org/apache/hudi/sync/adb/HoodieAdbJdbcClient.java
@@ -18,11 +18,11 @@
 
 package org.apache.hudi.sync.adb;
 
-import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.StringUtils;
 import org.apache.hudi.common.util.ValidationUtils;
 import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.hadoop.fs.HadoopFSUtils;
 import org.apache.hudi.hive.HoodieHiveSyncException;
 import org.apache.hudi.hive.SchemaDifference;
 import org.apache.hudi.hive.util.HiveSchemaUtil;
@@ -323,7 +323,7 @@ public class HoodieAdbJdbcClient extends HoodieSyncClient {
             if (!StringUtils.isNullOrEmpty(str)) {
               List<String> values = 
partitionValueExtractor.extractPartitionValuesInPath(str);
               Path storagePartitionPath =
-                  
FSUtils.constructAbsolutePathInHadoopPath(config.getString(META_SYNC_BASE_PATH),
 String.join("/", values));
+                  
HadoopFSUtils.constructAbsolutePathInHadoopPath(config.getString(META_SYNC_BASE_PATH),
 String.join("/", values));
               String fullStoragePartitionPath =
                   
Path.getPathWithoutSchemeAndAuthority(storagePartitionPath).toUri().getPath();
               partitions.put(values, fullStoragePartitionPath);
@@ -359,7 +359,7 @@ public class HoodieAdbJdbcClient extends HoodieSyncClient {
         .append(tableName).append("`").append(" add if not exists ");
     for (String partition : partitions) {
       String partitionClause = getPartitionClause(partition);
-      Path partitionPath = 
FSUtils.constructAbsolutePathInHadoopPath(config.getString(META_SYNC_BASE_PATH),
 partition);
+      Path partitionPath = 
HadoopFSUtils.constructAbsolutePathInHadoopPath(config.getString(META_SYNC_BASE_PATH),
 partition);
       String fullPartitionPathStr = 
config.generateAbsolutePathStr(partitionPath);
       sqlBuilder.append("  partition (").append(partitionClause).append(") 
location '")
           .append(fullPartitionPathStr).append("' ");
@@ -376,7 +376,7 @@ public class HoodieAdbJdbcClient extends HoodieSyncClient {
     String alterTable = "alter table `" + tableName + "`";
     for (String partition : partitions) {
       String partitionClause = getPartitionClause(partition);
-      Path partitionPath = 
FSUtils.constructAbsolutePathInHadoopPath(config.getString(META_SYNC_BASE_PATH),
 partition);
+      Path partitionPath = 
HadoopFSUtils.constructAbsolutePathInHadoopPath(config.getString(META_SYNC_BASE_PATH),
 partition);
       String fullPartitionPathStr = 
config.generateAbsolutePathStr(partitionPath);
       String changePartition = alterTable + " add if not exists partition (" + 
partitionClause
           + ") location '" + fullPartitionPathStr + "'";
@@ -455,13 +455,13 @@ public class HoodieAdbJdbcClient extends HoodieSyncClient 
{
     List<PartitionEvent> events = new ArrayList<>();
     for (String storagePartition : partitionStoragePartitions) {
       Path storagePartitionPath =
-          
FSUtils.constructAbsolutePathInHadoopPath(config.getString(META_SYNC_BASE_PATH),
 storagePartition);
+          
HadoopFSUtils.constructAbsolutePathInHadoopPath(config.getString(META_SYNC_BASE_PATH),
 storagePartition);
       String fullStoragePartitionPath = 
Path.getPathWithoutSchemeAndAuthority(storagePartitionPath).toUri().getPath();
       // Check if the partition values or if hdfs path is the same
       List<String> storagePartitionValues = 
partitionValueExtractor.extractPartitionValuesInPath(storagePartition);
       if (config.getBoolean(ADB_SYNC_USE_HIVE_STYLE_PARTITIONING)) {
         String partition = String.join("/", storagePartitionValues);
-        storagePartitionPath = 
FSUtils.constructAbsolutePathInHadoopPath(config.getString(META_SYNC_BASE_PATH),
 partition);
+        storagePartitionPath = 
HadoopFSUtils.constructAbsolutePathInHadoopPath(config.getString(META_SYNC_BASE_PATH),
 partition);
         fullStoragePartitionPath = 
Path.getPathWithoutSchemeAndAuthority(storagePartitionPath).toUri().getPath();
       }
       if (!storagePartitionValues.isEmpty()) {
diff --git 
a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/HMSDDLExecutor.java
 
b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/HMSDDLExecutor.java
index b5471079524..c3db79fb368 100644
--- 
a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/HMSDDLExecutor.java
+++ 
b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/HMSDDLExecutor.java
@@ -21,6 +21,7 @@ package org.apache.hudi.hive.ddl;
 import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.util.CollectionUtils;
 import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.hadoop.fs.HadoopFSUtils;
 import org.apache.hudi.hive.HiveSyncConfig;
 import org.apache.hudi.hive.HoodieHiveSyncException;
 import org.apache.hudi.hive.util.HivePartitionUtil;
@@ -205,7 +206,7 @@ public class HMSDDLExecutor implements DDLExecutor {
           partitionSd.setOutputFormat(sd.getOutputFormat());
           partitionSd.setSerdeInfo(sd.getSerdeInfo());
           String fullPartitionPath =
-              
FSUtils.constructAbsolutePathInHadoopPath(syncConfig.getString(META_SYNC_BASE_PATH),
 x).toString();
+              
FSUtils.constructAbsolutePath(syncConfig.getString(META_SYNC_BASE_PATH), 
x).toString();
           List<String> partitionValues = 
partitionValueExtractor.extractPartitionValuesInPath(x);
           partitionSd.setLocation(fullPartitionPath);
           partitionList.add(new Partition(partitionValues, databaseName, 
tableName, 0, 0, partitionSd, null));
@@ -229,10 +230,10 @@ public class HMSDDLExecutor implements DDLExecutor {
     try {
       StorageDescriptor sd = client.getTable(databaseName, tableName).getSd();
       List<Partition> partitionList = changedPartitions.stream().map(partition 
-> {
-        Path partitionPath = 
FSUtils.constructAbsolutePathInHadoopPath(syncConfig.getString(META_SYNC_BASE_PATH),
 partition);
+        Path partitionPath = 
HadoopFSUtils.constructAbsolutePathInHadoopPath(syncConfig.getString(META_SYNC_BASE_PATH),
 partition);
         String partitionScheme = partitionPath.toUri().getScheme();
         String fullPartitionPath = 
StorageSchemes.HDFS.getScheme().equals(partitionScheme)
-            ? 
FSUtils.getDFSFullPartitionPath(syncConfig.getHadoopFileSystem(), 
partitionPath) : partitionPath.toString();
+            ? 
HadoopFSUtils.getDFSFullPartitionPath(syncConfig.getHadoopFileSystem(), 
partitionPath) : partitionPath.toString();
         List<String> partitionValues = 
partitionValueExtractor.extractPartitionValuesInPath(partition);
         StorageDescriptor partitionSd = sd.deepCopy();
         partitionSd.setLocation(fullPartitionPath);
diff --git 
a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/QueryBasedDDLExecutor.java
 
b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/QueryBasedDDLExecutor.java
index 194f99705bf..156353f0e24 100644
--- 
a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/QueryBasedDDLExecutor.java
+++ 
b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/QueryBasedDDLExecutor.java
@@ -22,6 +22,7 @@ import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.util.PartitionPathEncodeUtils;
 import org.apache.hudi.common.util.ValidationUtils;
 import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.hadoop.fs.HadoopFSUtils;
 import org.apache.hudi.hive.HiveSyncConfig;
 import org.apache.hudi.hive.HoodieHiveSyncException;
 import org.apache.hudi.hive.util.HiveSchemaUtil;
@@ -162,7 +163,7 @@ public abstract class QueryBasedDDLExecutor implements 
DDLExecutor {
     for (int i = 0; i < partitions.size(); i++) {
       String partitionClause = getPartitionClause(partitions.get(i));
       String fullPartitionPath =
-          
FSUtils.constructAbsolutePathInHadoopPath(config.getString(META_SYNC_BASE_PATH),
 partitions.get(i)).toString();
+          FSUtils.constructAbsolutePath(config.getString(META_SYNC_BASE_PATH), 
partitions.get(i)).toString();
       alterSQL.append("  PARTITION (").append(partitionClause).append(") 
LOCATION '").append(fullPartitionPath)
           .append("' ");
       if ((i + 1) % batchSyncPartitionNum == 0) {
@@ -211,10 +212,10 @@ public abstract class QueryBasedDDLExecutor implements 
DDLExecutor {
     String alterTable = "ALTER TABLE " + HIVE_ESCAPE_CHARACTER + tableName + 
HIVE_ESCAPE_CHARACTER;
     for (String partition : partitions) {
       String partitionClause = getPartitionClause(partition);
-      Path partitionPath = 
FSUtils.constructAbsolutePathInHadoopPath(config.getString(META_SYNC_BASE_PATH),
 partition);
+      Path partitionPath = 
HadoopFSUtils.constructAbsolutePathInHadoopPath(config.getString(META_SYNC_BASE_PATH),
 partition);
       String partitionScheme = partitionPath.toUri().getScheme();
       String fullPartitionPath = 
StorageSchemes.HDFS.getScheme().equals(partitionScheme)
-          ? FSUtils.getDFSFullPartitionPath(config.getHadoopFileSystem(), 
partitionPath) : partitionPath.toString();
+          ? 
HadoopFSUtils.getDFSFullPartitionPath(config.getHadoopFileSystem(), 
partitionPath) : partitionPath.toString();
       String changePartition =
           alterTable + " PARTITION (" + partitionClause + ") SET LOCATION '" + 
fullPartitionPath + "'";
       changePartitions.add(changePartition);
diff --git 
a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java
 
b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java
index f2c67bc22e5..136c9c4e636 100644
--- 
a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java
+++ 
b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java
@@ -82,9 +82,9 @@ import java.util.Map;
 import java.util.Set;
 import java.util.stream.Collectors;
 
-import static org.apache.hudi.common.fs.FSUtils.getRelativePartitionPath;
 import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.COMMIT_ACTION;
 import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.DELTA_COMMIT_ACTION;
+import static org.apache.hudi.hadoop.fs.HadoopFSUtils.getRelativePartitionPath;
 import static 
org.apache.hudi.hive.HiveSyncConfig.HIVE_SYNC_FILTER_PUSHDOWN_ENABLED;
 import static 
org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_AUTO_CREATE_DATABASE;
 import static 
org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_CREATE_MANAGED_TABLE;
@@ -358,7 +358,7 @@ public class TestHiveSyncTool {
     // it and generate a partition update event for it.
     ddlExecutor.runSQL("ALTER TABLE `" + HiveTestUtil.TABLE_NAME
         + "` PARTITION (`datestr`='2050-01-01') SET LOCATION '"
-        + FSUtils.constructAbsolutePathInHadoopPath(basePath, 
"2050/1/1").toString() + "'");
+        + FSUtils.constructAbsolutePath(basePath, "2050/1/1").toString() + 
"'");
 
     hivePartitions = hiveClient.getAllPartitions(HiveTestUtil.TABLE_NAME);
     List<String> writtenPartitionsSince = 
hiveClient.getWrittenPartitionsSince(Option.empty(), Option.empty());
diff --git 
a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieSyncClient.java
 
b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieSyncClient.java
index ffb82021213..03085cc9d9b 100644
--- 
a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieSyncClient.java
+++ 
b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieSyncClient.java
@@ -21,8 +21,8 @@ package org.apache.hudi.sync.common;
 import org.apache.hudi.common.engine.HoodieLocalEngineContext;
 import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.HoodieTableType;
-import org.apache.hudi.common.table.ParquetTableSchemaResolver;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.ParquetTableSchemaResolver;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.table.timeline.TimelineUtils;
 import org.apache.hudi.common.util.Option;
@@ -162,7 +162,7 @@ public abstract class HoodieSyncClient implements 
HoodieMetaSyncOperations, Auto
     List<PartitionEvent> events = new ArrayList<>();
     for (String storagePartition : allPartitionsOnStorage) {
       Path storagePartitionPath =
-          
FSUtils.constructAbsolutePathInHadoopPath(config.getString(META_SYNC_BASE_PATH),
 storagePartition);
+          
HadoopFSUtils.constructAbsolutePathInHadoopPath(config.getString(META_SYNC_BASE_PATH),
 storagePartition);
       String fullStoragePartitionPath = 
Path.getPathWithoutSchemeAndAuthority(storagePartitionPath).toUri().getPath();
       // Check if the partition values or if hdfs path is the same
       List<String> storagePartitionValues = 
partitionValueExtractor.extractPartitionValuesInPath(storagePartition);
@@ -206,7 +206,7 @@ public abstract class HoodieSyncClient implements 
HoodieMetaSyncOperations, Auto
     List<PartitionEvent> events = new ArrayList<>();
     for (String storagePartition : writtenPartitionsOnStorage) {
       Path storagePartitionPath =
-          
FSUtils.constructAbsolutePathInHadoopPath(config.getString(META_SYNC_BASE_PATH),
 storagePartition);
+          
HadoopFSUtils.constructAbsolutePathInHadoopPath(config.getString(META_SYNC_BASE_PATH),
 storagePartition);
       String fullStoragePartitionPath = 
Path.getPathWithoutSchemeAndAuthority(storagePartitionPath).toUri().getPath();
       // Check if the partition values or if hdfs path is the same
       List<String> storagePartitionValues = 
partitionValueExtractor.extractPartitionValuesInPath(storagePartition);
diff --git 
a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieSyncConfig.java
 
b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieSyncConfig.java
index e85324b7a77..35900fc75da 100644
--- 
a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieSyncConfig.java
+++ 
b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieSyncConfig.java
@@ -25,7 +25,7 @@ import org.apache.hudi.common.config.HoodieConfig;
 import org.apache.hudi.common.config.HoodieMetadataConfig;
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.table.HoodieTableConfig;
-import org.apache.hudi.common.util.ConfigUtils;
+import org.apache.hudi.common.util.HadoopConfigUtils;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.StringUtils;
 import org.apache.hudi.hadoop.fs.HadoopFSUtils;
@@ -199,7 +199,7 @@ public class HoodieSyncConfig extends HoodieConfig {
   private Configuration hadoopConf;
 
   public HoodieSyncConfig(Properties props) {
-    this(props, ConfigUtils.createHadoopConf(props));
+    this(props, HadoopConfigUtils.createHadoopConf(props));
   }
 
   public HoodieSyncConfig(Properties props, Configuration hadoopConf) {
diff --git 
a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieSyncTool.java
 
b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieSyncTool.java
index 729807d1b9b..c614a7ae82b 100644
--- 
a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieSyncTool.java
+++ 
b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieSyncTool.java
@@ -18,7 +18,7 @@
 package org.apache.hudi.sync.common;
 
 import org.apache.hudi.common.config.TypedProperties;
-import org.apache.hudi.common.util.ConfigUtils;
+import org.apache.hudi.common.util.HadoopConfigUtils;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
@@ -35,7 +35,7 @@ public abstract class HoodieSyncTool implements AutoCloseable 
{
   protected Configuration hadoopConf;
 
   public HoodieSyncTool(Properties props) {
-    this(props, ConfigUtils.createHadoopConf(props));
+    this(props, HadoopConfigUtils.createHadoopConf(props));
   }
 
   public HoodieSyncTool(Properties props, Configuration hadoopConf) {
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieDataTableUtils.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieDataTableUtils.java
index 7647f93c899..6f1be367c2e 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieDataTableUtils.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieDataTableUtils.java
@@ -38,7 +38,7 @@ public class HoodieDataTableUtils {
       String basePath) throws IOException {
     List<String> allPartitionPaths = tableMetadata.getAllPartitionPaths()
         .stream().map(partitionPath ->
-            FSUtils.constructAbsolutePathInHadoopPath(basePath, 
partitionPath).toString())
+            FSUtils.constructAbsolutePath(basePath, partitionPath).toString())
         .collect(Collectors.toList());
     return 
tableMetadata.getAllFilesInPartitions(allPartitionPaths).values().stream()
         .map(fileStatuses ->
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieRepairTool.java 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieRepairTool.java
index 94dde8ce41e..f7fdbcae64c 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieRepairTool.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieRepairTool.java
@@ -22,7 +22,6 @@ package org.apache.hudi.utilities;
 import org.apache.hudi.client.common.HoodieSparkEngineContext;
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.engine.HoodieEngineContext;
-import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
 import org.apache.hudi.common.table.timeline.HoodieArchivedTimeline;
@@ -290,13 +289,13 @@ public class HoodieRepairTool {
       HoodieEngineContext context, String basePathStr, int expectedLevel, int 
parallelism) {
     FileSystem fs = HadoopFSUtils.getFs(basePathStr, context.getStorageConf());
     Path basePath = new Path(basePathStr);
-    return FSUtils.getFileStatusAtLevel(
+    return HadoopFSUtils.getFileStatusAtLevel(
             context, fs, basePath, expectedLevel, parallelism).stream()
         .filter(fileStatus -> {
           if (!fileStatus.isFile()) {
             return false;
           }
-          return FSUtils.isDataFile(fileStatus.getPath());
+          return HadoopFSUtils.isDataFile(fileStatus.getPath());
         })
         .map(fileStatus -> fileStatus.getPath().toString())
         .collect(Collectors.toList());
@@ -414,7 +413,7 @@ public class HoodieRepairTool {
     List<String> relativeFilePaths = listFilesFromBasePath(
         context, backupPathStr, partitionLevels, cfg.parallelism).stream()
         .map(filePath ->
-            FSUtils.getRelativePartitionPath(new Path(backupPathStr), new 
Path(filePath)))
+            HadoopFSUtils.getRelativePartitionPath(new Path(backupPathStr), 
new Path(filePath)))
         .collect(Collectors.toList());
     return restoreFiles(relativeFilePaths);
   }
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieSnapshotCopier.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieSnapshotCopier.java
index 36050c926ab..9b3dcc6ffe1 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieSnapshotCopier.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieSnapshotCopier.java
@@ -138,7 +138,7 @@ public class HoodieSnapshotCopier implements Serializable {
       context.foreach(filesToCopy, tuple -> {
         String partition = tuple._1();
         Path sourceFilePath = new Path(tuple._2());
-        Path toPartitionPath = 
FSUtils.constructAbsolutePathInHadoopPath(outputDir, partition);
+        Path toPartitionPath = 
HadoopFSUtils.constructAbsolutePathInHadoopPath(outputDir, partition);
         FileSystem ifs = HadoopFSUtils.getFs(baseDir, 
storageConf.unwrapCopyAs(Configuration.class));
 
         if (!ifs.exists(toPartitionPath)) {
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieSnapshotExporter.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieSnapshotExporter.java
index fd80d37a8d2..c6c8a393bbd 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieSnapshotExporter.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieSnapshotExporter.java
@@ -228,7 +228,7 @@ public class HoodieSnapshotExporter {
     context.foreach(partitionAndFileList, partitionAndFile -> {
       String partition = partitionAndFile.getLeft();
       Path sourceFilePath = new Path(partitionAndFile.getRight());
-      Path toPartitionPath = 
FSUtils.constructAbsolutePathInHadoopPath(cfg.targetOutputPath, partition);
+      Path toPartitionPath = 
HadoopFSUtils.constructAbsolutePathInHadoopPath(cfg.targetOutputPath, 
partition);
       FileSystem executorSourceFs = HadoopFSUtils.getFs(cfg.sourceBasePath, 
storageConf.newInstance());
       FileSystem executorOutputFs = HadoopFSUtils.getFs(cfg.targetOutputPath, 
storageConf.newInstance());
 

Reply via email to