This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch branch-0.x
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit 6a9b1d98c6b537aa449f7a6d3e573726f9f79e74
Author: Jon Vexler <[email protected]>
AuthorDate: Wed May 15 06:46:17 2024 -0700

    [HUDI-7743] Improve StoragePath usages (#11189)
    
    Co-authored-by: Jonathan Vexler <=>
    Co-authored-by: Y Ethan Guo <[email protected]>
---
 .../hudi/cli/commands/ArchivedCommitsCommand.java  | 19 ++++++++--------
 .../apache/hudi/cli/commands/RepairsCommand.java   | 11 ++++-----
 .../org/apache/hudi/cli/commands/TableCommand.java | 11 +++------
 .../apache/hudi/cli/commands/TimelineCommand.java  |  4 ++--
 .../apache/hudi/cli/commands/TestTableCommand.java |  4 ++--
 .../cli/commands/TestUpgradeDowngradeCommand.java  |  4 ++--
 .../hudi/client/heartbeat/HeartbeatUtils.java      |  2 +-
 .../client/heartbeat/HoodieHeartbeatClient.java    |  4 ++--
 .../index/bucket/ConsistentBucketIndexUtils.java   |  8 +++----
 .../org/apache/hudi/io/HoodieKeyLookupHandle.java  |  3 +--
 .../java/org/apache/hudi/io/HoodieReadHandle.java  |  5 ++---
 .../java/org/apache/hudi/io/HoodieWriteHandle.java |  2 +-
 .../metadata/HoodieBackedTableMetadataWriter.java  |  3 +--
 .../java/org/apache/hudi/table/HoodieTable.java    |  4 ++--
 .../table/action/commit/HoodieMergeHelper.java     |  3 +--
 .../table/action/index/RunIndexActionExecutor.java |  3 +--
 .../rollback/ListingBasedRollbackStrategy.java     |  4 ++--
 .../hudi/table/upgrade/UpgradeDowngrade.java       |  6 ++---
 .../table/upgrade/ZeroToOneUpgradeHandler.java     |  2 +-
 .../apache/hudi/io/FlinkWriteHandleFactory.java    |  4 +++-
 .../io/storage/row/HoodieRowDataCreateHandle.java  |  7 ++++--
 .../row/HoodieRowDataFileWriterFactory.java        |  4 ++--
 .../org/apache/hudi/table/HoodieJavaTable.java     |  5 ++---
 .../index/bloom/HoodieFileProbingFunction.java     |  3 +--
 .../org/apache/hudi/table/HoodieSparkTable.java    |  5 ++---
 .../functional/TestHoodieBackedMetadata.java       |  4 ++--
 .../TestHoodieSparkMergeOnReadTableRollback.java   |  4 ++--
 .../hudi/table/upgrade/TestUpgradeDowngrade.java   | 16 ++++++-------
 .../java/org/apache/hudi/common/fs/FSUtils.java    |  2 +-
 .../common/heartbeat/HoodieHeartbeatUtils.java     |  2 +-
 .../hudi/common/table/HoodieTableConfig.java       |  8 +++----
 .../hudi/common/table/HoodieTableMetaClient.java   |  6 ++---
 .../table/timeline/HoodieActiveTimeline.java       |  4 ++--
 .../view/HoodieTablePreCommitFileSystemView.java   |  2 +-
 .../io/FileBasedInternalSchemaStorageManager.java  |  5 ++---
 .../metadata/FileSystemBackedTableMetadata.java    |  2 +-
 .../hudi/metadata/HoodieBackedTableMetadata.java   |  4 ++--
 .../secondary/index/SecondaryIndexManager.java     |  7 +++---
 .../hudi/sink/bootstrap/BootstrapOperator.java     |  3 +--
 .../java/org/apache/hudi/util/StreamerUtil.java    |  2 +-
 .../hudi/sink/bucket/ITTestBucketStreamWrite.java  |  2 +-
 .../common/config/DFSPropertiesConfiguration.java  |  2 +-
 .../common/bootstrap/index/TestBootstrapIndex.java |  3 +--
 .../fs/TestFSUtilsWithRetryWrapperEnable.java      |  8 +++----
 .../hudi/common/table/TestHoodieTableConfig.java   | 26 +++++++++++-----------
 .../common/table/TestHoodieTableMetaClient.java    |  2 +-
 .../hadoop/HoodieCopyOnWriteTableInputFormat.java  |  4 ++--
 .../hudi/hadoop/HoodieHFileRecordReader.java       |  3 ++-
 .../hudi/hadoop/HoodieROTablePathFilter.java       |  8 ++++---
 .../apache/hudi/hadoop/SchemaEvolutionContext.java |  5 +++--
 .../HoodieMergeOnReadTableInputFormat.java         |  3 +--
 .../hudi/hadoop/utils/HoodieInputFormatUtils.java  |  8 ++++---
 .../utils/HoodieRealtimeRecordReaderUtils.java     |  4 ++--
 .../reader/DFSHoodieDatasetInputReader.java        |  3 +--
 .../scala/org/apache/hudi/HoodieBaseRelation.scala | 11 ++++-----
 .../org/apache/spark/sql/hudi/DedupeSparkJob.scala | 15 +++++++------
 .../procedures/ExportInstantsProcedure.scala       |  3 ++-
 .../RepairMigratePartitionMetaProcedure.scala      |  2 +-
 .../RepairOverwriteHoodiePropsProcedure.scala      |  5 +----
 .../apache/spark/sql/hudi/common/TestSqlConf.scala |  6 ++---
 .../MarkerBasedEarlyConflictDetectionRunnable.java |  2 +-
 61 files changed, 156 insertions(+), 170 deletions(-)

diff --git 
a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/ArchivedCommitsCommand.java
 
b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/ArchivedCommitsCommand.java
index 921d12fb663..50e71f370db 100644
--- 
a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/ArchivedCommitsCommand.java
+++ 
b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/ArchivedCommitsCommand.java
@@ -37,6 +37,7 @@ import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.collection.ClosableIterator;
 import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.storage.HoodieStorage;
 import org.apache.hudi.storage.StoragePathInfo;
 import org.apache.hudi.storage.StoragePath;
 import org.apache.hudi.storage.HoodieStorageUtils;
@@ -105,19 +106,17 @@ public class ArchivedCommitsCommand {
               defaultValue = "false") final boolean headerOnly)
       throws IOException {
     System.out.println("===============> Showing only " + limit + " archived 
commits <===============");
-    String basePath = HoodieCLI.getTableMetaClient().getBasePath();
-    StoragePath archivePath = new StoragePath(
-        HoodieCLI.getTableMetaClient().getArchivePath() + 
"/.commits_.archive*");
-    if (folder != null && !folder.isEmpty()) {
-      archivePath = new StoragePath(basePath + "/.hoodie/" + folder);
-    }
-    List<StoragePathInfo> pathInfoList =
-        HoodieStorageUtils.getStorage(basePath, 
HoodieCLI.conf).globEntries(archivePath);
+    HoodieTableMetaClient metaClient = HoodieCLI.getTableMetaClient();
+    StoragePath archivePath = folder != null && !folder.isEmpty()
+        ? new StoragePath(metaClient.getMetaPath(), folder)
+        : new StoragePath(metaClient.getArchivePath(), ".commits_.archive*");
+    HoodieStorage storage = 
HoodieStorageUtils.getStorage(metaClient.getBasePathV2(), HoodieCLI.conf);
+    List<StoragePathInfo> pathInfoList = storage.globEntries(archivePath);
     List<Comparable[]> allStats = new ArrayList<>();
     for (StoragePathInfo pathInfo : pathInfoList) {
       // read the archived file
-      try (Reader reader = 
HoodieLogFormat.newReader(HoodieStorageUtils.getStorage(basePath, 
HoodieCLI.conf),
-          new HoodieLogFile(pathInfo.getPath()), 
HoodieArchivedMetaEntry.getClassSchema())) {
+      try (Reader reader = HoodieLogFormat.newReader(storage, new 
HoodieLogFile(pathInfo.getPath()),
+          HoodieArchivedMetaEntry.getClassSchema())) {
         List<IndexedRecord> readRecords = new ArrayList<>();
         // read the avro blocks
         while (reader.hasNext()) {
diff --git 
a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/RepairsCommand.java 
b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/RepairsCommand.java
index 0eedbf964fe..8783e749057 100644
--- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/RepairsCommand.java
+++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/RepairsCommand.java
@@ -57,8 +57,6 @@ import java.util.stream.Collectors;
 
 import scala.collection.JavaConverters;
 
-import static 
org.apache.hudi.common.table.HoodieTableMetaClient.METAFOLDER_NAME;
-
 /**
  * CLI command to display and trigger repair options.
  */
@@ -123,7 +121,7 @@ public class RepairsCommand {
         
client.getActiveTimeline().getCommitTimeline().lastInstant().get().getTimestamp();
     List<String> partitionPaths =
         FSUtils.getAllPartitionFoldersThreeLevelsDown(HoodieCLI.storage, 
client.getBasePath());
-    StoragePath basePath = new StoragePath(client.getBasePath());
+    StoragePath basePath = client.getBasePathV2();
     String[][] rows = new String[partitionPaths.size()][];
 
     int ind = 0;
@@ -163,8 +161,7 @@ public class RepairsCommand {
       newProps.load(fileInputStream);
     }
     Map<String, String> oldProps = client.getTableConfig().propsMap();
-    StoragePath metaPathDir = new StoragePath(client.getBasePath(), 
METAFOLDER_NAME);
-    HoodieTableConfig.create(client.getStorage(), metaPathDir, newProps);
+    HoodieTableConfig.create(client.getStorage(), client.getMetaPath(), 
newProps);
     // reload new props as checksum would have been added
     newProps =
         
HoodieTableMetaClient.reload(HoodieCLI.getTableMetaClient()).getTableConfig().getProps();
@@ -230,7 +227,7 @@ public class RepairsCommand {
     HoodieLocalEngineContext engineContext = new 
HoodieLocalEngineContext(HoodieCLI.conf);
     HoodieTableMetaClient client = HoodieCLI.getTableMetaClient();
     List<String> partitionPaths = FSUtils.getAllPartitionPaths(engineContext, 
client.getBasePath(), false, false);
-    StoragePath basePath = new StoragePath(client.getBasePath());
+    StoragePath basePath = client.getBasePathV2();
 
     String[][] rows = new String[partitionPaths.size()][];
     int ind = 0;
@@ -276,7 +273,7 @@ public class RepairsCommand {
 
     Properties props = new Properties();
     
props.setProperty(HoodieTableConfig.PARTITION_METAFILE_USE_BASE_FORMAT.key(), 
"true");
-    HoodieTableConfig.update(HoodieCLI.storage, new 
StoragePath(client.getMetaPath()), props);
+    HoodieTableConfig.update(HoodieCLI.storage, client.getMetaPath(), props);
 
     return HoodiePrintHelper.print(new String[] {
         HoodieTableHeaderFields.HEADER_PARTITION_PATH,
diff --git 
a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/TableCommand.java 
b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/TableCommand.java
index c0e6a2cc801..9c1946ae171 100644
--- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/TableCommand.java
+++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/TableCommand.java
@@ -27,7 +27,6 @@ import org.apache.hudi.common.table.HoodieTableConfig;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.TableSchemaResolver;
 import org.apache.hudi.exception.TableNotFoundException;
-import org.apache.hudi.storage.StoragePath;
 
 import org.apache.avro.Schema;
 import org.slf4j.Logger;
@@ -51,7 +50,6 @@ import java.util.Set;
 import java.util.TreeSet;
 import java.util.stream.Collectors;
 
-import static 
org.apache.hudi.common.table.HoodieTableMetaClient.METAFOLDER_NAME;
 import static org.apache.hudi.common.util.StringUtils.getUTF8Bytes;
 
 /**
@@ -189,8 +187,7 @@ public class TableCommand {
   public String recoverTableConfig() throws IOException {
     HoodieCLI.refreshTableMetadata();
     HoodieTableMetaClient client = HoodieCLI.getTableMetaClient();
-    StoragePath metaPathDir = new StoragePath(client.getBasePath(), 
METAFOLDER_NAME);
-    HoodieTableConfig.recover(client.getStorage(), metaPathDir);
+    HoodieTableConfig.recover(client.getStorage(), client.getMetaPath());
     return descTable();
   }
 
@@ -205,8 +202,7 @@ public class TableCommand {
     try (FileInputStream fileInputStream = new 
FileInputStream(updatePropsFilePath)) {
       updatedProps.load(fileInputStream);
     }
-    StoragePath metaPathDir = new StoragePath(client.getBasePath(), 
METAFOLDER_NAME);
-    HoodieTableConfig.update(client.getStorage(), metaPathDir, updatedProps);
+    HoodieTableConfig.update(client.getStorage(), client.getMetaPath(), 
updatedProps);
 
     HoodieCLI.refreshTableMetadata();
     Map<String, String> newProps = 
HoodieCLI.getTableMetaClient().getTableConfig().propsMap();
@@ -221,8 +217,7 @@ public class TableCommand {
     Map<String, String> oldProps = client.getTableConfig().propsMap();
 
     Set<String> deleteConfigs = 
Arrays.stream(csConfigs.split(",")).collect(Collectors.toSet());
-    StoragePath metaPathDir = new StoragePath(client.getBasePath(), 
METAFOLDER_NAME);
-    HoodieTableConfig.delete(client.getStorage(), metaPathDir, deleteConfigs);
+    HoodieTableConfig.delete(client.getStorage(), client.getMetaPath(), 
deleteConfigs);
 
     HoodieCLI.refreshTableMetadata();
     Map<String, String> newProps = 
HoodieCLI.getTableMetaClient().getTableConfig().propsMap();
diff --git 
a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/TimelineCommand.java 
b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/TimelineCommand.java
index 6dbba62af49..8cb6fb72180 100644
--- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/TimelineCommand.java
+++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/TimelineCommand.java
@@ -174,10 +174,10 @@ public class TimelineCommand {
   }
 
   private Map<String, Map<HoodieInstant.State, HoodieInstantWithModTime>> 
getInstantInfoFromTimeline(
-      HoodieStorage storage, String metaPath) throws IOException {
+      HoodieStorage storage, StoragePath metaPath) throws IOException {
     Map<String, Map<HoodieInstant.State, HoodieInstantWithModTime>> instantMap 
= new HashMap<>();
     Stream<HoodieInstantWithModTime> instantStream =
-        HoodieTableMetaClient.scanFiles(storage, new StoragePath(metaPath), 
path -> {
+        HoodieTableMetaClient.scanFiles(storage, metaPath, path -> {
           // Include only the meta files with extensions that needs to be 
included
           String extension = 
HoodieInstant.getTimelineFileExtension(path.getName());
           return 
HoodieActiveTimeline.VALID_EXTENSIONS_IN_ACTIVE_TIMELINE.contains(extension);
diff --git 
a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestTableCommand.java 
b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestTableCommand.java
index 9dc4852e30d..c3bbbef0cf4 100644
--- a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestTableCommand.java
+++ b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestTableCommand.java
@@ -131,7 +131,7 @@ public class TestTableCommand extends 
CLIFunctionalTestHarness {
     HoodieTableMetaClient client = HoodieCLI.getTableMetaClient();
     assertEquals(archivePath, client.getArchivePath());
     assertEquals(tablePath, client.getBasePath());
-    assertEquals(metaPath, client.getMetaPath());
+    assertEquals(metaPath, client.getMetaPath().toString());
     assertEquals(HoodieTableType.COPY_ON_WRITE, client.getTableType());
     assertEquals(new Integer(1), 
client.getTimelineLayoutVersion().getVersion());
   }
@@ -149,7 +149,7 @@ public class TestTableCommand extends 
CLIFunctionalTestHarness {
     HoodieTableMetaClient client = HoodieCLI.getTableMetaClient();
     assertEquals(metaPath + StoragePath.SEPARATOR + "archive", 
client.getArchivePath());
     assertEquals(tablePath, client.getBasePath());
-    assertEquals(metaPath, client.getMetaPath());
+    assertEquals(metaPath, client.getMetaPath().toString());
     assertEquals(HoodieTableType.MERGE_ON_READ, client.getTableType());
   }
 
diff --git 
a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestUpgradeDowngradeCommand.java
 
b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestUpgradeDowngradeCommand.java
index 5211da14b18..9d1169b4245 100644
--- 
a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestUpgradeDowngradeCommand.java
+++ 
b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestUpgradeDowngradeCommand.java
@@ -117,7 +117,7 @@ public class TestUpgradeDowngradeCommand extends 
CLIFunctionalTestHarness {
     metaClient.getTableConfig().setTableVersion(HoodieTableVersion.FIVE);
     try (OutputStream os = metaClient.getStorage().create(
         new StoragePath(
-            metaClient.getMetaPath() + "/" + 
HoodieTableConfig.HOODIE_PROPERTIES_FILE),
+            metaClient.getMetaPath(), 
HoodieTableConfig.HOODIE_PROPERTIES_FILE),
         true)) {
       metaClient.getTableConfig().getProps().store(os, "");
     }
@@ -167,7 +167,7 @@ public class TestUpgradeDowngradeCommand extends 
CLIFunctionalTestHarness {
   private void assertTableVersionFromPropertyFile(HoodieTableVersion 
expectedVersion) throws IOException {
     StoragePath propertyFile =
         new StoragePath(
-            metaClient.getMetaPath() + "/" + 
HoodieTableConfig.HOODIE_PROPERTIES_FILE);
+            metaClient.getMetaPath(), 
HoodieTableConfig.HOODIE_PROPERTIES_FILE);
     // Load the properties and verify
     InputStream inputStream = metaClient.getStorage().open(propertyFile);
     HoodieConfig config = new HoodieConfig();
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/heartbeat/HeartbeatUtils.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/heartbeat/HeartbeatUtils.java
index e7e8e6c1b5a..dcdc45932c2 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/heartbeat/HeartbeatUtils.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/heartbeat/HeartbeatUtils.java
@@ -54,7 +54,7 @@ public class HeartbeatUtils {
     boolean deleted = false;
     try {
       String heartbeatFolderPath = 
HoodieTableMetaClient.getHeartbeatFolderPath(basePath);
-      deleted = storage.deleteFile(new StoragePath(heartbeatFolderPath + 
StoragePath.SEPARATOR + instantTime));
+      deleted = storage.deleteFile(new StoragePath(heartbeatFolderPath, 
instantTime));
       if (!deleted) {
         LOG.error("Failed to delete heartbeat for instant " + instantTime);
       } else {
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/heartbeat/HoodieHeartbeatClient.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/heartbeat/HoodieHeartbeatClient.java
index 460ebdfd11e..0238f6e7f45 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/heartbeat/HoodieHeartbeatClient.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/heartbeat/HoodieHeartbeatClient.java
@@ -227,7 +227,7 @@ public class HoodieHeartbeatClient implements 
AutoCloseable, Serializable {
 
   public static Boolean heartbeatExists(HoodieStorage storage, String 
basePath, String instantTime) throws IOException {
     StoragePath heartbeatFilePath = new StoragePath(
-        HoodieTableMetaClient.getHeartbeatFolderPath(basePath) + 
StoragePath.SEPARATOR + instantTime);
+        HoodieTableMetaClient.getHeartbeatFolderPath(basePath), instantTime);
     return storage.exists(heartbeatFilePath);
   }
 
@@ -255,7 +255,7 @@ public class HoodieHeartbeatClient implements 
AutoCloseable, Serializable {
       Long newHeartbeatTime = System.currentTimeMillis();
       OutputStream outputStream =
           this.storage.create(
-              new StoragePath(heartbeatFolderPath + StoragePath.SEPARATOR + 
instantTime), true);
+              new StoragePath(heartbeatFolderPath, instantTime), true);
       outputStream.close();
       Heartbeat heartbeat = instantToHeartbeatMap.get(instantTime);
       if (heartbeat.getLastHeartbeatTime() != null && 
isHeartbeatExpired(instantTime)) {
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/ConsistentBucketIndexUtils.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/ConsistentBucketIndexUtils.java
index 069ec9e5b74..99b5d833f50 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/ConsistentBucketIndexUtils.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/ConsistentBucketIndexUtils.java
@@ -58,6 +58,7 @@ import static 
org.apache.hudi.common.model.HoodieConsistentHashingMetadata.HASHI
 import static 
org.apache.hudi.common.model.HoodieConsistentHashingMetadata.HASHING_METADATA_FILE_SUFFIX;
 import static 
org.apache.hudi.common.model.HoodieConsistentHashingMetadata.getTimestampFromFile;
 import static org.apache.hudi.common.util.StringUtils.getUTF8Bytes;
+import static org.apache.hudi.hadoop.fs.HadoopFSUtils.convertToStoragePath;
 
 /**
  * Utilities class for consistent bucket index metadata management.
@@ -211,8 +212,8 @@ public class ConsistentBucketIndexUtils {
    */
   private static void createCommitMarker(HoodieTable table, Path fileStatus, 
Path partitionPath) throws IOException {
     HoodieStorage storage = table.getMetaClient().getStorage();
-    StoragePath fullPath = new StoragePath(
-        partitionPath.toString(), getTimestampFromFile(fileStatus.getName()) + 
HASHING_METADATA_COMMIT_FILE_SUFFIX);
+    StoragePath fullPath = new StoragePath(convertToStoragePath(partitionPath),
+        getTimestampFromFile(fileStatus.getName()) + 
HASHING_METADATA_COMMIT_FILE_SUFFIX);
     if (storage.exists(fullPath)) {
       return;
     }
@@ -239,8 +240,7 @@ public class ConsistentBucketIndexUtils {
     if (metaFile == null) {
       return Option.empty();
     }
-    try (InputStream is = table.getMetaClient().getStorage().open(
-        new StoragePath(metaFile.getPath().toUri()))) {
+    try (InputStream is = 
table.getMetaClient().getStorage().open(convertToStoragePath(metaFile.getPath())))
 {
       byte[] content = FileIOUtils.readAsByteArray(is);
       return Option.of(HoodieConsistentHashingMetadata.fromBytes(content));
     } catch (FileNotFoundException e) {
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieKeyLookupHandle.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieKeyLookupHandle.java
index e573b9b026e..664192d454d 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieKeyLookupHandle.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieKeyLookupHandle.java
@@ -26,7 +26,6 @@ import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieIndexException;
 import org.apache.hudi.index.HoodieIndexUtils;
 import org.apache.hudi.io.storage.HoodieFileReader;
-import org.apache.hudi.storage.StoragePath;
 import org.apache.hudi.table.HoodieTable;
 
 import org.slf4j.Logger;
@@ -101,7 +100,7 @@ public class HoodieKeyLookupHandle<T, I, K, O> extends 
HoodieReadHandle<T, I, K,
     }
 
     HoodieBaseFile baseFile = getLatestBaseFile();
-    List<String> matchingKeys = HoodieIndexUtils.filterKeysFromFile(new 
StoragePath(baseFile.getPath()), candidateRecordKeys,
+    List<String> matchingKeys = 
HoodieIndexUtils.filterKeysFromFile(baseFile.getStoragePath(), 
candidateRecordKeys,
         hoodieTable.getStorageConf());
     LOG.info(
         String.format("Total records (%d), bloom filter candidates 
(%d)/fp(%d), actual matches (%d)", totalKeysChecked,
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieReadHandle.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieReadHandle.java
index 03227b75f64..5f9afc1bad1 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieReadHandle.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieReadHandle.java
@@ -25,7 +25,6 @@ import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.io.storage.HoodieFileReader;
 import org.apache.hudi.io.storage.HoodieFileReaderFactory;
 import org.apache.hudi.storage.HoodieStorage;
-import org.apache.hudi.storage.StoragePath;
 import org.apache.hudi.table.HoodieTable;
 
 import java.io.IOException;
@@ -71,11 +70,11 @@ public abstract class HoodieReadHandle<T, I, K, O> extends 
HoodieIOHandle<T, I,
 
   protected HoodieFileReader createNewFileReader() throws IOException {
     return 
HoodieFileReaderFactory.getReaderFactory(this.config.getRecordMerger().getRecordType())
-        .getFileReader(config, hoodieTable.getStorageConf(), new 
StoragePath(getLatestBaseFile().getPath()));
+        .getFileReader(config, hoodieTable.getStorageConf(), 
getLatestBaseFile().getStoragePath());
   }
 
   protected HoodieFileReader createNewFileReader(HoodieBaseFile 
hoodieBaseFile) throws IOException {
     return 
HoodieFileReaderFactory.getReaderFactory(this.config.getRecordMerger().getRecordType())
-        .getFileReader(config, hoodieTable.getStorageConf(), new 
StoragePath(hoodieBaseFile.getPath()));
+        .getFileReader(config, hoodieTable.getStorageConf(), 
hoodieBaseFile.getStoragePath());
   }
 }
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java
index 486102b5222..f51f3d1c279 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java
@@ -123,7 +123,7 @@ public abstract class HoodieWriteHandle<T, I, K, O> extends 
HoodieIOHandle<T, I,
       throw new HoodieIOException("Failed to make dir " + path, e);
     }
 
-    return new StoragePath(path.toString(), 
FSUtils.makeBaseFileName(instantTime, writeToken, fileId,
+    return new StoragePath(path, FSUtils.makeBaseFileName(instantTime, 
writeToken, fileId,
         
hoodieTable.getMetaClient().getTableConfig().getBaseFileFormat().getFileExtension()));
   }
 
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
index 4646cc2ec11..445c7b74fff 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
@@ -594,8 +594,7 @@ public abstract class HoodieBackedTableMetadataWriter<I> 
implements HoodieTableM
     final int fileListingParallelism = 
metadataWriteConfig.getFileListingParallelism();
     StorageConfiguration<?> storageConf = dataMetaClient.getStorageConf();
     final String dirFilterRegex = 
dataWriteConfig.getMetadataConfig().getDirectoryFilterRegex();
-    final String datasetBasePath = dataMetaClient.getBasePathV2().toString();
-    StoragePath storageBasePath = new StoragePath(datasetBasePath);
+    StoragePath storageBasePath = dataMetaClient.getBasePathV2();
 
     while (!pathsToList.isEmpty()) {
       // In each round we will list a section of directories
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
index 58ea31bed21..009e02277f5 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
@@ -1047,10 +1047,10 @@ public abstract class HoodieTable<T, I, K, O> 
implements Serializable {
     if (clearAll && partitions.size() > 0) {
       LOG.info("Clear hoodie.table.metadata.partitions in hoodie.properties");
       metaClient.getTableConfig().setValue(TABLE_METADATA_PARTITIONS.key(), 
EMPTY_STRING);
-      HoodieTableConfig.update(metaClient.getStorage(), new 
StoragePath(metaClient.getMetaPath()), metaClient.getTableConfig().getProps());
+      HoodieTableConfig.update(metaClient.getStorage(), 
metaClient.getMetaPath(), metaClient.getTableConfig().getProps());
     } else if (partitionType.isPresent() && 
partitions.remove(partitionType.get().getPartitionPath())) {
       
metaClient.getTableConfig().setValue(HoodieTableConfig.TABLE_METADATA_PARTITIONS.key(),
 String.join(",", partitions));
-      HoodieTableConfig.update(metaClient.getStorage(), new 
StoragePath(metaClient.getMetaPath()), metaClient.getTableConfig().getProps());
+      HoodieTableConfig.update(metaClient.getStorage(), 
metaClient.getMetaPath(), metaClient.getTableConfig().getProps());
     }
   }
 
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieMergeHelper.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieMergeHelper.java
index 3dc2c6f5ed1..38383fd7a88 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieMergeHelper.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieMergeHelper.java
@@ -110,8 +110,7 @@ public class HoodieMergeHelper<T> extends BaseMergeHelper {
       ClosableIterator<HoodieRecord> recordIterator;
       Schema recordSchema;
       if (baseFile.getBootstrapBaseFile().isPresent()) {
-        StoragePath bootstrapFilePath =
-            new StoragePath(baseFile.getBootstrapBaseFile().get().getPath());
+        StoragePath bootstrapFilePath = 
baseFile.getBootstrapBaseFile().get().getStoragePath();
         StorageConfiguration<?> bootstrapFileConfig = 
table.getStorageConf().newInstance();
         bootstrapFileReader = 
HoodieFileReaderFactory.getReaderFactory(recordType).newBootstrapFileReader(
             baseFileReader,
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/RunIndexActionExecutor.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/RunIndexActionExecutor.java
index c971ac10646..5ad4e5e9f39 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/RunIndexActionExecutor.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/RunIndexActionExecutor.java
@@ -40,7 +40,6 @@ import org.apache.hudi.hadoop.fs.HadoopFSUtils;
 import org.apache.hudi.metadata.HoodieMetadataMetrics;
 import org.apache.hudi.metadata.HoodieTableMetadataWriter;
 import org.apache.hudi.metadata.MetadataPartitionType;
-import org.apache.hudi.storage.StoragePath;
 import org.apache.hudi.table.HoodieTable;
 import org.apache.hudi.table.action.BaseActionExecutor;
 
@@ -214,7 +213,7 @@ public class RunIndexActionExecutor<T, I, K, O> extends 
BaseActionExecutor<T, I,
     
table.getMetaClient().getTableConfig().setValue(TABLE_METADATA_PARTITIONS_INFLIGHT.key(),
 String.join(",", inflightPartitions));
     
table.getMetaClient().getTableConfig().setValue(TABLE_METADATA_PARTITIONS.key(),
 String.join(",", completedPartitions));
     HoodieTableConfig.update(table.getMetaClient().getStorage(),
-        new StoragePath(table.getMetaClient().getMetaPath()), 
table.getMetaClient().getTableConfig().getProps());
+        table.getMetaClient().getMetaPath(), 
table.getMetaClient().getTableConfig().getProps());
 
     // delete metadata partition
     requestedPartitions.forEach(partition -> {
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackStrategy.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackStrategy.java
index e6eca0924bd..39f6d8c3ca1 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackStrategy.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackStrategy.java
@@ -35,7 +35,6 @@ import org.apache.hudi.common.util.StringUtils;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieRollbackException;
 import org.apache.hudi.hadoop.fs.HadoopFSUtils;
-import org.apache.hudi.storage.StoragePath;
 import org.apache.hudi.table.HoodieTable;
 
 import org.apache.hadoop.fs.FileStatus;
@@ -58,6 +57,7 @@ import java.util.stream.Collectors;
 
 import static 
org.apache.hudi.client.utils.MetadataConversionUtils.getHoodieCommitMetadata;
 import static org.apache.hudi.common.util.ValidationUtils.checkArgument;
+import static org.apache.hudi.hadoop.fs.HadoopFSUtils.convertToStoragePath;
 import static 
org.apache.hudi.table.action.rollback.BaseRollbackHelper.EMPTY_STRING;
 
 /**
@@ -303,7 +303,7 @@ public class ListingBasedRollbackStrategy implements 
BaseRollbackPlanActionExecu
         return commit.equals(fileCommitTime);
       } else if (HadoopFSUtils.isLogFile(path)) {
         // Since the baseCommitTime is the only commit for new log files, it's 
okay here
-        String fileCommitTime = FSUtils.getBaseCommitTimeFromLogPath(new 
StoragePath(path.toUri()));
+        String fileCommitTime = 
FSUtils.getBaseCommitTimeFromLogPath(convertToStoragePath(path));
         return commit.equals(fileCommitTime);
       }
       return false;
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/UpgradeDowngrade.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/UpgradeDowngrade.java
index 03c715e01e7..b5177a5746b 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/UpgradeDowngrade.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/UpgradeDowngrade.java
@@ -58,8 +58,8 @@ public class UpgradeDowngrade {
     this.metaClient = metaClient;
     this.config = config;
     this.context = context;
-    this.updatedPropsFilePath = new Path(metaClient.getMetaPath(), 
HOODIE_UPDATED_PROPERTY_FILE);
-    this.propsFilePath = new Path(metaClient.getMetaPath(), 
HoodieTableConfig.HOODIE_PROPERTIES_FILE);
+    this.updatedPropsFilePath = new Path(metaClient.getMetaPath().toString(), 
HOODIE_UPDATED_PROPERTY_FILE);
+    this.propsFilePath = new Path(metaClient.getMetaPath().toString(), 
HoodieTableConfig.HOODIE_PROPERTIES_FILE);
     this.upgradeDowngradeHelper = upgradeDowngradeHelper;
   }
 
@@ -158,7 +158,7 @@ public class UpgradeDowngrade {
     metaClient.getTableConfig().setTableVersion(toVersion);
 
     HoodieTableConfig.update(metaClient.getStorage(),
-        new StoragePath(metaClient.getMetaPath()), 
metaClient.getTableConfig().getProps());
+        metaClient.getMetaPath(), metaClient.getTableConfig().getProps());
   }
 
   protected Map<ConfigProperty, String> upgrade(HoodieTableVersion 
fromVersion, HoodieTableVersion toVersion, String instantTime) {
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/ZeroToOneUpgradeHandler.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/ZeroToOneUpgradeHandler.java
index 78c35f0d2c6..be48ec3ab82 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/ZeroToOneUpgradeHandler.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/ZeroToOneUpgradeHandler.java
@@ -133,7 +133,7 @@ public class ZeroToOneUpgradeHandler implements 
UpgradeHandler {
    * @return the marker file name thus curated.
    */
   private static String getFileNameForMarkerFromLogFile(String logFilePath, 
HoodieTable<?, ?, ?, ?> table) {
-    StoragePath logPath = new StoragePath(table.getMetaClient().getBasePath(), 
logFilePath);
+    StoragePath logPath = new 
StoragePath(table.getMetaClient().getBasePathV2(), logFilePath);
     String fileId = FSUtils.getFileIdFromLogPath(logPath);
     String baseInstant = FSUtils.getBaseCommitTimeFromLogPath(logPath);
     String writeToken = FSUtils.getWriteTokenFromLogPath(logPath);
diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkWriteHandleFactory.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkWriteHandleFactory.java
index 188a92663ee..4bc55408cbb 100644
--- 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkWriteHandleFactory.java
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkWriteHandleFactory.java
@@ -32,6 +32,8 @@ import org.apache.hadoop.fs.Path;
 import java.util.Iterator;
 import java.util.Map;
 
+import static org.apache.hudi.hadoop.fs.HadoopFSUtils.convertToStoragePath;
+
 /**
  * Factory clazz for flink write handles.
  */
@@ -108,7 +110,7 @@ public class FlinkWriteHandleFactory {
       Path writePath = bucketToHandles.get(fileID);
       if (writePath != null) {
         HoodieWriteHandle<?, ?, ?, ?> writeHandle =
-            createReplaceHandle(config, instantTime, table, recordItr, 
partitionPath, fileID, new StoragePath(writePath.toUri()));
+            createReplaceHandle(config, instantTime, table, recordItr, 
partitionPath, fileID, convertToStoragePath(writePath));
         bucketToHandles.put(fileID, new Path(((MiniBatchHandle) 
writeHandle).getWritePath().toUri())); // override with new replace handle
         return writeHandle;
       }
diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataCreateHandle.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataCreateHandle.java
index 4227e14165f..5915a3eda36 100644
--- 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataCreateHandle.java
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataCreateHandle.java
@@ -50,6 +50,8 @@ import java.io.IOException;
 import java.io.Serializable;
 import java.util.concurrent.atomic.AtomicLong;
 
+import static org.apache.hudi.hadoop.fs.HadoopFSUtils.convertToStoragePath;
+
 /**
  * Create handle with RowData for datasource implementation of bulk insert.
  */
@@ -172,9 +174,10 @@ public class HoodieRowDataCreateHandle implements 
Serializable {
     stat.setNumInserts(writeStatus.getTotalRecords());
     stat.setPrevCommit(HoodieWriteStat.NULL_COMMIT);
     stat.setFileId(fileId);
-    stat.setPath(new StoragePath(writeConfig.getBasePath()), new 
StoragePath(path.toUri()));
+    StoragePath storagePath = convertToStoragePath(path);
+    stat.setPath(new StoragePath(writeConfig.getBasePath()), storagePath);
     long fileSizeInBytes = FSUtils.getFileSize(
-        table.getMetaClient().getStorage(), new StoragePath(path.toUri()));
+        table.getMetaClient().getStorage(), storagePath);
     stat.setTotalWriteBytes(fileSizeInBytes);
     stat.setFileSizeInBytes(fileSizeInBytes);
     stat.setTotalWriteErrors(writeStatus.getTotalErrorRecords());
diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataFileWriterFactory.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataFileWriterFactory.java
index e9bc86b4a76..be757a30954 100644
--- 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataFileWriterFactory.java
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataFileWriterFactory.java
@@ -23,7 +23,6 @@ import org.apache.hudi.common.bloom.BloomFilterFactory;
 import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.io.storage.HoodieParquetConfig;
-import org.apache.hudi.storage.StoragePath;
 import org.apache.hudi.storage.hadoop.HadoopStorageConfiguration;
 import org.apache.hudi.table.HoodieTable;
 
@@ -34,6 +33,7 @@ import org.apache.hadoop.fs.Path;
 import java.io.IOException;
 
 import static org.apache.hudi.common.model.HoodieFileFormat.PARQUET;
+import static org.apache.hudi.hadoop.fs.HadoopFSUtils.convertToStoragePath;
 
 /**
  * Factory to assist in instantiating a new {@link HoodieRowDataFileWriter}.
@@ -71,7 +71,7 @@ public class HoodieRowDataFileWriterFactory {
     HoodieRowDataParquetWriteSupport writeSupport =
         new HoodieRowDataParquetWriteSupport((Configuration) 
table.getStorageConf().unwrap(), rowType, filter);
     return new HoodieRowDataParquetWriter(
-        new StoragePath(path.toUri()), new HoodieParquetConfig<>(
+        convertToStoragePath(path), new HoodieParquetConfig<>(
         writeSupport,
         writeConfig.getParquetCompressionCodec(),
         writeConfig.getParquetBlockSize(),
diff --git 
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaTable.java
 
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaTable.java
index 1538c1c00b0..2e13da6c201 100644
--- 
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaTable.java
+++ 
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaTable.java
@@ -35,7 +35,6 @@ import org.apache.hudi.index.JavaHoodieIndexFactory;
 import org.apache.hudi.metadata.HoodieTableMetadata;
 import org.apache.hudi.metadata.HoodieTableMetadataWriter;
 import org.apache.hudi.metadata.JavaHoodieBackedTableMetadataWriter;
-import org.apache.hudi.storage.StoragePath;
 import org.apache.hudi.table.action.HoodieWriteMetadata;
 
 import java.io.IOException;
@@ -93,8 +92,8 @@ public abstract class HoodieJavaTable<T>
       // delete metadata partitions corresponding to such indexes
       deleteMetadataIndexIfNecessary();
       try {
-        if (isMetadataTableExists || metaClient.getStorage().exists(new 
StoragePath(
-            
HoodieTableMetadata.getMetadataTableBasePath(metaClient.getBasePath())))) {
+        if (isMetadataTableExists || metaClient.getStorage().exists(
+            
HoodieTableMetadata.getMetadataTableBasePath(metaClient.getBasePathV2()))) {
           isMetadataTableExists = true;
           return Option.of(metadataWriter);
         }
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/HoodieFileProbingFunction.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/HoodieFileProbingFunction.java
index 667b00ada22..59bbbec3dd4 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/HoodieFileProbingFunction.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/HoodieFileProbingFunction.java
@@ -29,7 +29,6 @@ import org.apache.hudi.exception.HoodieIndexException;
 import org.apache.hudi.index.HoodieIndexUtils;
 import org.apache.hudi.io.HoodieKeyLookupResult;
 import org.apache.hudi.storage.StorageConfiguration;
-import org.apache.hudi.storage.StoragePath;
 
 import org.apache.spark.api.java.function.FlatMapFunction;
 import org.apache.spark.broadcast.Broadcast;
@@ -127,7 +126,7 @@ public class HoodieFileProbingFunction implements
             // TODO add assertion that file is checked only once
 
             final HoodieBaseFile dataFile = fileIDBaseFileMap.get(fileId);
-            List<String> matchingKeys = 
HoodieIndexUtils.filterKeysFromFile(new StoragePath(dataFile.getPath()),
+            List<String> matchingKeys = 
HoodieIndexUtils.filterKeysFromFile(dataFile.getStoragePath(),
                 candidateRecordKeys, storageConf);
 
             LOG.debug(
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkTable.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkTable.java
index 9b408ca0d84..b1fc87338bf 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkTable.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkTable.java
@@ -38,7 +38,6 @@ import org.apache.hudi.io.HoodieMergeHandle;
 import org.apache.hudi.metadata.HoodieTableMetadata;
 import org.apache.hudi.metadata.HoodieTableMetadataWriter;
 import org.apache.hudi.metadata.SparkHoodieBackedTableMetadataWriter;
-import org.apache.hudi.storage.StoragePath;
 import org.apache.hudi.table.action.commit.HoodieMergeHelper;
 
 import org.apache.hadoop.conf.Configuration;
@@ -111,8 +110,8 @@ public abstract class HoodieSparkTable<T>
           context.getStorageConf(), config, failedWritesCleaningPolicy, 
context,
           Option.of(triggeringInstantTimestamp));
       try {
-        if (isMetadataTableExists || metaClient.getStorage().exists(new 
StoragePath(
-            
HoodieTableMetadata.getMetadataTableBasePath(metaClient.getBasePath())))) {
+        if (isMetadataTableExists || metaClient.getStorage().exists(
+            
HoodieTableMetadata.getMetadataTableBasePath(metaClient.getBasePathV2()))) {
           isMetadataTableExists = true;
           return Option.of(metadataWriter);
         }
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
index f2f689d1bd4..9301529c740 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
@@ -1957,7 +1957,7 @@ public class TestHoodieBackedMetadata extends 
TestHoodieMetadataBase {
 
     // collect all commit meta files from metadata table.
     List<StoragePathInfo> metaFiles = metaClient.getStorage()
-        .listDirectEntries(new StoragePath(metaClient.getMetaPath() + 
"/metadata/.hoodie"));
+        .listDirectEntries(new StoragePath(metaClient.getMetaPath(), 
"metadata/.hoodie"));
     List<StoragePathInfo> commit3Files = metaFiles.stream()
         .filter(pathInfo ->
             pathInfo.getPath().getName().contains(commit3 + "." + 
HoodieTimeline.DELTA_COMMIT_ACTION))
@@ -3700,7 +3700,7 @@ public class TestHoodieBackedMetadata extends 
TestHoodieMetadataBase {
     metaClient = HoodieTableMetaClient.reload(metaClient);
     metaClient.getTableConfig().setTableVersion(version);
     StoragePath propertyFile = new StoragePath(
-        metaClient.getMetaPath() + "/" + 
HoodieTableConfig.HOODIE_PROPERTIES_FILE);
+        metaClient.getMetaPath(), HoodieTableConfig.HOODIE_PROPERTIES_FILE);
     try (OutputStream os = metaClient.getStorage().create(propertyFile)) {
       metaClient.getTableConfig().getProps().store(os, "");
     }
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableRollback.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableRollback.java
index 1abc05058ec..10d26f83698 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableRollback.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableRollback.java
@@ -918,7 +918,7 @@ public class TestHoodieSparkMergeOnReadTableRollback 
extends TestHoodieSparkRoll
       for (HoodieInstant.State state : 
Arrays.asList(HoodieInstant.State.REQUESTED, HoodieInstant.State.INFLIGHT)) {
         HoodieInstant toCopy = new HoodieInstant(state, 
HoodieTimeline.DELTA_COMMIT_ACTION, lastCommitTime);
         File file = Files.createTempFile(tempFolder, null, null).toFile();
-        fs().copyToLocalFile(new Path(metaClient.getMetaPath(), 
toCopy.getFileName()),
+        fs().copyToLocalFile(new Path(metaClient.getMetaPath().toString(), 
toCopy.getFileName()),
             new Path(file.getAbsolutePath()));
         fileNameMap.put(file.getAbsolutePath(), toCopy.getFileName());
       }
@@ -944,7 +944,7 @@ public class TestHoodieSparkMergeOnReadTableRollback 
extends TestHoodieSparkRoll
       for (Map.Entry<String, String> entry : fileNameMap.entrySet()) {
         try {
           fs().copyFromLocalFile(new Path(entry.getKey()),
-              new Path(metaClient.getMetaPath(), entry.getValue()));
+              new Path(metaClient.getMetaPath().toString(), entry.getValue()));
         } catch (IOException e) {
           throw new HoodieIOException("Error copying state from local disk.", 
e);
         }
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/upgrade/TestUpgradeDowngrade.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/upgrade/TestUpgradeDowngrade.java
index 10a77f9b5b7..e25db7d5924 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/upgrade/TestUpgradeDowngrade.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/upgrade/TestUpgradeDowngrade.java
@@ -509,8 +509,8 @@ public class TestUpgradeDowngrade extends 
HoodieClientTestBase {
     metaClient = HoodieTestUtils.init(storageConf, basePath, getTableType(), 
properties);
     // set hoodie.table.version to 4 in hoodie.properties file
     metaClient.getTableConfig().setTableVersion(HoodieTableVersion.FOUR);
-    HoodieTableConfig.update(metaClient.getStorage(),
-        new StoragePath(metaClient.getMetaPath()), 
metaClient.getTableConfig().getProps());
+    HoodieTableConfig.update(metaClient.getStorage(), metaClient.getMetaPath(),
+        metaClient.getTableConfig().getProps());
 
     String metadataTablePath =
         
HoodieTableMetadata.getMetadataTableBasePath(metaClient.getBasePathV2().toString());
@@ -519,8 +519,8 @@ public class TestUpgradeDowngrade extends 
HoodieClientTestBase {
           
.setConf(metaClient.getStorageConf().newInstance()).setBasePath(metadataTablePath).build();
       metaClient.getTableConfig().setTableVersion(HoodieTableVersion.FOUR);
       HoodieTableConfig.update(
-          mdtMetaClient.getStorage(),
-          new StoragePath(mdtMetaClient.getMetaPath()), 
metaClient.getTableConfig().getProps());
+          mdtMetaClient.getStorage(), mdtMetaClient.getMetaPath(),
+          metaClient.getTableConfig().getProps());
     }
 
     assertTableVersionOnDataAndMetadataTable(metaClient, 
HoodieTableVersion.FOUR);
@@ -902,7 +902,7 @@ public class TestUpgradeDowngrade extends 
HoodieClientTestBase {
   private void prepForDowngradeFromVersion(HoodieTableVersion fromVersion) 
throws IOException {
     metaClient.getTableConfig().setTableVersion(fromVersion);
     StoragePath propertyFile = new StoragePath(
-        metaClient.getMetaPath() + "/" + 
HoodieTableConfig.HOODIE_PROPERTIES_FILE);
+        metaClient.getMetaPath(), HoodieTableConfig.HOODIE_PROPERTIES_FILE);
     try (OutputStream os = metaClient.getStorage().create(propertyFile)) {
       metaClient.getTableConfig().getProps().store(os, "");
     }
@@ -910,9 +910,9 @@ public class TestUpgradeDowngrade extends 
HoodieClientTestBase {
 
   private void createResidualFile() throws IOException {
     Path propertyFile =
-        new Path(metaClient.getMetaPath() + "/" + 
HoodieTableConfig.HOODIE_PROPERTIES_FILE);
+        new Path(metaClient.getMetaPath().toString(), 
HoodieTableConfig.HOODIE_PROPERTIES_FILE);
     Path updatedPropertyFile =
-        new Path(metaClient.getMetaPath() + "/" + 
UpgradeDowngrade.HOODIE_UPDATED_PROPERTY_FILE);
+        new Path(metaClient.getMetaPath().toString(), 
UpgradeDowngrade.HOODIE_UPDATED_PROPERTY_FILE);
 
     // Step1: Copy hoodie.properties to hoodie.properties.orig
     FileSystem fs = (FileSystem) metaClient.getStorage().getFileSystem();
@@ -938,7 +938,7 @@ public class TestUpgradeDowngrade extends 
HoodieClientTestBase {
     assertEquals(expectedVersion.versionCode(),
         metaClient.getTableConfig().getTableVersion().versionCode());
     StoragePath propertyFile = new StoragePath(
-        metaClient.getMetaPath() + "/" + 
HoodieTableConfig.HOODIE_PROPERTIES_FILE);
+        metaClient.getMetaPath(), HoodieTableConfig.HOODIE_PROPERTIES_FILE);
     // Load the properties and verify
     InputStream inputStream = metaClient.getStorage().open(propertyFile);
     HoodieConfig config = new HoodieConfig();
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java 
b/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java
index ec13861b849..ecbe3fc1766 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java
@@ -95,7 +95,7 @@ public class FSUtils {
    * @return {@code true} if table exists. {@code false} otherwise.
    */
   public static boolean isTableExists(String path, HoodieStorage storage) 
throws IOException {
-    return storage.exists(new StoragePath(path + "/" + 
HoodieTableMetaClient.METAFOLDER_NAME));
+    return storage.exists(new StoragePath(path, 
HoodieTableMetaClient.METAFOLDER_NAME));
   }
 
   /**
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/heartbeat/HoodieHeartbeatUtils.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/heartbeat/HoodieHeartbeatUtils.java
index 0631ed587f1..7e6ce0e2135 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/heartbeat/HoodieHeartbeatUtils.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/heartbeat/HoodieHeartbeatUtils.java
@@ -46,7 +46,7 @@ public class HoodieHeartbeatUtils {
   public static Long getLastHeartbeatTime(HoodieStorage storage, String 
basePath,
                                           String instantTime) throws 
IOException {
     StoragePath heartbeatFilePath = new StoragePath(
-        HoodieTableMetaClient.getHeartbeatFolderPath(basePath) + 
StoragePath.SEPARATOR + instantTime);
+        HoodieTableMetaClient.getHeartbeatFolderPath(basePath), instantTime);
     if (storage.exists(heartbeatFilePath)) {
       return storage.getPathInfo(heartbeatFilePath).getModificationTime();
     } else {
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java 
b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
index 2acf8bc6f93..f6dcdce1c34 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
@@ -273,12 +273,12 @@ public class HoodieTableConfig extends HoodieConfig {
   // Delay between retries while reading the properties file
   private static final int READ_RETRY_DELAY_MSEC = 1000;
 
-  public HoodieTableConfig(HoodieStorage storage, String metaPath, String 
payloadClassName, String recordMergerStrategyId) {
+  public HoodieTableConfig(HoodieStorage storage, StoragePath metaPath, String 
payloadClassName, String recordMergerStrategyId) {
     super();
     StoragePath propertyPath = new StoragePath(metaPath, 
HOODIE_PROPERTIES_FILE);
     LOG.info("Loading table properties from " + propertyPath);
     try {
-      this.props = fetchConfigs(storage, metaPath);
+      this.props = fetchConfigs(storage, metaPath.toString());
       boolean needStore = false;
       if (contains(PAYLOAD_CLASS_NAME) && payloadClassName != null
           && !getString(PAYLOAD_CLASS_NAME).equals(payloadClassName)) {
@@ -782,7 +782,7 @@ public class HoodieTableConfig extends HoodieConfig {
     }
     setValue(TABLE_METADATA_PARTITIONS, 
partitions.stream().sorted().collect(Collectors.joining(CONFIG_VALUES_DELIMITER)));
     setValue(TABLE_METADATA_PARTITIONS_INFLIGHT, 
partitionsInflight.stream().sorted().collect(Collectors.joining(CONFIG_VALUES_DELIMITER)));
-    update(metaClient.getStorage(), new StoragePath(metaClient.getMetaPath()), 
getProps());
+    update(metaClient.getStorage(), metaClient.getMetaPath(), getProps());
     LOG.info(String.format("MDT %s partition %s has been %s", 
metaClient.getBasePathV2(), partitionType.name(), enabled ? "enabled" : 
"disabled"));
   }
 
@@ -800,7 +800,7 @@ public class HoodieTableConfig extends HoodieConfig {
     });
 
     setValue(TABLE_METADATA_PARTITIONS_INFLIGHT, 
partitionsInflight.stream().sorted().collect(Collectors.joining(CONFIG_VALUES_DELIMITER)));
-    update(metaClient.getStorage(), new StoragePath(metaClient.getMetaPath()), 
getProps());
+    update(metaClient.getStorage(), metaClient.getMetaPath(), getProps());
     LOG.info(String.format("MDT %s partitions %s have been set to inflight", 
metaClient.getBasePathV2(), partitionTypes));
   }
 
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
index bedf0204bf8..4105677e03d 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
@@ -132,7 +132,7 @@ public class HoodieTableMetaClient implements Serializable {
     this.metaPath = new StoragePath(basePath, METAFOLDER_NAME);
     this.storage = getStorage();
     TableNotFoundException.checkTableValidity(storage, this.basePath, 
metaPath);
-    this.tableConfig = new HoodieTableConfig(storage, metaPath.toString(), 
payloadClassName, recordMergerStrategy);
+    this.tableConfig = new HoodieTableConfig(storage, metaPath, 
payloadClassName, recordMergerStrategy);
     this.tableType = tableConfig.getTableType();
     Option<TimelineLayoutVersion> tableConfigVersion = 
tableConfig.getTimelineLayoutVersion();
     if (layoutVersion.isPresent() && tableConfigVersion.isPresent()) {
@@ -212,8 +212,8 @@ public class HoodieTableMetaClient implements Serializable {
   /**
    * @return Meta path
    */
-  public String getMetaPath() {
-    return metaPath.toString();  // this invocation is cached
+  public StoragePath getMetaPath() {
+    return metaPath;
   }
 
   /**
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java
index cbe1691e318..7f53feb5a54 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java
@@ -248,7 +248,7 @@ public class HoodieActiveTimeline extends 
HoodieDefaultTimeline {
     deleteInstantFile(instant);
   }
 
-  public static void deleteInstantFile(HoodieStorage storage, String metaPath, 
HoodieInstant instant) {
+  public static void deleteInstantFile(HoodieStorage storage, StoragePath 
metaPath, HoodieInstant instant) {
     try {
       storage.deleteFile(new StoragePath(metaPath, instant.getFileName()));
     } catch (IOException e) {
@@ -665,7 +665,7 @@ public class HoodieActiveTimeline extends 
HoodieDefaultTimeline {
   }
 
   private StoragePath getInstantFileNamePath(String fileName) {
-    return new StoragePath(fileName.contains(SCHEMA_COMMIT_ACTION) ? 
metaClient.getSchemaFolderName() : metaClient.getMetaPath(), fileName);
+    return new StoragePath(fileName.contains(SCHEMA_COMMIT_ACTION) ? 
metaClient.getSchemaFolderName() : metaClient.getMetaPath().toString(), 
fileName);
   }
 
   public void transitionRequestedToInflight(String commitType, String 
inFlightInstant) {
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/view/HoodieTablePreCommitFileSystemView.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/view/HoodieTablePreCommitFileSystemView.java
index ea6b8f429bd..9c6c05f4523 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/view/HoodieTablePreCommitFileSystemView.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/view/HoodieTablePreCommitFileSystemView.java
@@ -71,7 +71,7 @@ public class HoodieTablePreCommitFileSystemView {
     Map<String, HoodieBaseFile> newFilesWrittenForPartition = 
filesWritten.stream()
         .filter(file -> partitionStr.equals(file.getPartitionPath()))
         .collect(Collectors.toMap(HoodieWriteStat::getFileId, writeStat -> 
-            new HoodieBaseFile(new StoragePath(tableMetaClient.getBasePath(), 
writeStat.getPath()).toString(), writeStat.getFileId(), preCommitInstantTime, 
null)));
+            new HoodieBaseFile(new 
StoragePath(tableMetaClient.getBasePathV2(), writeStat.getPath()).toString(), 
writeStat.getFileId(), preCommitInstantTime, null)));
 
     Stream<HoodieBaseFile> committedBaseFiles = 
this.completedCommitsFileSystemView.getLatestBaseFiles(partitionStr);
     Map<String, HoodieBaseFile> allFileIds = committedBaseFiles
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/internal/schema/io/FileBasedInternalSchemaStorageManager.java
 
b/hudi-common/src/main/java/org/apache/hudi/internal/schema/io/FileBasedInternalSchemaStorageManager.java
index 6e4945628cf..43923b5e40a 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/internal/schema/io/FileBasedInternalSchemaStorageManager.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/internal/schema/io/FileBasedInternalSchemaStorageManager.java
@@ -59,14 +59,13 @@ public class FileBasedInternalSchemaStorageManager extends 
AbstractInternalSchem
   private HoodieTableMetaClient metaClient;
 
   public FileBasedInternalSchemaStorageManager(StorageConfiguration<?> conf, 
StoragePath baseTablePath) {
-    StoragePath metaPath = new StoragePath(baseTablePath, ".hoodie");
+    StoragePath metaPath = new StoragePath(baseTablePath, 
HoodieTableMetaClient.METAFOLDER_NAME);
     this.baseSchemaPath = new StoragePath(metaPath, SCHEMA_NAME);
     this.conf = conf;
   }
 
   public FileBasedInternalSchemaStorageManager(HoodieTableMetaClient 
metaClient) {
-    StoragePath metaPath = new StoragePath(metaClient.getBasePath(), 
".hoodie");
-    this.baseSchemaPath = new StoragePath(metaPath, SCHEMA_NAME);
+    this.baseSchemaPath = new StoragePath(metaClient.getMetaPath(), 
SCHEMA_NAME);
     this.conf = metaClient.getStorageConf();
     this.metaClient = metaClient;
   }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/metadata/FileSystemBackedTableMetadata.java
 
b/hudi-common/src/main/java/org/apache/hudi/metadata/FileSystemBackedTableMetadata.java
index 18a58df9320..1148503c5a8 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/metadata/FileSystemBackedTableMetadata.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/metadata/FileSystemBackedTableMetadata.java
@@ -85,7 +85,7 @@ public class FileSystemBackedTableMetadata extends 
AbstractHoodieTableMetadata {
     StoragePath metaPath =
         new StoragePath(dataBasePath, HoodieTableMetaClient.METAFOLDER_NAME);
     TableNotFoundException.checkTableValidity(storage, this.dataBasePath, 
metaPath);
-    HoodieTableConfig tableConfig = new HoodieTableConfig(storage, 
metaPath.toString(), null, null);
+    HoodieTableConfig tableConfig = new HoodieTableConfig(storage, metaPath, 
null, null);
     this.hiveStylePartitioningEnabled =
         Boolean.parseBoolean(tableConfig.getHiveStylePartitioningEnable());
     this.urlEncodePartitioningEnabled =
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
index 55c9a49b61c..68932a5224f 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
@@ -445,9 +445,9 @@ public class HoodieBackedTableMetadata extends 
BaseTableMetadata {
     // If the base file is present then create a reader
     Option<HoodieBaseFile> basefile = slice.getBaseFile();
     if (basefile.isPresent()) {
-      String baseFilePath = basefile.get().getPath();
+      StoragePath baseFilePath = basefile.get().getStoragePath();
       baseFileReader = (HoodieSeekingFileReader<?>) 
HoodieFileReaderFactory.getReaderFactory(HoodieRecordType.AVRO)
-          .getFileReader(DEFAULT_HUDI_CONFIG_FOR_READER, getStorageConf(), new 
StoragePath(baseFilePath));
+          .getFileReader(DEFAULT_HUDI_CONFIG_FOR_READER, getStorageConf(), 
baseFilePath);
       baseFileOpenMs = timer.endTimer();
       LOG.info(String.format("Opened metadata base file from %s at instant %s 
in %d ms", baseFilePath,
           basefile.get().getCommitTime(), baseFileOpenMs));
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/secondary/index/SecondaryIndexManager.java
 
b/hudi-common/src/main/java/org/apache/hudi/secondary/index/SecondaryIndexManager.java
index 0e7dbf83c51..8d769d99bf5 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/secondary/index/SecondaryIndexManager.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/secondary/index/SecondaryIndexManager.java
@@ -25,7 +25,6 @@ import org.apache.hudi.common.table.TableSchemaResolver;
 import org.apache.hudi.common.util.CollectionUtils;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.exception.HoodieSecondaryIndexException;
-import org.apache.hudi.storage.StoragePath;
 
 import org.apache.avro.Schema;
 import org.slf4j.Logger;
@@ -125,7 +124,7 @@ public class SecondaryIndexManager {
     Properties updatedProps = new Properties();
     updatedProps.put(HoodieTableConfig.SECONDARY_INDEXES_METADATA.key(),
         SecondaryIndexUtils.toJsonString(newSecondaryIndexes));
-    HoodieTableConfig.update(metaClient.getStorage(), new 
StoragePath(metaClient.getMetaPath()), updatedProps);
+    HoodieTableConfig.update(metaClient.getStorage(), 
metaClient.getMetaPath(), updatedProps);
 
     LOG.info("Success to add secondary index metadata: {}", 
secondaryIndexToAdd);
 
@@ -157,9 +156,9 @@ public class SecondaryIndexManager {
       Properties updatedProps = new Properties();
       updatedProps.put(HoodieTableConfig.SECONDARY_INDEXES_METADATA.key(),
           SecondaryIndexUtils.toJsonString(secondaryIndexesToKeep));
-      HoodieTableConfig.update(metaClient.getStorage(), new 
StoragePath(metaClient.getMetaPath()), updatedProps);
+      HoodieTableConfig.update(metaClient.getStorage(), 
metaClient.getMetaPath(), updatedProps);
     } else {
-      HoodieTableConfig.delete(metaClient.getStorage(), new 
StoragePath(metaClient.getMetaPath()),
+      HoodieTableConfig.delete(metaClient.getStorage(), 
metaClient.getMetaPath(),
           
CollectionUtils.createSet(HoodieTableConfig.SECONDARY_INDEXES_METADATA.key()));
     }
 
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapOperator.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapOperator.java
index b15e52969ef..54f302a85fb 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapOperator.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapOperator.java
@@ -41,7 +41,6 @@ import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.hadoop.fs.HadoopFSUtils;
 import org.apache.hudi.sink.bootstrap.aggregate.BootstrapAggFunction;
 import org.apache.hudi.sink.meta.CkpMetadata;
-import org.apache.hudi.storage.StoragePath;
 import org.apache.hudi.table.HoodieTable;
 import org.apache.hudi.table.format.FormatUtils;
 import org.apache.hudi.util.FlinkTables;
@@ -221,7 +220,7 @@ public class BootstrapOperator<I, O extends HoodieRecord<?>>
             return;
           }
           try (ClosableIterator<HoodieKey> iterator = 
fileUtils.getHoodieKeyIterator(
-              HadoopFSUtils.getStorageConf(this.hadoopConf), new 
StoragePath(baseFile.getPath()))) {
+              HadoopFSUtils.getStorageConf(this.hadoopConf), 
baseFile.getStoragePath())) {
             iterator.forEachRemaining(hoodieKey -> {
               output.collect(new StreamRecord(new 
IndexRecord(generateHoodieRecord(hoodieKey, fileSlice))));
             });
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
index e8926638294..128a7385bf0 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
@@ -321,7 +321,7 @@ public class StreamerUtil {
     StoragePath metaPath = new StoragePath(basePath, 
HoodieTableMetaClient.METAFOLDER_NAME);
     try {
       if (storage.exists(new StoragePath(metaPath, 
HoodieTableConfig.HOODIE_PROPERTIES_FILE))) {
-        return Option.of(new HoodieTableConfig(storage, metaPath.toString(), 
null, null));
+        return Option.of(new HoodieTableConfig(storage, metaPath, null, null));
       }
     } catch (IOException e) {
       throw new HoodieIOException("Get table config error", e);
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/bucket/ITTestBucketStreamWrite.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/bucket/ITTestBucketStreamWrite.java
index 2e334a7554c..29560768266 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/bucket/ITTestBucketStreamWrite.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/bucket/ITTestBucketStreamWrite.java
@@ -110,7 +110,7 @@ public class ITTestBucketStreamWrite {
 
     // delete successful commit to simulate an unsuccessful write
     HoodieStorage storage = metaClient.getStorage();
-    StoragePath path = new StoragePath(metaClient.getMetaPath() + 
StoragePath.SEPARATOR + filename);
+    StoragePath path = new StoragePath(metaClient.getMetaPath(), filename);
     storage.deleteDirectory(path);
 
     // marker types are different for COW and MOR
diff --git 
a/hudi-hadoop-common/src/main/java/org/apache/hudi/common/config/DFSPropertiesConfiguration.java
 
b/hudi-hadoop-common/src/main/java/org/apache/hudi/common/config/DFSPropertiesConfiguration.java
index 662c2ffe35a..2e3f546debe 100644
--- 
a/hudi-hadoop-common/src/main/java/org/apache/hudi/common/config/DFSPropertiesConfiguration.java
+++ 
b/hudi-hadoop-common/src/main/java/org/apache/hudi/common/config/DFSPropertiesConfiguration.java
@@ -64,7 +64,7 @@ public class DFSPropertiesConfiguration extends 
PropertiesConfig {
   public static final String CONF_FILE_DIR_ENV_NAME = "HUDI_CONF_DIR";
   public static final String DEFAULT_CONF_FILE_DIR = "file:/etc/hudi/conf";
   public static final StoragePath DEFAULT_PATH = new StoragePath(
-      DEFAULT_CONF_FILE_DIR + "/" + DEFAULT_PROPERTIES_FILE);
+      DEFAULT_CONF_FILE_DIR, DEFAULT_PROPERTIES_FILE);
 
   // props read from hudi-defaults.conf
   private static TypedProperties GLOBAL_PROPS = loadGlobalProps();
diff --git 
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/bootstrap/index/TestBootstrapIndex.java
 
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/bootstrap/index/TestBootstrapIndex.java
index a9f19c7ee01..7cf65ce1caa 100644
--- 
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/bootstrap/index/TestBootstrapIndex.java
+++ 
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/bootstrap/index/TestBootstrapIndex.java
@@ -30,7 +30,6 @@ import org.apache.hudi.common.table.HoodieTableConfig;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.testutils.HoodieCommonTestHarness;
 import org.apache.hudi.common.util.collection.Pair;
-import org.apache.hudi.storage.StoragePath;
 
 import org.apache.hadoop.fs.permission.FsAction;
 import org.junit.jupiter.api.AfterEach;
@@ -100,7 +99,7 @@ public class TestBootstrapIndex extends 
HoodieCommonTestHarness {
     props.put(HoodieTableConfig.BOOTSTRAP_INDEX_ENABLE.key(), "false");
     Properties properties = new Properties();
     properties.putAll(props);
-    HoodieTableConfig.create(metaClient.getStorage(), new 
StoragePath(metaClient.getMetaPath()), properties);
+    HoodieTableConfig.create(metaClient.getStorage(), 
metaClient.getMetaPath(), properties);
 
     metaClient = createMetaClient(metaClient.getStorageConf().newInstance(), 
basePath);
     BootstrapIndex bootstrapIndex = 
BootstrapIndex.getBootstrapIndex(metaClient);
diff --git 
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/fs/TestFSUtilsWithRetryWrapperEnable.java
 
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/fs/TestFSUtilsWithRetryWrapperEnable.java
index 2093e658c4e..7eb2901c1d3 100644
--- 
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/fs/TestFSUtilsWithRetryWrapperEnable.java
+++ 
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/fs/TestFSUtilsWithRetryWrapperEnable.java
@@ -70,7 +70,7 @@ public class TestFSUtilsWithRetryWrapperEnable extends 
TestFSUtils {
     initialRetryIntervalMs = fileSystemRetryConfig.getInitialRetryIntervalMs();
 
     FakeRemoteFileSystem fakeFs = new FakeRemoteFileSystem(
-        HadoopFSUtils.getFs(metaClient.getMetaPath(), 
metaClient.getStorageConf()), 2);
+        HadoopFSUtils.getFs(metaClient.getMetaPath().toString(), 
metaClient.getStorageConf()), 2);
     FileSystem fileSystem =
         new HoodieRetryWrapperFileSystem(fakeFs, maxRetryIntervalMs, 
maxRetryNumbers,
             initialRetryIntervalMs, "");
@@ -85,7 +85,7 @@ public class TestFSUtilsWithRetryWrapperEnable extends 
TestFSUtils {
   @Test
   public void testProcessFilesWithExceptions() throws Exception {
     FakeRemoteFileSystem fakeFs = new FakeRemoteFileSystem(
-        HadoopFSUtils.getFs(metaClient.getMetaPath(), 
metaClient.getStorageConf()), 100);
+        HadoopFSUtils.getFs(metaClient.getMetaPath().toString(), 
metaClient.getStorageConf()), 100);
     FileSystem fileSystem =
         new HoodieRetryWrapperFileSystem(fakeFs, maxRetryIntervalMs, 
maxRetryNumbers,
             initialRetryIntervalMs, "");
@@ -102,7 +102,7 @@ public class TestFSUtilsWithRetryWrapperEnable extends 
TestFSUtils {
   @Test
   public void testGetSchema() {
     FakeRemoteFileSystem fakeFs = new FakeRemoteFileSystem(
-        HadoopFSUtils.getFs(metaClient.getMetaPath(), 
metaClient.getStorageConf()), 100);
+        HadoopFSUtils.getFs(metaClient.getMetaPath().toString(), 
metaClient.getStorageConf()), 100);
     FileSystem fileSystem =
         new HoodieRetryWrapperFileSystem(fakeFs, maxRetryIntervalMs, 
maxRetryNumbers,
             initialRetryIntervalMs, "");
@@ -114,7 +114,7 @@ public class TestFSUtilsWithRetryWrapperEnable extends 
TestFSUtils {
   @Test
   public void testGetDefaultReplication() {
     FakeRemoteFileSystem fakeFs = new FakeRemoteFileSystem(
-        HadoopFSUtils.getFs(metaClient.getMetaPath(), 
metaClient.getStorageConf()), 100);
+        HadoopFSUtils.getFs(metaClient.getMetaPath().toString(), 
metaClient.getStorageConf()), 100);
     FileSystem fileSystem =
         new HoodieRetryWrapperFileSystem(fakeFs, maxRetryIntervalMs, 
maxRetryNumbers,
             initialRetryIntervalMs, "");
diff --git 
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/TestHoodieTableConfig.java
 
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/TestHoodieTableConfig.java
index 297ddda2091..fe7e57c5443 100644
--- 
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/TestHoodieTableConfig.java
+++ 
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/TestHoodieTableConfig.java
@@ -79,7 +79,7 @@ public class TestHoodieTableConfig extends 
HoodieCommonTestHarness {
   public void testCreate() throws IOException {
     assertTrue(
         storage.exists(new StoragePath(metaPath, 
HoodieTableConfig.HOODIE_PROPERTIES_FILE)));
-    HoodieTableConfig config = new HoodieTableConfig(storage, 
metaPath.toString(), null, null);
+    HoodieTableConfig config = new HoodieTableConfig(storage, metaPath, null, 
null);
     assertEquals(6, config.getProps().size());
   }
 
@@ -92,7 +92,7 @@ public class TestHoodieTableConfig extends 
HoodieCommonTestHarness {
 
     assertTrue(storage.exists(cfgPath));
     assertFalse(storage.exists(backupCfgPath));
-    HoodieTableConfig config = new HoodieTableConfig(storage, 
metaPath.toString(), null, null);
+    HoodieTableConfig config = new HoodieTableConfig(storage, metaPath, null, 
null);
     assertEquals(7, config.getProps().size());
     assertEquals("test-table2", config.getTableName());
     assertEquals("new_field", config.getPreCombineField());
@@ -106,7 +106,7 @@ public class TestHoodieTableConfig extends 
HoodieCommonTestHarness {
 
     assertTrue(storage.exists(cfgPath));
     assertFalse(storage.exists(backupCfgPath));
-    HoodieTableConfig config = new HoodieTableConfig(storage, 
metaPath.toString(), null, null);
+    HoodieTableConfig config = new HoodieTableConfig(storage, metaPath, null, 
null);
     assertEquals(5, config.getProps().size());
     assertNull(config.getProps().getProperty("hoodie.invalid.config"));
     
assertFalse(config.getProps().contains(HoodieTableConfig.ARCHIVELOG_FOLDER.key()));
@@ -116,13 +116,13 @@ public class TestHoodieTableConfig extends 
HoodieCommonTestHarness {
   public void testReadsWhenPropsFileDoesNotExist() throws IOException {
     storage.deleteFile(cfgPath);
     assertThrows(HoodieIOException.class, () -> {
-      new HoodieTableConfig(storage, metaPath.toString(), null, null);
+      new HoodieTableConfig(storage, metaPath, null, null);
     });
   }
 
   @Test
   public void testReadsWithUpdateFailures() throws IOException {
-    HoodieTableConfig config = new HoodieTableConfig(storage, 
metaPath.toString(), null, null);
+    HoodieTableConfig config = new HoodieTableConfig(storage, metaPath, null, 
null);
     storage.deleteFile(cfgPath);
     try (OutputStream out = storage.create(backupCfgPath)) {
       config.getProps().store(out, "");
@@ -130,14 +130,14 @@ public class TestHoodieTableConfig extends 
HoodieCommonTestHarness {
 
     assertFalse(storage.exists(cfgPath));
     assertTrue(storage.exists(backupCfgPath));
-    config = new HoodieTableConfig(storage, metaPath.toString(), null, null);
+    config = new HoodieTableConfig(storage, metaPath, null, null);
     assertEquals(6, config.getProps().size());
   }
 
   @ParameterizedTest
   @ValueSource(booleans = {true, false})
   public void testUpdateRecovery(boolean shouldPropsFileExist) throws 
IOException {
-    HoodieTableConfig config = new HoodieTableConfig(storage, 
metaPath.toString(), null, null);
+    HoodieTableConfig config = new HoodieTableConfig(storage, metaPath, null, 
null);
     if (!shouldPropsFileExist) {
       storage.deleteFile(cfgPath);
     }
@@ -148,7 +148,7 @@ public class TestHoodieTableConfig extends 
HoodieCommonTestHarness {
     HoodieTableConfig.recoverIfNeeded(storage, cfgPath, backupCfgPath);
     assertTrue(storage.exists(cfgPath));
     assertFalse(storage.exists(backupCfgPath));
-    config = new HoodieTableConfig(storage, metaPath.toString(), null, null);
+    config = new HoodieTableConfig(storage, metaPath, null, null);
     assertEquals(6, config.getProps().size());
   }
 
@@ -156,11 +156,11 @@ public class TestHoodieTableConfig extends 
HoodieCommonTestHarness {
   public void testReadRetry() throws IOException {
     // When both the hoodie.properties and hoodie.properties.backup do not 
exist then the read fails
     storage.rename(cfgPath, new StoragePath(cfgPath.toString() + ".bak"));
-    assertThrows(HoodieIOException.class, () -> new HoodieTableConfig(storage, 
metaPath.toString(), null, null));
+    assertThrows(HoodieIOException.class, () -> new HoodieTableConfig(storage, 
metaPath, null, null));
 
     // Should return the backup config if hoodie.properties is not present
     storage.rename(new StoragePath(cfgPath.toString() + ".bak"), 
backupCfgPath);
-    new HoodieTableConfig(storage, metaPath.toString(), null, null);
+    new HoodieTableConfig(storage, metaPath, null, null);
 
     // Should return backup config if hoodie.properties is corrupted
     Properties props = new Properties();
@@ -168,14 +168,14 @@ public class TestHoodieTableConfig extends 
HoodieCommonTestHarness {
     try (OutputStream out = storage.create(cfgPath)) {
       props.store(out, "Wrong checksum in file so is invalid");
     }
-    new HoodieTableConfig(storage, metaPath.toString(), null, null);
+    new HoodieTableConfig(storage, metaPath, null, null);
 
     // Should throw exception if both hoodie.properties and backup are 
corrupted
     try (OutputStream out = storage.create(backupCfgPath)) {
       props.store(out, "Wrong checksum in file so is invalid");
     }
     assertThrows(IllegalArgumentException.class, () -> new 
HoodieTableConfig(storage,
-        metaPath.toString(), null, null));
+        metaPath, null, null));
   }
 
   @Test
@@ -193,7 +193,7 @@ public class TestHoodieTableConfig extends 
HoodieCommonTestHarness {
     Future readerFuture = executor.submit(() -> {
       for (int i = 0; i < 100; i++) {
         // Try to load the table properties, won't throw any exception
-        new HoodieTableConfig(storage, metaPath.toString(), null, null);
+        new HoodieTableConfig(storage, metaPath, null, null);
       }
     });
 
diff --git 
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/TestHoodieTableMetaClient.java
 
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/TestHoodieTableMetaClient.java
index decdb2d7d24..9bbc72289f5 100644
--- 
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/TestHoodieTableMetaClient.java
+++ 
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/TestHoodieTableMetaClient.java
@@ -59,7 +59,7 @@ public class TestHoodieTableMetaClient extends 
HoodieCommonTestHarness {
     assertEquals(HoodieTestUtils.RAW_TRIPS_TEST_NAME, 
metaClient.getTableConfig().getTableName(),
         "Table name should be raw_trips");
     assertEquals(basePath, metaClient.getBasePath(), "Basepath should be the 
one assigned");
-    assertEquals(basePath + "/.hoodie", metaClient.getMetaPath(),
+    assertEquals(basePath + "/.hoodie", metaClient.getMetaPath().toString(),
         "Metapath should be ${basepath}/.hoodie");
     
assertTrue(metaClient.getTableConfig().getProps().containsKey(HoodieTableConfig.TABLE_CHECKSUM.key()));
     
assertTrue(HoodieTableConfig.validateChecksum(metaClient.getTableConfig().getProps()));
diff --git 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieCopyOnWriteTableInputFormat.java
 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieCopyOnWriteTableInputFormat.java
index 2484df8daa4..33f9fdf829f 100644
--- 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieCopyOnWriteTableInputFormat.java
+++ 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieCopyOnWriteTableInputFormat.java
@@ -33,7 +33,6 @@ import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.hadoop.fs.HadoopFSUtils;
 import org.apache.hudi.hadoop.utils.HoodieHiveUtils;
 import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils;
-import org.apache.hudi.storage.StoragePath;
 
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
@@ -62,6 +61,7 @@ import java.util.Map;
 import java.util.Properties;
 import java.util.stream.Collectors;
 
+import static org.apache.hudi.hadoop.fs.HadoopFSUtils.convertToStoragePath;
 import static org.apache.hudi.common.util.ValidationUtils.checkState;
 
 /**
@@ -250,7 +250,7 @@ public class HoodieCopyOnWriteTableInputFormat extends 
HoodieTableInputFormat {
               tableMetaClient,
               props,
               HoodieTableQueryType.SNAPSHOT,
-              partitionPaths.stream().map(e -> new 
StoragePath(e.toUri())).collect(Collectors.toList()),
+              
partitionPaths.stream().map(HadoopFSUtils::convertToStoragePath).collect(Collectors.toList()),
               queryCommitInstant,
               shouldIncludePendingCommits);
 
diff --git 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieHFileRecordReader.java
 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieHFileRecordReader.java
index 4110f47385b..97177ab260d 100644
--- 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieHFileRecordReader.java
+++ 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieHFileRecordReader.java
@@ -43,6 +43,7 @@ import org.apache.hadoop.mapred.RecordReader;
 import java.io.IOException;
 
 import static org.apache.hudi.common.util.ConfigUtils.getReaderConfigs;
+import static org.apache.hudi.hadoop.fs.HadoopFSUtils.convertToStoragePath;
 
 public class HoodieHFileRecordReader implements RecordReader<NullWritable, 
ArrayWritable> {
 
@@ -54,7 +55,7 @@ public class HoodieHFileRecordReader implements 
RecordReader<NullWritable, Array
 
   public HoodieHFileRecordReader(Configuration conf, InputSplit split, JobConf 
job) throws IOException {
     FileSplit fileSplit = (FileSplit) split;
-    StoragePath path = new StoragePath(fileSplit.getPath().toUri());
+    StoragePath path = convertToStoragePath(fileSplit.getPath());
     HoodieConfig hoodieConfig = 
getReaderConfigs(HadoopFSUtils.getStorageConf(conf));
     reader = 
HoodieFileReaderFactory.getReaderFactory(HoodieRecord.HoodieRecordType.AVRO)
         .getFileReader(hoodieConfig, HadoopFSUtils.getStorageConf(conf), path, 
HoodieFileFormat.HFILE, Option.empty());
diff --git 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieROTablePathFilter.java
 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieROTablePathFilter.java
index d6a62f3a061..51d8a9f3af4 100644
--- 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieROTablePathFilter.java
+++ 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieROTablePathFilter.java
@@ -53,6 +53,7 @@ import java.util.stream.Collectors;
 import static org.apache.hudi.common.config.HoodieCommonConfig.TIMESTAMP_AS_OF;
 import static 
org.apache.hudi.common.table.timeline.TimelineUtils.validateTimestampAsOf;
 import static org.apache.hudi.common.util.StringUtils.nonEmpty;
+import static org.apache.hudi.hadoop.fs.HadoopFSUtils.convertToStoragePath;
 
 /**
  * Given a path is a part of - Hoodie table = accepts ONLY the latest version 
of each path - Non-Hoodie table = then
@@ -133,7 +134,7 @@ public class HoodieROTablePathFilter implements 
Configurable, PathFilter, Serial
     try {
       if (storage == null) {
         storage =
-            HoodieStorageUtils.getStorage(new StoragePath(path.toUri()), conf);
+            HoodieStorageUtils.getStorage(convertToStoragePath(path), conf);
       }
 
       // Assumes path is a file
@@ -166,8 +167,9 @@ public class HoodieROTablePathFilter implements 
Configurable, PathFilter, Serial
 
       // Perform actual checking.
       Path baseDir;
-      if (HoodiePartitionMetadata.hasPartitionMetadata(storage, new 
StoragePath(folder.toUri()))) {
-        HoodiePartitionMetadata metadata = new 
HoodiePartitionMetadata(storage, new StoragePath(folder.toUri()));
+      StoragePath storagePath = convertToStoragePath(folder);
+      if (HoodiePartitionMetadata.hasPartitionMetadata(storage, storagePath)) {
+        HoodiePartitionMetadata metadata = new 
HoodiePartitionMetadata(storage, storagePath);
         metadata.readFromFS();
         baseDir = HoodieHiveUtils.getNthParent(folder, 
metadata.getPartitionDepth());
       } else {
diff --git 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/SchemaEvolutionContext.java
 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/SchemaEvolutionContext.java
index 454aa519bd5..79829cc3917 100644
--- 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/SchemaEvolutionContext.java
+++ 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/SchemaEvolutionContext.java
@@ -71,6 +71,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
 
+import static org.apache.hudi.hadoop.fs.HadoopFSUtils.convertToStoragePath;
+
 /**
  * This class is responsible for calculating names and types of fields that 
are actual at a certain point in time for hive.
  * If field is renamed in queried schema, its old name will be returned, which 
is relevant at the provided time.
@@ -114,10 +116,9 @@ public class SchemaEvolutionContext {
   private HoodieTableMetaClient setUpHoodieTableMetaClient() throws 
IOException {
     try {
       Path inputPath = ((FileSplit) split).getPath();
-      StoragePath path = new StoragePath(inputPath.toString());
       FileSystem fs = inputPath.getFileSystem(job);
       HoodieStorage storage = HoodieStorageUtils.getStorage(fs);
-      Option<StoragePath> tablePath = TablePathUtils.getTablePath(storage, 
path);
+      Option<StoragePath> tablePath = TablePathUtils.getTablePath(storage, 
convertToStoragePath(inputPath));
       return 
HoodieTableMetaClient.builder().setBasePath(tablePath.get().toString())
           .setConf(HadoopFSUtils.getStorageConfWithCopy(job)).build();
     } catch (Exception e) {
diff --git 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieMergeOnReadTableInputFormat.java
 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieMergeOnReadTableInputFormat.java
index 2af8e92baab..fac2336836b 100644
--- 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieMergeOnReadTableInputFormat.java
+++ 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieMergeOnReadTableInputFormat.java
@@ -44,7 +44,6 @@ import org.apache.hudi.hadoop.fs.HadoopFSUtils;
 import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils;
 import org.apache.hudi.hadoop.utils.HoodieRealtimeInputFormatUtils;
 import org.apache.hudi.metadata.HoodieTableMetadataUtil;
-import org.apache.hudi.storage.StoragePath;
 import org.apache.hudi.storage.StoragePathInfo;
 
 import org.apache.avro.Schema;
@@ -194,7 +193,7 @@ public class HoodieMergeOnReadTableInputFormat extends 
HoodieCopyOnWriteTableInp
 
     // build fileGroup from fsView
     List<StoragePathInfo> affectedPathInfoList = HoodieInputFormatUtils
-        .listAffectedFilesForCommits(job, new 
StoragePath(tableMetaClient.getBasePath()),
+        .listAffectedFilesForCommits(job, tableMetaClient.getBasePathV2(),
             metadataList);
     // step3
     HoodieTableFileSystemView fsView = new HoodieTableFileSystemView(
diff --git 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieInputFormatUtils.java
 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieInputFormatUtils.java
index 9db661daf81..6945b241e0a 100644
--- 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieInputFormatUtils.java
+++ 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieInputFormatUtils.java
@@ -82,6 +82,7 @@ import static 
org.apache.hudi.common.config.HoodieMetadataConfig.DEFAULT_METADAT
 import static org.apache.hudi.common.config.HoodieMetadataConfig.ENABLE;
 import static 
org.apache.hudi.common.table.HoodieTableMetaClient.METAFOLDER_NAME;
 import static 
org.apache.hudi.common.table.timeline.TimelineUtils.handleHollowCommitIfNeeded;
+import static org.apache.hudi.hadoop.fs.HadoopFSUtils.convertToStoragePath;
 
 public class HoodieInputFormatUtils {
 
@@ -360,14 +361,15 @@ public class HoodieInputFormatUtils {
     Path baseDir = partitionPath;
     HoodieStorage storage = HoodieStorageUtils.getStorage(
         partitionPath.toString(), HadoopFSUtils.getStorageConf(conf));
-    if (HoodiePartitionMetadata.hasPartitionMetadata(storage, new 
StoragePath(partitionPath.toUri()))) {
-      HoodiePartitionMetadata metadata = new HoodiePartitionMetadata(storage, 
new StoragePath(partitionPath.toUri()));
+    StoragePath partitionStoragePath = convertToStoragePath(partitionPath);
+    if (HoodiePartitionMetadata.hasPartitionMetadata(storage,  
partitionStoragePath)) {
+      HoodiePartitionMetadata metadata = new HoodiePartitionMetadata(storage, 
partitionStoragePath);
       metadata.readFromFS();
       int levels = metadata.getPartitionDepth();
       baseDir = HoodieHiveUtils.getNthParent(partitionPath, levels);
     } else {
       for (int i = 0; i < partitionPath.depth(); i++) {
-        if (storage.exists(new StoragePath(new StoragePath(baseDir.toUri()), 
METAFOLDER_NAME))) {
+        if (storage.exists(new StoragePath(convertToStoragePath(baseDir), 
METAFOLDER_NAME))) {
           break;
         } else if (i == partitionPath.depth() - 1) {
           throw new TableNotFoundException(partitionPath.toString());
diff --git 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeRecordReaderUtils.java
 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeRecordReaderUtils.java
index f160307dcf9..666e51b81de 100644
--- 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeRecordReaderUtils.java
+++ 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeRecordReaderUtils.java
@@ -26,7 +26,6 @@ import org.apache.hudi.hadoop.config.HoodieRealtimeConfig;
 import org.apache.hudi.hadoop.fs.HadoopFSUtils;
 import org.apache.hudi.io.storage.HoodieFileReader;
 import org.apache.hudi.io.storage.HoodieFileReaderFactory;
-import org.apache.hudi.storage.StoragePath;
 
 import org.apache.avro.JsonProperties;
 import org.apache.avro.LogicalType;
@@ -67,6 +66,7 @@ import java.util.stream.Collectors;
 import static org.apache.hudi.avro.AvroSchemaUtils.appendFieldsToSchema;
 import static org.apache.hudi.avro.AvroSchemaUtils.createNullableSchema;
 import static org.apache.hudi.common.util.ConfigUtils.getReaderConfigs;
+import static org.apache.hudi.hadoop.fs.HadoopFSUtils.convertToStoragePath;
 
 public class HoodieRealtimeRecordReaderUtils {
   private static final Logger LOG = 
LoggerFactory.getLogger(HoodieRealtimeRecordReaderUtils.class);
@@ -308,7 +308,7 @@ public class HoodieRealtimeRecordReaderUtils {
   public static HoodieFileReader getBaseFileReader(Path path, JobConf conf) 
throws IOException {
     HoodieConfig hoodieConfig = 
getReaderConfigs(HadoopFSUtils.getStorageConf(conf));
     return 
HoodieFileReaderFactory.getReaderFactory(HoodieRecord.HoodieRecordType.AVRO)
-        .getFileReader(hoodieConfig, HadoopFSUtils.getStorageConf(conf), new 
StoragePath(path.toUri()));
+        .getFileReader(hoodieConfig, HadoopFSUtils.getStorageConf(conf), 
convertToStoragePath(path));
   }
 
   private static Schema appendNullSchemaFields(Schema schema, List<String> 
newFieldNames) {
diff --git 
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/reader/DFSHoodieDatasetInputReader.java
 
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/reader/DFSHoodieDatasetInputReader.java
index 298618e60c6..0fcae011638 100644
--- 
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/reader/DFSHoodieDatasetInputReader.java
+++ 
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/reader/DFSHoodieDatasetInputReader.java
@@ -44,7 +44,6 @@ import org.apache.hudi.config.HoodieMemoryConfig;
 import org.apache.hudi.hadoop.fs.HadoopFSUtils;
 import org.apache.hudi.io.storage.HoodieAvroFileReader;
 import org.apache.hudi.io.storage.HoodieFileReaderFactory;
-import org.apache.hudi.storage.StoragePath;
 
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericRecord;
@@ -279,7 +278,7 @@ public class DFSHoodieDatasetInputReader extends 
DFSDeltaInputReader {
           .getFileReader(
               DEFAULT_HUDI_CONFIG_FOR_READER,
               metaClient.getStorageConf(),
-              new StoragePath(fileSlice.getBaseFile().get().getPath())));
+              fileSlice.getBaseFile().get().getStoragePath()));
       return new CloseableMappingIterator<>(reader.getRecordIterator(schema), 
HoodieRecord::getData);
     } else {
       // If there is no data file, fall back to reading log files
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala
index cafed4e5e70..ee815188d8e 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala
@@ -46,12 +46,12 @@ import 
org.apache.hudi.internal.schema.utils.{InternalSchemaUtils, SerDeHelper}
 import org.apache.hudi.io.storage.HoodieFileReaderFactory
 import org.apache.hudi.metadata.HoodieTableMetadata
 import org.apache.hudi.storage.{StoragePath, StoragePathInfo}
-
 import org.apache.avro.Schema
 import org.apache.avro.generic.GenericRecord
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{FileStatus, Path}
 import org.apache.hadoop.mapred.JobConf
+import org.apache.hudi.hadoop.fs.HadoopFSUtils.convertToStoragePath
 import org.apache.spark.execution.datasources.HoodieInMemoryFileIndex
 import org.apache.spark.internal.Logging
 import org.apache.spark.rdd.RDD
@@ -429,7 +429,7 @@ abstract class HoodieBaseRelation(val sqlContext: 
SQLContext,
             .asJava)
 
         fsView.getPartitionPaths.asScala.flatMap { partitionPath =>
-          val relativePath = getRelativePartitionPath(new 
StoragePath(basePath.toUri), partitionPath)
+          val relativePath = 
getRelativePartitionPath(convertToStoragePath(basePath), partitionPath)
           fsView.getLatestMergedFileSlicesBeforeOrOn(relativePath, 
ts).iterator().asScala
         }.toSeq
 
@@ -487,14 +487,15 @@ abstract class HoodieBaseRelation(val sqlContext: 
SQLContext,
   protected def getPartitionColumnsAsInternalRowInternal(file: 
StoragePathInfo, basePath: Path,
                                                          
extractPartitionValuesFromPartitionPath: Boolean): InternalRow = {
     if (extractPartitionValuesFromPartitionPath) {
-      val tablePathWithoutScheme = new 
StoragePath(basePath.toUri).getPathWithoutSchemeAndAuthority
-      val partitionPathWithoutScheme = new 
StoragePath(file.getPath.getParent.toUri).getPathWithoutSchemeAndAuthority
+      val baseStoragePath = convertToStoragePath(basePath)
+      val tablePathWithoutScheme = 
baseStoragePath.getPathWithoutSchemeAndAuthority
+      val partitionPathWithoutScheme = 
file.getPath.getParent.getPathWithoutSchemeAndAuthority
       val relativePath = 
tablePathWithoutScheme.toUri.relativize(partitionPathWithoutScheme.toUri).toString
       val timeZoneId = conf.get("timeZone", 
sparkSession.sessionState.conf.sessionLocalTimeZone)
       val rowValues = HoodieSparkUtils.parsePartitionColumnValues(
         partitionColumns,
         relativePath,
-        new StoragePath(basePath.toUri),
+        baseStoragePath,
         tableStructSchema,
         timeZoneId,
         sparkAdapter.getSparkParsePartitionUtil,
diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/DedupeSparkJob.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/DedupeSparkJob.scala
index 3a498d98a96..761f2ae49b9 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/DedupeSparkJob.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/DedupeSparkJob.scala
@@ -27,6 +27,7 @@ import org.apache.hudi.storage.{HoodieStorage, 
StorageConfiguration, StoragePath
 
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{FileSystem, Path}
+import org.apache.hudi.hadoop.fs.HadoopFSUtils.convertToStoragePath
 import org.apache.spark.sql.{DataFrame, Row, SQLContext}
 import org.slf4j.LoggerFactory
 
@@ -80,7 +81,7 @@ class DedupeSparkJob(basePath: String,
       .setConf(storage.getConf.newInstance())
       .setBasePath(basePath).build()
 
-    val allFiles = storage.listDirectEntries(new 
StoragePath(s"$basePath/$duplicatedPartitionPath"))
+    val allFiles = storage.listDirectEntries(new StoragePath(basePath, 
duplicatedPartitionPath))
     val fsView = new HoodieTableFileSystemView(metadata, 
metadata.getActiveTimeline.getCommitsTimeline.filterCompletedInstants(), 
allFiles)
     val latestFiles: java.util.List[HoodieBaseFile] = 
fsView.getLatestBaseFiles().collect(Collectors.toList[HoodieBaseFile]())
     val filteredStatuses = latestFiles.asScala.map(f => f.getPath)
@@ -191,7 +192,7 @@ class DedupeSparkJob(basePath: String,
       .setConf(storage.getConf.newInstance())
       .setBasePath(basePath).build()
 
-    val allFiles = storage.listDirectEntries(new 
StoragePath(s"$basePath/$duplicatedPartitionPath"))
+    val allFiles = storage.listDirectEntries(new StoragePath(basePath, 
duplicatedPartitionPath))
     val fsView = new HoodieTableFileSystemView(metadata, 
metadata.getActiveTimeline.getCommitsTimeline.filterCompletedInstants(), 
allFiles)
 
     val latestFiles: java.util.List[HoodieBaseFile] = 
fsView.getLatestBaseFiles().collect(Collectors.toList[HoodieBaseFile]())
@@ -204,8 +205,8 @@ class DedupeSparkJob(basePath: String,
       val badSuffix = if (dupeFixPlan.contains(fileName)) ".bad" else ""
       val dstPath = new 
Path(s"$repairOutputPath/${filePath.getName}$badSuffix")
       LOG.info(s"Copying from $filePath to $dstPath")
-      FileIOUtils.copy(storage, new StoragePath(filePath.toUri), storage,
-        new StoragePath(dstPath.toUri), false, true)
+      FileIOUtils.copy(storage, convertToStoragePath(filePath), storage,
+        convertToStoragePath(dstPath), false, true)
     }
 
     // 2. Remove duplicates from the bad files
@@ -216,7 +217,7 @@ class DedupeSparkJob(basePath: String,
       LOG.info(" Skipping and writing new file for : " + fileName)
       SparkHelpers.skipKeysAndWriteNewFile(instantTime,
         storage.getConf.asInstanceOf[StorageConfiguration[Configuration]], 
storage, badFilePath, newFilePath, dupeFixPlan(fileName))
-      storage.deleteFile(new StoragePath(badFilePath.toUri))
+      storage.deleteFile(badFilePath)
     }
 
     // 3. Check that there are no duplicates anymore.
@@ -249,8 +250,8 @@ class DedupeSparkJob(basePath: String,
       } else {
         // for real
         LOG.info(s"[FOR REAL!!!] Copying from $srcPath to $dstPath")
-        FileIOUtils.copy(storage, new StoragePath(srcPath.toUri), storage,
-          new StoragePath(dstPath.toUri), false, true)
+        FileIOUtils.copy(storage, convertToStoragePath(srcPath), storage,
+          convertToStoragePath(dstPath), false, true)
       }
     }
   }
diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ExportInstantsProcedure.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ExportInstantsProcedure.scala
index abcd13105dc..68d9c93fc7b 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ExportInstantsProcedure.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ExportInstantsProcedure.scala
@@ -33,6 +33,7 @@ import org.apache.hudi.storage.{HoodieStorage, 
HoodieStorageUtils, StoragePath}
 import org.apache.avro.generic.GenericRecord
 import org.apache.avro.specific.SpecificData
 import org.apache.hadoop.fs.{FileStatus, Path}
+import org.apache.hudi.hadoop.fs.HadoopFSUtils.convertToStoragePath
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.Row
 import org.apache.spark.sql.types.{DataTypes, Metadata, StructField, 
StructType}
@@ -118,7 +119,7 @@ class ExportInstantsProcedure extends BaseProcedure with 
ProcedureBuilder with L
     for (fs <- statuses.asScala) {
       // read the archived file
       val reader = HoodieLogFormat.newReader(
-        storage, new HoodieLogFile(new StoragePath(fs.getPath.toUri)), 
HoodieArchivedMetaEntry.getClassSchema)
+        storage, new HoodieLogFile(convertToStoragePath(fs.getPath)), 
HoodieArchivedMetaEntry.getClassSchema)
       // read the avro blocks
       while ( {
         reader.hasNext && copyCount < limit
diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RepairMigratePartitionMetaProcedure.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RepairMigratePartitionMetaProcedure.scala
index 60cc9714a55..b9f43e12e66 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RepairMigratePartitionMetaProcedure.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RepairMigratePartitionMetaProcedure.scala
@@ -98,7 +98,7 @@ class RepairMigratePartitionMetaProcedure extends 
BaseProcedure with ProcedureBu
     }
     val props: Properties = new Properties
     
props.setProperty(HoodieTableConfig.PARTITION_METAFILE_USE_BASE_FORMAT.key, 
"true")
-    HoodieTableConfig.update(metaClient.getStorage, new 
StoragePath(metaClient.getMetaPath), props)
+    HoodieTableConfig.update(metaClient.getStorage, metaClient.getMetaPath, 
props)
 
     rows.stream().toArray().map(r => r.asInstanceOf[Row]).toList
   }
diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RepairOverwriteHoodiePropsProcedure.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RepairOverwriteHoodiePropsProcedure.scala
index 07b4992dbc8..3273c737747 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RepairOverwriteHoodiePropsProcedure.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RepairOverwriteHoodiePropsProcedure.scala
@@ -17,10 +17,8 @@
 
 package org.apache.spark.sql.hudi.command.procedures
 
-import org.apache.hudi.common.table.HoodieTableMetaClient.METAFOLDER_NAME
 import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient}
 import org.apache.hudi.hadoop.fs.HadoopFSUtils
-import org.apache.hudi.storage.StoragePath
 
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.Path
@@ -70,8 +68,7 @@ class RepairOverwriteHoodiePropsProcedure extends 
BaseProcedure with ProcedureBu
     var newProps = new Properties
     loadNewProps(overwriteFilePath, newProps)
     val oldProps = metaClient.getTableConfig.propsMap
-    val metaPathDir = new StoragePath(tablePath, METAFOLDER_NAME)
-    HoodieTableConfig.create(metaClient.getStorage, metaPathDir, newProps)
+    HoodieTableConfig.create(metaClient.getStorage, metaClient.getMetaPath, 
newProps)
     // reload new props as checksum would have been added
     newProps = HoodieTableMetaClient.reload(metaClient).getTableConfig.getProps
 
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/TestSqlConf.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/TestSqlConf.scala
index a47b756c4b2..adce16e7193 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/TestSqlConf.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/TestSqlConf.scala
@@ -22,10 +22,8 @@ import 
org.apache.hudi.common.config.DFSPropertiesConfiguration
 import org.apache.hudi.common.model.HoodieTableType
 import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient}
 import org.apache.hudi.common.testutils.HoodieTestUtils
-import org.apache.hudi.storage.HoodieStorageUtils
+import org.apache.hudi.storage.{HoodieStorageUtils, StoragePath}
 import org.apache.hudi.testutils.HoodieClientTestUtils.createMetaClient
-
-import org.apache.hadoop.conf.Configuration
 import org.scalatest.BeforeAndAfter
 
 import java.io.File
@@ -83,7 +81,7 @@ class TestSqlConf extends HoodieSparkSqlTestBase with 
BeforeAndAfter {
       assertResult(true)(Files.exists(Paths.get(s"$tablePath/$partitionVal")))
       assertResult(HoodieTableType.MERGE_ON_READ)(new HoodieTableConfig(
         HoodieStorageUtils.getStorage(tablePath, 
HoodieTestUtils.getDefaultStorageConf),
-        s"$tablePath/" + HoodieTableMetaClient.METAFOLDER_NAME,
+        new StoragePath(tablePath, HoodieTableMetaClient.METAFOLDER_NAME),
         HoodieTableConfig.PAYLOAD_CLASS_NAME.defaultValue,
         HoodieTableConfig.RECORD_MERGER_STRATEGY.defaultValue).getTableType)
 
diff --git 
a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/marker/MarkerBasedEarlyConflictDetectionRunnable.java
 
b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/marker/MarkerBasedEarlyConflictDetectionRunnable.java
index 11213b56e26..bce28e8ae9c 100644
--- 
a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/marker/MarkerBasedEarlyConflictDetectionRunnable.java
+++ 
b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/marker/MarkerBasedEarlyConflictDetectionRunnable.java
@@ -88,7 +88,7 @@ public class MarkerBasedEarlyConflictDetectionRunnable 
implements Runnable {
       // and the markers from the requests pending processing.
       currentInstantAllMarkers.addAll(markerHandler.getAllMarkers(markerDir));
       currentInstantAllMarkers.addAll(pendingMarkers);
-      StoragePath tempPath = new StoragePath(basePath + StoragePath.SEPARATOR 
+ HoodieTableMetaClient.TEMPFOLDER_NAME);
+      StoragePath tempPath = new StoragePath(basePath, 
HoodieTableMetaClient.TEMPFOLDER_NAME);
 
       List<StoragePath> instants = MarkerUtils.getAllMarkerDir(tempPath, 
storage);
 

Reply via email to