This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 4b59b202f49 [HUDI-7743] Improve StoragePath usages (#11189)
4b59b202f49 is described below

commit 4b59b202f491780eeab8c67ce5f4b6506200c7b4
Author: Jon Vexler <[email protected]>
AuthorDate: Sun May 12 23:18:09 2024 -0400

    [HUDI-7743] Improve StoragePath usages (#11189)
    
    Co-authored-by: Jonathan Vexler <=>
    Co-authored-by: Y Ethan Guo <[email protected]>
---
 .../hudi/cli/commands/ArchivedCommitsCommand.java  | 19 ++++++++--------
 .../apache/hudi/cli/commands/RepairsCommand.java   | 11 ++++-----
 .../org/apache/hudi/cli/commands/TableCommand.java | 14 ++++--------
 .../apache/hudi/cli/commands/TimelineCommand.java  |  4 ++--
 .../apache/hudi/cli/commands/TestTableCommand.java |  4 ++--
 .../cli/commands/TestUpgradeDowngradeCommand.java  |  4 ++--
 .../hudi/client/heartbeat/HeartbeatUtils.java      |  2 +-
 .../client/heartbeat/HoodieHeartbeatClient.java    |  4 ++--
 .../utils/LegacyArchivedMetaEntryReader.java       |  2 +-
 .../index/bucket/ConsistentBucketIndexUtils.java   |  8 +++----
 .../org/apache/hudi/io/HoodieKeyLookupHandle.java  |  3 +--
 .../java/org/apache/hudi/io/HoodieReadHandle.java  |  5 ++---
 .../java/org/apache/hudi/io/HoodieWriteHandle.java |  2 +-
 .../metadata/HoodieBackedTableMetadataWriter.java  |  3 +--
 .../java/org/apache/hudi/table/HoodieTable.java    |  4 ++--
 .../table/action/commit/HoodieMergeHelper.java     |  3 +--
 .../table/action/index/RunIndexActionExecutor.java |  3 +--
 .../BaseHoodieFunctionalIndexClient.java           |  3 +--
 .../rollback/ListingBasedRollbackStrategy.java     |  6 ++---
 .../hudi/table/upgrade/UpgradeDowngrade.java       |  6 ++---
 .../table/upgrade/ZeroToOneUpgradeHandler.java     |  2 +-
 .../apache/hudi/io/FlinkWriteHandleFactory.java    |  4 +++-
 .../io/storage/row/HoodieRowDataCreateHandle.java  |  7 ++++--
 .../row/HoodieRowDataFileWriterFactory.java        |  4 ++--
 .../org/apache/hudi/table/HoodieJavaTable.java     |  5 ++---
 .../client/utils/SparkMetadataWriterUtils.java     |  5 +++--
 .../index/bloom/HoodieFileProbingFunction.java     |  3 +--
 .../org/apache/hudi/table/HoodieSparkTable.java    |  5 ++---
 .../functional/TestHoodieBackedMetadata.java       |  4 ++--
 .../TestCopyOnWriteRollbackActionExecutor.java     |  2 +-
 .../TestHoodieSparkMergeOnReadTableRollback.java   |  4 ++--
 .../hudi/table/upgrade/TestUpgradeDowngrade.java   | 16 ++++++-------
 .../common/config/HoodieFunctionalIndexConfig.java |  2 +-
 .../java/org/apache/hudi/common/fs/FSUtils.java    |  2 +-
 .../common/heartbeat/HoodieHeartbeatUtils.java     |  2 +-
 .../hudi/common/table/HoodieTableConfig.java       |  8 +++----
 .../hudi/common/table/HoodieTableMetaClient.java   |  6 ++---
 .../table/timeline/HoodieActiveTimeline.java       |  4 ++--
 .../hudi/common/table/timeline/LSMTimeline.java    |  2 +-
 .../view/HoodieTablePreCommitFileSystemView.java   |  2 +-
 .../org/apache/hudi/common/util/ConfigUtils.java   |  2 +-
 .../index/secondary/SecondaryIndexManager.java     |  7 +++---
 .../io/FileBasedInternalSchemaStorageManager.java  |  5 ++---
 .../metadata/FileSystemBackedTableMetadata.java    |  2 +-
 .../hudi/metadata/HoodieBackedTableMetadata.java   |  4 ++--
 .../hudi/sink/bootstrap/BootstrapOperator.java     |  3 +--
 .../java/org/apache/hudi/util/StreamerUtil.java    |  2 +-
 .../hudi/sink/bucket/ITTestBucketStreamWrite.java  |  2 +-
 .../apache/hudi/table/format/TestInputFormat.java  |  2 +-
 .../common/config/DFSPropertiesConfiguration.java  |  2 +-
 .../common/bootstrap/index/TestBootstrapIndex.java |  3 +--
 .../fs/TestFSUtilsWithRetryWrapperEnable.java      |  8 +++----
 .../hudi/common/table/TestHoodieTableConfig.java   | 26 +++++++++++-----------
 .../common/table/TestHoodieTableMetaClient.java    |  2 +-
 .../table/view/TestHoodieTableFileSystemView.java  |  6 ++---
 .../table/view/TestIncrementalFSViewSync.java      |  2 +-
 .../hadoop/HoodieCopyOnWriteTableInputFormat.java  |  4 ++--
 .../hudi/hadoop/HoodieHFileRecordReader.java       |  3 ++-
 .../hudi/hadoop/HoodieROTablePathFilter.java       |  8 ++++---
 .../apache/hudi/hadoop/SchemaEvolutionContext.java |  5 +++--
 .../HoodieMergeOnReadTableInputFormat.java         |  3 +--
 .../hudi/hadoop/utils/HoodieInputFormatUtils.java  |  8 ++++---
 .../utils/HoodieRealtimeRecordReaderUtils.java     |  4 ++--
 .../reader/DFSHoodieDatasetInputReader.java        |  3 +--
 .../scala/org/apache/hudi/HoodieBaseRelation.scala | 11 ++++-----
 .../org/apache/spark/sql/hudi/DedupeSparkJob.scala | 15 +++++++------
 .../procedures/ExportInstantsProcedure.scala       |  3 ++-
 .../RepairMigratePartitionMetaProcedure.scala      |  2 +-
 .../RepairOverwriteHoodiePropsProcedure.scala      |  5 +----
 .../apache/spark/sql/hudi/common/TestSqlConf.scala |  6 ++---
 .../TestUpgradeOrDowngradeProcedure.scala          |  2 +-
 .../MarkerBasedEarlyConflictDetectionRunnable.java |  2 +-
 72 files changed, 173 insertions(+), 188 deletions(-)

diff --git 
a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/ArchivedCommitsCommand.java
 
b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/ArchivedCommitsCommand.java
index 921d12fb663..50e71f370db 100644
--- 
a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/ArchivedCommitsCommand.java
+++ 
b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/ArchivedCommitsCommand.java
@@ -37,6 +37,7 @@ import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.collection.ClosableIterator;
 import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.storage.HoodieStorage;
 import org.apache.hudi.storage.StoragePathInfo;
 import org.apache.hudi.storage.StoragePath;
 import org.apache.hudi.storage.HoodieStorageUtils;
@@ -105,19 +106,17 @@ public class ArchivedCommitsCommand {
               defaultValue = "false") final boolean headerOnly)
       throws IOException {
     System.out.println("===============> Showing only " + limit + " archived 
commits <===============");
-    String basePath = HoodieCLI.getTableMetaClient().getBasePath();
-    StoragePath archivePath = new StoragePath(
-        HoodieCLI.getTableMetaClient().getArchivePath() + 
"/.commits_.archive*");
-    if (folder != null && !folder.isEmpty()) {
-      archivePath = new StoragePath(basePath + "/.hoodie/" + folder);
-    }
-    List<StoragePathInfo> pathInfoList =
-        HoodieStorageUtils.getStorage(basePath, 
HoodieCLI.conf).globEntries(archivePath);
+    HoodieTableMetaClient metaClient = HoodieCLI.getTableMetaClient();
+    StoragePath archivePath = folder != null && !folder.isEmpty()
+        ? new StoragePath(metaClient.getMetaPath(), folder)
+        : new StoragePath(metaClient.getArchivePath(), ".commits_.archive*");
+    HoodieStorage storage = 
HoodieStorageUtils.getStorage(metaClient.getBasePathV2(), HoodieCLI.conf);
+    List<StoragePathInfo> pathInfoList = storage.globEntries(archivePath);
     List<Comparable[]> allStats = new ArrayList<>();
     for (StoragePathInfo pathInfo : pathInfoList) {
       // read the archived file
-      try (Reader reader = 
HoodieLogFormat.newReader(HoodieStorageUtils.getStorage(basePath, 
HoodieCLI.conf),
-          new HoodieLogFile(pathInfo.getPath()), 
HoodieArchivedMetaEntry.getClassSchema())) {
+      try (Reader reader = HoodieLogFormat.newReader(storage, new 
HoodieLogFile(pathInfo.getPath()),
+          HoodieArchivedMetaEntry.getClassSchema())) {
         List<IndexedRecord> readRecords = new ArrayList<>();
         // read the avro blocks
         while (reader.hasNext()) {
diff --git 
a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/RepairsCommand.java 
b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/RepairsCommand.java
index 41a801c2f2a..683c6351e44 100644
--- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/RepairsCommand.java
+++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/RepairsCommand.java
@@ -57,8 +57,6 @@ import java.util.stream.Collectors;
 
 import scala.collection.JavaConverters;
 
-import static 
org.apache.hudi.common.table.HoodieTableMetaClient.METAFOLDER_NAME;
-
 /**
  * CLI command to display and trigger repair options.
  */
@@ -123,7 +121,7 @@ public class RepairsCommand {
         
client.getActiveTimeline().getCommitTimeline().lastInstant().get().getTimestamp();
     List<String> partitionPaths =
         FSUtils.getAllPartitionFoldersThreeLevelsDown(HoodieCLI.storage, 
client.getBasePath());
-    StoragePath basePath = new StoragePath(client.getBasePath());
+    StoragePath basePath = client.getBasePathV2();
     String[][] rows = new String[partitionPaths.size()][];
 
     int ind = 0;
@@ -163,8 +161,7 @@ public class RepairsCommand {
       newProps.load(fileInputStream);
     }
     Map<String, String> oldProps = client.getTableConfig().propsMap();
-    StoragePath metaPathDir = new StoragePath(client.getBasePath(), 
METAFOLDER_NAME);
-    HoodieTableConfig.create(client.getStorage(), metaPathDir, newProps);
+    HoodieTableConfig.create(client.getStorage(), client.getMetaPath(), 
newProps);
     // reload new props as checksum would have been added
     newProps =
         
HoodieTableMetaClient.reload(HoodieCLI.getTableMetaClient()).getTableConfig().getProps();
@@ -231,7 +228,7 @@ public class RepairsCommand {
     HoodieTableMetaClient client = HoodieCLI.getTableMetaClient();
     List<String> partitionPaths =
         FSUtils.getAllPartitionPaths(engineContext, client.getBasePath(), 
false);
-    StoragePath basePath = new StoragePath(client.getBasePath());
+    StoragePath basePath = client.getBasePathV2();
 
     String[][] rows = new String[partitionPaths.size()][];
     int ind = 0;
@@ -277,7 +274,7 @@ public class RepairsCommand {
 
     Properties props = new Properties();
     
props.setProperty(HoodieTableConfig.PARTITION_METAFILE_USE_BASE_FORMAT.key(), 
"true");
-    HoodieTableConfig.update(HoodieCLI.storage, new 
StoragePath(client.getMetaPath()), props);
+    HoodieTableConfig.update(HoodieCLI.storage, client.getMetaPath(), props);
 
     return HoodiePrintHelper.print(new String[] {
         HoodieTableHeaderFields.HEADER_PARTITION_PATH,
diff --git 
a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/TableCommand.java 
b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/TableCommand.java
index b2d7e07b330..ce67294cd77 100644
--- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/TableCommand.java
+++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/TableCommand.java
@@ -36,7 +36,6 @@ import org.apache.hudi.common.util.Option;
 import org.apache.hudi.config.HoodieCompactionConfig;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.exception.TableNotFoundException;
-import org.apache.hudi.storage.StoragePath;
 import 
org.apache.hudi.table.action.compact.strategy.UnBoundedCompactionStrategy;
 
 import org.apache.avro.Schema;
@@ -61,7 +60,6 @@ import java.util.Set;
 import java.util.TreeSet;
 import java.util.stream.Collectors;
 
-import static 
org.apache.hudi.common.table.HoodieTableMetaClient.METAFOLDER_NAME;
 import static org.apache.hudi.common.util.StringUtils.getUTF8Bytes;
 
 /**
@@ -212,8 +210,7 @@ public class TableCommand {
   public String recoverTableConfig() throws IOException {
     HoodieCLI.refreshTableMetadata();
     HoodieTableMetaClient client = HoodieCLI.getTableMetaClient();
-    StoragePath metaPathDir = new StoragePath(client.getBasePath(), 
METAFOLDER_NAME);
-    HoodieTableConfig.recover(client.getStorage(), metaPathDir);
+    HoodieTableConfig.recover(client.getStorage(), client.getMetaPath());
     return descTable();
   }
 
@@ -228,8 +225,7 @@ public class TableCommand {
     try (FileInputStream fileInputStream = new 
FileInputStream(updatePropsFilePath)) {
       updatedProps.load(fileInputStream);
     }
-    StoragePath metaPathDir = new StoragePath(client.getBasePath(), 
METAFOLDER_NAME);
-    HoodieTableConfig.update(client.getStorage(), metaPathDir, updatedProps);
+    HoodieTableConfig.update(client.getStorage(), client.getMetaPath(), 
updatedProps);
 
     HoodieCLI.refreshTableMetadata();
     Map<String, String> newProps = 
HoodieCLI.getTableMetaClient().getTableConfig().propsMap();
@@ -244,8 +240,7 @@ public class TableCommand {
     Map<String, String> oldProps = client.getTableConfig().propsMap();
 
     Set<String> deleteConfigs = 
Arrays.stream(csConfigs.split(",")).collect(Collectors.toSet());
-    StoragePath metaPathDir = new StoragePath(client.getBasePath(), 
METAFOLDER_NAME);
-    HoodieTableConfig.delete(client.getStorage(), metaPathDir, deleteConfigs);
+    HoodieTableConfig.delete(client.getStorage(), client.getMetaPath(), 
deleteConfigs);
 
     HoodieCLI.refreshTableMetadata();
     Map<String, String> newProps = 
HoodieCLI.getTableMetaClient().getTableConfig().propsMap();
@@ -366,8 +361,7 @@ public class TableCommand {
     updatedProps.putAll(oldProps);
     // change the table type to target type
     updatedProps.put(HoodieTableConfig.TYPE.key(), targetType);
-    StoragePath metaPathDir = new StoragePath(client.getBasePath(), 
METAFOLDER_NAME);
-    HoodieTableConfig.update(client.getStorage(), metaPathDir, updatedProps);
+    HoodieTableConfig.update(client.getStorage(), client.getMetaPath(), 
updatedProps);
 
     HoodieCLI.refreshTableMetadata();
     Map<String, String> newProps = 
HoodieCLI.getTableMetaClient().getTableConfig().propsMap();
diff --git 
a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/TimelineCommand.java 
b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/TimelineCommand.java
index 6dbba62af49..8cb6fb72180 100644
--- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/TimelineCommand.java
+++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/TimelineCommand.java
@@ -174,10 +174,10 @@ public class TimelineCommand {
   }
 
   private Map<String, Map<HoodieInstant.State, HoodieInstantWithModTime>> 
getInstantInfoFromTimeline(
-      HoodieStorage storage, String metaPath) throws IOException {
+      HoodieStorage storage, StoragePath metaPath) throws IOException {
     Map<String, Map<HoodieInstant.State, HoodieInstantWithModTime>> instantMap 
= new HashMap<>();
     Stream<HoodieInstantWithModTime> instantStream =
-        HoodieTableMetaClient.scanFiles(storage, new StoragePath(metaPath), 
path -> {
+        HoodieTableMetaClient.scanFiles(storage, metaPath, path -> {
           // Include only the meta files with extensions that needs to be 
included
           String extension = 
HoodieInstant.getTimelineFileExtension(path.getName());
           return 
HoodieActiveTimeline.VALID_EXTENSIONS_IN_ACTIVE_TIMELINE.contains(extension);
diff --git 
a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestTableCommand.java 
b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestTableCommand.java
index 22141b1916f..6e831a996ba 100644
--- a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestTableCommand.java
+++ b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestTableCommand.java
@@ -139,7 +139,7 @@ public class TestTableCommand extends 
CLIFunctionalTestHarness {
     HoodieTableMetaClient client = HoodieCLI.getTableMetaClient();
     assertEquals(archivePath, client.getArchivePath());
     assertEquals(tablePath, client.getBasePath());
-    assertEquals(metaPath, client.getMetaPath());
+    assertEquals(metaPath, client.getMetaPath().toString());
     assertEquals(HoodieTableType.COPY_ON_WRITE, client.getTableType());
     assertEquals(new Integer(1), 
client.getTimelineLayoutVersion().getVersion());
 
@@ -164,7 +164,7 @@ public class TestTableCommand extends 
CLIFunctionalTestHarness {
     HoodieTableMetaClient client = HoodieCLI.getTableMetaClient();
     assertEquals(metaPath + StoragePath.SEPARATOR + "archive", 
client.getArchivePath());
     assertEquals(tablePath, client.getBasePath());
-    assertEquals(metaPath, client.getMetaPath());
+    assertEquals(metaPath, client.getMetaPath().toString());
     assertEquals(HoodieTableType.MERGE_ON_READ, client.getTableType());
   }
 
diff --git 
a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestUpgradeDowngradeCommand.java
 
b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestUpgradeDowngradeCommand.java
index 5211da14b18..9d1169b4245 100644
--- 
a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestUpgradeDowngradeCommand.java
+++ 
b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestUpgradeDowngradeCommand.java
@@ -117,7 +117,7 @@ public class TestUpgradeDowngradeCommand extends 
CLIFunctionalTestHarness {
     metaClient.getTableConfig().setTableVersion(HoodieTableVersion.FIVE);
     try (OutputStream os = metaClient.getStorage().create(
         new StoragePath(
-            metaClient.getMetaPath() + "/" + 
HoodieTableConfig.HOODIE_PROPERTIES_FILE),
+            metaClient.getMetaPath(), 
HoodieTableConfig.HOODIE_PROPERTIES_FILE),
         true)) {
       metaClient.getTableConfig().getProps().store(os, "");
     }
@@ -167,7 +167,7 @@ public class TestUpgradeDowngradeCommand extends 
CLIFunctionalTestHarness {
   private void assertTableVersionFromPropertyFile(HoodieTableVersion 
expectedVersion) throws IOException {
     StoragePath propertyFile =
         new StoragePath(
-            metaClient.getMetaPath() + "/" + 
HoodieTableConfig.HOODIE_PROPERTIES_FILE);
+            metaClient.getMetaPath(), 
HoodieTableConfig.HOODIE_PROPERTIES_FILE);
     // Load the properties and verify
     InputStream inputStream = metaClient.getStorage().open(propertyFile);
     HoodieConfig config = new HoodieConfig();
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/heartbeat/HeartbeatUtils.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/heartbeat/HeartbeatUtils.java
index e7e8e6c1b5a..dcdc45932c2 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/heartbeat/HeartbeatUtils.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/heartbeat/HeartbeatUtils.java
@@ -54,7 +54,7 @@ public class HeartbeatUtils {
     boolean deleted = false;
     try {
       String heartbeatFolderPath = 
HoodieTableMetaClient.getHeartbeatFolderPath(basePath);
-      deleted = storage.deleteFile(new StoragePath(heartbeatFolderPath + 
StoragePath.SEPARATOR + instantTime));
+      deleted = storage.deleteFile(new StoragePath(heartbeatFolderPath, 
instantTime));
       if (!deleted) {
         LOG.error("Failed to delete heartbeat for instant " + instantTime);
       } else {
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/heartbeat/HoodieHeartbeatClient.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/heartbeat/HoodieHeartbeatClient.java
index 460ebdfd11e..0238f6e7f45 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/heartbeat/HoodieHeartbeatClient.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/heartbeat/HoodieHeartbeatClient.java
@@ -227,7 +227,7 @@ public class HoodieHeartbeatClient implements 
AutoCloseable, Serializable {
 
   public static Boolean heartbeatExists(HoodieStorage storage, String 
basePath, String instantTime) throws IOException {
     StoragePath heartbeatFilePath = new StoragePath(
-        HoodieTableMetaClient.getHeartbeatFolderPath(basePath) + 
StoragePath.SEPARATOR + instantTime);
+        HoodieTableMetaClient.getHeartbeatFolderPath(basePath), instantTime);
     return storage.exists(heartbeatFilePath);
   }
 
@@ -255,7 +255,7 @@ public class HoodieHeartbeatClient implements 
AutoCloseable, Serializable {
       Long newHeartbeatTime = System.currentTimeMillis();
       OutputStream outputStream =
           this.storage.create(
-              new StoragePath(heartbeatFolderPath + StoragePath.SEPARATOR + 
instantTime), true);
+              new StoragePath(heartbeatFolderPath, instantTime), true);
       outputStream.close();
       Heartbeat heartbeat = instantToHeartbeatMap.get(instantTime);
       if (heartbeat.getLastHeartbeatTime() != null && 
isHeartbeatExpired(instantTime)) {
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/LegacyArchivedMetaEntryReader.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/LegacyArchivedMetaEntryReader.java
index 24a0e1ce9f4..8ecdb9dccc8 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/LegacyArchivedMetaEntryReader.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/LegacyArchivedMetaEntryReader.java
@@ -140,7 +140,7 @@ public class LegacyArchivedMetaEntryReader {
     try {
       // List all files
       List<StoragePathInfo> pathInfoList = metaClient.getStorage().globEntries(
-          new StoragePath(metaClient.getArchivePath() + 
"/.commits_.archive*"));
+          new StoragePath(metaClient.getArchivePath(), ".commits_.archive*"));
 
       // Sort files by version suffix in reverse (implies reverse 
chronological order)
       pathInfoList.sort(new ArchiveLogVersionComparator());
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/ConsistentBucketIndexUtils.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/ConsistentBucketIndexUtils.java
index 069ec9e5b74..99b5d833f50 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/ConsistentBucketIndexUtils.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/ConsistentBucketIndexUtils.java
@@ -58,6 +58,7 @@ import static 
org.apache.hudi.common.model.HoodieConsistentHashingMetadata.HASHI
 import static 
org.apache.hudi.common.model.HoodieConsistentHashingMetadata.HASHING_METADATA_FILE_SUFFIX;
 import static 
org.apache.hudi.common.model.HoodieConsistentHashingMetadata.getTimestampFromFile;
 import static org.apache.hudi.common.util.StringUtils.getUTF8Bytes;
+import static org.apache.hudi.hadoop.fs.HadoopFSUtils.convertToStoragePath;
 
 /**
  * Utilities class for consistent bucket index metadata management.
@@ -211,8 +212,8 @@ public class ConsistentBucketIndexUtils {
    */
   private static void createCommitMarker(HoodieTable table, Path fileStatus, 
Path partitionPath) throws IOException {
     HoodieStorage storage = table.getMetaClient().getStorage();
-    StoragePath fullPath = new StoragePath(
-        partitionPath.toString(), getTimestampFromFile(fileStatus.getName()) + 
HASHING_METADATA_COMMIT_FILE_SUFFIX);
+    StoragePath fullPath = new StoragePath(convertToStoragePath(partitionPath),
+        getTimestampFromFile(fileStatus.getName()) + 
HASHING_METADATA_COMMIT_FILE_SUFFIX);
     if (storage.exists(fullPath)) {
       return;
     }
@@ -239,8 +240,7 @@ public class ConsistentBucketIndexUtils {
     if (metaFile == null) {
       return Option.empty();
     }
-    try (InputStream is = table.getMetaClient().getStorage().open(
-        new StoragePath(metaFile.getPath().toUri()))) {
+    try (InputStream is = 
table.getMetaClient().getStorage().open(convertToStoragePath(metaFile.getPath())))
 {
       byte[] content = FileIOUtils.readAsByteArray(is);
       return Option.of(HoodieConsistentHashingMetadata.fromBytes(content));
     } catch (FileNotFoundException e) {
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieKeyLookupHandle.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieKeyLookupHandle.java
index ef695b2ad55..77c4a841f3c 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieKeyLookupHandle.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieKeyLookupHandle.java
@@ -26,7 +26,6 @@ import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieIndexException;
 import org.apache.hudi.index.HoodieIndexUtils;
 import org.apache.hudi.io.storage.HoodieFileReader;
-import org.apache.hudi.storage.StoragePath;
 import org.apache.hudi.table.HoodieTable;
 
 import org.slf4j.Logger;
@@ -102,7 +101,7 @@ public class HoodieKeyLookupHandle<T, I, K, O> extends 
HoodieReadHandle<T, I, K,
 
     HoodieBaseFile baseFile = getLatestBaseFile();
     List<Pair<String, Long>> matchingKeysAndPositions = 
HoodieIndexUtils.filterKeysFromFile(
-        new StoragePath(baseFile.getPath()), candidateRecordKeys, 
hoodieTable.getStorageConf());
+        baseFile.getStoragePath(), candidateRecordKeys, 
hoodieTable.getStorageConf());
     LOG.info(
         String.format("Total records (%d), bloom filter candidates 
(%d)/fp(%d), actual matches (%d)", totalKeysChecked,
             candidateRecordKeys.size(), candidateRecordKeys.size() - 
matchingKeysAndPositions.size(), matchingKeysAndPositions.size()));
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieReadHandle.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieReadHandle.java
index 03227b75f64..5f9afc1bad1 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieReadHandle.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieReadHandle.java
@@ -25,7 +25,6 @@ import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.io.storage.HoodieFileReader;
 import org.apache.hudi.io.storage.HoodieFileReaderFactory;
 import org.apache.hudi.storage.HoodieStorage;
-import org.apache.hudi.storage.StoragePath;
 import org.apache.hudi.table.HoodieTable;
 
 import java.io.IOException;
@@ -71,11 +70,11 @@ public abstract class HoodieReadHandle<T, I, K, O> extends 
HoodieIOHandle<T, I,
 
   protected HoodieFileReader createNewFileReader() throws IOException {
     return 
HoodieFileReaderFactory.getReaderFactory(this.config.getRecordMerger().getRecordType())
-        .getFileReader(config, hoodieTable.getStorageConf(), new 
StoragePath(getLatestBaseFile().getPath()));
+        .getFileReader(config, hoodieTable.getStorageConf(), 
getLatestBaseFile().getStoragePath());
   }
 
   protected HoodieFileReader createNewFileReader(HoodieBaseFile 
hoodieBaseFile) throws IOException {
     return 
HoodieFileReaderFactory.getReaderFactory(this.config.getRecordMerger().getRecordType())
-        .getFileReader(config, hoodieTable.getStorageConf(), new 
StoragePath(hoodieBaseFile.getPath()));
+        .getFileReader(config, hoodieTable.getStorageConf(), 
hoodieBaseFile.getStoragePath());
   }
 }
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java
index 1feeca51578..b9b25b870a2 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java
@@ -122,7 +122,7 @@ public abstract class HoodieWriteHandle<T, I, K, O> extends 
HoodieIOHandle<T, I,
       throw new HoodieIOException("Failed to make dir " + path, e);
     }
 
-    return new StoragePath(path.toString(),
+    return new StoragePath(path,
         FSUtils.makeBaseFileName(instantTime, writeToken, fileId, 
hoodieTable.getBaseFileExtension()));
   }
 
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
index f01b5bd3a08..4fc3271e8a0 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
@@ -643,8 +643,7 @@ public abstract class HoodieBackedTableMetadataWriter<I> 
implements HoodieTableM
     final int fileListingParallelism = 
metadataWriteConfig.getFileListingParallelism();
     StorageConfiguration<?> storageConf = dataMetaClient.getStorageConf();
     final String dirFilterRegex = 
dataWriteConfig.getMetadataConfig().getDirectoryFilterRegex();
-    final String datasetBasePath = dataMetaClient.getBasePathV2().toString();
-    StoragePath storageBasePath = new StoragePath(datasetBasePath);
+    StoragePath storageBasePath = dataMetaClient.getBasePathV2();
 
     while (!pathsToList.isEmpty()) {
       // In each round we will list a section of directories
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
index 524901dd9bb..adbd4112438 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
@@ -1059,10 +1059,10 @@ public abstract class HoodieTable<T, I, K, O> 
implements Serializable {
     if (clearAll && partitions.size() > 0) {
       LOG.info("Clear hoodie.table.metadata.partitions in hoodie.properties");
       metaClient.getTableConfig().setValue(TABLE_METADATA_PARTITIONS.key(), 
EMPTY_STRING);
-      HoodieTableConfig.update(metaClient.getStorage(), new 
StoragePath(metaClient.getMetaPath()), metaClient.getTableConfig().getProps());
+      HoodieTableConfig.update(metaClient.getStorage(), 
metaClient.getMetaPath(), metaClient.getTableConfig().getProps());
     } else if (partitionType.isPresent() && 
partitions.remove(partitionType.get().getPartitionPath())) {
       
metaClient.getTableConfig().setValue(HoodieTableConfig.TABLE_METADATA_PARTITIONS.key(),
 String.join(",", partitions));
-      HoodieTableConfig.update(metaClient.getStorage(), new 
StoragePath(metaClient.getMetaPath()), metaClient.getTableConfig().getProps());
+      HoodieTableConfig.update(metaClient.getStorage(), 
metaClient.getMetaPath(), metaClient.getTableConfig().getProps());
     }
   }
 
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieMergeHelper.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieMergeHelper.java
index 3dc2c6f5ed1..38383fd7a88 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieMergeHelper.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieMergeHelper.java
@@ -110,8 +110,7 @@ public class HoodieMergeHelper<T> extends BaseMergeHelper {
       ClosableIterator<HoodieRecord> recordIterator;
       Schema recordSchema;
       if (baseFile.getBootstrapBaseFile().isPresent()) {
-        StoragePath bootstrapFilePath =
-            new StoragePath(baseFile.getBootstrapBaseFile().get().getPath());
+        StoragePath bootstrapFilePath = 
baseFile.getBootstrapBaseFile().get().getStoragePath();
         StorageConfiguration<?> bootstrapFileConfig = 
table.getStorageConf().newInstance();
         bootstrapFileReader = 
HoodieFileReaderFactory.getReaderFactory(recordType).newBootstrapFileReader(
             baseFileReader,
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/RunIndexActionExecutor.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/RunIndexActionExecutor.java
index ab2096b2ede..61e234534bc 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/RunIndexActionExecutor.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/RunIndexActionExecutor.java
@@ -40,7 +40,6 @@ import org.apache.hudi.hadoop.fs.HadoopFSUtils;
 import org.apache.hudi.metadata.HoodieMetadataMetrics;
 import org.apache.hudi.metadata.HoodieTableMetadataWriter;
 import org.apache.hudi.metadata.MetadataPartitionType;
-import org.apache.hudi.storage.StoragePath;
 import org.apache.hudi.table.HoodieTable;
 import org.apache.hudi.table.action.BaseActionExecutor;
 
@@ -217,7 +216,7 @@ public class RunIndexActionExecutor<T, I, K, O> extends 
BaseActionExecutor<T, I,
     
table.getMetaClient().getTableConfig().setValue(TABLE_METADATA_PARTITIONS_INFLIGHT.key(),
 String.join(",", inflightPartitions));
     
table.getMetaClient().getTableConfig().setValue(TABLE_METADATA_PARTITIONS.key(),
 String.join(",", completedPartitions));
     HoodieTableConfig.update(table.getMetaClient().getStorage(),
-        new StoragePath(table.getMetaClient().getMetaPath()), 
table.getMetaClient().getTableConfig().getProps());
+        table.getMetaClient().getMetaPath(), 
table.getMetaClient().getTableConfig().getProps());
 
     // delete metadata partition
     requestedPartitions.forEach(partition -> {
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/functional/BaseHoodieFunctionalIndexClient.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/functional/BaseHoodieFunctionalIndexClient.java
index f767ca3dcb2..dd87490d880 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/functional/BaseHoodieFunctionalIndexClient.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/functional/BaseHoodieFunctionalIndexClient.java
@@ -53,8 +53,7 @@ public abstract class BaseHoodieFunctionalIndexClient {
     // update table config if necessary
     if 
(!metaClient.getTableConfig().getProps().containsKey(HoodieTableConfig.INDEX_DEFINITION_PATH)
 || !metaClient.getTableConfig().getIndexDefinitionPath().isPresent()) {
       
metaClient.getTableConfig().setValue(HoodieTableConfig.INDEX_DEFINITION_PATH, 
indexMetaPath);
-      HoodieTableConfig.update(metaClient.getStorage(),
-          new StoragePath(metaClient.getMetaPath()), 
metaClient.getTableConfig().getProps());
+      HoodieTableConfig.update(metaClient.getStorage(), 
metaClient.getMetaPath(), metaClient.getTableConfig().getProps());
     }
   }
 
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackStrategy.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackStrategy.java
index 3e5029720c6..dbf4ff45917 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackStrategy.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackStrategy.java
@@ -33,7 +33,6 @@ import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.exception.HoodieRollbackException;
 import org.apache.hudi.hadoop.fs.HadoopFSUtils;
-import org.apache.hudi.storage.StoragePath;
 import org.apache.hudi.table.HoodieTable;
 
 import org.apache.hadoop.fs.FileStatus;
@@ -54,6 +53,7 @@ import java.util.function.Supplier;
 import java.util.stream.Collectors;
 
 import static 
org.apache.hudi.common.table.timeline.MetadataConversionUtils.getHoodieCommitMetadata;
+import static org.apache.hudi.hadoop.fs.HadoopFSUtils.convertToStoragePath;
 import static 
org.apache.hudi.table.action.rollback.BaseRollbackHelper.EMPTY_STRING;
 
 /**
@@ -186,7 +186,7 @@ public class ListingBasedRollbackStrategy implements 
BaseRollbackPlanActionExecu
         return HoodieTimeline.compareTimestamps(commit, 
HoodieTimeline.LESSER_THAN_OR_EQUALS,
             fileCommitTime);
       } else if (HadoopFSUtils.isLogFile(path)) {
-        String fileCommitTime = FSUtils.getDeltaCommitTimeFromLogPath(new 
StoragePath(path.toUri()));
+        String fileCommitTime = 
FSUtils.getDeltaCommitTimeFromLogPath(convertToStoragePath(path));
         return completionTimeQueryView.isSlicedAfterOrOn(commit, 
fileCommitTime);
       }
       return false;
@@ -299,7 +299,7 @@ public class ListingBasedRollbackStrategy implements 
BaseRollbackPlanActionExecu
         return commit.equals(fileCommitTime);
       } else if (HadoopFSUtils.isLogFile(path)) {
         // Since the baseCommitTime is the only commit for new log files, it's 
okay here
-        String fileCommitTime = FSUtils.getDeltaCommitTimeFromLogPath(new 
StoragePath(path.toUri()));
+        String fileCommitTime = 
FSUtils.getDeltaCommitTimeFromLogPath(convertToStoragePath(path));
         return commit.equals(fileCommitTime);
       }
       return false;
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/UpgradeDowngrade.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/UpgradeDowngrade.java
index 03c715e01e7..b5177a5746b 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/UpgradeDowngrade.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/UpgradeDowngrade.java
@@ -58,8 +58,8 @@ public class UpgradeDowngrade {
     this.metaClient = metaClient;
     this.config = config;
     this.context = context;
-    this.updatedPropsFilePath = new Path(metaClient.getMetaPath(), 
HOODIE_UPDATED_PROPERTY_FILE);
-    this.propsFilePath = new Path(metaClient.getMetaPath(), 
HoodieTableConfig.HOODIE_PROPERTIES_FILE);
+    this.updatedPropsFilePath = new Path(metaClient.getMetaPath().toString(), 
HOODIE_UPDATED_PROPERTY_FILE);
+    this.propsFilePath = new Path(metaClient.getMetaPath().toString(), 
HoodieTableConfig.HOODIE_PROPERTIES_FILE);
     this.upgradeDowngradeHelper = upgradeDowngradeHelper;
   }
 
@@ -158,7 +158,7 @@ public class UpgradeDowngrade {
     metaClient.getTableConfig().setTableVersion(toVersion);
 
     HoodieTableConfig.update(metaClient.getStorage(),
-        new StoragePath(metaClient.getMetaPath()), 
metaClient.getTableConfig().getProps());
+        metaClient.getMetaPath(), metaClient.getTableConfig().getProps());
   }
 
   protected Map<ConfigProperty, String> upgrade(HoodieTableVersion 
fromVersion, HoodieTableVersion toVersion, String instantTime) {
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/ZeroToOneUpgradeHandler.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/ZeroToOneUpgradeHandler.java
index 0037e3b301d..74a8a3f3d32 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/ZeroToOneUpgradeHandler.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/ZeroToOneUpgradeHandler.java
@@ -132,7 +132,7 @@ public class ZeroToOneUpgradeHandler implements 
UpgradeHandler {
    * @return the marker file name thus curated.
    */
   private static String getFileNameForMarkerFromLogFile(String logFilePath, 
HoodieTable<?, ?, ?, ?> table) {
-    StoragePath logPath = new StoragePath(table.getMetaClient().getBasePath(), 
logFilePath);
+    StoragePath logPath = new 
StoragePath(table.getMetaClient().getBasePathV2(), logFilePath);
     String fileId = FSUtils.getFileIdFromLogPath(logPath);
     String deltaInstant = FSUtils.getDeltaCommitTimeFromLogPath(logPath);
     String writeToken = FSUtils.getWriteTokenFromLogPath(logPath);
diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkWriteHandleFactory.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkWriteHandleFactory.java
index 188a92663ee..4bc55408cbb 100644
--- 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkWriteHandleFactory.java
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkWriteHandleFactory.java
@@ -32,6 +32,8 @@ import org.apache.hadoop.fs.Path;
 import java.util.Iterator;
 import java.util.Map;
 
+import static org.apache.hudi.hadoop.fs.HadoopFSUtils.convertToStoragePath;
+
 /**
  * Factory clazz for flink write handles.
  */
@@ -108,7 +110,7 @@ public class FlinkWriteHandleFactory {
       Path writePath = bucketToHandles.get(fileID);
       if (writePath != null) {
         HoodieWriteHandle<?, ?, ?, ?> writeHandle =
-            createReplaceHandle(config, instantTime, table, recordItr, 
partitionPath, fileID, new StoragePath(writePath.toUri()));
+            createReplaceHandle(config, instantTime, table, recordItr, 
partitionPath, fileID, convertToStoragePath(writePath));
         bucketToHandles.put(fileID, new Path(((MiniBatchHandle) 
writeHandle).getWritePath().toUri())); // override with new replace handle
         return writeHandle;
       }
diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataCreateHandle.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataCreateHandle.java
index 683282853de..15bbc7fa668 100644
--- 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataCreateHandle.java
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataCreateHandle.java
@@ -49,6 +49,8 @@ import java.io.IOException;
 import java.io.Serializable;
 import java.util.concurrent.atomic.AtomicLong;
 
+import static org.apache.hudi.hadoop.fs.HadoopFSUtils.convertToStoragePath;
+
 /**
  * Create handle with RowData for datasource implementation of bulk insert.
  */
@@ -171,9 +173,10 @@ public class HoodieRowDataCreateHandle implements 
Serializable {
     stat.setNumInserts(writeStatus.getTotalRecords());
     stat.setPrevCommit(HoodieWriteStat.NULL_COMMIT);
     stat.setFileId(fileId);
-    stat.setPath(new StoragePath(writeConfig.getBasePath()), new 
StoragePath(path.toUri()));
+    StoragePath storagePath = convertToStoragePath(path);
+    stat.setPath(new StoragePath(writeConfig.getBasePath()), storagePath);
     long fileSizeInBytes = FSUtils.getFileSize(
-        table.getMetaClient().getStorage(), new StoragePath(path.toUri()));
+        table.getMetaClient().getStorage(), storagePath);
     stat.setTotalWriteBytes(fileSizeInBytes);
     stat.setFileSizeInBytes(fileSizeInBytes);
     stat.setTotalWriteErrors(writeStatus.getTotalErrorRecords());
diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataFileWriterFactory.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataFileWriterFactory.java
index e9bc86b4a76..be757a30954 100644
--- 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataFileWriterFactory.java
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataFileWriterFactory.java
@@ -23,7 +23,6 @@ import org.apache.hudi.common.bloom.BloomFilterFactory;
 import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.io.storage.HoodieParquetConfig;
-import org.apache.hudi.storage.StoragePath;
 import org.apache.hudi.storage.hadoop.HadoopStorageConfiguration;
 import org.apache.hudi.table.HoodieTable;
 
@@ -34,6 +33,7 @@ import org.apache.hadoop.fs.Path;
 import java.io.IOException;
 
 import static org.apache.hudi.common.model.HoodieFileFormat.PARQUET;
+import static org.apache.hudi.hadoop.fs.HadoopFSUtils.convertToStoragePath;
 
 /**
  * Factory to assist in instantiating a new {@link HoodieRowDataFileWriter}.
@@ -71,7 +71,7 @@ public class HoodieRowDataFileWriterFactory {
     HoodieRowDataParquetWriteSupport writeSupport =
         new HoodieRowDataParquetWriteSupport((Configuration) 
table.getStorageConf().unwrap(), rowType, filter);
     return new HoodieRowDataParquetWriter(
-        new StoragePath(path.toUri()), new HoodieParquetConfig<>(
+        convertToStoragePath(path), new HoodieParquetConfig<>(
         writeSupport,
         writeConfig.getParquetCompressionCodec(),
         writeConfig.getParquetBlockSize(),
diff --git 
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaTable.java
 
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaTable.java
index 6a7bfd3f280..225e260bfb6 100644
--- 
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaTable.java
+++ 
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaTable.java
@@ -35,7 +35,6 @@ import org.apache.hudi.index.JavaHoodieIndexFactory;
 import org.apache.hudi.metadata.HoodieTableMetadata;
 import org.apache.hudi.metadata.HoodieTableMetadataWriter;
 import org.apache.hudi.metadata.JavaHoodieBackedTableMetadataWriter;
-import org.apache.hudi.storage.StoragePath;
 import org.apache.hudi.table.action.HoodieWriteMetadata;
 
 import java.io.IOException;
@@ -94,8 +93,8 @@ public abstract class HoodieJavaTable<T>
       // delete metadata partitions corresponding to such indexes
       deleteMetadataIndexIfNecessary();
       try {
-        if (isMetadataTableExists || metaClient.getStorage().exists(new 
StoragePath(
-            
HoodieTableMetadata.getMetadataTableBasePath(metaClient.getBasePath())))) {
+        if (isMetadataTableExists || metaClient.getStorage().exists(
+            
HoodieTableMetadata.getMetadataTableBasePath(metaClient.getBasePathV2()))) {
           isMetadataTableExists = true;
           return Option.of(metadataWriter);
         }
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkMetadataWriterUtils.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkMetadataWriterUtils.java
index 82b4e0c366e..f9fdf0af0c7 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkMetadataWriterUtils.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkMetadataWriterUtils.java
@@ -53,6 +53,7 @@ import java.util.stream.Collectors;
 
 import static org.apache.hudi.common.model.HoodieRecord.HOODIE_META_COLUMNS;
 import static org.apache.hudi.common.util.StringUtils.getUTF8Bytes;
+import static org.apache.hudi.hadoop.fs.HadoopFSUtils.convertToStoragePath;
 import static 
org.apache.hudi.metadata.HoodieMetadataPayload.createBloomFilterMetadataRecord;
 import static 
org.apache.hudi.metadata.HoodieMetadataPayload.createColumnStatsRecords;
 
@@ -156,7 +157,7 @@ public class SparkMetadataWriterUtils {
       long fileSize,
       Path filePath) {
     Dataset<Row> fileDf = readRecordsAsRow(
-        new StoragePath[] {new StoragePath(filePath.toUri())},
+        new StoragePath[] {convertToStoragePath(filePath)},
         sqlContext,
         metaClient,
         readerSchema);
@@ -179,7 +180,7 @@ public class SparkMetadataWriterUtils {
       String partitionName,
       String instantTime) {
     Dataset<Row> fileDf =
-        readRecordsAsRow(new StoragePath[] {new StoragePath(filePath.toUri())},
+        readRecordsAsRow(new StoragePath[] {convertToStoragePath(filePath)},
             sqlContext, metaClient, readerSchema);
     Column indexedColumn = 
functionalIndex.apply(Arrays.asList(fileDf.col(columnToIndex)));
     fileDf = fileDf.withColumn(columnToIndex, indexedColumn);
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/HoodieFileProbingFunction.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/HoodieFileProbingFunction.java
index 0f2e5ae5f1f..ae3cb76e33d 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/HoodieFileProbingFunction.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/HoodieFileProbingFunction.java
@@ -29,7 +29,6 @@ import org.apache.hudi.exception.HoodieIndexException;
 import org.apache.hudi.index.HoodieIndexUtils;
 import org.apache.hudi.io.HoodieKeyLookupResult;
 import org.apache.hudi.storage.StorageConfiguration;
-import org.apache.hudi.storage.StoragePath;
 
 import org.apache.spark.api.java.function.FlatMapFunction;
 import org.apache.spark.broadcast.Broadcast;
@@ -128,7 +127,7 @@ public class HoodieFileProbingFunction implements
 
             final HoodieBaseFile dataFile = fileIDBaseFileMap.get(fileId);
             List<Pair<String, Long>> matchingKeysAndPositions = 
HoodieIndexUtils.filterKeysFromFile(
-                new StoragePath(dataFile.getPath()), candidateRecordKeys, 
storageConf);
+                dataFile.getStoragePath(), candidateRecordKeys, storageConf);
 
             LOG.debug(
                 String.format("Bloom filter candidates (%d) / false positives 
(%d), actual matches (%d)",
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkTable.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkTable.java
index ff47e3e5d90..ecf5bd88135 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkTable.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkTable.java
@@ -38,7 +38,6 @@ import org.apache.hudi.io.HoodieMergeHandle;
 import org.apache.hudi.metadata.HoodieTableMetadata;
 import org.apache.hudi.metadata.HoodieTableMetadataWriter;
 import org.apache.hudi.metadata.SparkHoodieBackedTableMetadataWriter;
-import org.apache.hudi.storage.StoragePath;
 import org.apache.hudi.table.action.commit.HoodieMergeHelper;
 
 import org.apache.hadoop.conf.Configuration;
@@ -112,8 +111,8 @@ public abstract class HoodieSparkTable<T>
           context.getStorageConf(), config, failedWritesCleaningPolicy, 
context,
           Option.of(triggeringInstantTimestamp));
       try {
-        if (isMetadataTableExists || metaClient.getStorage().exists(new 
StoragePath(
-            
HoodieTableMetadata.getMetadataTableBasePath(metaClient.getBasePath())))) {
+        if (isMetadataTableExists || metaClient.getStorage().exists(
+            
HoodieTableMetadata.getMetadataTableBasePath(metaClient.getBasePathV2()))) {
           isMetadataTableExists = true;
           return Option.of(metadataWriter);
         }
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
index cfe4fbad4c3..2c145f5b10e 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
@@ -2040,7 +2040,7 @@ public class TestHoodieBackedMetadata extends 
TestHoodieMetadataBase {
 
     // collect all commit meta files from metadata table.
     List<StoragePathInfo> metaFiles = metaClient.getStorage()
-        .listDirectEntries(new StoragePath(metaClient.getMetaPath() + 
"/metadata/.hoodie"));
+        .listDirectEntries(new StoragePath(metaClient.getMetaPath(), 
"metadata/.hoodie"));
     List<StoragePathInfo> commit3Files = metaFiles.stream()
         .filter(pathInfo ->
             pathInfo.getPath().getName().contains(commit3)
@@ -3787,7 +3787,7 @@ public class TestHoodieBackedMetadata extends 
TestHoodieMetadataBase {
     metaClient = HoodieTableMetaClient.reload(metaClient);
     metaClient.getTableConfig().setTableVersion(version);
     StoragePath propertyFile = new StoragePath(
-        metaClient.getMetaPath() + "/" + 
HoodieTableConfig.HOODIE_PROPERTIES_FILE);
+        metaClient.getMetaPath(), HoodieTableConfig.HOODIE_PROPERTIES_FILE);
     try (OutputStream os = metaClient.getStorage().create(propertyFile)) {
       metaClient.getTableConfig().getProps().store(os, "");
     }
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestCopyOnWriteRollbackActionExecutor.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestCopyOnWriteRollbackActionExecutor.java
index dd1468c0580..67d618e616d 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestCopyOnWriteRollbackActionExecutor.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestCopyOnWriteRollbackActionExecutor.java
@@ -361,7 +361,7 @@ public class TestCopyOnWriteRollbackActionExecutor extends 
HoodieClientRollbackT
     HoodieTable table =
         this.getHoodieTable(metaClient, 
getConfigBuilder().withRollbackBackupEnabled(true).build());
     HoodieInstant needRollBackInstant = HoodieTestUtils.getCompleteInstant(
-        metaClient.getStorage(), new StoragePath(metaClient.getMetaPath()),
+        metaClient.getStorage(), metaClient.getMetaPath(),
         "002", HoodieTimeline.COMMIT_ACTION);
 
     // Create the rollback plan and perform the rollback
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableRollback.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableRollback.java
index c1a207cd895..275d62bc4f3 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableRollback.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableRollback.java
@@ -790,7 +790,7 @@ public class TestHoodieSparkMergeOnReadTableRollback 
extends SparkClientFunction
       for (HoodieInstant.State state : 
Arrays.asList(HoodieInstant.State.REQUESTED, HoodieInstant.State.INFLIGHT)) {
         HoodieInstant toCopy = new HoodieInstant(state, 
HoodieTimeline.DELTA_COMMIT_ACTION, lastCommitTime);
         File file = Files.createTempFile(tempFolder, null, null).toFile();
-        fs().copyToLocalFile(new Path(metaClient.getMetaPath(), 
toCopy.getFileName()),
+        fs().copyToLocalFile(new Path(metaClient.getMetaPath().toString(), 
toCopy.getFileName()),
             new Path(file.getAbsolutePath()));
         fileNameMap.put(file.getAbsolutePath(), toCopy.getFileName());
       }
@@ -816,7 +816,7 @@ public class TestHoodieSparkMergeOnReadTableRollback 
extends SparkClientFunction
       for (Map.Entry<String, String> entry : fileNameMap.entrySet()) {
         try {
           fs().copyFromLocalFile(new Path(entry.getKey()),
-              new Path(metaClient.getMetaPath(), entry.getValue()));
+              new Path(metaClient.getMetaPath().toString(), entry.getValue()));
         } catch (IOException e) {
           throw new HoodieIOException("Error copying state from local disk.", 
e);
         }
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/upgrade/TestUpgradeDowngrade.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/upgrade/TestUpgradeDowngrade.java
index 358bde192e5..3db773549c8 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/upgrade/TestUpgradeDowngrade.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/upgrade/TestUpgradeDowngrade.java
@@ -509,8 +509,8 @@ public class TestUpgradeDowngrade extends 
HoodieClientTestBase {
     metaClient = HoodieTestUtils.init(storageConf, basePath, getTableType(), 
properties);
     // set hoodie.table.version to 4 in hoodie.properties file
     metaClient.getTableConfig().setTableVersion(HoodieTableVersion.FOUR);
-    HoodieTableConfig.update(metaClient.getStorage(),
-        new StoragePath(metaClient.getMetaPath()), 
metaClient.getTableConfig().getProps());
+    HoodieTableConfig.update(metaClient.getStorage(), metaClient.getMetaPath(),
+        metaClient.getTableConfig().getProps());
 
     String metadataTablePath =
         
HoodieTableMetadata.getMetadataTableBasePath(metaClient.getBasePathV2().toString());
@@ -519,8 +519,8 @@ public class TestUpgradeDowngrade extends 
HoodieClientTestBase {
           
.setConf(metaClient.getStorageConf().newInstance()).setBasePath(metadataTablePath).build();
       metaClient.getTableConfig().setTableVersion(HoodieTableVersion.FOUR);
       HoodieTableConfig.update(
-          mdtMetaClient.getStorage(),
-          new StoragePath(mdtMetaClient.getMetaPath()), 
metaClient.getTableConfig().getProps());
+          mdtMetaClient.getStorage(), mdtMetaClient.getMetaPath(),
+          metaClient.getTableConfig().getProps());
     }
 
     assertTableVersionOnDataAndMetadataTable(metaClient, 
HoodieTableVersion.FOUR);
@@ -902,7 +902,7 @@ public class TestUpgradeDowngrade extends 
HoodieClientTestBase {
   private void prepForDowngradeFromVersion(HoodieTableVersion fromVersion) 
throws IOException {
     metaClient.getTableConfig().setTableVersion(fromVersion);
     StoragePath propertyFile = new StoragePath(
-        metaClient.getMetaPath() + "/" + 
HoodieTableConfig.HOODIE_PROPERTIES_FILE);
+        metaClient.getMetaPath(), HoodieTableConfig.HOODIE_PROPERTIES_FILE);
     try (OutputStream os = metaClient.getStorage().create(propertyFile)) {
       metaClient.getTableConfig().getProps().store(os, "");
     }
@@ -910,9 +910,9 @@ public class TestUpgradeDowngrade extends 
HoodieClientTestBase {
 
   private void createResidualFile() throws IOException {
     Path propertyFile =
-        new Path(metaClient.getMetaPath() + "/" + 
HoodieTableConfig.HOODIE_PROPERTIES_FILE);
+        new Path(metaClient.getMetaPath().toString(), 
HoodieTableConfig.HOODIE_PROPERTIES_FILE);
     Path updatedPropertyFile =
-        new Path(metaClient.getMetaPath() + "/" + 
UpgradeDowngrade.HOODIE_UPDATED_PROPERTY_FILE);
+        new Path(metaClient.getMetaPath().toString(), 
UpgradeDowngrade.HOODIE_UPDATED_PROPERTY_FILE);
 
     // Step1: Copy hoodie.properties to hoodie.properties.orig
     FileSystem fs = (FileSystem) metaClient.getStorage().getFileSystem();
@@ -938,7 +938,7 @@ public class TestUpgradeDowngrade extends 
HoodieClientTestBase {
     assertEquals(expectedVersion.versionCode(),
         metaClient.getTableConfig().getTableVersion().versionCode());
     StoragePath propertyFile = new StoragePath(
-        metaClient.getMetaPath() + "/" + 
HoodieTableConfig.HOODIE_PROPERTIES_FILE);
+        metaClient.getMetaPath(), HoodieTableConfig.HOODIE_PROPERTIES_FILE);
     // Load the properties and verify
     InputStream inputStream = metaClient.getStorage().open(propertyFile);
     HoodieConfig config = new HoodieConfig();
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieFunctionalIndexConfig.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieFunctionalIndexConfig.java
index dc267f88c3c..2598511a60a 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieFunctionalIndexConfig.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieFunctionalIndexConfig.java
@@ -136,7 +136,7 @@ public class HoodieFunctionalIndexConfig extends 
HoodieConfig {
 
       // 1. Read the existing config
       TypedProperties props =
-          fetchConfigs(storage, metadataFolder.toString(), 
INDEX_DEFINITION_FILE,
+          fetchConfigs(storage, metadataFolder, INDEX_DEFINITION_FILE,
               INDEX_DEFINITION_FILE_BACKUP, MAX_READ_RETRIES, 
READ_RETRY_DELAY_MSEC);
 
       // 2. backup the existing properties.
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java 
b/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java
index 4abcae863ff..0829216216c 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java
@@ -92,7 +92,7 @@ public class FSUtils {
    * @return {@code true} if table exists. {@code false} otherwise.
    */
   public static boolean isTableExists(String path, HoodieStorage storage) 
throws IOException {
-    return storage.exists(new StoragePath(path + "/" + 
HoodieTableMetaClient.METAFOLDER_NAME));
+    return storage.exists(new StoragePath(path, 
HoodieTableMetaClient.METAFOLDER_NAME));
   }
 
   /**
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/heartbeat/HoodieHeartbeatUtils.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/heartbeat/HoodieHeartbeatUtils.java
index 0631ed587f1..7e6ce0e2135 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/heartbeat/HoodieHeartbeatUtils.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/heartbeat/HoodieHeartbeatUtils.java
@@ -46,7 +46,7 @@ public class HoodieHeartbeatUtils {
   public static Long getLastHeartbeatTime(HoodieStorage storage, String 
basePath,
                                           String instantTime) throws 
IOException {
     StoragePath heartbeatFilePath = new StoragePath(
-        HoodieTableMetaClient.getHeartbeatFolderPath(basePath) + 
StoragePath.SEPARATOR + instantTime);
+        HoodieTableMetaClient.getHeartbeatFolderPath(basePath), instantTime);
     if (storage.exists(heartbeatFilePath)) {
       return storage.getPathInfo(heartbeatFilePath).getModificationTime();
     } else {
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java 
b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
index efdbba8a5b8..9210d1521ed 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
@@ -303,7 +303,7 @@ public class HoodieTableConfig extends HoodieConfig {
 
   private static final String TABLE_CHECKSUM_FORMAT = "%s.%s"; // 
<database_name>.<table_name>
 
-  public HoodieTableConfig(HoodieStorage storage, String metaPath, String 
payloadClassName, String recordMergerStrategyId) {
+  public HoodieTableConfig(HoodieStorage storage, StoragePath metaPath, String 
payloadClassName, String recordMergerStrategyId) {
     super();
     StoragePath propertyPath = new StoragePath(metaPath, 
HOODIE_PROPERTIES_FILE);
     LOG.info("Loading table properties from " + propertyPath);
@@ -389,7 +389,7 @@ public class HoodieTableConfig extends HoodieConfig {
       recoverIfNeeded(storage, cfgPath, backupCfgPath);
 
       // 1. Read the existing config
-      TypedProperties props = fetchConfigs(storage, metadataFolder.toString(), 
HOODIE_PROPERTIES_FILE, HOODIE_PROPERTIES_FILE_BACKUP, MAX_READ_RETRIES, 
READ_RETRY_DELAY_MSEC);
+      TypedProperties props = fetchConfigs(storage, metadataFolder, 
HOODIE_PROPERTIES_FILE, HOODIE_PROPERTIES_FILE_BACKUP, MAX_READ_RETRIES, 
READ_RETRY_DELAY_MSEC);
 
       // 2. backup the existing properties.
       try (OutputStream out = storage.create(backupCfgPath, false)) {
@@ -775,7 +775,7 @@ public class HoodieTableConfig extends HoodieConfig {
     }
     setValue(TABLE_METADATA_PARTITIONS, 
partitions.stream().sorted().collect(Collectors.joining(CONFIG_VALUES_DELIMITER)));
     setValue(TABLE_METADATA_PARTITIONS_INFLIGHT, 
partitionsInflight.stream().sorted().collect(Collectors.joining(CONFIG_VALUES_DELIMITER)));
-    update(metaClient.getStorage(), new StoragePath(metaClient.getMetaPath()), 
getProps());
+    update(metaClient.getStorage(), metaClient.getMetaPath(), getProps());
     LOG.info(String.format("MDT %s partition %s has been %s", 
metaClient.getBasePathV2(), partitionPath, enabled ? "enabled" : "disabled"));
   }
 
@@ -793,7 +793,7 @@ public class HoodieTableConfig extends HoodieConfig {
     });
 
     setValue(TABLE_METADATA_PARTITIONS_INFLIGHT, 
partitionsInflight.stream().sorted().collect(Collectors.joining(CONFIG_VALUES_DELIMITER)));
-    update(metaClient.getStorage(), new StoragePath(metaClient.getMetaPath()), 
getProps());
+    update(metaClient.getStorage(), metaClient.getMetaPath(), getProps());
     LOG.info(String.format("MDT %s partitions %s have been set to inflight", 
metaClient.getBasePathV2(), partitionPaths));
   }
 
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
index 38bf3e43d45..943c8ab9dc4 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
@@ -153,7 +153,7 @@ public class HoodieTableMetaClient implements Serializable {
     this.metaPath = new StoragePath(basePath, METAFOLDER_NAME);
     this.storage = getStorage();
     TableNotFoundException.checkTableValidity(storage, this.basePath, 
metaPath);
-    this.tableConfig = new HoodieTableConfig(storage, metaPath.toString(), 
payloadClassName, recordMergerStrategy);
+    this.tableConfig = new HoodieTableConfig(storage, metaPath, 
payloadClassName, recordMergerStrategy);
     this.functionalIndexMetadata = getFunctionalIndexMetadata();
     this.tableType = tableConfig.getTableType();
     Option<TimelineLayoutVersion> tableConfigVersion = 
tableConfig.getTimelineLayoutVersion();
@@ -295,8 +295,8 @@ public class HoodieTableMetaClient implements Serializable {
   /**
    * @return Meta path
    */
-  public String getMetaPath() {
-    return metaPath.toString();  // this invocation is cached
+  public StoragePath getMetaPath() {
+    return metaPath;
   }
 
   /**
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java
index 350ee050a68..376b6535af9 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java
@@ -273,7 +273,7 @@ public class HoodieActiveTimeline extends 
HoodieDefaultTimeline {
     deleteInstantFile(instant);
   }
 
-  public static void deleteInstantFile(HoodieStorage storage, String metaPath, 
HoodieInstant instant) {
+  public static void deleteInstantFile(HoodieStorage storage, StoragePath 
metaPath, HoodieInstant instant) {
     try {
       storage.deleteFile(new StoragePath(metaPath, instant.getFileName()));
     } catch (IOException e) {
@@ -750,7 +750,7 @@ public class HoodieActiveTimeline extends 
HoodieDefaultTimeline {
   }
 
   private StoragePath getInstantFileNamePath(String fileName) {
-    return new StoragePath(fileName.contains(SCHEMA_COMMIT_ACTION) ? 
metaClient.getSchemaFolderName() : metaClient.getMetaPath(), fileName);
+    return new StoragePath(fileName.contains(SCHEMA_COMMIT_ACTION) ? 
metaClient.getSchemaFolderName() : metaClient.getMetaPath().toString(), 
fileName);
   }
 
   public void transitionRequestedToInflight(String commitType, String 
inFlightInstant) {
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/LSMTimeline.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/LSMTimeline.java
index e65f19982e1..c482de08ac3 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/LSMTimeline.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/LSMTimeline.java
@@ -228,7 +228,7 @@ public class LSMTimeline {
    */
   public static List<StoragePathInfo> listAllMetaFiles(HoodieTableMetaClient 
metaClient) throws IOException {
     return metaClient.getStorage().globEntries(
-        new StoragePath(metaClient.getArchivePath() + "/*.parquet"));
+        new StoragePath(metaClient.getArchivePath(), "*.parquet"));
   }
 
   /**
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/view/HoodieTablePreCommitFileSystemView.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/view/HoodieTablePreCommitFileSystemView.java
index ea6b8f429bd..9c6c05f4523 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/view/HoodieTablePreCommitFileSystemView.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/view/HoodieTablePreCommitFileSystemView.java
@@ -71,7 +71,7 @@ public class HoodieTablePreCommitFileSystemView {
     Map<String, HoodieBaseFile> newFilesWrittenForPartition = 
filesWritten.stream()
         .filter(file -> partitionStr.equals(file.getPartitionPath()))
         .collect(Collectors.toMap(HoodieWriteStat::getFileId, writeStat -> 
-            new HoodieBaseFile(new StoragePath(tableMetaClient.getBasePath(), 
writeStat.getPath()).toString(), writeStat.getFileId(), preCommitInstantTime, 
null)));
+            new HoodieBaseFile(new 
StoragePath(tableMetaClient.getBasePathV2(), writeStat.getPath()).toString(), 
writeStat.getFileId(), preCommitInstantTime, null)));
 
     Stream<HoodieBaseFile> committedBaseFiles = 
this.completedCommitsFileSystemView.getLatestBaseFiles(partitionStr);
     Map<String, HoodieBaseFile> allFileIds = committedBaseFiles
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/util/ConfigUtils.java 
b/hudi-common/src/main/java/org/apache/hudi/common/util/ConfigUtils.java
index 12a6e9ab7ea..223a79e6c65 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/util/ConfigUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/ConfigUtils.java
@@ -510,7 +510,7 @@ public class ConfigUtils {
 
   public static TypedProperties fetchConfigs(
       HoodieStorage storage,
-      String metaPath,
+      StoragePath metaPath,
       String propertiesFile,
       String propertiesBackupFile,
       int maxReadRetries,
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/index/secondary/SecondaryIndexManager.java
 
b/hudi-common/src/main/java/org/apache/hudi/index/secondary/SecondaryIndexManager.java
index bc63319fa34..f3c594d6c77 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/index/secondary/SecondaryIndexManager.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/index/secondary/SecondaryIndexManager.java
@@ -25,7 +25,6 @@ import org.apache.hudi.common.table.TableSchemaResolver;
 import org.apache.hudi.common.util.CollectionUtils;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.exception.HoodieSecondaryIndexException;
-import org.apache.hudi.storage.StoragePath;
 
 import org.apache.avro.Schema;
 import org.slf4j.Logger;
@@ -123,7 +122,7 @@ public class SecondaryIndexManager {
     Properties updatedProps = new Properties();
     updatedProps.put(HoodieTableConfig.SECONDARY_INDEXES_METADATA.key(),
         SecondaryIndexUtils.toJsonString(newSecondaryIndexes));
-    HoodieTableConfig.update(metaClient.getStorage(), new 
StoragePath(metaClient.getMetaPath()), updatedProps);
+    HoodieTableConfig.update(metaClient.getStorage(), 
metaClient.getMetaPath(), updatedProps);
 
     LOG.info("Success to add secondary index metadata: {}", 
secondaryIndexToAdd);
 
@@ -155,9 +154,9 @@ public class SecondaryIndexManager {
       Properties updatedProps = new Properties();
       updatedProps.put(HoodieTableConfig.SECONDARY_INDEXES_METADATA.key(),
           SecondaryIndexUtils.toJsonString(secondaryIndexesToKeep));
-      HoodieTableConfig.update(metaClient.getStorage(), new 
StoragePath(metaClient.getMetaPath()), updatedProps);
+      HoodieTableConfig.update(metaClient.getStorage(), 
metaClient.getMetaPath(), updatedProps);
     } else {
-      HoodieTableConfig.delete(metaClient.getStorage(), new 
StoragePath(metaClient.getMetaPath()),
+      HoodieTableConfig.delete(metaClient.getStorage(), 
metaClient.getMetaPath(),
           
CollectionUtils.createSet(HoodieTableConfig.SECONDARY_INDEXES_METADATA.key()));
     }
 
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/internal/schema/io/FileBasedInternalSchemaStorageManager.java
 
b/hudi-common/src/main/java/org/apache/hudi/internal/schema/io/FileBasedInternalSchemaStorageManager.java
index 854b465b7d0..e5a58631059 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/internal/schema/io/FileBasedInternalSchemaStorageManager.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/internal/schema/io/FileBasedInternalSchemaStorageManager.java
@@ -61,14 +61,13 @@ public class FileBasedInternalSchemaStorageManager extends 
AbstractInternalSchem
   private HoodieTableMetaClient metaClient;
 
   public FileBasedInternalSchemaStorageManager(StorageConfiguration<?> conf, 
StoragePath baseTablePath) {
-    StoragePath metaPath = new StoragePath(baseTablePath, ".hoodie");
+    StoragePath metaPath = new StoragePath(baseTablePath, 
HoodieTableMetaClient.METAFOLDER_NAME);
     this.baseSchemaPath = new StoragePath(metaPath, SCHEMA_NAME);
     this.conf = conf;
   }
 
   public FileBasedInternalSchemaStorageManager(HoodieTableMetaClient 
metaClient) {
-    StoragePath metaPath = new StoragePath(metaClient.getBasePath(), 
".hoodie");
-    this.baseSchemaPath = new StoragePath(metaPath, SCHEMA_NAME);
+    this.baseSchemaPath = new StoragePath(metaClient.getMetaPath(), 
SCHEMA_NAME);
     this.conf = metaClient.getStorageConf();
     this.metaClient = metaClient;
   }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/metadata/FileSystemBackedTableMetadata.java
 
b/hudi-common/src/main/java/org/apache/hudi/metadata/FileSystemBackedTableMetadata.java
index b90d94a017b..5b2d3f68c17 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/metadata/FileSystemBackedTableMetadata.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/metadata/FileSystemBackedTableMetadata.java
@@ -81,7 +81,7 @@ public class FileSystemBackedTableMetadata extends 
AbstractHoodieTableMetadata {
     StoragePath metaPath =
         new StoragePath(dataBasePath, HoodieTableMetaClient.METAFOLDER_NAME);
     TableNotFoundException.checkTableValidity(storage, this.dataBasePath, 
metaPath);
-    HoodieTableConfig tableConfig = new HoodieTableConfig(storage, 
metaPath.toString(), null, null);
+    HoodieTableConfig tableConfig = new HoodieTableConfig(storage, metaPath, 
null, null);
     this.hiveStylePartitioningEnabled =
         Boolean.parseBoolean(tableConfig.getHiveStylePartitioningEnable());
     this.urlEncodePartitioningEnabled =
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
index d3348ba8d03..9a525a8142c 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
@@ -605,9 +605,9 @@ public class HoodieBackedTableMetadata extends 
BaseTableMetadata {
     // If the base file is present then create a reader
     Option<HoodieBaseFile> basefile = slice.getBaseFile();
     if (basefile.isPresent()) {
-      String baseFilePath = basefile.get().getPath();
+      StoragePath baseFilePath = basefile.get().getStoragePath();
       baseFileReader = (HoodieSeekingFileReader<?>) 
HoodieFileReaderFactory.getReaderFactory(HoodieRecordType.AVRO)
-          .getFileReader(DEFAULT_HUDI_CONFIG_FOR_READER, getStorageConf(), new 
StoragePath(baseFilePath));
+          .getFileReader(DEFAULT_HUDI_CONFIG_FOR_READER, getStorageConf(), 
baseFilePath);
       baseFileOpenMs = timer.endTimer();
       LOG.info(String.format("Opened metadata base file from %s at instant %s 
in %d ms", baseFilePath,
           basefile.get().getCommitTime(), baseFileOpenMs));
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapOperator.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapOperator.java
index dc8977668b2..b171560643b 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapOperator.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapOperator.java
@@ -42,7 +42,6 @@ import org.apache.hudi.hadoop.fs.HadoopFSUtils;
 import org.apache.hudi.sink.bootstrap.aggregate.BootstrapAggFunction;
 import org.apache.hudi.sink.meta.CkpMetadata;
 import org.apache.hudi.sink.meta.CkpMetadataFactory;
-import org.apache.hudi.storage.StoragePath;
 import org.apache.hudi.table.HoodieTable;
 import org.apache.hudi.table.format.FormatUtils;
 import org.apache.hudi.util.FlinkTables;
@@ -223,7 +222,7 @@ public class BootstrapOperator<I, O extends HoodieRecord<?>>
             return;
           }
           try (ClosableIterator<HoodieKey> iterator = 
fileUtils.getHoodieKeyIterator(
-              HadoopFSUtils.getStorageConf(this.hadoopConf), new 
StoragePath(baseFile.getPath()))) {
+              HadoopFSUtils.getStorageConf(this.hadoopConf), 
baseFile.getStoragePath())) {
             iterator.forEachRemaining(hoodieKey -> {
               output.collect(new StreamRecord(new 
IndexRecord(generateHoodieRecord(hoodieKey, fileSlice))));
             });
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
index f6f6e5b124b..86dd5ad2074 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
@@ -372,7 +372,7 @@ public class StreamerUtil {
     StoragePath metaPath = new StoragePath(basePath, 
HoodieTableMetaClient.METAFOLDER_NAME);
     try {
       if (storage.exists(new StoragePath(metaPath, 
HoodieTableConfig.HOODIE_PROPERTIES_FILE))) {
-        return Option.of(new HoodieTableConfig(storage, metaPath.toString(), 
null, null));
+        return Option.of(new HoodieTableConfig(storage, metaPath, null, null));
       }
     } catch (IOException e) {
       throw new HoodieIOException("Get table config error", e);
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/bucket/ITTestBucketStreamWrite.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/bucket/ITTestBucketStreamWrite.java
index c5875acd87b..96cc3569e83 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/bucket/ITTestBucketStreamWrite.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/bucket/ITTestBucketStreamWrite.java
@@ -109,7 +109,7 @@ public class ITTestBucketStreamWrite {
 
     // delete successful commit to simulate an unsuccessful write
     HoodieStorage storage = metaClient.getStorage();
-    StoragePath path = new StoragePath(metaClient.getMetaPath() + 
StoragePath.SEPARATOR + filename);
+    StoragePath path = new StoragePath(metaClient.getMetaPath(), filename);
     storage.deleteDirectory(path);
 
     commitMetadata.getFileIdAndRelativePaths().forEach((fileId, relativePath) 
-> {
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java
index 4126e06e07a..4dbc71dca7a 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java
@@ -1143,7 +1143,7 @@ public class TestInputFormat {
     assertTrue(firstCommit.isPresent());
     assertThat(firstCommit.get().getAction(), 
is(HoodieTimeline.DELTA_COMMIT_ACTION));
 
-    java.nio.file.Path metaFilePath = Paths.get(metaClient.getMetaPath(), 
firstCommit.get().getFileName());
+    java.nio.file.Path metaFilePath = 
Paths.get(metaClient.getMetaPath().toString(), firstCommit.get().getFileName());
     String newCompletionTime = 
TestUtils.amendCompletionTimeToLatest(metaClient, metaFilePath, 
firstCommit.get().getTimestamp());
 
     InputFormat<RowData, ?> inputFormat = 
this.tableSource.getInputFormat(true);
diff --git 
a/hudi-hadoop-common/src/main/java/org/apache/hudi/common/config/DFSPropertiesConfiguration.java
 
b/hudi-hadoop-common/src/main/java/org/apache/hudi/common/config/DFSPropertiesConfiguration.java
index 662c2ffe35a..2e3f546debe 100644
--- 
a/hudi-hadoop-common/src/main/java/org/apache/hudi/common/config/DFSPropertiesConfiguration.java
+++ 
b/hudi-hadoop-common/src/main/java/org/apache/hudi/common/config/DFSPropertiesConfiguration.java
@@ -64,7 +64,7 @@ public class DFSPropertiesConfiguration extends 
PropertiesConfig {
   public static final String CONF_FILE_DIR_ENV_NAME = "HUDI_CONF_DIR";
   public static final String DEFAULT_CONF_FILE_DIR = "file:/etc/hudi/conf";
   public static final StoragePath DEFAULT_PATH = new StoragePath(
-      DEFAULT_CONF_FILE_DIR + "/" + DEFAULT_PROPERTIES_FILE);
+      DEFAULT_CONF_FILE_DIR, DEFAULT_PROPERTIES_FILE);
 
   // props read from hudi-defaults.conf
   private static TypedProperties GLOBAL_PROPS = loadGlobalProps();
diff --git 
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/bootstrap/index/TestBootstrapIndex.java
 
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/bootstrap/index/TestBootstrapIndex.java
index a9f19c7ee01..7cf65ce1caa 100644
--- 
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/bootstrap/index/TestBootstrapIndex.java
+++ 
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/bootstrap/index/TestBootstrapIndex.java
@@ -30,7 +30,6 @@ import org.apache.hudi.common.table.HoodieTableConfig;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.testutils.HoodieCommonTestHarness;
 import org.apache.hudi.common.util.collection.Pair;
-import org.apache.hudi.storage.StoragePath;
 
 import org.apache.hadoop.fs.permission.FsAction;
 import org.junit.jupiter.api.AfterEach;
@@ -100,7 +99,7 @@ public class TestBootstrapIndex extends 
HoodieCommonTestHarness {
     props.put(HoodieTableConfig.BOOTSTRAP_INDEX_ENABLE.key(), "false");
     Properties properties = new Properties();
     properties.putAll(props);
-    HoodieTableConfig.create(metaClient.getStorage(), new 
StoragePath(metaClient.getMetaPath()), properties);
+    HoodieTableConfig.create(metaClient.getStorage(), 
metaClient.getMetaPath(), properties);
 
     metaClient = createMetaClient(metaClient.getStorageConf().newInstance(), 
basePath);
     BootstrapIndex bootstrapIndex = 
BootstrapIndex.getBootstrapIndex(metaClient);
diff --git 
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/fs/TestFSUtilsWithRetryWrapperEnable.java
 
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/fs/TestFSUtilsWithRetryWrapperEnable.java
index 2093e658c4e..7eb2901c1d3 100644
--- 
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/fs/TestFSUtilsWithRetryWrapperEnable.java
+++ 
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/fs/TestFSUtilsWithRetryWrapperEnable.java
@@ -70,7 +70,7 @@ public class TestFSUtilsWithRetryWrapperEnable extends 
TestFSUtils {
     initialRetryIntervalMs = fileSystemRetryConfig.getInitialRetryIntervalMs();
 
     FakeRemoteFileSystem fakeFs = new FakeRemoteFileSystem(
-        HadoopFSUtils.getFs(metaClient.getMetaPath(), 
metaClient.getStorageConf()), 2);
+        HadoopFSUtils.getFs(metaClient.getMetaPath().toString(), 
metaClient.getStorageConf()), 2);
     FileSystem fileSystem =
         new HoodieRetryWrapperFileSystem(fakeFs, maxRetryIntervalMs, 
maxRetryNumbers,
             initialRetryIntervalMs, "");
@@ -85,7 +85,7 @@ public class TestFSUtilsWithRetryWrapperEnable extends 
TestFSUtils {
   @Test
   public void testProcessFilesWithExceptions() throws Exception {
     FakeRemoteFileSystem fakeFs = new FakeRemoteFileSystem(
-        HadoopFSUtils.getFs(metaClient.getMetaPath(), 
metaClient.getStorageConf()), 100);
+        HadoopFSUtils.getFs(metaClient.getMetaPath().toString(), 
metaClient.getStorageConf()), 100);
     FileSystem fileSystem =
         new HoodieRetryWrapperFileSystem(fakeFs, maxRetryIntervalMs, 
maxRetryNumbers,
             initialRetryIntervalMs, "");
@@ -102,7 +102,7 @@ public class TestFSUtilsWithRetryWrapperEnable extends 
TestFSUtils {
   @Test
   public void testGetSchema() {
     FakeRemoteFileSystem fakeFs = new FakeRemoteFileSystem(
-        HadoopFSUtils.getFs(metaClient.getMetaPath(), 
metaClient.getStorageConf()), 100);
+        HadoopFSUtils.getFs(metaClient.getMetaPath().toString(), 
metaClient.getStorageConf()), 100);
     FileSystem fileSystem =
         new HoodieRetryWrapperFileSystem(fakeFs, maxRetryIntervalMs, 
maxRetryNumbers,
             initialRetryIntervalMs, "");
@@ -114,7 +114,7 @@ public class TestFSUtilsWithRetryWrapperEnable extends 
TestFSUtils {
   @Test
   public void testGetDefaultReplication() {
     FakeRemoteFileSystem fakeFs = new FakeRemoteFileSystem(
-        HadoopFSUtils.getFs(metaClient.getMetaPath(), 
metaClient.getStorageConf()), 100);
+        HadoopFSUtils.getFs(metaClient.getMetaPath().toString(), 
metaClient.getStorageConf()), 100);
     FileSystem fileSystem =
         new HoodieRetryWrapperFileSystem(fakeFs, maxRetryIntervalMs, 
maxRetryNumbers,
             initialRetryIntervalMs, "");
diff --git 
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/TestHoodieTableConfig.java
 
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/TestHoodieTableConfig.java
index 70cb9eea6d5..9e5ad70fdca 100644
--- 
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/TestHoodieTableConfig.java
+++ 
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/TestHoodieTableConfig.java
@@ -80,7 +80,7 @@ public class TestHoodieTableConfig extends 
HoodieCommonTestHarness {
   public void testCreate() throws IOException {
     assertTrue(
         storage.exists(new StoragePath(metaPath, 
HoodieTableConfig.HOODIE_PROPERTIES_FILE)));
-    HoodieTableConfig config = new HoodieTableConfig(storage, 
metaPath.toString(), null, null);
+    HoodieTableConfig config = new HoodieTableConfig(storage, metaPath, null, 
null);
     assertEquals(6, config.getProps().size());
   }
 
@@ -93,7 +93,7 @@ public class TestHoodieTableConfig extends 
HoodieCommonTestHarness {
 
     assertTrue(storage.exists(cfgPath));
     assertFalse(storage.exists(backupCfgPath));
-    HoodieTableConfig config = new HoodieTableConfig(storage, 
metaPath.toString(), null, null);
+    HoodieTableConfig config = new HoodieTableConfig(storage, metaPath, null, 
null);
     assertEquals(7, config.getProps().size());
     assertEquals("test-table2", config.getTableName());
     assertEquals("new_field", config.getPreCombineField());
@@ -107,7 +107,7 @@ public class TestHoodieTableConfig extends 
HoodieCommonTestHarness {
 
     assertTrue(storage.exists(cfgPath));
     assertFalse(storage.exists(backupCfgPath));
-    HoodieTableConfig config = new HoodieTableConfig(storage, 
metaPath.toString(), null, null);
+    HoodieTableConfig config = new HoodieTableConfig(storage, metaPath, null, 
null);
     assertEquals(5, config.getProps().size());
     assertNull(config.getProps().getProperty("hoodie.invalid.config"));
     
assertFalse(config.getProps().contains(HoodieTableConfig.ARCHIVELOG_FOLDER.key()));
@@ -117,13 +117,13 @@ public class TestHoodieTableConfig extends 
HoodieCommonTestHarness {
   public void testReadsWhenPropsFileDoesNotExist() throws IOException {
     storage.deleteFile(cfgPath);
     assertThrows(HoodieIOException.class, () -> {
-      new HoodieTableConfig(storage, metaPath.toString(), null, null);
+      new HoodieTableConfig(storage, metaPath, null, null);
     });
   }
 
   @Test
   public void testReadsWithUpdateFailures() throws IOException {
-    HoodieTableConfig config = new HoodieTableConfig(storage, 
metaPath.toString(), null, null);
+    HoodieTableConfig config = new HoodieTableConfig(storage, metaPath, null, 
null);
     storage.deleteFile(cfgPath);
     try (OutputStream out = storage.create(backupCfgPath)) {
       config.getProps().store(out, "");
@@ -131,14 +131,14 @@ public class TestHoodieTableConfig extends 
HoodieCommonTestHarness {
 
     assertFalse(storage.exists(cfgPath));
     assertTrue(storage.exists(backupCfgPath));
-    config = new HoodieTableConfig(storage, metaPath.toString(), null, null);
+    config = new HoodieTableConfig(storage, metaPath, null, null);
     assertEquals(6, config.getProps().size());
   }
 
   @ParameterizedTest
   @ValueSource(booleans = {true, false})
   public void testUpdateRecovery(boolean shouldPropsFileExist) throws 
IOException {
-    HoodieTableConfig config = new HoodieTableConfig(storage, 
metaPath.toString(), null, null);
+    HoodieTableConfig config = new HoodieTableConfig(storage, metaPath, null, 
null);
     if (!shouldPropsFileExist) {
       storage.deleteFile(cfgPath);
     }
@@ -149,7 +149,7 @@ public class TestHoodieTableConfig extends 
HoodieCommonTestHarness {
     recoverIfNeeded(storage, cfgPath, backupCfgPath);
     assertTrue(storage.exists(cfgPath));
     assertFalse(storage.exists(backupCfgPath));
-    config = new HoodieTableConfig(storage, metaPath.toString(), null, null);
+    config = new HoodieTableConfig(storage, metaPath, null, null);
     assertEquals(6, config.getProps().size());
   }
 
@@ -157,11 +157,11 @@ public class TestHoodieTableConfig extends 
HoodieCommonTestHarness {
   public void testReadRetry() throws IOException {
     // When both the hoodie.properties and hoodie.properties.backup do not 
exist then the read fails
     storage.rename(cfgPath, new StoragePath(cfgPath.toString() + ".bak"));
-    assertThrows(HoodieIOException.class, () -> new HoodieTableConfig(storage, 
metaPath.toString(), null, null));
+    assertThrows(HoodieIOException.class, () -> new HoodieTableConfig(storage, 
metaPath, null, null));
 
     // Should return the backup config if hoodie.properties is not present
     storage.rename(new StoragePath(cfgPath.toString() + ".bak"), 
backupCfgPath);
-    new HoodieTableConfig(storage, metaPath.toString(), null, null);
+    new HoodieTableConfig(storage, metaPath, null, null);
 
     // Should return backup config if hoodie.properties is corrupted
     Properties props = new Properties();
@@ -169,14 +169,14 @@ public class TestHoodieTableConfig extends 
HoodieCommonTestHarness {
     try (OutputStream out = storage.create(cfgPath)) {
       props.store(out, "Wrong checksum in file so is invalid");
     }
-    new HoodieTableConfig(storage, metaPath.toString(), null, null);
+    new HoodieTableConfig(storage, metaPath, null, null);
 
     // Should throw exception if both hoodie.properties and backup are 
corrupted
     try (OutputStream out = storage.create(backupCfgPath)) {
       props.store(out, "Wrong checksum in file so is invalid");
     }
     assertThrows(IllegalArgumentException.class, () -> new 
HoodieTableConfig(storage,
-        metaPath.toString(), null, null));
+        metaPath, null, null));
   }
 
   @Test
@@ -194,7 +194,7 @@ public class TestHoodieTableConfig extends 
HoodieCommonTestHarness {
     Future readerFuture = executor.submit(() -> {
       for (int i = 0; i < 100; i++) {
         // Try to load the table properties, won't throw any exception
-        new HoodieTableConfig(storage, metaPath.toString(), null, null);
+        new HoodieTableConfig(storage, metaPath, null, null);
       }
     });
 
diff --git 
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/TestHoodieTableMetaClient.java
 
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/TestHoodieTableMetaClient.java
index bed20a7fa83..9f3760b70d9 100644
--- 
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/TestHoodieTableMetaClient.java
+++ 
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/TestHoodieTableMetaClient.java
@@ -59,7 +59,7 @@ public class TestHoodieTableMetaClient extends 
HoodieCommonTestHarness {
     assertEquals(HoodieTestUtils.RAW_TRIPS_TEST_NAME, 
metaClient.getTableConfig().getTableName(),
         "Table name should be raw_trips");
     assertEquals(basePath, metaClient.getBasePath(), "Basepath should be the 
one assigned");
-    assertEquals(basePath + "/.hoodie", metaClient.getMetaPath(),
+    assertEquals(basePath + "/.hoodie", metaClient.getMetaPath().toString(),
         "Metapath should be ${basepath}/.hoodie");
     
assertTrue(metaClient.getTableConfig().getProps().containsKey(HoodieTableConfig.TABLE_CHECKSUM.key()));
     
assertTrue(HoodieTableConfig.validateChecksum(metaClient.getTableConfig().getProps()));
diff --git 
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/view/TestHoodieTableFileSystemView.java
 
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/view/TestHoodieTableFileSystemView.java
index a288acf5a31..1e1d2bfee0b 100644
--- 
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/view/TestHoodieTableFileSystemView.java
+++ 
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/view/TestHoodieTableFileSystemView.java
@@ -202,7 +202,7 @@ public class TestHoodieTableFileSystemView extends 
HoodieCommonTestHarness {
 
     // Now create a scenario where archiving deleted replace commits 
(requested,inflight and replacecommit)
     StoragePath completeInstantPath = HoodieTestUtils.getCompleteInstantPath(
-        metaClient.getStorage(), new StoragePath(metaClient.getMetaPath()),
+        metaClient.getStorage(), metaClient.getMetaPath(),
         clusteringInstantTime3,
         HoodieTimeline.REPLACE_COMMIT_ACTION);
 
@@ -2291,13 +2291,13 @@ public class TestHoodieTableFileSystemView extends 
HoodieCommonTestHarness {
 
     HoodieStorage storage = metaClient.getStorage();
     StoragePath instantPath1 = HoodieTestUtils
-        .getCompleteInstantPath(storage, new 
StoragePath(metaClient.getMetaPath()), "1",
+        .getCompleteInstantPath(storage, metaClient.getMetaPath(), "1",
             HoodieTimeline.COMMIT_ACTION);
     storage.deleteFile(instantPath1);
     storage.deleteFile(new StoragePath(basePath + "/.hoodie", "1.inflight"));
     storage.deleteFile(new StoragePath(basePath + "/.hoodie", 
"1.commit.requested"));
     StoragePath instantPath2 = HoodieTestUtils
-        .getCompleteInstantPath(storage, new 
StoragePath(metaClient.getMetaPath()), "2",
+        .getCompleteInstantPath(storage, metaClient.getMetaPath(), "2",
             HoodieTimeline.REPLACE_COMMIT_ACTION);
     storage.deleteFile(instantPath2);
 
diff --git 
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/view/TestIncrementalFSViewSync.java
 
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/view/TestIncrementalFSViewSync.java
index 2ac6c3e93bd..11d552a1e1e 100644
--- 
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/view/TestIncrementalFSViewSync.java
+++ 
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/view/TestIncrementalFSViewSync.java
@@ -678,7 +678,7 @@ public class TestIncrementalFSViewSync extends 
HoodieCommonTestHarness {
     }
     StoragePath instantPath = HoodieTestUtils
         .getCompleteInstantPath(metaClient.getStorage(),
-            new StoragePath(metaClient.getMetaPath()),
+            metaClient.getMetaPath(),
             instant.getTimestamp(), instant.getAction());
     boolean deleted = metaClient.getStorage().deleteFile(instantPath);
     assertTrue(deleted);
diff --git 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieCopyOnWriteTableInputFormat.java
 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieCopyOnWriteTableInputFormat.java
index d31d6935d74..71232e509b5 100644
--- 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieCopyOnWriteTableInputFormat.java
+++ 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieCopyOnWriteTableInputFormat.java
@@ -35,7 +35,6 @@ import org.apache.hudi.hadoop.fs.HadoopFSUtils;
 import org.apache.hudi.hadoop.utils.HoodieHiveUtils;
 import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils;
 import org.apache.hudi.metadata.HoodieTableMetadataUtil;
-import org.apache.hudi.storage.StoragePath;
 
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
@@ -66,6 +65,7 @@ import java.util.Properties;
 import java.util.stream.Collectors;
 
 import static org.apache.hudi.common.config.HoodieMetadataConfig.ENABLE;
+import static org.apache.hudi.hadoop.fs.HadoopFSUtils.convertToStoragePath;
 
 /**
  * Base implementation of the Hive's {@link FileInputFormat} allowing for 
reading of Hudi's
@@ -255,7 +255,7 @@ public class HoodieCopyOnWriteTableInputFormat extends 
HoodieTableInputFormat {
                 tableMetaClient,
                 props,
                 HoodieTableQueryType.SNAPSHOT,
-                partitionPaths.stream().map(e -> new 
StoragePath(e.toUri())).collect(Collectors.toList()),
+                
partitionPaths.stream().map(HadoopFSUtils::convertToStoragePath).collect(Collectors.toList()),
                 queryCommitInstant,
                 shouldIncludePendingCommits);
 
diff --git 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieHFileRecordReader.java
 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieHFileRecordReader.java
index 4110f47385b..97177ab260d 100644
--- 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieHFileRecordReader.java
+++ 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieHFileRecordReader.java
@@ -43,6 +43,7 @@ import org.apache.hadoop.mapred.RecordReader;
 import java.io.IOException;
 
 import static org.apache.hudi.common.util.ConfigUtils.getReaderConfigs;
+import static org.apache.hudi.hadoop.fs.HadoopFSUtils.convertToStoragePath;
 
 public class HoodieHFileRecordReader implements RecordReader<NullWritable, 
ArrayWritable> {
 
@@ -54,7 +55,7 @@ public class HoodieHFileRecordReader implements 
RecordReader<NullWritable, Array
 
   public HoodieHFileRecordReader(Configuration conf, InputSplit split, JobConf 
job) throws IOException {
     FileSplit fileSplit = (FileSplit) split;
-    StoragePath path = new StoragePath(fileSplit.getPath().toUri());
+    StoragePath path = convertToStoragePath(fileSplit.getPath());
     HoodieConfig hoodieConfig = 
getReaderConfigs(HadoopFSUtils.getStorageConf(conf));
     reader = 
HoodieFileReaderFactory.getReaderFactory(HoodieRecord.HoodieRecordType.AVRO)
         .getFileReader(hoodieConfig, HadoopFSUtils.getStorageConf(conf), path, 
HoodieFileFormat.HFILE, Option.empty());
diff --git 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieROTablePathFilter.java
 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieROTablePathFilter.java
index d6a62f3a061..51d8a9f3af4 100644
--- 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieROTablePathFilter.java
+++ 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieROTablePathFilter.java
@@ -53,6 +53,7 @@ import java.util.stream.Collectors;
 import static org.apache.hudi.common.config.HoodieCommonConfig.TIMESTAMP_AS_OF;
 import static 
org.apache.hudi.common.table.timeline.TimelineUtils.validateTimestampAsOf;
 import static org.apache.hudi.common.util.StringUtils.nonEmpty;
+import static org.apache.hudi.hadoop.fs.HadoopFSUtils.convertToStoragePath;
 
 /**
  * Given a path is a part of - Hoodie table = accepts ONLY the latest version 
of each path - Non-Hoodie table = then
@@ -133,7 +134,7 @@ public class HoodieROTablePathFilter implements 
Configurable, PathFilter, Serial
     try {
       if (storage == null) {
         storage =
-            HoodieStorageUtils.getStorage(new StoragePath(path.toUri()), conf);
+            HoodieStorageUtils.getStorage(convertToStoragePath(path), conf);
       }
 
       // Assumes path is a file
@@ -166,8 +167,9 @@ public class HoodieROTablePathFilter implements 
Configurable, PathFilter, Serial
 
       // Perform actual checking.
       Path baseDir;
-      if (HoodiePartitionMetadata.hasPartitionMetadata(storage, new 
StoragePath(folder.toUri()))) {
-        HoodiePartitionMetadata metadata = new 
HoodiePartitionMetadata(storage, new StoragePath(folder.toUri()));
+      StoragePath storagePath = convertToStoragePath(folder);
+      if (HoodiePartitionMetadata.hasPartitionMetadata(storage, storagePath)) {
+        HoodiePartitionMetadata metadata = new 
HoodiePartitionMetadata(storage, storagePath);
         metadata.readFromFS();
         baseDir = HoodieHiveUtils.getNthParent(folder, 
metadata.getPartitionDepth());
       } else {
diff --git 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/SchemaEvolutionContext.java
 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/SchemaEvolutionContext.java
index 454aa519bd5..79829cc3917 100644
--- 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/SchemaEvolutionContext.java
+++ 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/SchemaEvolutionContext.java
@@ -71,6 +71,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
 
+import static org.apache.hudi.hadoop.fs.HadoopFSUtils.convertToStoragePath;
+
 /**
  * This class is responsible for calculating names and types of fields that 
are actual at a certain point in time for hive.
  * If field is renamed in queried schema, its old name will be returned, which 
is relevant at the provided time.
@@ -114,10 +116,9 @@ public class SchemaEvolutionContext {
   private HoodieTableMetaClient setUpHoodieTableMetaClient() throws 
IOException {
     try {
       Path inputPath = ((FileSplit) split).getPath();
-      StoragePath path = new StoragePath(inputPath.toString());
       FileSystem fs = inputPath.getFileSystem(job);
       HoodieStorage storage = HoodieStorageUtils.getStorage(fs);
-      Option<StoragePath> tablePath = TablePathUtils.getTablePath(storage, 
path);
+      Option<StoragePath> tablePath = TablePathUtils.getTablePath(storage, 
convertToStoragePath(inputPath));
       return 
HoodieTableMetaClient.builder().setBasePath(tablePath.get().toString())
           .setConf(HadoopFSUtils.getStorageConfWithCopy(job)).build();
     } catch (Exception e) {
diff --git 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieMergeOnReadTableInputFormat.java
 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieMergeOnReadTableInputFormat.java
index bf4dfaa0476..28b27aebe4f 100644
--- 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieMergeOnReadTableInputFormat.java
+++ 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieMergeOnReadTableInputFormat.java
@@ -43,7 +43,6 @@ import org.apache.hudi.hadoop.fs.HadoopFSUtils;
 import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils;
 import org.apache.hudi.hadoop.utils.HoodieRealtimeInputFormatUtils;
 import org.apache.hudi.metadata.HoodieTableMetadataUtil;
-import org.apache.hudi.storage.StoragePath;
 import org.apache.hudi.storage.StoragePathInfo;
 
 import org.apache.avro.Schema;
@@ -191,7 +190,7 @@ public class HoodieMergeOnReadTableInputFormat extends 
HoodieCopyOnWriteTableInp
 
     // build fileGroup from fsView
     List<StoragePathInfo> affectedPathInfoList = HoodieInputFormatUtils
-        .listAffectedFilesForCommits(job, new 
StoragePath(tableMetaClient.getBasePath()),
+        .listAffectedFilesForCommits(job, tableMetaClient.getBasePathV2(),
             metadataList);
     // step3
     HoodieTableFileSystemView fsView = new HoodieTableFileSystemView(
diff --git 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieInputFormatUtils.java
 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieInputFormatUtils.java
index 9db661daf81..6945b241e0a 100644
--- 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieInputFormatUtils.java
+++ 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieInputFormatUtils.java
@@ -82,6 +82,7 @@ import static 
org.apache.hudi.common.config.HoodieMetadataConfig.DEFAULT_METADAT
 import static org.apache.hudi.common.config.HoodieMetadataConfig.ENABLE;
 import static 
org.apache.hudi.common.table.HoodieTableMetaClient.METAFOLDER_NAME;
 import static 
org.apache.hudi.common.table.timeline.TimelineUtils.handleHollowCommitIfNeeded;
+import static org.apache.hudi.hadoop.fs.HadoopFSUtils.convertToStoragePath;
 
 public class HoodieInputFormatUtils {
 
@@ -360,14 +361,15 @@ public class HoodieInputFormatUtils {
     Path baseDir = partitionPath;
     HoodieStorage storage = HoodieStorageUtils.getStorage(
         partitionPath.toString(), HadoopFSUtils.getStorageConf(conf));
-    if (HoodiePartitionMetadata.hasPartitionMetadata(storage, new 
StoragePath(partitionPath.toUri()))) {
-      HoodiePartitionMetadata metadata = new HoodiePartitionMetadata(storage, 
new StoragePath(partitionPath.toUri()));
+    StoragePath partitionStoragePath = convertToStoragePath(partitionPath);
+    if (HoodiePartitionMetadata.hasPartitionMetadata(storage,  
partitionStoragePath)) {
+      HoodiePartitionMetadata metadata = new HoodiePartitionMetadata(storage, 
partitionStoragePath);
       metadata.readFromFS();
       int levels = metadata.getPartitionDepth();
       baseDir = HoodieHiveUtils.getNthParent(partitionPath, levels);
     } else {
       for (int i = 0; i < partitionPath.depth(); i++) {
-        if (storage.exists(new StoragePath(new StoragePath(baseDir.toUri()), 
METAFOLDER_NAME))) {
+        if (storage.exists(new StoragePath(convertToStoragePath(baseDir), 
METAFOLDER_NAME))) {
           break;
         } else if (i == partitionPath.depth() - 1) {
           throw new TableNotFoundException(partitionPath.toString());
diff --git 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeRecordReaderUtils.java
 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeRecordReaderUtils.java
index 84c28106ef8..a66f3264e33 100644
--- 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeRecordReaderUtils.java
+++ 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeRecordReaderUtils.java
@@ -27,7 +27,6 @@ import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.hadoop.fs.HadoopFSUtils;
 import org.apache.hudi.io.storage.HoodieFileReader;
 import org.apache.hudi.io.storage.HoodieFileReaderFactory;
-import org.apache.hudi.storage.StoragePath;
 
 import org.apache.avro.JsonProperties;
 import org.apache.avro.LogicalType;
@@ -68,6 +67,7 @@ import java.util.stream.Collectors;
 import static org.apache.hudi.avro.AvroSchemaUtils.appendFieldsToSchema;
 import static org.apache.hudi.avro.AvroSchemaUtils.createNullableSchema;
 import static org.apache.hudi.common.util.ConfigUtils.getReaderConfigs;
+import static org.apache.hudi.hadoop.fs.HadoopFSUtils.convertToStoragePath;
 
 public class HoodieRealtimeRecordReaderUtils {
   private static final Logger LOG = 
LoggerFactory.getLogger(HoodieRealtimeRecordReaderUtils.class);
@@ -311,7 +311,7 @@ public class HoodieRealtimeRecordReaderUtils {
   public static HoodieFileReader getBaseFileReader(Path path, JobConf conf) 
throws IOException {
     HoodieConfig hoodieConfig = 
getReaderConfigs(HadoopFSUtils.getStorageConf(conf));
     return 
HoodieFileReaderFactory.getReaderFactory(HoodieRecord.HoodieRecordType.AVRO)
-        .getFileReader(hoodieConfig, HadoopFSUtils.getStorageConf(conf), new 
StoragePath(path.toUri()));
+        .getFileReader(hoodieConfig, HadoopFSUtils.getStorageConf(conf), 
convertToStoragePath(path));
   }
 
   private static Schema appendNullSchemaFields(Schema schema, List<String> 
newFieldNames) {
diff --git 
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/reader/DFSHoodieDatasetInputReader.java
 
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/reader/DFSHoodieDatasetInputReader.java
index cb54219593b..aa2e277edc9 100644
--- 
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/reader/DFSHoodieDatasetInputReader.java
+++ 
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/reader/DFSHoodieDatasetInputReader.java
@@ -44,7 +44,6 @@ import 
org.apache.hudi.common.util.collection.CloseableMappingIterator;
 import org.apache.hudi.hadoop.fs.HadoopFSUtils;
 import org.apache.hudi.io.storage.HoodieAvroFileReader;
 import org.apache.hudi.io.storage.HoodieFileReaderFactory;
-import org.apache.hudi.storage.StoragePath;
 
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericRecord;
@@ -279,7 +278,7 @@ public class DFSHoodieDatasetInputReader extends 
DFSDeltaInputReader {
           .getFileReader(
               DEFAULT_HUDI_CONFIG_FOR_READER,
               metaClient.getStorageConf(),
-              new StoragePath(fileSlice.getBaseFile().get().getPath())));
+              fileSlice.getBaseFile().get().getStoragePath()));
       return new CloseableMappingIterator<>(reader.getRecordIterator(schema), 
HoodieRecord::getData);
     } else {
       // If there is no data file, fall back to reading log files
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala
index ead5d721710..10685b624bc 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala
@@ -46,12 +46,12 @@ import 
org.apache.hudi.internal.schema.utils.{InternalSchemaUtils, SerDeHelper}
 import org.apache.hudi.io.storage.HoodieFileReaderFactory
 import org.apache.hudi.metadata.HoodieTableMetadata
 import org.apache.hudi.storage.{StoragePath, StoragePathInfo}
-
 import org.apache.avro.Schema
 import org.apache.avro.generic.GenericRecord
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{FileStatus, Path}
 import org.apache.hadoop.mapred.JobConf
+import org.apache.hudi.hadoop.fs.HadoopFSUtils.convertToStoragePath
 import org.apache.spark.execution.datasources.HoodieInMemoryFileIndex
 import org.apache.spark.internal.Logging
 import org.apache.spark.rdd.RDD
@@ -429,7 +429,7 @@ abstract class HoodieBaseRelation(val sqlContext: 
SQLContext,
             .asJava)
 
         fsView.getPartitionPaths.asScala.flatMap { partitionPath =>
-          val relativePath = getRelativePartitionPath(new 
StoragePath(basePath.toUri), partitionPath)
+          val relativePath = 
getRelativePartitionPath(convertToStoragePath(basePath), partitionPath)
           fsView.getLatestMergedFileSlicesBeforeOrOn(relativePath, 
ts).iterator().asScala
         }.toSeq
 
@@ -491,14 +491,15 @@ abstract class HoodieBaseRelation(val sqlContext: 
SQLContext,
   protected def getPartitionColumnsAsInternalRowInternal(file: 
StoragePathInfo, basePath: Path,
                                                          
extractPartitionValuesFromPartitionPath: Boolean): InternalRow = {
     if (extractPartitionValuesFromPartitionPath) {
-      val tablePathWithoutScheme = new 
StoragePath(basePath.toUri).getPathWithoutSchemeAndAuthority
-      val partitionPathWithoutScheme = new 
StoragePath(file.getPath.getParent.toUri).getPathWithoutSchemeAndAuthority
+      val baseStoragePath = convertToStoragePath(basePath)
+      val tablePathWithoutScheme = 
baseStoragePath.getPathWithoutSchemeAndAuthority
+      val partitionPathWithoutScheme = 
file.getPath.getParent.getPathWithoutSchemeAndAuthority
       val relativePath = 
tablePathWithoutScheme.toUri.relativize(partitionPathWithoutScheme.toUri).toString
       val timeZoneId = conf.get("timeZone", 
sparkSession.sessionState.conf.sessionLocalTimeZone)
       val rowValues = HoodieSparkUtils.parsePartitionColumnValues(
         partitionColumns,
         relativePath,
-        new StoragePath(basePath.toUri),
+        baseStoragePath,
         tableStructSchema,
         timeZoneId,
         sparkAdapter.getSparkParsePartitionUtil,
diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/DedupeSparkJob.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/DedupeSparkJob.scala
index 3a498d98a96..761f2ae49b9 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/DedupeSparkJob.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/DedupeSparkJob.scala
@@ -27,6 +27,7 @@ import org.apache.hudi.storage.{HoodieStorage, 
StorageConfiguration, StoragePath
 
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{FileSystem, Path}
+import org.apache.hudi.hadoop.fs.HadoopFSUtils.convertToStoragePath
 import org.apache.spark.sql.{DataFrame, Row, SQLContext}
 import org.slf4j.LoggerFactory
 
@@ -80,7 +81,7 @@ class DedupeSparkJob(basePath: String,
       .setConf(storage.getConf.newInstance())
       .setBasePath(basePath).build()
 
-    val allFiles = storage.listDirectEntries(new 
StoragePath(s"$basePath/$duplicatedPartitionPath"))
+    val allFiles = storage.listDirectEntries(new StoragePath(basePath, 
duplicatedPartitionPath))
     val fsView = new HoodieTableFileSystemView(metadata, 
metadata.getActiveTimeline.getCommitsTimeline.filterCompletedInstants(), 
allFiles)
     val latestFiles: java.util.List[HoodieBaseFile] = 
fsView.getLatestBaseFiles().collect(Collectors.toList[HoodieBaseFile]())
     val filteredStatuses = latestFiles.asScala.map(f => f.getPath)
@@ -191,7 +192,7 @@ class DedupeSparkJob(basePath: String,
       .setConf(storage.getConf.newInstance())
       .setBasePath(basePath).build()
 
-    val allFiles = storage.listDirectEntries(new 
StoragePath(s"$basePath/$duplicatedPartitionPath"))
+    val allFiles = storage.listDirectEntries(new StoragePath(basePath, 
duplicatedPartitionPath))
     val fsView = new HoodieTableFileSystemView(metadata, 
metadata.getActiveTimeline.getCommitsTimeline.filterCompletedInstants(), 
allFiles)
 
     val latestFiles: java.util.List[HoodieBaseFile] = 
fsView.getLatestBaseFiles().collect(Collectors.toList[HoodieBaseFile]())
@@ -204,8 +205,8 @@ class DedupeSparkJob(basePath: String,
       val badSuffix = if (dupeFixPlan.contains(fileName)) ".bad" else ""
       val dstPath = new 
Path(s"$repairOutputPath/${filePath.getName}$badSuffix")
       LOG.info(s"Copying from $filePath to $dstPath")
-      FileIOUtils.copy(storage, new StoragePath(filePath.toUri), storage,
-        new StoragePath(dstPath.toUri), false, true)
+      FileIOUtils.copy(storage, convertToStoragePath(filePath), storage,
+        convertToStoragePath(dstPath), false, true)
     }
 
     // 2. Remove duplicates from the bad files
@@ -216,7 +217,7 @@ class DedupeSparkJob(basePath: String,
       LOG.info(" Skipping and writing new file for : " + fileName)
       SparkHelpers.skipKeysAndWriteNewFile(instantTime,
         storage.getConf.asInstanceOf[StorageConfiguration[Configuration]], 
storage, badFilePath, newFilePath, dupeFixPlan(fileName))
-      storage.deleteFile(new StoragePath(badFilePath.toUri))
+      storage.deleteFile(badFilePath)
     }
 
     // 3. Check that there are no duplicates anymore.
@@ -249,8 +250,8 @@ class DedupeSparkJob(basePath: String,
       } else {
         // for real
         LOG.info(s"[FOR REAL!!!] Copying from $srcPath to $dstPath")
-        FileIOUtils.copy(storage, new StoragePath(srcPath.toUri), storage,
-          new StoragePath(dstPath.toUri), false, true)
+        FileIOUtils.copy(storage, convertToStoragePath(srcPath), storage,
+          convertToStoragePath(dstPath), false, true)
       }
     }
   }
diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ExportInstantsProcedure.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ExportInstantsProcedure.scala
index 1b47ad02a5f..3992e43d1d2 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ExportInstantsProcedure.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ExportInstantsProcedure.scala
@@ -33,6 +33,7 @@ import org.apache.hudi.storage.{HoodieStorage, 
HoodieStorageUtils, StoragePath}
 import org.apache.avro.generic.GenericRecord
 import org.apache.avro.specific.SpecificData
 import org.apache.hadoop.fs.{FileStatus, Path}
+import org.apache.hudi.hadoop.fs.HadoopFSUtils.convertToStoragePath
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.Row
 import org.apache.spark.sql.types.{DataTypes, Metadata, StructField, 
StructType}
@@ -119,7 +120,7 @@ class ExportInstantsProcedure extends BaseProcedure with 
ProcedureBuilder with L
     for (fs <- statuses.asScala) {
       // read the archived file
       val reader = HoodieLogFormat.newReader(
-        storage, new HoodieLogFile(new StoragePath(fs.getPath.toUri)), 
HoodieArchivedMetaEntry.getClassSchema)
+        storage, new HoodieLogFile(convertToStoragePath(fs.getPath)), 
HoodieArchivedMetaEntry.getClassSchema)
       // read the avro blocks
       while ( {
         reader.hasNext && copyCount < limit
diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RepairMigratePartitionMetaProcedure.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RepairMigratePartitionMetaProcedure.scala
index a3b64589b26..cfda38a446f 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RepairMigratePartitionMetaProcedure.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RepairMigratePartitionMetaProcedure.scala
@@ -99,7 +99,7 @@ class RepairMigratePartitionMetaProcedure extends 
BaseProcedure with ProcedureBu
     }
     val props: Properties = new Properties
     
props.setProperty(HoodieTableConfig.PARTITION_METAFILE_USE_BASE_FORMAT.key, 
"true")
-    HoodieTableConfig.update(metaClient.getStorage, new 
StoragePath(metaClient.getMetaPath), props)
+    HoodieTableConfig.update(metaClient.getStorage, metaClient.getMetaPath, 
props)
 
     rows.stream().toArray().map(r => r.asInstanceOf[Row]).toList
   }
diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RepairOverwriteHoodiePropsProcedure.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RepairOverwriteHoodiePropsProcedure.scala
index 07b4992dbc8..3273c737747 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RepairOverwriteHoodiePropsProcedure.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RepairOverwriteHoodiePropsProcedure.scala
@@ -17,10 +17,8 @@
 
 package org.apache.spark.sql.hudi.command.procedures
 
-import org.apache.hudi.common.table.HoodieTableMetaClient.METAFOLDER_NAME
 import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient}
 import org.apache.hudi.hadoop.fs.HadoopFSUtils
-import org.apache.hudi.storage.StoragePath
 
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.Path
@@ -70,8 +68,7 @@ class RepairOverwriteHoodiePropsProcedure extends 
BaseProcedure with ProcedureBu
     var newProps = new Properties
     loadNewProps(overwriteFilePath, newProps)
     val oldProps = metaClient.getTableConfig.propsMap
-    val metaPathDir = new StoragePath(tablePath, METAFOLDER_NAME)
-    HoodieTableConfig.create(metaClient.getStorage, metaPathDir, newProps)
+    HoodieTableConfig.create(metaClient.getStorage, metaClient.getMetaPath, 
newProps)
     // reload new props as checksum would have been added
     newProps = HoodieTableMetaClient.reload(metaClient).getTableConfig.getProps
 
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/TestSqlConf.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/TestSqlConf.scala
index a47b756c4b2..adce16e7193 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/TestSqlConf.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/TestSqlConf.scala
@@ -22,10 +22,8 @@ import 
org.apache.hudi.common.config.DFSPropertiesConfiguration
 import org.apache.hudi.common.model.HoodieTableType
 import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient}
 import org.apache.hudi.common.testutils.HoodieTestUtils
-import org.apache.hudi.storage.HoodieStorageUtils
+import org.apache.hudi.storage.{HoodieStorageUtils, StoragePath}
 import org.apache.hudi.testutils.HoodieClientTestUtils.createMetaClient
-
-import org.apache.hadoop.conf.Configuration
 import org.scalatest.BeforeAndAfter
 
 import java.io.File
@@ -83,7 +81,7 @@ class TestSqlConf extends HoodieSparkSqlTestBase with 
BeforeAndAfter {
       assertResult(true)(Files.exists(Paths.get(s"$tablePath/$partitionVal")))
       assertResult(HoodieTableType.MERGE_ON_READ)(new HoodieTableConfig(
         HoodieStorageUtils.getStorage(tablePath, 
HoodieTestUtils.getDefaultStorageConf),
-        s"$tablePath/" + HoodieTableMetaClient.METAFOLDER_NAME,
+        new StoragePath(tablePath, HoodieTableMetaClient.METAFOLDER_NAME),
         HoodieTableConfig.PAYLOAD_CLASS_NAME.defaultValue,
         HoodieTableConfig.RECORD_MERGER_STRATEGY.defaultValue).getTableType)
 
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestUpgradeOrDowngradeProcedure.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestUpgradeOrDowngradeProcedure.scala
index 70ca15f2315..bc6f266eb94 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestUpgradeOrDowngradeProcedure.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestUpgradeOrDowngradeProcedure.scala
@@ -113,7 +113,7 @@ class TestUpgradeOrDowngradeProcedure extends 
HoodieSparkProcedureTestBase {
       // delete checksum from hoodie.properties
       val props = ConfigUtils.fetchConfigs(
         storage,
-        metaPathDir.toString,
+        metaPathDir,
         HoodieTableConfig.HOODIE_PROPERTIES_FILE,
         HoodieTableConfig.HOODIE_PROPERTIES_FILE_BACKUP,
         1,
diff --git 
a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/marker/MarkerBasedEarlyConflictDetectionRunnable.java
 
b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/marker/MarkerBasedEarlyConflictDetectionRunnable.java
index 11213b56e26..bce28e8ae9c 100644
--- 
a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/marker/MarkerBasedEarlyConflictDetectionRunnable.java
+++ 
b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/marker/MarkerBasedEarlyConflictDetectionRunnable.java
@@ -88,7 +88,7 @@ public class MarkerBasedEarlyConflictDetectionRunnable 
implements Runnable {
       // and the markers from the requests pending processing.
       currentInstantAllMarkers.addAll(markerHandler.getAllMarkers(markerDir));
       currentInstantAllMarkers.addAll(pendingMarkers);
-      StoragePath tempPath = new StoragePath(basePath + StoragePath.SEPARATOR 
+ HoodieTableMetaClient.TEMPFOLDER_NAME);
+      StoragePath tempPath = new StoragePath(basePath, 
HoodieTableMetaClient.TEMPFOLDER_NAME);
 
       List<StoragePath> instants = MarkerUtils.getAllMarkerDir(tempPath, 
storage);
 

Reply via email to