This is an automated email from the ASF dual-hosted git repository. yihua pushed a commit to branch branch-0.x in repository https://gitbox.apache.org/repos/asf/hudi.git
commit 6a9b1d98c6b537aa449f7a6d3e573726f9f79e74 Author: Jon Vexler <[email protected]> AuthorDate: Wed May 15 06:46:17 2024 -0700 [HUDI-7743] Improve StoragePath usages (#11189) Co-authored-by: Jonathan Vexler <=> Co-authored-by: Y Ethan Guo <[email protected]> --- .../hudi/cli/commands/ArchivedCommitsCommand.java | 19 ++++++++-------- .../apache/hudi/cli/commands/RepairsCommand.java | 11 ++++----- .../org/apache/hudi/cli/commands/TableCommand.java | 11 +++------ .../apache/hudi/cli/commands/TimelineCommand.java | 4 ++-- .../apache/hudi/cli/commands/TestTableCommand.java | 4 ++-- .../cli/commands/TestUpgradeDowngradeCommand.java | 4 ++-- .../hudi/client/heartbeat/HeartbeatUtils.java | 2 +- .../client/heartbeat/HoodieHeartbeatClient.java | 4 ++-- .../index/bucket/ConsistentBucketIndexUtils.java | 8 +++---- .../org/apache/hudi/io/HoodieKeyLookupHandle.java | 3 +-- .../java/org/apache/hudi/io/HoodieReadHandle.java | 5 ++--- .../java/org/apache/hudi/io/HoodieWriteHandle.java | 2 +- .../metadata/HoodieBackedTableMetadataWriter.java | 3 +-- .../java/org/apache/hudi/table/HoodieTable.java | 4 ++-- .../table/action/commit/HoodieMergeHelper.java | 3 +-- .../table/action/index/RunIndexActionExecutor.java | 3 +-- .../rollback/ListingBasedRollbackStrategy.java | 4 ++-- .../hudi/table/upgrade/UpgradeDowngrade.java | 6 ++--- .../table/upgrade/ZeroToOneUpgradeHandler.java | 2 +- .../apache/hudi/io/FlinkWriteHandleFactory.java | 4 +++- .../io/storage/row/HoodieRowDataCreateHandle.java | 7 ++++-- .../row/HoodieRowDataFileWriterFactory.java | 4 ++-- .../org/apache/hudi/table/HoodieJavaTable.java | 5 ++--- .../index/bloom/HoodieFileProbingFunction.java | 3 +-- .../org/apache/hudi/table/HoodieSparkTable.java | 5 ++--- .../functional/TestHoodieBackedMetadata.java | 4 ++-- .../TestHoodieSparkMergeOnReadTableRollback.java | 4 ++-- .../hudi/table/upgrade/TestUpgradeDowngrade.java | 16 ++++++------- .../java/org/apache/hudi/common/fs/FSUtils.java | 2 +- .../common/heartbeat/HoodieHeartbeatUtils.java | 2 +- .../hudi/common/table/HoodieTableConfig.java | 8 +++---- .../hudi/common/table/HoodieTableMetaClient.java | 6 ++--- .../table/timeline/HoodieActiveTimeline.java | 4 ++-- .../view/HoodieTablePreCommitFileSystemView.java | 2 +- .../io/FileBasedInternalSchemaStorageManager.java | 5 ++--- .../metadata/FileSystemBackedTableMetadata.java | 2 +- .../hudi/metadata/HoodieBackedTableMetadata.java | 4 ++-- .../secondary/index/SecondaryIndexManager.java | 7 +++--- .../hudi/sink/bootstrap/BootstrapOperator.java | 3 +-- .../java/org/apache/hudi/util/StreamerUtil.java | 2 +- .../hudi/sink/bucket/ITTestBucketStreamWrite.java | 2 +- .../common/config/DFSPropertiesConfiguration.java | 2 +- .../common/bootstrap/index/TestBootstrapIndex.java | 3 +-- .../fs/TestFSUtilsWithRetryWrapperEnable.java | 8 +++---- .../hudi/common/table/TestHoodieTableConfig.java | 26 +++++++++++----------- .../common/table/TestHoodieTableMetaClient.java | 2 +- .../hadoop/HoodieCopyOnWriteTableInputFormat.java | 4 ++-- .../hudi/hadoop/HoodieHFileRecordReader.java | 3 ++- .../hudi/hadoop/HoodieROTablePathFilter.java | 8 ++++--- .../apache/hudi/hadoop/SchemaEvolutionContext.java | 5 +++-- .../HoodieMergeOnReadTableInputFormat.java | 3 +-- .../hudi/hadoop/utils/HoodieInputFormatUtils.java | 8 ++++--- .../utils/HoodieRealtimeRecordReaderUtils.java | 4 ++-- .../reader/DFSHoodieDatasetInputReader.java | 3 +-- .../scala/org/apache/hudi/HoodieBaseRelation.scala | 11 ++++----- .../org/apache/spark/sql/hudi/DedupeSparkJob.scala | 15 +++++++------ .../procedures/ExportInstantsProcedure.scala | 3 ++- .../RepairMigratePartitionMetaProcedure.scala | 2 +- .../RepairOverwriteHoodiePropsProcedure.scala | 5 +---- .../apache/spark/sql/hudi/common/TestSqlConf.scala | 6 ++--- .../MarkerBasedEarlyConflictDetectionRunnable.java | 2 +- 61 files changed, 156 insertions(+), 170 deletions(-) diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/ArchivedCommitsCommand.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/ArchivedCommitsCommand.java index 921d12fb663..50e71f370db 100644 --- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/ArchivedCommitsCommand.java +++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/ArchivedCommitsCommand.java @@ -37,6 +37,7 @@ import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.collection.ClosableIterator; import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.storage.HoodieStorage; import org.apache.hudi.storage.StoragePathInfo; import org.apache.hudi.storage.StoragePath; import org.apache.hudi.storage.HoodieStorageUtils; @@ -105,19 +106,17 @@ public class ArchivedCommitsCommand { defaultValue = "false") final boolean headerOnly) throws IOException { System.out.println("===============> Showing only " + limit + " archived commits <==============="); - String basePath = HoodieCLI.getTableMetaClient().getBasePath(); - StoragePath archivePath = new StoragePath( - HoodieCLI.getTableMetaClient().getArchivePath() + "/.commits_.archive*"); - if (folder != null && !folder.isEmpty()) { - archivePath = new StoragePath(basePath + "/.hoodie/" + folder); - } - List<StoragePathInfo> pathInfoList = - HoodieStorageUtils.getStorage(basePath, HoodieCLI.conf).globEntries(archivePath); + HoodieTableMetaClient metaClient = HoodieCLI.getTableMetaClient(); + StoragePath archivePath = folder != null && !folder.isEmpty() + ? new StoragePath(metaClient.getMetaPath(), folder) + : new StoragePath(metaClient.getArchivePath(), ".commits_.archive*"); + HoodieStorage storage = HoodieStorageUtils.getStorage(metaClient.getBasePathV2(), HoodieCLI.conf); + List<StoragePathInfo> pathInfoList = storage.globEntries(archivePath); List<Comparable[]> allStats = new ArrayList<>(); for (StoragePathInfo pathInfo : pathInfoList) { // read the archived file - try (Reader reader = HoodieLogFormat.newReader(HoodieStorageUtils.getStorage(basePath, HoodieCLI.conf), - new HoodieLogFile(pathInfo.getPath()), HoodieArchivedMetaEntry.getClassSchema())) { + try (Reader reader = HoodieLogFormat.newReader(storage, new HoodieLogFile(pathInfo.getPath()), + HoodieArchivedMetaEntry.getClassSchema())) { List<IndexedRecord> readRecords = new ArrayList<>(); // read the avro blocks while (reader.hasNext()) { diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/RepairsCommand.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/RepairsCommand.java index 0eedbf964fe..8783e749057 100644 --- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/RepairsCommand.java +++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/RepairsCommand.java @@ -57,8 +57,6 @@ import java.util.stream.Collectors; import scala.collection.JavaConverters; -import static org.apache.hudi.common.table.HoodieTableMetaClient.METAFOLDER_NAME; - /** * CLI command to display and trigger repair options. */ @@ -123,7 +121,7 @@ public class RepairsCommand { client.getActiveTimeline().getCommitTimeline().lastInstant().get().getTimestamp(); List<String> partitionPaths = FSUtils.getAllPartitionFoldersThreeLevelsDown(HoodieCLI.storage, client.getBasePath()); - StoragePath basePath = new StoragePath(client.getBasePath()); + StoragePath basePath = client.getBasePathV2(); String[][] rows = new String[partitionPaths.size()][]; int ind = 0; @@ -163,8 +161,7 @@ public class RepairsCommand { newProps.load(fileInputStream); } Map<String, String> oldProps = client.getTableConfig().propsMap(); - StoragePath metaPathDir = new StoragePath(client.getBasePath(), METAFOLDER_NAME); - HoodieTableConfig.create(client.getStorage(), metaPathDir, newProps); + HoodieTableConfig.create(client.getStorage(), client.getMetaPath(), newProps); // reload new props as checksum would have been added newProps = HoodieTableMetaClient.reload(HoodieCLI.getTableMetaClient()).getTableConfig().getProps(); @@ -230,7 +227,7 @@ public class RepairsCommand { HoodieLocalEngineContext engineContext = new HoodieLocalEngineContext(HoodieCLI.conf); HoodieTableMetaClient client = HoodieCLI.getTableMetaClient(); List<String> partitionPaths = FSUtils.getAllPartitionPaths(engineContext, client.getBasePath(), false, false); - StoragePath basePath = new StoragePath(client.getBasePath()); + StoragePath basePath = client.getBasePathV2(); String[][] rows = new String[partitionPaths.size()][]; int ind = 0; @@ -276,7 +273,7 @@ public class RepairsCommand { Properties props = new Properties(); props.setProperty(HoodieTableConfig.PARTITION_METAFILE_USE_BASE_FORMAT.key(), "true"); - HoodieTableConfig.update(HoodieCLI.storage, new StoragePath(client.getMetaPath()), props); + HoodieTableConfig.update(HoodieCLI.storage, client.getMetaPath(), props); return HoodiePrintHelper.print(new String[] { HoodieTableHeaderFields.HEADER_PARTITION_PATH, diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/TableCommand.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/TableCommand.java index c0e6a2cc801..9c1946ae171 100644 --- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/TableCommand.java +++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/TableCommand.java @@ -27,7 +27,6 @@ import org.apache.hudi.common.table.HoodieTableConfig; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.TableSchemaResolver; import org.apache.hudi.exception.TableNotFoundException; -import org.apache.hudi.storage.StoragePath; import org.apache.avro.Schema; import org.slf4j.Logger; @@ -51,7 +50,6 @@ import java.util.Set; import java.util.TreeSet; import java.util.stream.Collectors; -import static org.apache.hudi.common.table.HoodieTableMetaClient.METAFOLDER_NAME; import static org.apache.hudi.common.util.StringUtils.getUTF8Bytes; /** @@ -189,8 +187,7 @@ public class TableCommand { public String recoverTableConfig() throws IOException { HoodieCLI.refreshTableMetadata(); HoodieTableMetaClient client = HoodieCLI.getTableMetaClient(); - StoragePath metaPathDir = new StoragePath(client.getBasePath(), METAFOLDER_NAME); - HoodieTableConfig.recover(client.getStorage(), metaPathDir); + HoodieTableConfig.recover(client.getStorage(), client.getMetaPath()); return descTable(); } @@ -205,8 +202,7 @@ public class TableCommand { try (FileInputStream fileInputStream = new FileInputStream(updatePropsFilePath)) { updatedProps.load(fileInputStream); } - StoragePath metaPathDir = new StoragePath(client.getBasePath(), METAFOLDER_NAME); - HoodieTableConfig.update(client.getStorage(), metaPathDir, updatedProps); + HoodieTableConfig.update(client.getStorage(), client.getMetaPath(), updatedProps); HoodieCLI.refreshTableMetadata(); Map<String, String> newProps = HoodieCLI.getTableMetaClient().getTableConfig().propsMap(); @@ -221,8 +217,7 @@ public class TableCommand { Map<String, String> oldProps = client.getTableConfig().propsMap(); Set<String> deleteConfigs = Arrays.stream(csConfigs.split(",")).collect(Collectors.toSet()); - StoragePath metaPathDir = new StoragePath(client.getBasePath(), METAFOLDER_NAME); - HoodieTableConfig.delete(client.getStorage(), metaPathDir, deleteConfigs); + HoodieTableConfig.delete(client.getStorage(), client.getMetaPath(), deleteConfigs); HoodieCLI.refreshTableMetadata(); Map<String, String> newProps = HoodieCLI.getTableMetaClient().getTableConfig().propsMap(); diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/TimelineCommand.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/TimelineCommand.java index 6dbba62af49..8cb6fb72180 100644 --- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/TimelineCommand.java +++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/TimelineCommand.java @@ -174,10 +174,10 @@ public class TimelineCommand { } private Map<String, Map<HoodieInstant.State, HoodieInstantWithModTime>> getInstantInfoFromTimeline( - HoodieStorage storage, String metaPath) throws IOException { + HoodieStorage storage, StoragePath metaPath) throws IOException { Map<String, Map<HoodieInstant.State, HoodieInstantWithModTime>> instantMap = new HashMap<>(); Stream<HoodieInstantWithModTime> instantStream = - HoodieTableMetaClient.scanFiles(storage, new StoragePath(metaPath), path -> { + HoodieTableMetaClient.scanFiles(storage, metaPath, path -> { // Include only the meta files with extensions that needs to be included String extension = HoodieInstant.getTimelineFileExtension(path.getName()); return HoodieActiveTimeline.VALID_EXTENSIONS_IN_ACTIVE_TIMELINE.contains(extension); diff --git a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestTableCommand.java b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestTableCommand.java index 9dc4852e30d..c3bbbef0cf4 100644 --- a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestTableCommand.java +++ b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestTableCommand.java @@ -131,7 +131,7 @@ public class TestTableCommand extends CLIFunctionalTestHarness { HoodieTableMetaClient client = HoodieCLI.getTableMetaClient(); assertEquals(archivePath, client.getArchivePath()); assertEquals(tablePath, client.getBasePath()); - assertEquals(metaPath, client.getMetaPath()); + assertEquals(metaPath, client.getMetaPath().toString()); assertEquals(HoodieTableType.COPY_ON_WRITE, client.getTableType()); assertEquals(new Integer(1), client.getTimelineLayoutVersion().getVersion()); } @@ -149,7 +149,7 @@ public class TestTableCommand extends CLIFunctionalTestHarness { HoodieTableMetaClient client = HoodieCLI.getTableMetaClient(); assertEquals(metaPath + StoragePath.SEPARATOR + "archive", client.getArchivePath()); assertEquals(tablePath, client.getBasePath()); - assertEquals(metaPath, client.getMetaPath()); + assertEquals(metaPath, client.getMetaPath().toString()); assertEquals(HoodieTableType.MERGE_ON_READ, client.getTableType()); } diff --git a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestUpgradeDowngradeCommand.java b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestUpgradeDowngradeCommand.java index 5211da14b18..9d1169b4245 100644 --- a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestUpgradeDowngradeCommand.java +++ b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestUpgradeDowngradeCommand.java @@ -117,7 +117,7 @@ public class TestUpgradeDowngradeCommand extends CLIFunctionalTestHarness { metaClient.getTableConfig().setTableVersion(HoodieTableVersion.FIVE); try (OutputStream os = metaClient.getStorage().create( new StoragePath( - metaClient.getMetaPath() + "/" + HoodieTableConfig.HOODIE_PROPERTIES_FILE), + metaClient.getMetaPath(), HoodieTableConfig.HOODIE_PROPERTIES_FILE), true)) { metaClient.getTableConfig().getProps().store(os, ""); } @@ -167,7 +167,7 @@ public class TestUpgradeDowngradeCommand extends CLIFunctionalTestHarness { private void assertTableVersionFromPropertyFile(HoodieTableVersion expectedVersion) throws IOException { StoragePath propertyFile = new StoragePath( - metaClient.getMetaPath() + "/" + HoodieTableConfig.HOODIE_PROPERTIES_FILE); + metaClient.getMetaPath(), HoodieTableConfig.HOODIE_PROPERTIES_FILE); // Load the properties and verify InputStream inputStream = metaClient.getStorage().open(propertyFile); HoodieConfig config = new HoodieConfig(); diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/heartbeat/HeartbeatUtils.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/heartbeat/HeartbeatUtils.java index e7e8e6c1b5a..dcdc45932c2 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/heartbeat/HeartbeatUtils.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/heartbeat/HeartbeatUtils.java @@ -54,7 +54,7 @@ public class HeartbeatUtils { boolean deleted = false; try { String heartbeatFolderPath = HoodieTableMetaClient.getHeartbeatFolderPath(basePath); - deleted = storage.deleteFile(new StoragePath(heartbeatFolderPath + StoragePath.SEPARATOR + instantTime)); + deleted = storage.deleteFile(new StoragePath(heartbeatFolderPath, instantTime)); if (!deleted) { LOG.error("Failed to delete heartbeat for instant " + instantTime); } else { diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/heartbeat/HoodieHeartbeatClient.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/heartbeat/HoodieHeartbeatClient.java index 460ebdfd11e..0238f6e7f45 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/heartbeat/HoodieHeartbeatClient.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/heartbeat/HoodieHeartbeatClient.java @@ -227,7 +227,7 @@ public class HoodieHeartbeatClient implements AutoCloseable, Serializable { public static Boolean heartbeatExists(HoodieStorage storage, String basePath, String instantTime) throws IOException { StoragePath heartbeatFilePath = new StoragePath( - HoodieTableMetaClient.getHeartbeatFolderPath(basePath) + StoragePath.SEPARATOR + instantTime); + HoodieTableMetaClient.getHeartbeatFolderPath(basePath), instantTime); return storage.exists(heartbeatFilePath); } @@ -255,7 +255,7 @@ public class HoodieHeartbeatClient implements AutoCloseable, Serializable { Long newHeartbeatTime = System.currentTimeMillis(); OutputStream outputStream = this.storage.create( - new StoragePath(heartbeatFolderPath + StoragePath.SEPARATOR + instantTime), true); + new StoragePath(heartbeatFolderPath, instantTime), true); outputStream.close(); Heartbeat heartbeat = instantToHeartbeatMap.get(instantTime); if (heartbeat.getLastHeartbeatTime() != null && isHeartbeatExpired(instantTime)) { diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/ConsistentBucketIndexUtils.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/ConsistentBucketIndexUtils.java index 069ec9e5b74..99b5d833f50 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/ConsistentBucketIndexUtils.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/ConsistentBucketIndexUtils.java @@ -58,6 +58,7 @@ import static org.apache.hudi.common.model.HoodieConsistentHashingMetadata.HASHI import static org.apache.hudi.common.model.HoodieConsistentHashingMetadata.HASHING_METADATA_FILE_SUFFIX; import static org.apache.hudi.common.model.HoodieConsistentHashingMetadata.getTimestampFromFile; import static org.apache.hudi.common.util.StringUtils.getUTF8Bytes; +import static org.apache.hudi.hadoop.fs.HadoopFSUtils.convertToStoragePath; /** * Utilities class for consistent bucket index metadata management. @@ -211,8 +212,8 @@ public class ConsistentBucketIndexUtils { */ private static void createCommitMarker(HoodieTable table, Path fileStatus, Path partitionPath) throws IOException { HoodieStorage storage = table.getMetaClient().getStorage(); - StoragePath fullPath = new StoragePath( - partitionPath.toString(), getTimestampFromFile(fileStatus.getName()) + HASHING_METADATA_COMMIT_FILE_SUFFIX); + StoragePath fullPath = new StoragePath(convertToStoragePath(partitionPath), + getTimestampFromFile(fileStatus.getName()) + HASHING_METADATA_COMMIT_FILE_SUFFIX); if (storage.exists(fullPath)) { return; } @@ -239,8 +240,7 @@ public class ConsistentBucketIndexUtils { if (metaFile == null) { return Option.empty(); } - try (InputStream is = table.getMetaClient().getStorage().open( - new StoragePath(metaFile.getPath().toUri()))) { + try (InputStream is = table.getMetaClient().getStorage().open(convertToStoragePath(metaFile.getPath()))) { byte[] content = FileIOUtils.readAsByteArray(is); return Option.of(HoodieConsistentHashingMetadata.fromBytes(content)); } catch (FileNotFoundException e) { diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieKeyLookupHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieKeyLookupHandle.java index e573b9b026e..664192d454d 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieKeyLookupHandle.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieKeyLookupHandle.java @@ -26,7 +26,6 @@ import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieIndexException; import org.apache.hudi.index.HoodieIndexUtils; import org.apache.hudi.io.storage.HoodieFileReader; -import org.apache.hudi.storage.StoragePath; import org.apache.hudi.table.HoodieTable; import org.slf4j.Logger; @@ -101,7 +100,7 @@ public class HoodieKeyLookupHandle<T, I, K, O> extends HoodieReadHandle<T, I, K, } HoodieBaseFile baseFile = getLatestBaseFile(); - List<String> matchingKeys = HoodieIndexUtils.filterKeysFromFile(new StoragePath(baseFile.getPath()), candidateRecordKeys, + List<String> matchingKeys = HoodieIndexUtils.filterKeysFromFile(baseFile.getStoragePath(), candidateRecordKeys, hoodieTable.getStorageConf()); LOG.info( String.format("Total records (%d), bloom filter candidates (%d)/fp(%d), actual matches (%d)", totalKeysChecked, diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieReadHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieReadHandle.java index 03227b75f64..5f9afc1bad1 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieReadHandle.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieReadHandle.java @@ -25,7 +25,6 @@ import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.io.storage.HoodieFileReader; import org.apache.hudi.io.storage.HoodieFileReaderFactory; import org.apache.hudi.storage.HoodieStorage; -import org.apache.hudi.storage.StoragePath; import org.apache.hudi.table.HoodieTable; import java.io.IOException; @@ -71,11 +70,11 @@ public abstract class HoodieReadHandle<T, I, K, O> extends HoodieIOHandle<T, I, protected HoodieFileReader createNewFileReader() throws IOException { return HoodieFileReaderFactory.getReaderFactory(this.config.getRecordMerger().getRecordType()) - .getFileReader(config, hoodieTable.getStorageConf(), new StoragePath(getLatestBaseFile().getPath())); + .getFileReader(config, hoodieTable.getStorageConf(), getLatestBaseFile().getStoragePath()); } protected HoodieFileReader createNewFileReader(HoodieBaseFile hoodieBaseFile) throws IOException { return HoodieFileReaderFactory.getReaderFactory(this.config.getRecordMerger().getRecordType()) - .getFileReader(config, hoodieTable.getStorageConf(), new StoragePath(hoodieBaseFile.getPath())); + .getFileReader(config, hoodieTable.getStorageConf(), hoodieBaseFile.getStoragePath()); } } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java index 486102b5222..f51f3d1c279 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java @@ -123,7 +123,7 @@ public abstract class HoodieWriteHandle<T, I, K, O> extends HoodieIOHandle<T, I, throw new HoodieIOException("Failed to make dir " + path, e); } - return new StoragePath(path.toString(), FSUtils.makeBaseFileName(instantTime, writeToken, fileId, + return new StoragePath(path, FSUtils.makeBaseFileName(instantTime, writeToken, fileId, hoodieTable.getMetaClient().getTableConfig().getBaseFileFormat().getFileExtension())); } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java index 4646cc2ec11..445c7b74fff 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java @@ -594,8 +594,7 @@ public abstract class HoodieBackedTableMetadataWriter<I> implements HoodieTableM final int fileListingParallelism = metadataWriteConfig.getFileListingParallelism(); StorageConfiguration<?> storageConf = dataMetaClient.getStorageConf(); final String dirFilterRegex = dataWriteConfig.getMetadataConfig().getDirectoryFilterRegex(); - final String datasetBasePath = dataMetaClient.getBasePathV2().toString(); - StoragePath storageBasePath = new StoragePath(datasetBasePath); + StoragePath storageBasePath = dataMetaClient.getBasePathV2(); while (!pathsToList.isEmpty()) { // In each round we will list a section of directories diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java index 58ea31bed21..009e02277f5 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java @@ -1047,10 +1047,10 @@ public abstract class HoodieTable<T, I, K, O> implements Serializable { if (clearAll && partitions.size() > 0) { LOG.info("Clear hoodie.table.metadata.partitions in hoodie.properties"); metaClient.getTableConfig().setValue(TABLE_METADATA_PARTITIONS.key(), EMPTY_STRING); - HoodieTableConfig.update(metaClient.getStorage(), new StoragePath(metaClient.getMetaPath()), metaClient.getTableConfig().getProps()); + HoodieTableConfig.update(metaClient.getStorage(), metaClient.getMetaPath(), metaClient.getTableConfig().getProps()); } else if (partitionType.isPresent() && partitions.remove(partitionType.get().getPartitionPath())) { metaClient.getTableConfig().setValue(HoodieTableConfig.TABLE_METADATA_PARTITIONS.key(), String.join(",", partitions)); - HoodieTableConfig.update(metaClient.getStorage(), new StoragePath(metaClient.getMetaPath()), metaClient.getTableConfig().getProps()); + HoodieTableConfig.update(metaClient.getStorage(), metaClient.getMetaPath(), metaClient.getTableConfig().getProps()); } } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieMergeHelper.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieMergeHelper.java index 3dc2c6f5ed1..38383fd7a88 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieMergeHelper.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieMergeHelper.java @@ -110,8 +110,7 @@ public class HoodieMergeHelper<T> extends BaseMergeHelper { ClosableIterator<HoodieRecord> recordIterator; Schema recordSchema; if (baseFile.getBootstrapBaseFile().isPresent()) { - StoragePath bootstrapFilePath = - new StoragePath(baseFile.getBootstrapBaseFile().get().getPath()); + StoragePath bootstrapFilePath = baseFile.getBootstrapBaseFile().get().getStoragePath(); StorageConfiguration<?> bootstrapFileConfig = table.getStorageConf().newInstance(); bootstrapFileReader = HoodieFileReaderFactory.getReaderFactory(recordType).newBootstrapFileReader( baseFileReader, diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/RunIndexActionExecutor.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/RunIndexActionExecutor.java index c971ac10646..5ad4e5e9f39 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/RunIndexActionExecutor.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/RunIndexActionExecutor.java @@ -40,7 +40,6 @@ import org.apache.hudi.hadoop.fs.HadoopFSUtils; import org.apache.hudi.metadata.HoodieMetadataMetrics; import org.apache.hudi.metadata.HoodieTableMetadataWriter; import org.apache.hudi.metadata.MetadataPartitionType; -import org.apache.hudi.storage.StoragePath; import org.apache.hudi.table.HoodieTable; import org.apache.hudi.table.action.BaseActionExecutor; @@ -214,7 +213,7 @@ public class RunIndexActionExecutor<T, I, K, O> extends BaseActionExecutor<T, I, table.getMetaClient().getTableConfig().setValue(TABLE_METADATA_PARTITIONS_INFLIGHT.key(), String.join(",", inflightPartitions)); table.getMetaClient().getTableConfig().setValue(TABLE_METADATA_PARTITIONS.key(), String.join(",", completedPartitions)); HoodieTableConfig.update(table.getMetaClient().getStorage(), - new StoragePath(table.getMetaClient().getMetaPath()), table.getMetaClient().getTableConfig().getProps()); + table.getMetaClient().getMetaPath(), table.getMetaClient().getTableConfig().getProps()); // delete metadata partition requestedPartitions.forEach(partition -> { diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackStrategy.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackStrategy.java index e6eca0924bd..39f6d8c3ca1 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackStrategy.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackStrategy.java @@ -35,7 +35,6 @@ import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieRollbackException; import org.apache.hudi.hadoop.fs.HadoopFSUtils; -import org.apache.hudi.storage.StoragePath; import org.apache.hudi.table.HoodieTable; import org.apache.hadoop.fs.FileStatus; @@ -58,6 +57,7 @@ import java.util.stream.Collectors; import static org.apache.hudi.client.utils.MetadataConversionUtils.getHoodieCommitMetadata; import static org.apache.hudi.common.util.ValidationUtils.checkArgument; +import static org.apache.hudi.hadoop.fs.HadoopFSUtils.convertToStoragePath; import static org.apache.hudi.table.action.rollback.BaseRollbackHelper.EMPTY_STRING; /** @@ -303,7 +303,7 @@ public class ListingBasedRollbackStrategy implements BaseRollbackPlanActionExecu return commit.equals(fileCommitTime); } else if (HadoopFSUtils.isLogFile(path)) { // Since the baseCommitTime is the only commit for new log files, it's okay here - String fileCommitTime = FSUtils.getBaseCommitTimeFromLogPath(new StoragePath(path.toUri())); + String fileCommitTime = FSUtils.getBaseCommitTimeFromLogPath(convertToStoragePath(path)); return commit.equals(fileCommitTime); } return false; diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/UpgradeDowngrade.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/UpgradeDowngrade.java index 03c715e01e7..b5177a5746b 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/UpgradeDowngrade.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/UpgradeDowngrade.java @@ -58,8 +58,8 @@ public class UpgradeDowngrade { this.metaClient = metaClient; this.config = config; this.context = context; - this.updatedPropsFilePath = new Path(metaClient.getMetaPath(), HOODIE_UPDATED_PROPERTY_FILE); - this.propsFilePath = new Path(metaClient.getMetaPath(), HoodieTableConfig.HOODIE_PROPERTIES_FILE); + this.updatedPropsFilePath = new Path(metaClient.getMetaPath().toString(), HOODIE_UPDATED_PROPERTY_FILE); + this.propsFilePath = new Path(metaClient.getMetaPath().toString(), HoodieTableConfig.HOODIE_PROPERTIES_FILE); this.upgradeDowngradeHelper = upgradeDowngradeHelper; } @@ -158,7 +158,7 @@ public class UpgradeDowngrade { metaClient.getTableConfig().setTableVersion(toVersion); HoodieTableConfig.update(metaClient.getStorage(), - new StoragePath(metaClient.getMetaPath()), metaClient.getTableConfig().getProps()); + metaClient.getMetaPath(), metaClient.getTableConfig().getProps()); } protected Map<ConfigProperty, String> upgrade(HoodieTableVersion fromVersion, HoodieTableVersion toVersion, String instantTime) { diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/ZeroToOneUpgradeHandler.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/ZeroToOneUpgradeHandler.java index 78c35f0d2c6..be48ec3ab82 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/ZeroToOneUpgradeHandler.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/ZeroToOneUpgradeHandler.java @@ -133,7 +133,7 @@ public class ZeroToOneUpgradeHandler implements UpgradeHandler { * @return the marker file name thus curated. */ private static String getFileNameForMarkerFromLogFile(String logFilePath, HoodieTable<?, ?, ?, ?> table) { - StoragePath logPath = new StoragePath(table.getMetaClient().getBasePath(), logFilePath); + StoragePath logPath = new StoragePath(table.getMetaClient().getBasePathV2(), logFilePath); String fileId = FSUtils.getFileIdFromLogPath(logPath); String baseInstant = FSUtils.getBaseCommitTimeFromLogPath(logPath); String writeToken = FSUtils.getWriteTokenFromLogPath(logPath); diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkWriteHandleFactory.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkWriteHandleFactory.java index 188a92663ee..4bc55408cbb 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkWriteHandleFactory.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkWriteHandleFactory.java @@ -32,6 +32,8 @@ import org.apache.hadoop.fs.Path; import java.util.Iterator; import java.util.Map; +import static org.apache.hudi.hadoop.fs.HadoopFSUtils.convertToStoragePath; + /** * Factory clazz for flink write handles. */ @@ -108,7 +110,7 @@ public class FlinkWriteHandleFactory { Path writePath = bucketToHandles.get(fileID); if (writePath != null) { HoodieWriteHandle<?, ?, ?, ?> writeHandle = - createReplaceHandle(config, instantTime, table, recordItr, partitionPath, fileID, new StoragePath(writePath.toUri())); + createReplaceHandle(config, instantTime, table, recordItr, partitionPath, fileID, convertToStoragePath(writePath)); bucketToHandles.put(fileID, new Path(((MiniBatchHandle) writeHandle).getWritePath().toUri())); // override with new replace handle return writeHandle; } diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataCreateHandle.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataCreateHandle.java index 4227e14165f..5915a3eda36 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataCreateHandle.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataCreateHandle.java @@ -50,6 +50,8 @@ import java.io.IOException; import java.io.Serializable; import java.util.concurrent.atomic.AtomicLong; +import static org.apache.hudi.hadoop.fs.HadoopFSUtils.convertToStoragePath; + /** * Create handle with RowData for datasource implementation of bulk insert. */ @@ -172,9 +174,10 @@ public class HoodieRowDataCreateHandle implements Serializable { stat.setNumInserts(writeStatus.getTotalRecords()); stat.setPrevCommit(HoodieWriteStat.NULL_COMMIT); stat.setFileId(fileId); - stat.setPath(new StoragePath(writeConfig.getBasePath()), new StoragePath(path.toUri())); + StoragePath storagePath = convertToStoragePath(path); + stat.setPath(new StoragePath(writeConfig.getBasePath()), storagePath); long fileSizeInBytes = FSUtils.getFileSize( - table.getMetaClient().getStorage(), new StoragePath(path.toUri())); + table.getMetaClient().getStorage(), storagePath); stat.setTotalWriteBytes(fileSizeInBytes); stat.setFileSizeInBytes(fileSizeInBytes); stat.setTotalWriteErrors(writeStatus.getTotalErrorRecords()); diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataFileWriterFactory.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataFileWriterFactory.java index e9bc86b4a76..be757a30954 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataFileWriterFactory.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataFileWriterFactory.java @@ -23,7 +23,6 @@ import org.apache.hudi.common.bloom.BloomFilterFactory; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.io.storage.HoodieParquetConfig; -import org.apache.hudi.storage.StoragePath; import org.apache.hudi.storage.hadoop.HadoopStorageConfiguration; import org.apache.hudi.table.HoodieTable; @@ -34,6 +33,7 @@ import org.apache.hadoop.fs.Path; import java.io.IOException; import static org.apache.hudi.common.model.HoodieFileFormat.PARQUET; +import static org.apache.hudi.hadoop.fs.HadoopFSUtils.convertToStoragePath; /** * Factory to assist in instantiating a new {@link HoodieRowDataFileWriter}. @@ -71,7 +71,7 @@ public class HoodieRowDataFileWriterFactory { HoodieRowDataParquetWriteSupport writeSupport = new HoodieRowDataParquetWriteSupport((Configuration) table.getStorageConf().unwrap(), rowType, filter); return new HoodieRowDataParquetWriter( - new StoragePath(path.toUri()), new HoodieParquetConfig<>( + convertToStoragePath(path), new HoodieParquetConfig<>( writeSupport, writeConfig.getParquetCompressionCodec(), writeConfig.getParquetBlockSize(), diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaTable.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaTable.java index 1538c1c00b0..2e13da6c201 100644 --- a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaTable.java +++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaTable.java @@ -35,7 +35,6 @@ import org.apache.hudi.index.JavaHoodieIndexFactory; import org.apache.hudi.metadata.HoodieTableMetadata; import org.apache.hudi.metadata.HoodieTableMetadataWriter; import org.apache.hudi.metadata.JavaHoodieBackedTableMetadataWriter; -import org.apache.hudi.storage.StoragePath; import org.apache.hudi.table.action.HoodieWriteMetadata; import java.io.IOException; @@ -93,8 +92,8 @@ public abstract class HoodieJavaTable<T> // delete metadata partitions corresponding to such indexes deleteMetadataIndexIfNecessary(); try { - if (isMetadataTableExists || metaClient.getStorage().exists(new StoragePath( - HoodieTableMetadata.getMetadataTableBasePath(metaClient.getBasePath())))) { + if (isMetadataTableExists || metaClient.getStorage().exists( + HoodieTableMetadata.getMetadataTableBasePath(metaClient.getBasePathV2()))) { isMetadataTableExists = true; return Option.of(metadataWriter); } diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/HoodieFileProbingFunction.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/HoodieFileProbingFunction.java index 667b00ada22..59bbbec3dd4 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/HoodieFileProbingFunction.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/HoodieFileProbingFunction.java @@ -29,7 +29,6 @@ import org.apache.hudi.exception.HoodieIndexException; import org.apache.hudi.index.HoodieIndexUtils; import org.apache.hudi.io.HoodieKeyLookupResult; import org.apache.hudi.storage.StorageConfiguration; -import org.apache.hudi.storage.StoragePath; import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.broadcast.Broadcast; @@ -127,7 +126,7 @@ public class HoodieFileProbingFunction implements // TODO add assertion that file is checked only once final HoodieBaseFile dataFile = fileIDBaseFileMap.get(fileId); - List<String> matchingKeys = HoodieIndexUtils.filterKeysFromFile(new StoragePath(dataFile.getPath()), + List<String> matchingKeys = HoodieIndexUtils.filterKeysFromFile(dataFile.getStoragePath(), candidateRecordKeys, storageConf); LOG.debug( diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkTable.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkTable.java index 9b408ca0d84..b1fc87338bf 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkTable.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkTable.java @@ -38,7 +38,6 @@ import org.apache.hudi.io.HoodieMergeHandle; import org.apache.hudi.metadata.HoodieTableMetadata; import org.apache.hudi.metadata.HoodieTableMetadataWriter; import org.apache.hudi.metadata.SparkHoodieBackedTableMetadataWriter; -import org.apache.hudi.storage.StoragePath; import org.apache.hudi.table.action.commit.HoodieMergeHelper; import org.apache.hadoop.conf.Configuration; @@ -111,8 +110,8 @@ public abstract class HoodieSparkTable<T> context.getStorageConf(), config, failedWritesCleaningPolicy, context, Option.of(triggeringInstantTimestamp)); try { - if (isMetadataTableExists || metaClient.getStorage().exists(new StoragePath( - HoodieTableMetadata.getMetadataTableBasePath(metaClient.getBasePath())))) { + if (isMetadataTableExists || metaClient.getStorage().exists( + HoodieTableMetadata.getMetadataTableBasePath(metaClient.getBasePathV2()))) { isMetadataTableExists = true; return Option.of(metadataWriter); } diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java index f2f689d1bd4..9301529c740 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java @@ -1957,7 +1957,7 @@ public class TestHoodieBackedMetadata extends TestHoodieMetadataBase { // collect all commit meta files from metadata table. List<StoragePathInfo> metaFiles = metaClient.getStorage() - .listDirectEntries(new StoragePath(metaClient.getMetaPath() + "/metadata/.hoodie")); + .listDirectEntries(new StoragePath(metaClient.getMetaPath(), "metadata/.hoodie")); List<StoragePathInfo> commit3Files = metaFiles.stream() .filter(pathInfo -> pathInfo.getPath().getName().contains(commit3 + "." + HoodieTimeline.DELTA_COMMIT_ACTION)) @@ -3700,7 +3700,7 @@ public class TestHoodieBackedMetadata extends TestHoodieMetadataBase { metaClient = HoodieTableMetaClient.reload(metaClient); metaClient.getTableConfig().setTableVersion(version); StoragePath propertyFile = new StoragePath( - metaClient.getMetaPath() + "/" + HoodieTableConfig.HOODIE_PROPERTIES_FILE); + metaClient.getMetaPath(), HoodieTableConfig.HOODIE_PROPERTIES_FILE); try (OutputStream os = metaClient.getStorage().create(propertyFile)) { metaClient.getTableConfig().getProps().store(os, ""); } diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableRollback.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableRollback.java index 1abc05058ec..10d26f83698 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableRollback.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableRollback.java @@ -918,7 +918,7 @@ public class TestHoodieSparkMergeOnReadTableRollback extends TestHoodieSparkRoll for (HoodieInstant.State state : Arrays.asList(HoodieInstant.State.REQUESTED, HoodieInstant.State.INFLIGHT)) { HoodieInstant toCopy = new HoodieInstant(state, HoodieTimeline.DELTA_COMMIT_ACTION, lastCommitTime); File file = Files.createTempFile(tempFolder, null, null).toFile(); - fs().copyToLocalFile(new Path(metaClient.getMetaPath(), toCopy.getFileName()), + fs().copyToLocalFile(new Path(metaClient.getMetaPath().toString(), toCopy.getFileName()), new Path(file.getAbsolutePath())); fileNameMap.put(file.getAbsolutePath(), toCopy.getFileName()); } @@ -944,7 +944,7 @@ public class TestHoodieSparkMergeOnReadTableRollback extends TestHoodieSparkRoll for (Map.Entry<String, String> entry : fileNameMap.entrySet()) { try { fs().copyFromLocalFile(new Path(entry.getKey()), - new Path(metaClient.getMetaPath(), entry.getValue())); + new Path(metaClient.getMetaPath().toString(), entry.getValue())); } catch (IOException e) { throw new HoodieIOException("Error copying state from local disk.", e); } diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/upgrade/TestUpgradeDowngrade.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/upgrade/TestUpgradeDowngrade.java index 10a77f9b5b7..e25db7d5924 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/upgrade/TestUpgradeDowngrade.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/upgrade/TestUpgradeDowngrade.java @@ -509,8 +509,8 @@ public class TestUpgradeDowngrade extends HoodieClientTestBase { metaClient = HoodieTestUtils.init(storageConf, basePath, getTableType(), properties); // set hoodie.table.version to 4 in hoodie.properties file metaClient.getTableConfig().setTableVersion(HoodieTableVersion.FOUR); - HoodieTableConfig.update(metaClient.getStorage(), - new StoragePath(metaClient.getMetaPath()), metaClient.getTableConfig().getProps()); + HoodieTableConfig.update(metaClient.getStorage(), metaClient.getMetaPath(), + metaClient.getTableConfig().getProps()); String metadataTablePath = HoodieTableMetadata.getMetadataTableBasePath(metaClient.getBasePathV2().toString()); @@ -519,8 +519,8 @@ public class TestUpgradeDowngrade extends HoodieClientTestBase { .setConf(metaClient.getStorageConf().newInstance()).setBasePath(metadataTablePath).build(); metaClient.getTableConfig().setTableVersion(HoodieTableVersion.FOUR); HoodieTableConfig.update( - mdtMetaClient.getStorage(), - new StoragePath(mdtMetaClient.getMetaPath()), metaClient.getTableConfig().getProps()); + mdtMetaClient.getStorage(), mdtMetaClient.getMetaPath(), + metaClient.getTableConfig().getProps()); } assertTableVersionOnDataAndMetadataTable(metaClient, HoodieTableVersion.FOUR); @@ -902,7 +902,7 @@ public class TestUpgradeDowngrade extends HoodieClientTestBase { private void prepForDowngradeFromVersion(HoodieTableVersion fromVersion) throws IOException { metaClient.getTableConfig().setTableVersion(fromVersion); StoragePath propertyFile = new StoragePath( - metaClient.getMetaPath() + "/" + HoodieTableConfig.HOODIE_PROPERTIES_FILE); + metaClient.getMetaPath(), HoodieTableConfig.HOODIE_PROPERTIES_FILE); try (OutputStream os = metaClient.getStorage().create(propertyFile)) { metaClient.getTableConfig().getProps().store(os, ""); } @@ -910,9 +910,9 @@ public class TestUpgradeDowngrade extends HoodieClientTestBase { private void createResidualFile() throws IOException { Path propertyFile = - new Path(metaClient.getMetaPath() + "/" + HoodieTableConfig.HOODIE_PROPERTIES_FILE); + new Path(metaClient.getMetaPath().toString(), HoodieTableConfig.HOODIE_PROPERTIES_FILE); Path updatedPropertyFile = - new Path(metaClient.getMetaPath() + "/" + UpgradeDowngrade.HOODIE_UPDATED_PROPERTY_FILE); + new Path(metaClient.getMetaPath().toString(), UpgradeDowngrade.HOODIE_UPDATED_PROPERTY_FILE); // Step1: Copy hoodie.properties to hoodie.properties.orig FileSystem fs = (FileSystem) metaClient.getStorage().getFileSystem(); @@ -938,7 +938,7 @@ public class TestUpgradeDowngrade extends HoodieClientTestBase { assertEquals(expectedVersion.versionCode(), metaClient.getTableConfig().getTableVersion().versionCode()); StoragePath propertyFile = new StoragePath( - metaClient.getMetaPath() + "/" + HoodieTableConfig.HOODIE_PROPERTIES_FILE); + metaClient.getMetaPath(), HoodieTableConfig.HOODIE_PROPERTIES_FILE); // Load the properties and verify InputStream inputStream = metaClient.getStorage().open(propertyFile); HoodieConfig config = new HoodieConfig(); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java index ec13861b849..ecbe3fc1766 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java @@ -95,7 +95,7 @@ public class FSUtils { * @return {@code true} if table exists. {@code false} otherwise. */ public static boolean isTableExists(String path, HoodieStorage storage) throws IOException { - return storage.exists(new StoragePath(path + "/" + HoodieTableMetaClient.METAFOLDER_NAME)); + return storage.exists(new StoragePath(path, HoodieTableMetaClient.METAFOLDER_NAME)); } /** diff --git a/hudi-common/src/main/java/org/apache/hudi/common/heartbeat/HoodieHeartbeatUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/heartbeat/HoodieHeartbeatUtils.java index 0631ed587f1..7e6ce0e2135 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/heartbeat/HoodieHeartbeatUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/heartbeat/HoodieHeartbeatUtils.java @@ -46,7 +46,7 @@ public class HoodieHeartbeatUtils { public static Long getLastHeartbeatTime(HoodieStorage storage, String basePath, String instantTime) throws IOException { StoragePath heartbeatFilePath = new StoragePath( - HoodieTableMetaClient.getHeartbeatFolderPath(basePath) + StoragePath.SEPARATOR + instantTime); + HoodieTableMetaClient.getHeartbeatFolderPath(basePath), instantTime); if (storage.exists(heartbeatFilePath)) { return storage.getPathInfo(heartbeatFilePath).getModificationTime(); } else { diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java index 2acf8bc6f93..f6dcdce1c34 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java @@ -273,12 +273,12 @@ public class HoodieTableConfig extends HoodieConfig { // Delay between retries while reading the properties file private static final int READ_RETRY_DELAY_MSEC = 1000; - public HoodieTableConfig(HoodieStorage storage, String metaPath, String payloadClassName, String recordMergerStrategyId) { + public HoodieTableConfig(HoodieStorage storage, StoragePath metaPath, String payloadClassName, String recordMergerStrategyId) { super(); StoragePath propertyPath = new StoragePath(metaPath, HOODIE_PROPERTIES_FILE); LOG.info("Loading table properties from " + propertyPath); try { - this.props = fetchConfigs(storage, metaPath); + this.props = fetchConfigs(storage, metaPath.toString()); boolean needStore = false; if (contains(PAYLOAD_CLASS_NAME) && payloadClassName != null && !getString(PAYLOAD_CLASS_NAME).equals(payloadClassName)) { @@ -782,7 +782,7 @@ public class HoodieTableConfig extends HoodieConfig { } setValue(TABLE_METADATA_PARTITIONS, partitions.stream().sorted().collect(Collectors.joining(CONFIG_VALUES_DELIMITER))); setValue(TABLE_METADATA_PARTITIONS_INFLIGHT, partitionsInflight.stream().sorted().collect(Collectors.joining(CONFIG_VALUES_DELIMITER))); - update(metaClient.getStorage(), new StoragePath(metaClient.getMetaPath()), getProps()); + update(metaClient.getStorage(), metaClient.getMetaPath(), getProps()); LOG.info(String.format("MDT %s partition %s has been %s", metaClient.getBasePathV2(), partitionType.name(), enabled ? "enabled" : "disabled")); } @@ -800,7 +800,7 @@ public class HoodieTableConfig extends HoodieConfig { }); setValue(TABLE_METADATA_PARTITIONS_INFLIGHT, partitionsInflight.stream().sorted().collect(Collectors.joining(CONFIG_VALUES_DELIMITER))); - update(metaClient.getStorage(), new StoragePath(metaClient.getMetaPath()), getProps()); + update(metaClient.getStorage(), metaClient.getMetaPath(), getProps()); LOG.info(String.format("MDT %s partitions %s have been set to inflight", metaClient.getBasePathV2(), partitionTypes)); } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java index bedf0204bf8..4105677e03d 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java @@ -132,7 +132,7 @@ public class HoodieTableMetaClient implements Serializable { this.metaPath = new StoragePath(basePath, METAFOLDER_NAME); this.storage = getStorage(); TableNotFoundException.checkTableValidity(storage, this.basePath, metaPath); - this.tableConfig = new HoodieTableConfig(storage, metaPath.toString(), payloadClassName, recordMergerStrategy); + this.tableConfig = new HoodieTableConfig(storage, metaPath, payloadClassName, recordMergerStrategy); this.tableType = tableConfig.getTableType(); Option<TimelineLayoutVersion> tableConfigVersion = tableConfig.getTimelineLayoutVersion(); if (layoutVersion.isPresent() && tableConfigVersion.isPresent()) { @@ -212,8 +212,8 @@ public class HoodieTableMetaClient implements Serializable { /** * @return Meta path */ - public String getMetaPath() { - return metaPath.toString(); // this invocation is cached + public StoragePath getMetaPath() { + return metaPath; } /** diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java index cbe1691e318..7f53feb5a54 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java @@ -248,7 +248,7 @@ public class HoodieActiveTimeline extends HoodieDefaultTimeline { deleteInstantFile(instant); } - public static void deleteInstantFile(HoodieStorage storage, String metaPath, HoodieInstant instant) { + public static void deleteInstantFile(HoodieStorage storage, StoragePath metaPath, HoodieInstant instant) { try { storage.deleteFile(new StoragePath(metaPath, instant.getFileName())); } catch (IOException e) { @@ -665,7 +665,7 @@ public class HoodieActiveTimeline extends HoodieDefaultTimeline { } private StoragePath getInstantFileNamePath(String fileName) { - return new StoragePath(fileName.contains(SCHEMA_COMMIT_ACTION) ? metaClient.getSchemaFolderName() : metaClient.getMetaPath(), fileName); + return new StoragePath(fileName.contains(SCHEMA_COMMIT_ACTION) ? metaClient.getSchemaFolderName() : metaClient.getMetaPath().toString(), fileName); } public void transitionRequestedToInflight(String commitType, String inFlightInstant) { diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/view/HoodieTablePreCommitFileSystemView.java b/hudi-common/src/main/java/org/apache/hudi/common/table/view/HoodieTablePreCommitFileSystemView.java index ea6b8f429bd..9c6c05f4523 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/view/HoodieTablePreCommitFileSystemView.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/view/HoodieTablePreCommitFileSystemView.java @@ -71,7 +71,7 @@ public class HoodieTablePreCommitFileSystemView { Map<String, HoodieBaseFile> newFilesWrittenForPartition = filesWritten.stream() .filter(file -> partitionStr.equals(file.getPartitionPath())) .collect(Collectors.toMap(HoodieWriteStat::getFileId, writeStat -> - new HoodieBaseFile(new StoragePath(tableMetaClient.getBasePath(), writeStat.getPath()).toString(), writeStat.getFileId(), preCommitInstantTime, null))); + new HoodieBaseFile(new StoragePath(tableMetaClient.getBasePathV2(), writeStat.getPath()).toString(), writeStat.getFileId(), preCommitInstantTime, null))); Stream<HoodieBaseFile> committedBaseFiles = this.completedCommitsFileSystemView.getLatestBaseFiles(partitionStr); Map<String, HoodieBaseFile> allFileIds = committedBaseFiles diff --git a/hudi-common/src/main/java/org/apache/hudi/internal/schema/io/FileBasedInternalSchemaStorageManager.java b/hudi-common/src/main/java/org/apache/hudi/internal/schema/io/FileBasedInternalSchemaStorageManager.java index 6e4945628cf..43923b5e40a 100644 --- a/hudi-common/src/main/java/org/apache/hudi/internal/schema/io/FileBasedInternalSchemaStorageManager.java +++ b/hudi-common/src/main/java/org/apache/hudi/internal/schema/io/FileBasedInternalSchemaStorageManager.java @@ -59,14 +59,13 @@ public class FileBasedInternalSchemaStorageManager extends AbstractInternalSchem private HoodieTableMetaClient metaClient; public FileBasedInternalSchemaStorageManager(StorageConfiguration<?> conf, StoragePath baseTablePath) { - StoragePath metaPath = new StoragePath(baseTablePath, ".hoodie"); + StoragePath metaPath = new StoragePath(baseTablePath, HoodieTableMetaClient.METAFOLDER_NAME); this.baseSchemaPath = new StoragePath(metaPath, SCHEMA_NAME); this.conf = conf; } public FileBasedInternalSchemaStorageManager(HoodieTableMetaClient metaClient) { - StoragePath metaPath = new StoragePath(metaClient.getBasePath(), ".hoodie"); - this.baseSchemaPath = new StoragePath(metaPath, SCHEMA_NAME); + this.baseSchemaPath = new StoragePath(metaClient.getMetaPath(), SCHEMA_NAME); this.conf = metaClient.getStorageConf(); this.metaClient = metaClient; } diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/FileSystemBackedTableMetadata.java b/hudi-common/src/main/java/org/apache/hudi/metadata/FileSystemBackedTableMetadata.java index 18a58df9320..1148503c5a8 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/FileSystemBackedTableMetadata.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/FileSystemBackedTableMetadata.java @@ -85,7 +85,7 @@ public class FileSystemBackedTableMetadata extends AbstractHoodieTableMetadata { StoragePath metaPath = new StoragePath(dataBasePath, HoodieTableMetaClient.METAFOLDER_NAME); TableNotFoundException.checkTableValidity(storage, this.dataBasePath, metaPath); - HoodieTableConfig tableConfig = new HoodieTableConfig(storage, metaPath.toString(), null, null); + HoodieTableConfig tableConfig = new HoodieTableConfig(storage, metaPath, null, null); this.hiveStylePartitioningEnabled = Boolean.parseBoolean(tableConfig.getHiveStylePartitioningEnable()); this.urlEncodePartitioningEnabled = diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java index 55c9a49b61c..68932a5224f 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java @@ -445,9 +445,9 @@ public class HoodieBackedTableMetadata extends BaseTableMetadata { // If the base file is present then create a reader Option<HoodieBaseFile> basefile = slice.getBaseFile(); if (basefile.isPresent()) { - String baseFilePath = basefile.get().getPath(); + StoragePath baseFilePath = basefile.get().getStoragePath(); baseFileReader = (HoodieSeekingFileReader<?>) HoodieFileReaderFactory.getReaderFactory(HoodieRecordType.AVRO) - .getFileReader(DEFAULT_HUDI_CONFIG_FOR_READER, getStorageConf(), new StoragePath(baseFilePath)); + .getFileReader(DEFAULT_HUDI_CONFIG_FOR_READER, getStorageConf(), baseFilePath); baseFileOpenMs = timer.endTimer(); LOG.info(String.format("Opened metadata base file from %s at instant %s in %d ms", baseFilePath, basefile.get().getCommitTime(), baseFileOpenMs)); diff --git a/hudi-common/src/main/java/org/apache/hudi/secondary/index/SecondaryIndexManager.java b/hudi-common/src/main/java/org/apache/hudi/secondary/index/SecondaryIndexManager.java index 0e7dbf83c51..8d769d99bf5 100644 --- a/hudi-common/src/main/java/org/apache/hudi/secondary/index/SecondaryIndexManager.java +++ b/hudi-common/src/main/java/org/apache/hudi/secondary/index/SecondaryIndexManager.java @@ -25,7 +25,6 @@ import org.apache.hudi.common.table.TableSchemaResolver; import org.apache.hudi.common.util.CollectionUtils; import org.apache.hudi.common.util.Option; import org.apache.hudi.exception.HoodieSecondaryIndexException; -import org.apache.hudi.storage.StoragePath; import org.apache.avro.Schema; import org.slf4j.Logger; @@ -125,7 +124,7 @@ public class SecondaryIndexManager { Properties updatedProps = new Properties(); updatedProps.put(HoodieTableConfig.SECONDARY_INDEXES_METADATA.key(), SecondaryIndexUtils.toJsonString(newSecondaryIndexes)); - HoodieTableConfig.update(metaClient.getStorage(), new StoragePath(metaClient.getMetaPath()), updatedProps); + HoodieTableConfig.update(metaClient.getStorage(), metaClient.getMetaPath(), updatedProps); LOG.info("Success to add secondary index metadata: {}", secondaryIndexToAdd); @@ -157,9 +156,9 @@ public class SecondaryIndexManager { Properties updatedProps = new Properties(); updatedProps.put(HoodieTableConfig.SECONDARY_INDEXES_METADATA.key(), SecondaryIndexUtils.toJsonString(secondaryIndexesToKeep)); - HoodieTableConfig.update(metaClient.getStorage(), new StoragePath(metaClient.getMetaPath()), updatedProps); + HoodieTableConfig.update(metaClient.getStorage(), metaClient.getMetaPath(), updatedProps); } else { - HoodieTableConfig.delete(metaClient.getStorage(), new StoragePath(metaClient.getMetaPath()), + HoodieTableConfig.delete(metaClient.getStorage(), metaClient.getMetaPath(), CollectionUtils.createSet(HoodieTableConfig.SECONDARY_INDEXES_METADATA.key())); } diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapOperator.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapOperator.java index b15e52969ef..54f302a85fb 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapOperator.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapOperator.java @@ -41,7 +41,6 @@ import org.apache.hudi.exception.HoodieException; import org.apache.hudi.hadoop.fs.HadoopFSUtils; import org.apache.hudi.sink.bootstrap.aggregate.BootstrapAggFunction; import org.apache.hudi.sink.meta.CkpMetadata; -import org.apache.hudi.storage.StoragePath; import org.apache.hudi.table.HoodieTable; import org.apache.hudi.table.format.FormatUtils; import org.apache.hudi.util.FlinkTables; @@ -221,7 +220,7 @@ public class BootstrapOperator<I, O extends HoodieRecord<?>> return; } try (ClosableIterator<HoodieKey> iterator = fileUtils.getHoodieKeyIterator( - HadoopFSUtils.getStorageConf(this.hadoopConf), new StoragePath(baseFile.getPath()))) { + HadoopFSUtils.getStorageConf(this.hadoopConf), baseFile.getStoragePath())) { iterator.forEachRemaining(hoodieKey -> { output.collect(new StreamRecord(new IndexRecord(generateHoodieRecord(hoodieKey, fileSlice)))); }); diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java index e8926638294..128a7385bf0 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java @@ -321,7 +321,7 @@ public class StreamerUtil { StoragePath metaPath = new StoragePath(basePath, HoodieTableMetaClient.METAFOLDER_NAME); try { if (storage.exists(new StoragePath(metaPath, HoodieTableConfig.HOODIE_PROPERTIES_FILE))) { - return Option.of(new HoodieTableConfig(storage, metaPath.toString(), null, null)); + return Option.of(new HoodieTableConfig(storage, metaPath, null, null)); } } catch (IOException e) { throw new HoodieIOException("Get table config error", e); diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/bucket/ITTestBucketStreamWrite.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/bucket/ITTestBucketStreamWrite.java index 2e334a7554c..29560768266 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/bucket/ITTestBucketStreamWrite.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/bucket/ITTestBucketStreamWrite.java @@ -110,7 +110,7 @@ public class ITTestBucketStreamWrite { // delete successful commit to simulate an unsuccessful write HoodieStorage storage = metaClient.getStorage(); - StoragePath path = new StoragePath(metaClient.getMetaPath() + StoragePath.SEPARATOR + filename); + StoragePath path = new StoragePath(metaClient.getMetaPath(), filename); storage.deleteDirectory(path); // marker types are different for COW and MOR diff --git a/hudi-hadoop-common/src/main/java/org/apache/hudi/common/config/DFSPropertiesConfiguration.java b/hudi-hadoop-common/src/main/java/org/apache/hudi/common/config/DFSPropertiesConfiguration.java index 662c2ffe35a..2e3f546debe 100644 --- a/hudi-hadoop-common/src/main/java/org/apache/hudi/common/config/DFSPropertiesConfiguration.java +++ b/hudi-hadoop-common/src/main/java/org/apache/hudi/common/config/DFSPropertiesConfiguration.java @@ -64,7 +64,7 @@ public class DFSPropertiesConfiguration extends PropertiesConfig { public static final String CONF_FILE_DIR_ENV_NAME = "HUDI_CONF_DIR"; public static final String DEFAULT_CONF_FILE_DIR = "file:/etc/hudi/conf"; public static final StoragePath DEFAULT_PATH = new StoragePath( - DEFAULT_CONF_FILE_DIR + "/" + DEFAULT_PROPERTIES_FILE); + DEFAULT_CONF_FILE_DIR, DEFAULT_PROPERTIES_FILE); // props read from hudi-defaults.conf private static TypedProperties GLOBAL_PROPS = loadGlobalProps(); diff --git a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/bootstrap/index/TestBootstrapIndex.java b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/bootstrap/index/TestBootstrapIndex.java index a9f19c7ee01..7cf65ce1caa 100644 --- a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/bootstrap/index/TestBootstrapIndex.java +++ b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/bootstrap/index/TestBootstrapIndex.java @@ -30,7 +30,6 @@ import org.apache.hudi.common.table.HoodieTableConfig; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.testutils.HoodieCommonTestHarness; import org.apache.hudi.common.util.collection.Pair; -import org.apache.hudi.storage.StoragePath; import org.apache.hadoop.fs.permission.FsAction; import org.junit.jupiter.api.AfterEach; @@ -100,7 +99,7 @@ public class TestBootstrapIndex extends HoodieCommonTestHarness { props.put(HoodieTableConfig.BOOTSTRAP_INDEX_ENABLE.key(), "false"); Properties properties = new Properties(); properties.putAll(props); - HoodieTableConfig.create(metaClient.getStorage(), new StoragePath(metaClient.getMetaPath()), properties); + HoodieTableConfig.create(metaClient.getStorage(), metaClient.getMetaPath(), properties); metaClient = createMetaClient(metaClient.getStorageConf().newInstance(), basePath); BootstrapIndex bootstrapIndex = BootstrapIndex.getBootstrapIndex(metaClient); diff --git a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/fs/TestFSUtilsWithRetryWrapperEnable.java b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/fs/TestFSUtilsWithRetryWrapperEnable.java index 2093e658c4e..7eb2901c1d3 100644 --- a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/fs/TestFSUtilsWithRetryWrapperEnable.java +++ b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/fs/TestFSUtilsWithRetryWrapperEnable.java @@ -70,7 +70,7 @@ public class TestFSUtilsWithRetryWrapperEnable extends TestFSUtils { initialRetryIntervalMs = fileSystemRetryConfig.getInitialRetryIntervalMs(); FakeRemoteFileSystem fakeFs = new FakeRemoteFileSystem( - HadoopFSUtils.getFs(metaClient.getMetaPath(), metaClient.getStorageConf()), 2); + HadoopFSUtils.getFs(metaClient.getMetaPath().toString(), metaClient.getStorageConf()), 2); FileSystem fileSystem = new HoodieRetryWrapperFileSystem(fakeFs, maxRetryIntervalMs, maxRetryNumbers, initialRetryIntervalMs, ""); @@ -85,7 +85,7 @@ public class TestFSUtilsWithRetryWrapperEnable extends TestFSUtils { @Test public void testProcessFilesWithExceptions() throws Exception { FakeRemoteFileSystem fakeFs = new FakeRemoteFileSystem( - HadoopFSUtils.getFs(metaClient.getMetaPath(), metaClient.getStorageConf()), 100); + HadoopFSUtils.getFs(metaClient.getMetaPath().toString(), metaClient.getStorageConf()), 100); FileSystem fileSystem = new HoodieRetryWrapperFileSystem(fakeFs, maxRetryIntervalMs, maxRetryNumbers, initialRetryIntervalMs, ""); @@ -102,7 +102,7 @@ public class TestFSUtilsWithRetryWrapperEnable extends TestFSUtils { @Test public void testGetSchema() { FakeRemoteFileSystem fakeFs = new FakeRemoteFileSystem( - HadoopFSUtils.getFs(metaClient.getMetaPath(), metaClient.getStorageConf()), 100); + HadoopFSUtils.getFs(metaClient.getMetaPath().toString(), metaClient.getStorageConf()), 100); FileSystem fileSystem = new HoodieRetryWrapperFileSystem(fakeFs, maxRetryIntervalMs, maxRetryNumbers, initialRetryIntervalMs, ""); @@ -114,7 +114,7 @@ public class TestFSUtilsWithRetryWrapperEnable extends TestFSUtils { @Test public void testGetDefaultReplication() { FakeRemoteFileSystem fakeFs = new FakeRemoteFileSystem( - HadoopFSUtils.getFs(metaClient.getMetaPath(), metaClient.getStorageConf()), 100); + HadoopFSUtils.getFs(metaClient.getMetaPath().toString(), metaClient.getStorageConf()), 100); FileSystem fileSystem = new HoodieRetryWrapperFileSystem(fakeFs, maxRetryIntervalMs, maxRetryNumbers, initialRetryIntervalMs, ""); diff --git a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/TestHoodieTableConfig.java b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/TestHoodieTableConfig.java index 297ddda2091..fe7e57c5443 100644 --- a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/TestHoodieTableConfig.java +++ b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/TestHoodieTableConfig.java @@ -79,7 +79,7 @@ public class TestHoodieTableConfig extends HoodieCommonTestHarness { public void testCreate() throws IOException { assertTrue( storage.exists(new StoragePath(metaPath, HoodieTableConfig.HOODIE_PROPERTIES_FILE))); - HoodieTableConfig config = new HoodieTableConfig(storage, metaPath.toString(), null, null); + HoodieTableConfig config = new HoodieTableConfig(storage, metaPath, null, null); assertEquals(6, config.getProps().size()); } @@ -92,7 +92,7 @@ public class TestHoodieTableConfig extends HoodieCommonTestHarness { assertTrue(storage.exists(cfgPath)); assertFalse(storage.exists(backupCfgPath)); - HoodieTableConfig config = new HoodieTableConfig(storage, metaPath.toString(), null, null); + HoodieTableConfig config = new HoodieTableConfig(storage, metaPath, null, null); assertEquals(7, config.getProps().size()); assertEquals("test-table2", config.getTableName()); assertEquals("new_field", config.getPreCombineField()); @@ -106,7 +106,7 @@ public class TestHoodieTableConfig extends HoodieCommonTestHarness { assertTrue(storage.exists(cfgPath)); assertFalse(storage.exists(backupCfgPath)); - HoodieTableConfig config = new HoodieTableConfig(storage, metaPath.toString(), null, null); + HoodieTableConfig config = new HoodieTableConfig(storage, metaPath, null, null); assertEquals(5, config.getProps().size()); assertNull(config.getProps().getProperty("hoodie.invalid.config")); assertFalse(config.getProps().contains(HoodieTableConfig.ARCHIVELOG_FOLDER.key())); @@ -116,13 +116,13 @@ public class TestHoodieTableConfig extends HoodieCommonTestHarness { public void testReadsWhenPropsFileDoesNotExist() throws IOException { storage.deleteFile(cfgPath); assertThrows(HoodieIOException.class, () -> { - new HoodieTableConfig(storage, metaPath.toString(), null, null); + new HoodieTableConfig(storage, metaPath, null, null); }); } @Test public void testReadsWithUpdateFailures() throws IOException { - HoodieTableConfig config = new HoodieTableConfig(storage, metaPath.toString(), null, null); + HoodieTableConfig config = new HoodieTableConfig(storage, metaPath, null, null); storage.deleteFile(cfgPath); try (OutputStream out = storage.create(backupCfgPath)) { config.getProps().store(out, ""); @@ -130,14 +130,14 @@ public class TestHoodieTableConfig extends HoodieCommonTestHarness { assertFalse(storage.exists(cfgPath)); assertTrue(storage.exists(backupCfgPath)); - config = new HoodieTableConfig(storage, metaPath.toString(), null, null); + config = new HoodieTableConfig(storage, metaPath, null, null); assertEquals(6, config.getProps().size()); } @ParameterizedTest @ValueSource(booleans = {true, false}) public void testUpdateRecovery(boolean shouldPropsFileExist) throws IOException { - HoodieTableConfig config = new HoodieTableConfig(storage, metaPath.toString(), null, null); + HoodieTableConfig config = new HoodieTableConfig(storage, metaPath, null, null); if (!shouldPropsFileExist) { storage.deleteFile(cfgPath); } @@ -148,7 +148,7 @@ public class TestHoodieTableConfig extends HoodieCommonTestHarness { HoodieTableConfig.recoverIfNeeded(storage, cfgPath, backupCfgPath); assertTrue(storage.exists(cfgPath)); assertFalse(storage.exists(backupCfgPath)); - config = new HoodieTableConfig(storage, metaPath.toString(), null, null); + config = new HoodieTableConfig(storage, metaPath, null, null); assertEquals(6, config.getProps().size()); } @@ -156,11 +156,11 @@ public class TestHoodieTableConfig extends HoodieCommonTestHarness { public void testReadRetry() throws IOException { // When both the hoodie.properties and hoodie.properties.backup do not exist then the read fails storage.rename(cfgPath, new StoragePath(cfgPath.toString() + ".bak")); - assertThrows(HoodieIOException.class, () -> new HoodieTableConfig(storage, metaPath.toString(), null, null)); + assertThrows(HoodieIOException.class, () -> new HoodieTableConfig(storage, metaPath, null, null)); // Should return the backup config if hoodie.properties is not present storage.rename(new StoragePath(cfgPath.toString() + ".bak"), backupCfgPath); - new HoodieTableConfig(storage, metaPath.toString(), null, null); + new HoodieTableConfig(storage, metaPath, null, null); // Should return backup config if hoodie.properties is corrupted Properties props = new Properties(); @@ -168,14 +168,14 @@ public class TestHoodieTableConfig extends HoodieCommonTestHarness { try (OutputStream out = storage.create(cfgPath)) { props.store(out, "Wrong checksum in file so is invalid"); } - new HoodieTableConfig(storage, metaPath.toString(), null, null); + new HoodieTableConfig(storage, metaPath, null, null); // Should throw exception if both hoodie.properties and backup are corrupted try (OutputStream out = storage.create(backupCfgPath)) { props.store(out, "Wrong checksum in file so is invalid"); } assertThrows(IllegalArgumentException.class, () -> new HoodieTableConfig(storage, - metaPath.toString(), null, null)); + metaPath, null, null)); } @Test @@ -193,7 +193,7 @@ public class TestHoodieTableConfig extends HoodieCommonTestHarness { Future readerFuture = executor.submit(() -> { for (int i = 0; i < 100; i++) { // Try to load the table properties, won't throw any exception - new HoodieTableConfig(storage, metaPath.toString(), null, null); + new HoodieTableConfig(storage, metaPath, null, null); } }); diff --git a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/TestHoodieTableMetaClient.java b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/TestHoodieTableMetaClient.java index decdb2d7d24..9bbc72289f5 100644 --- a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/TestHoodieTableMetaClient.java +++ b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/TestHoodieTableMetaClient.java @@ -59,7 +59,7 @@ public class TestHoodieTableMetaClient extends HoodieCommonTestHarness { assertEquals(HoodieTestUtils.RAW_TRIPS_TEST_NAME, metaClient.getTableConfig().getTableName(), "Table name should be raw_trips"); assertEquals(basePath, metaClient.getBasePath(), "Basepath should be the one assigned"); - assertEquals(basePath + "/.hoodie", metaClient.getMetaPath(), + assertEquals(basePath + "/.hoodie", metaClient.getMetaPath().toString(), "Metapath should be ${basepath}/.hoodie"); assertTrue(metaClient.getTableConfig().getProps().containsKey(HoodieTableConfig.TABLE_CHECKSUM.key())); assertTrue(HoodieTableConfig.validateChecksum(metaClient.getTableConfig().getProps())); diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieCopyOnWriteTableInputFormat.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieCopyOnWriteTableInputFormat.java index 2484df8daa4..33f9fdf829f 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieCopyOnWriteTableInputFormat.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieCopyOnWriteTableInputFormat.java @@ -33,7 +33,6 @@ import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.hadoop.fs.HadoopFSUtils; import org.apache.hudi.hadoop.utils.HoodieHiveUtils; import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils; -import org.apache.hudi.storage.StoragePath; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; @@ -62,6 +61,7 @@ import java.util.Map; import java.util.Properties; import java.util.stream.Collectors; +import static org.apache.hudi.hadoop.fs.HadoopFSUtils.convertToStoragePath; import static org.apache.hudi.common.util.ValidationUtils.checkState; /** @@ -250,7 +250,7 @@ public class HoodieCopyOnWriteTableInputFormat extends HoodieTableInputFormat { tableMetaClient, props, HoodieTableQueryType.SNAPSHOT, - partitionPaths.stream().map(e -> new StoragePath(e.toUri())).collect(Collectors.toList()), + partitionPaths.stream().map(HadoopFSUtils::convertToStoragePath).collect(Collectors.toList()), queryCommitInstant, shouldIncludePendingCommits); diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieHFileRecordReader.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieHFileRecordReader.java index 4110f47385b..97177ab260d 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieHFileRecordReader.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieHFileRecordReader.java @@ -43,6 +43,7 @@ import org.apache.hadoop.mapred.RecordReader; import java.io.IOException; import static org.apache.hudi.common.util.ConfigUtils.getReaderConfigs; +import static org.apache.hudi.hadoop.fs.HadoopFSUtils.convertToStoragePath; public class HoodieHFileRecordReader implements RecordReader<NullWritable, ArrayWritable> { @@ -54,7 +55,7 @@ public class HoodieHFileRecordReader implements RecordReader<NullWritable, Array public HoodieHFileRecordReader(Configuration conf, InputSplit split, JobConf job) throws IOException { FileSplit fileSplit = (FileSplit) split; - StoragePath path = new StoragePath(fileSplit.getPath().toUri()); + StoragePath path = convertToStoragePath(fileSplit.getPath()); HoodieConfig hoodieConfig = getReaderConfigs(HadoopFSUtils.getStorageConf(conf)); reader = HoodieFileReaderFactory.getReaderFactory(HoodieRecord.HoodieRecordType.AVRO) .getFileReader(hoodieConfig, HadoopFSUtils.getStorageConf(conf), path, HoodieFileFormat.HFILE, Option.empty()); diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieROTablePathFilter.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieROTablePathFilter.java index d6a62f3a061..51d8a9f3af4 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieROTablePathFilter.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieROTablePathFilter.java @@ -53,6 +53,7 @@ import java.util.stream.Collectors; import static org.apache.hudi.common.config.HoodieCommonConfig.TIMESTAMP_AS_OF; import static org.apache.hudi.common.table.timeline.TimelineUtils.validateTimestampAsOf; import static org.apache.hudi.common.util.StringUtils.nonEmpty; +import static org.apache.hudi.hadoop.fs.HadoopFSUtils.convertToStoragePath; /** * Given a path is a part of - Hoodie table = accepts ONLY the latest version of each path - Non-Hoodie table = then @@ -133,7 +134,7 @@ public class HoodieROTablePathFilter implements Configurable, PathFilter, Serial try { if (storage == null) { storage = - HoodieStorageUtils.getStorage(new StoragePath(path.toUri()), conf); + HoodieStorageUtils.getStorage(convertToStoragePath(path), conf); } // Assumes path is a file @@ -166,8 +167,9 @@ public class HoodieROTablePathFilter implements Configurable, PathFilter, Serial // Perform actual checking. Path baseDir; - if (HoodiePartitionMetadata.hasPartitionMetadata(storage, new StoragePath(folder.toUri()))) { - HoodiePartitionMetadata metadata = new HoodiePartitionMetadata(storage, new StoragePath(folder.toUri())); + StoragePath storagePath = convertToStoragePath(folder); + if (HoodiePartitionMetadata.hasPartitionMetadata(storage, storagePath)) { + HoodiePartitionMetadata metadata = new HoodiePartitionMetadata(storage, storagePath); metadata.readFromFS(); baseDir = HoodieHiveUtils.getNthParent(folder, metadata.getPartitionDepth()); } else { diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/SchemaEvolutionContext.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/SchemaEvolutionContext.java index 454aa519bd5..79829cc3917 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/SchemaEvolutionContext.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/SchemaEvolutionContext.java @@ -71,6 +71,8 @@ import java.util.List; import java.util.Map; import java.util.stream.Collectors; +import static org.apache.hudi.hadoop.fs.HadoopFSUtils.convertToStoragePath; + /** * This class is responsible for calculating names and types of fields that are actual at a certain point in time for hive. * If field is renamed in queried schema, its old name will be returned, which is relevant at the provided time. @@ -114,10 +116,9 @@ public class SchemaEvolutionContext { private HoodieTableMetaClient setUpHoodieTableMetaClient() throws IOException { try { Path inputPath = ((FileSplit) split).getPath(); - StoragePath path = new StoragePath(inputPath.toString()); FileSystem fs = inputPath.getFileSystem(job); HoodieStorage storage = HoodieStorageUtils.getStorage(fs); - Option<StoragePath> tablePath = TablePathUtils.getTablePath(storage, path); + Option<StoragePath> tablePath = TablePathUtils.getTablePath(storage, convertToStoragePath(inputPath)); return HoodieTableMetaClient.builder().setBasePath(tablePath.get().toString()) .setConf(HadoopFSUtils.getStorageConfWithCopy(job)).build(); } catch (Exception e) { diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieMergeOnReadTableInputFormat.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieMergeOnReadTableInputFormat.java index 2af8e92baab..fac2336836b 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieMergeOnReadTableInputFormat.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieMergeOnReadTableInputFormat.java @@ -44,7 +44,6 @@ import org.apache.hudi.hadoop.fs.HadoopFSUtils; import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils; import org.apache.hudi.hadoop.utils.HoodieRealtimeInputFormatUtils; import org.apache.hudi.metadata.HoodieTableMetadataUtil; -import org.apache.hudi.storage.StoragePath; import org.apache.hudi.storage.StoragePathInfo; import org.apache.avro.Schema; @@ -194,7 +193,7 @@ public class HoodieMergeOnReadTableInputFormat extends HoodieCopyOnWriteTableInp // build fileGroup from fsView List<StoragePathInfo> affectedPathInfoList = HoodieInputFormatUtils - .listAffectedFilesForCommits(job, new StoragePath(tableMetaClient.getBasePath()), + .listAffectedFilesForCommits(job, tableMetaClient.getBasePathV2(), metadataList); // step3 HoodieTableFileSystemView fsView = new HoodieTableFileSystemView( diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieInputFormatUtils.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieInputFormatUtils.java index 9db661daf81..6945b241e0a 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieInputFormatUtils.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieInputFormatUtils.java @@ -82,6 +82,7 @@ import static org.apache.hudi.common.config.HoodieMetadataConfig.DEFAULT_METADAT import static org.apache.hudi.common.config.HoodieMetadataConfig.ENABLE; import static org.apache.hudi.common.table.HoodieTableMetaClient.METAFOLDER_NAME; import static org.apache.hudi.common.table.timeline.TimelineUtils.handleHollowCommitIfNeeded; +import static org.apache.hudi.hadoop.fs.HadoopFSUtils.convertToStoragePath; public class HoodieInputFormatUtils { @@ -360,14 +361,15 @@ public class HoodieInputFormatUtils { Path baseDir = partitionPath; HoodieStorage storage = HoodieStorageUtils.getStorage( partitionPath.toString(), HadoopFSUtils.getStorageConf(conf)); - if (HoodiePartitionMetadata.hasPartitionMetadata(storage, new StoragePath(partitionPath.toUri()))) { - HoodiePartitionMetadata metadata = new HoodiePartitionMetadata(storage, new StoragePath(partitionPath.toUri())); + StoragePath partitionStoragePath = convertToStoragePath(partitionPath); + if (HoodiePartitionMetadata.hasPartitionMetadata(storage, partitionStoragePath)) { + HoodiePartitionMetadata metadata = new HoodiePartitionMetadata(storage, partitionStoragePath); metadata.readFromFS(); int levels = metadata.getPartitionDepth(); baseDir = HoodieHiveUtils.getNthParent(partitionPath, levels); } else { for (int i = 0; i < partitionPath.depth(); i++) { - if (storage.exists(new StoragePath(new StoragePath(baseDir.toUri()), METAFOLDER_NAME))) { + if (storage.exists(new StoragePath(convertToStoragePath(baseDir), METAFOLDER_NAME))) { break; } else if (i == partitionPath.depth() - 1) { throw new TableNotFoundException(partitionPath.toString()); diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeRecordReaderUtils.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeRecordReaderUtils.java index f160307dcf9..666e51b81de 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeRecordReaderUtils.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeRecordReaderUtils.java @@ -26,7 +26,6 @@ import org.apache.hudi.hadoop.config.HoodieRealtimeConfig; import org.apache.hudi.hadoop.fs.HadoopFSUtils; import org.apache.hudi.io.storage.HoodieFileReader; import org.apache.hudi.io.storage.HoodieFileReaderFactory; -import org.apache.hudi.storage.StoragePath; import org.apache.avro.JsonProperties; import org.apache.avro.LogicalType; @@ -67,6 +66,7 @@ import java.util.stream.Collectors; import static org.apache.hudi.avro.AvroSchemaUtils.appendFieldsToSchema; import static org.apache.hudi.avro.AvroSchemaUtils.createNullableSchema; import static org.apache.hudi.common.util.ConfigUtils.getReaderConfigs; +import static org.apache.hudi.hadoop.fs.HadoopFSUtils.convertToStoragePath; public class HoodieRealtimeRecordReaderUtils { private static final Logger LOG = LoggerFactory.getLogger(HoodieRealtimeRecordReaderUtils.class); @@ -308,7 +308,7 @@ public class HoodieRealtimeRecordReaderUtils { public static HoodieFileReader getBaseFileReader(Path path, JobConf conf) throws IOException { HoodieConfig hoodieConfig = getReaderConfigs(HadoopFSUtils.getStorageConf(conf)); return HoodieFileReaderFactory.getReaderFactory(HoodieRecord.HoodieRecordType.AVRO) - .getFileReader(hoodieConfig, HadoopFSUtils.getStorageConf(conf), new StoragePath(path.toUri())); + .getFileReader(hoodieConfig, HadoopFSUtils.getStorageConf(conf), convertToStoragePath(path)); } private static Schema appendNullSchemaFields(Schema schema, List<String> newFieldNames) { diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/reader/DFSHoodieDatasetInputReader.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/reader/DFSHoodieDatasetInputReader.java index 298618e60c6..0fcae011638 100644 --- a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/reader/DFSHoodieDatasetInputReader.java +++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/reader/DFSHoodieDatasetInputReader.java @@ -44,7 +44,6 @@ import org.apache.hudi.config.HoodieMemoryConfig; import org.apache.hudi.hadoop.fs.HadoopFSUtils; import org.apache.hudi.io.storage.HoodieAvroFileReader; import org.apache.hudi.io.storage.HoodieFileReaderFactory; -import org.apache.hudi.storage.StoragePath; import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; @@ -279,7 +278,7 @@ public class DFSHoodieDatasetInputReader extends DFSDeltaInputReader { .getFileReader( DEFAULT_HUDI_CONFIG_FOR_READER, metaClient.getStorageConf(), - new StoragePath(fileSlice.getBaseFile().get().getPath()))); + fileSlice.getBaseFile().get().getStoragePath())); return new CloseableMappingIterator<>(reader.getRecordIterator(schema), HoodieRecord::getData); } else { // If there is no data file, fall back to reading log files diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala index cafed4e5e70..ee815188d8e 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala @@ -46,12 +46,12 @@ import org.apache.hudi.internal.schema.utils.{InternalSchemaUtils, SerDeHelper} import org.apache.hudi.io.storage.HoodieFileReaderFactory import org.apache.hudi.metadata.HoodieTableMetadata import org.apache.hudi.storage.{StoragePath, StoragePathInfo} - import org.apache.avro.Schema import org.apache.avro.generic.GenericRecord import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileStatus, Path} import org.apache.hadoop.mapred.JobConf +import org.apache.hudi.hadoop.fs.HadoopFSUtils.convertToStoragePath import org.apache.spark.execution.datasources.HoodieInMemoryFileIndex import org.apache.spark.internal.Logging import org.apache.spark.rdd.RDD @@ -429,7 +429,7 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext, .asJava) fsView.getPartitionPaths.asScala.flatMap { partitionPath => - val relativePath = getRelativePartitionPath(new StoragePath(basePath.toUri), partitionPath) + val relativePath = getRelativePartitionPath(convertToStoragePath(basePath), partitionPath) fsView.getLatestMergedFileSlicesBeforeOrOn(relativePath, ts).iterator().asScala }.toSeq @@ -487,14 +487,15 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext, protected def getPartitionColumnsAsInternalRowInternal(file: StoragePathInfo, basePath: Path, extractPartitionValuesFromPartitionPath: Boolean): InternalRow = { if (extractPartitionValuesFromPartitionPath) { - val tablePathWithoutScheme = new StoragePath(basePath.toUri).getPathWithoutSchemeAndAuthority - val partitionPathWithoutScheme = new StoragePath(file.getPath.getParent.toUri).getPathWithoutSchemeAndAuthority + val baseStoragePath = convertToStoragePath(basePath) + val tablePathWithoutScheme = baseStoragePath.getPathWithoutSchemeAndAuthority + val partitionPathWithoutScheme = file.getPath.getParent.getPathWithoutSchemeAndAuthority val relativePath = tablePathWithoutScheme.toUri.relativize(partitionPathWithoutScheme.toUri).toString val timeZoneId = conf.get("timeZone", sparkSession.sessionState.conf.sessionLocalTimeZone) val rowValues = HoodieSparkUtils.parsePartitionColumnValues( partitionColumns, relativePath, - new StoragePath(basePath.toUri), + baseStoragePath, tableStructSchema, timeZoneId, sparkAdapter.getSparkParsePartitionUtil, diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/DedupeSparkJob.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/DedupeSparkJob.scala index 3a498d98a96..761f2ae49b9 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/DedupeSparkJob.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/DedupeSparkJob.scala @@ -27,6 +27,7 @@ import org.apache.hudi.storage.{HoodieStorage, StorageConfiguration, StoragePath import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileSystem, Path} +import org.apache.hudi.hadoop.fs.HadoopFSUtils.convertToStoragePath import org.apache.spark.sql.{DataFrame, Row, SQLContext} import org.slf4j.LoggerFactory @@ -80,7 +81,7 @@ class DedupeSparkJob(basePath: String, .setConf(storage.getConf.newInstance()) .setBasePath(basePath).build() - val allFiles = storage.listDirectEntries(new StoragePath(s"$basePath/$duplicatedPartitionPath")) + val allFiles = storage.listDirectEntries(new StoragePath(basePath, duplicatedPartitionPath)) val fsView = new HoodieTableFileSystemView(metadata, metadata.getActiveTimeline.getCommitsTimeline.filterCompletedInstants(), allFiles) val latestFiles: java.util.List[HoodieBaseFile] = fsView.getLatestBaseFiles().collect(Collectors.toList[HoodieBaseFile]()) val filteredStatuses = latestFiles.asScala.map(f => f.getPath) @@ -191,7 +192,7 @@ class DedupeSparkJob(basePath: String, .setConf(storage.getConf.newInstance()) .setBasePath(basePath).build() - val allFiles = storage.listDirectEntries(new StoragePath(s"$basePath/$duplicatedPartitionPath")) + val allFiles = storage.listDirectEntries(new StoragePath(basePath, duplicatedPartitionPath)) val fsView = new HoodieTableFileSystemView(metadata, metadata.getActiveTimeline.getCommitsTimeline.filterCompletedInstants(), allFiles) val latestFiles: java.util.List[HoodieBaseFile] = fsView.getLatestBaseFiles().collect(Collectors.toList[HoodieBaseFile]()) @@ -204,8 +205,8 @@ class DedupeSparkJob(basePath: String, val badSuffix = if (dupeFixPlan.contains(fileName)) ".bad" else "" val dstPath = new Path(s"$repairOutputPath/${filePath.getName}$badSuffix") LOG.info(s"Copying from $filePath to $dstPath") - FileIOUtils.copy(storage, new StoragePath(filePath.toUri), storage, - new StoragePath(dstPath.toUri), false, true) + FileIOUtils.copy(storage, convertToStoragePath(filePath), storage, + convertToStoragePath(dstPath), false, true) } // 2. Remove duplicates from the bad files @@ -216,7 +217,7 @@ class DedupeSparkJob(basePath: String, LOG.info(" Skipping and writing new file for : " + fileName) SparkHelpers.skipKeysAndWriteNewFile(instantTime, storage.getConf.asInstanceOf[StorageConfiguration[Configuration]], storage, badFilePath, newFilePath, dupeFixPlan(fileName)) - storage.deleteFile(new StoragePath(badFilePath.toUri)) + storage.deleteFile(badFilePath) } // 3. Check that there are no duplicates anymore. @@ -249,8 +250,8 @@ class DedupeSparkJob(basePath: String, } else { // for real LOG.info(s"[FOR REAL!!!] Copying from $srcPath to $dstPath") - FileIOUtils.copy(storage, new StoragePath(srcPath.toUri), storage, - new StoragePath(dstPath.toUri), false, true) + FileIOUtils.copy(storage, convertToStoragePath(srcPath), storage, + convertToStoragePath(dstPath), false, true) } } } diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ExportInstantsProcedure.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ExportInstantsProcedure.scala index abcd13105dc..68d9c93fc7b 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ExportInstantsProcedure.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ExportInstantsProcedure.scala @@ -33,6 +33,7 @@ import org.apache.hudi.storage.{HoodieStorage, HoodieStorageUtils, StoragePath} import org.apache.avro.generic.GenericRecord import org.apache.avro.specific.SpecificData import org.apache.hadoop.fs.{FileStatus, Path} +import org.apache.hudi.hadoop.fs.HadoopFSUtils.convertToStoragePath import org.apache.spark.internal.Logging import org.apache.spark.sql.Row import org.apache.spark.sql.types.{DataTypes, Metadata, StructField, StructType} @@ -118,7 +119,7 @@ class ExportInstantsProcedure extends BaseProcedure with ProcedureBuilder with L for (fs <- statuses.asScala) { // read the archived file val reader = HoodieLogFormat.newReader( - storage, new HoodieLogFile(new StoragePath(fs.getPath.toUri)), HoodieArchivedMetaEntry.getClassSchema) + storage, new HoodieLogFile(convertToStoragePath(fs.getPath)), HoodieArchivedMetaEntry.getClassSchema) // read the avro blocks while ( { reader.hasNext && copyCount < limit diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RepairMigratePartitionMetaProcedure.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RepairMigratePartitionMetaProcedure.scala index 60cc9714a55..b9f43e12e66 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RepairMigratePartitionMetaProcedure.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RepairMigratePartitionMetaProcedure.scala @@ -98,7 +98,7 @@ class RepairMigratePartitionMetaProcedure extends BaseProcedure with ProcedureBu } val props: Properties = new Properties props.setProperty(HoodieTableConfig.PARTITION_METAFILE_USE_BASE_FORMAT.key, "true") - HoodieTableConfig.update(metaClient.getStorage, new StoragePath(metaClient.getMetaPath), props) + HoodieTableConfig.update(metaClient.getStorage, metaClient.getMetaPath, props) rows.stream().toArray().map(r => r.asInstanceOf[Row]).toList } diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RepairOverwriteHoodiePropsProcedure.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RepairOverwriteHoodiePropsProcedure.scala index 07b4992dbc8..3273c737747 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RepairOverwriteHoodiePropsProcedure.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RepairOverwriteHoodiePropsProcedure.scala @@ -17,10 +17,8 @@ package org.apache.spark.sql.hudi.command.procedures -import org.apache.hudi.common.table.HoodieTableMetaClient.METAFOLDER_NAME import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient} import org.apache.hudi.hadoop.fs.HadoopFSUtils -import org.apache.hudi.storage.StoragePath import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path @@ -70,8 +68,7 @@ class RepairOverwriteHoodiePropsProcedure extends BaseProcedure with ProcedureBu var newProps = new Properties loadNewProps(overwriteFilePath, newProps) val oldProps = metaClient.getTableConfig.propsMap - val metaPathDir = new StoragePath(tablePath, METAFOLDER_NAME) - HoodieTableConfig.create(metaClient.getStorage, metaPathDir, newProps) + HoodieTableConfig.create(metaClient.getStorage, metaClient.getMetaPath, newProps) // reload new props as checksum would have been added newProps = HoodieTableMetaClient.reload(metaClient).getTableConfig.getProps diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/TestSqlConf.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/TestSqlConf.scala index a47b756c4b2..adce16e7193 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/TestSqlConf.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/TestSqlConf.scala @@ -22,10 +22,8 @@ import org.apache.hudi.common.config.DFSPropertiesConfiguration import org.apache.hudi.common.model.HoodieTableType import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient} import org.apache.hudi.common.testutils.HoodieTestUtils -import org.apache.hudi.storage.HoodieStorageUtils +import org.apache.hudi.storage.{HoodieStorageUtils, StoragePath} import org.apache.hudi.testutils.HoodieClientTestUtils.createMetaClient - -import org.apache.hadoop.conf.Configuration import org.scalatest.BeforeAndAfter import java.io.File @@ -83,7 +81,7 @@ class TestSqlConf extends HoodieSparkSqlTestBase with BeforeAndAfter { assertResult(true)(Files.exists(Paths.get(s"$tablePath/$partitionVal"))) assertResult(HoodieTableType.MERGE_ON_READ)(new HoodieTableConfig( HoodieStorageUtils.getStorage(tablePath, HoodieTestUtils.getDefaultStorageConf), - s"$tablePath/" + HoodieTableMetaClient.METAFOLDER_NAME, + new StoragePath(tablePath, HoodieTableMetaClient.METAFOLDER_NAME), HoodieTableConfig.PAYLOAD_CLASS_NAME.defaultValue, HoodieTableConfig.RECORD_MERGER_STRATEGY.defaultValue).getTableType) diff --git a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/marker/MarkerBasedEarlyConflictDetectionRunnable.java b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/marker/MarkerBasedEarlyConflictDetectionRunnable.java index 11213b56e26..bce28e8ae9c 100644 --- a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/marker/MarkerBasedEarlyConflictDetectionRunnable.java +++ b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/marker/MarkerBasedEarlyConflictDetectionRunnable.java @@ -88,7 +88,7 @@ public class MarkerBasedEarlyConflictDetectionRunnable implements Runnable { // and the markers from the requests pending processing. currentInstantAllMarkers.addAll(markerHandler.getAllMarkers(markerDir)); currentInstantAllMarkers.addAll(pendingMarkers); - StoragePath tempPath = new StoragePath(basePath + StoragePath.SEPARATOR + HoodieTableMetaClient.TEMPFOLDER_NAME); + StoragePath tempPath = new StoragePath(basePath, HoodieTableMetaClient.TEMPFOLDER_NAME); List<StoragePath> instants = MarkerUtils.getAllMarkerDir(tempPath, storage);
