This is an automated email from the ASF dual-hosted git repository. yihua pushed a commit to branch branch-0.x in repository https://gitbox.apache.org/repos/asf/hudi.git
commit 1536d50870a2f78f0a70e80bcc55053e9ac76dac Author: Y Ethan Guo <[email protected]> AuthorDate: Wed May 15 06:25:12 2024 -0700 [HUDI-7745] Move Hadoop-dependent util methods to hudi-hadoop-common (#11193) --- .../hudi/aws/sync/AWSGlueCatalogSyncClient.java | 4 +- .../hudi/client/utils/CommitMetadataUtils.java | 4 +- .../index/bucket/ConsistentBucketIndexUtils.java | 5 +- .../org/apache/hudi/io/HoodieAppendHandle.java | 3 +- .../java/org/apache/hudi/io/HoodieCDCLogger.java | 4 +- .../org/apache/hudi/io/HoodieCreateHandle.java | 3 +- .../java/org/apache/hudi/io/HoodieMergeHandle.java | 3 +- .../table/action/bootstrap/BootstrapUtils.java | 11 +- .../table/action/rollback/BaseRollbackHelper.java | 5 +- .../rollback/ListingBasedRollbackStrategy.java | 9 +- .../rollback/MarkerBasedRollbackStrategy.java | 7 +- .../HoodieSparkBootstrapSchemaProvider.java | 4 +- .../bootstrap/MetadataBootstrapHandlerFactory.java | 9 +- .../java/org/apache/hudi/table/TestCleaner.java | 7 +- ...dieSparkMergeOnReadTableInsertUpdateDelete.java | 3 +- .../hudi/common/bootstrap/FileStatusUtils.java | 86 ------- .../java/org/apache/hudi/common/fs/FSUtils.java | 229 +++-------------- .../org/apache/hudi/common/util/ConfigUtils.java | 57 ----- .../apache/hudi/common/util/TestConfigUtils.java | 10 +- .../sink/compact/ITTestHoodieFlinkCompactor.java | 3 +- .../apache/hudi/common/util/HadoopConfigUtils.java | 91 +++++++ .../org/apache/hudi/hadoop/fs/HadoopFSUtils.java | 275 +++++++++++++++++++++ .../org/apache/hudi/common/fs/TestFSUtils.java | 6 +- .../table/view/TestHoodieTableFileSystemView.java | 6 +- .../hudi/common/testutils/HoodieTestTable.java | 3 +- .../hudi/common/util/TestHadoopConfigUtils.java | 63 +++++ .../hudi/hadoop/HoodieROTablePathFilter.java | 3 +- .../realtime/HoodieParquetRealtimeInputFormat.java | 3 +- .../hudi/hadoop/utils/HoodieInputFormatUtils.java | 2 +- .../TestHoodieMergeOnReadSnapshotReader.java | 2 +- .../realtime/TestHoodieRealtimeRecordReader.java | 3 +- .../SparkFullBootstrapDataProviderBase.java | 4 +- .../procedures/ShowInvalidParquetProcedure.scala | 2 +- .../org/apache/hudi/functional/TestBootstrap.java | 7 +- .../apache/hudi/functional/TestOrcBootstrap.java | 10 +- .../apache/hudi/sync/adb/HoodieAdbJdbcClient.java | 12 +- .../org/apache/hudi/hive/ddl/HMSDDLExecutor.java | 7 +- .../hudi/hive/ddl/QueryBasedDDLExecutor.java | 7 +- .../org/apache/hudi/hive/TestHiveSyncTool.java | 4 +- .../apache/hudi/sync/common/HoodieSyncClient.java | 6 +- .../apache/hudi/sync/common/HoodieSyncConfig.java | 4 +- .../apache/hudi/sync/common/HoodieSyncTool.java | 4 +- .../hudi/utilities/HoodieDataTableUtils.java | 2 +- .../apache/hudi/utilities/HoodieRepairTool.java | 7 +- .../hudi/utilities/HoodieSnapshotCopier.java | 2 +- .../hudi/utilities/HoodieSnapshotExporter.java | 2 +- 46 files changed, 570 insertions(+), 433 deletions(-) diff --git a/hudi-aws/src/main/java/org/apache/hudi/aws/sync/AWSGlueCatalogSyncClient.java b/hudi-aws/src/main/java/org/apache/hudi/aws/sync/AWSGlueCatalogSyncClient.java index 11e3eaea1c0..d379109a624 100644 --- a/hudi-aws/src/main/java/org/apache/hudi/aws/sync/AWSGlueCatalogSyncClient.java +++ b/hudi-aws/src/main/java/org/apache/hudi/aws/sync/AWSGlueCatalogSyncClient.java @@ -301,7 +301,7 @@ public class AWSGlueCatalogSyncClient extends HoodieSyncClient { try { StorageDescriptor sd = table.storageDescriptor(); List<PartitionInput> partitionInputList = partitionsToAdd.stream().map(partition -> { - String fullPartitionPath = FSUtils.constructAbsolutePathInHadoopPath(s3aToS3(getBasePath()), partition).toString(); + String fullPartitionPath = FSUtils.constructAbsolutePath(s3aToS3(getBasePath()), partition).toString(); List<String> partitionValues = partitionValueExtractor.extractPartitionValuesInPath(partition); StorageDescriptor partitionSD = sd.copy(copySd -> copySd.location(fullPartitionPath)); return PartitionInput.builder().values(partitionValues).storageDescriptor(partitionSD).build(); @@ -345,7 +345,7 @@ public class AWSGlueCatalogSyncClient extends HoodieSyncClient { try { StorageDescriptor sd = table.storageDescriptor(); List<BatchUpdatePartitionRequestEntry> updatePartitionEntries = changedPartitions.stream().map(partition -> { - String fullPartitionPath = FSUtils.constructAbsolutePathInHadoopPath(s3aToS3(getBasePath()), partition).toString(); + String fullPartitionPath = FSUtils.constructAbsolutePath(s3aToS3(getBasePath()), partition).toString(); List<String> partitionValues = partitionValueExtractor.extractPartitionValuesInPath(partition); StorageDescriptor partitionSD = sd.copy(copySd -> copySd.location(fullPartitionPath)); PartitionInput partitionInput = PartitionInput.builder().values(partitionValues).storageDescriptor(partitionSD).build(); diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/CommitMetadataUtils.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/CommitMetadataUtils.java index 64f55b09e80..56014542394 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/CommitMetadataUtils.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/CommitMetadataUtils.java @@ -151,7 +151,7 @@ public class CommitMetadataUtils { List<String> logFilePaths = new ArrayList<>(logFilesMarkerPath); HoodiePairData<String, List<String>> partitionPathLogFilePair = context.parallelize(logFilePaths).mapToPair(logFilePath -> { Path logFileFullPath = new Path(basePathStr, logFilePath); - String partitionPath = FSUtils.getRelativePartitionPath(new Path(basePathStr), logFileFullPath.getParent()); + String partitionPath = HadoopFSUtils.getRelativePartitionPath(new Path(basePathStr), logFileFullPath.getParent()); return Pair.of(partitionPath, Collections.singletonList(logFileFullPath.getName())); }); HoodiePairData<String, Map<String, List<String>>> partitionPathToFileIdAndLogFileList = partitionPathLogFilePair @@ -169,7 +169,7 @@ public class CommitMetadataUtils { List<String> missingLogFiles = t.getValue(); Map<String, List<String>> fileIdtologFiles = new HashMap<>(); missingLogFiles.forEach(logFile -> { - String fileId = FSUtils.getFileIdFromLogPath(new Path(fullPartitionPath, logFile)); + String fileId = HadoopFSUtils.getFileIdFromLogPath(new Path(fullPartitionPath, logFile)); if (!fileIdtologFiles.containsKey(fileId)) { fileIdtologFiles.put(fileId, new ArrayList<>()); } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/ConsistentBucketIndexUtils.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/ConsistentBucketIndexUtils.java index a90e0db6a06..069ec9e5b74 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/ConsistentBucketIndexUtils.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/ConsistentBucketIndexUtils.java @@ -29,6 +29,7 @@ import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.common.util.ValidationUtils; import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.exception.HoodieIndexException; +import org.apache.hudi.hadoop.fs.HadoopFSUtils; import org.apache.hudi.storage.HoodieStorage; import org.apache.hudi.storage.StoragePath; import org.apache.hudi.table.HoodieTable; @@ -108,8 +109,8 @@ public class ConsistentBucketIndexUtils { */ public static Option<HoodieConsistentHashingMetadata> loadMetadata(HoodieTable table, String partition) { HoodieTableMetaClient metaClient = table.getMetaClient(); - Path metadataPath = FSUtils.constructAbsolutePathInHadoopPath(metaClient.getHashingMetadataPath(), partition); - Path partitionPath = FSUtils.constructAbsolutePathInHadoopPath(metaClient.getBasePathV2().toString(), partition); + Path metadataPath = HadoopFSUtils.constructAbsolutePathInHadoopPath(metaClient.getHashingMetadataPath(), partition); + Path partitionPath = HadoopFSUtils.constructAbsolutePathInHadoopPath(metaClient.getBasePathV2().toString(), partition); try { Predicate<FileStatus> hashingMetaCommitFilePredicate = fileStatus -> { String filename = fileStatus.getPath().getName(); diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java index 5b414c79b53..ce4a4a46506 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java @@ -55,6 +55,7 @@ import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieAppendException; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieUpsertException; +import org.apache.hudi.hadoop.fs.HadoopFSUtils; import org.apache.hudi.metadata.HoodieTableMetadata; import org.apache.hudi.storage.StoragePath; import org.apache.hudi.table.HoodieTable; @@ -516,7 +517,7 @@ public class HoodieAppendHandle<T, I, K, O> extends HoodieWriteHandle<T, I, K, O // TODO we can actually deduce file size purely from AppendResult (based on offset and size // of the appended block) for (WriteStatus status : statuses) { - long logFileSize = FSUtils.getFileSize(fs, new Path(config.getBasePath(), status.getStat().getPath())); + long logFileSize = HadoopFSUtils.getFileSize(fs, new Path(config.getBasePath(), status.getStat().getPath())); status.getStat().setFileSizeInBytes(logFileSize); } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCDCLogger.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCDCLogger.java index eec73b8ed9d..2397c2ea30f 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCDCLogger.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCDCLogger.java @@ -19,7 +19,6 @@ package org.apache.hudi.io; import org.apache.hudi.avro.HoodieAvroUtils; -import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.HoodieAvroIndexedRecord; import org.apache.hudi.common.model.HoodieAvroPayload; import org.apache.hudi.common.model.HoodieRecord; @@ -40,6 +39,7 @@ import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.exception.HoodieUpsertException; +import org.apache.hudi.hadoop.fs.HadoopFSUtils; import org.apache.avro.Schema; import org.apache.avro.generic.GenericData; @@ -216,7 +216,7 @@ public class HoodieCDCLogger implements Closeable { for (Path cdcAbsPath : cdcAbsPaths) { String cdcFileName = cdcAbsPath.getName(); String cdcPath = StringUtils.isNullOrEmpty(partitionPath) ? cdcFileName : partitionPath + "/" + cdcFileName; - stats.put(cdcPath, FSUtils.getFileSize(fs, cdcAbsPath)); + stats.put(cdcPath, HadoopFSUtils.getFileSize(fs, cdcAbsPath)); } } catch (IOException e) { throw new HoodieUpsertException("Failed to get cdc write stat", e); diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java index aaad39c3453..07f30c1e3fa 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java @@ -32,6 +32,7 @@ import org.apache.hudi.common.model.MetadataValues; import org.apache.hudi.common.util.Option; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieInsertException; +import org.apache.hudi.hadoop.fs.HadoopFSUtils; import org.apache.hudi.io.storage.HoodieFileWriter; import org.apache.hudi.io.storage.HoodieFileWriterFactory; import org.apache.hudi.storage.StoragePath; @@ -244,7 +245,7 @@ public class HoodieCreateHandle<T, I, K, O> extends HoodieWriteHandle<T, I, K, O stat.setPath(new StoragePath(config.getBasePath()), path); stat.setTotalWriteErrors(writeStatus.getTotalErrorRecords()); - long fileSize = FSUtils.getFileSize(fs, new Path(path.toUri())); + long fileSize = HadoopFSUtils.getFileSize(fs, new Path(path.toUri())); stat.setTotalWriteBytes(fileSize); stat.setFileSizeInBytes(fileSize); diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java index 8f310899174..ed18a2f0055 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java @@ -42,6 +42,7 @@ import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieCorruptedDataException; import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.exception.HoodieUpsertException; +import org.apache.hudi.hadoop.fs.HadoopFSUtils; import org.apache.hudi.io.storage.HoodieFileReader; import org.apache.hudi.io.storage.HoodieFileReaderFactory; import org.apache.hudi.io.storage.HoodieFileWriter; @@ -430,7 +431,7 @@ public class HoodieMergeHandle<T, I, K, O> extends HoodieWriteHandle<T, I, K, O> fileWriter.close(); fileWriter = null; - long fileSizeInBytes = FSUtils.getFileSize(fs, new Path(newFilePath.toUri())); + long fileSizeInBytes = HadoopFSUtils.getFileSize(fs, new Path(newFilePath.toUri())); HoodieWriteStat stat = writeStatus.getStat(); stat.setTotalWriteBytes(fileSizeInBytes); diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/bootstrap/BootstrapUtils.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/bootstrap/BootstrapUtils.java index 3e9e6b42a61..6ced75a2a3b 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/bootstrap/BootstrapUtils.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/bootstrap/BootstrapUtils.java @@ -19,11 +19,10 @@ package org.apache.hudi.table.action.bootstrap; import org.apache.hudi.avro.model.HoodieFileStatus; -import org.apache.hudi.common.bootstrap.FileStatusUtils; import org.apache.hudi.common.engine.HoodieEngineContext; -import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.hadoop.fs.HadoopFSUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; @@ -67,9 +66,9 @@ public class BootstrapUtils { for (FileStatus topLevelStatus: topLevelStatuses) { if (topLevelStatus.isFile() && filePathFilter.accept(topLevelStatus.getPath())) { - String relativePath = FSUtils.getRelativePartitionPath(basePath, topLevelStatus.getPath().getParent()); + String relativePath = HadoopFSUtils.getRelativePartitionPath(basePath, topLevelStatus.getPath().getParent()); Integer level = (int) relativePath.chars().filter(ch -> ch == '/').count(); - HoodieFileStatus hoodieFileStatus = FileStatusUtils.fromFileStatus(topLevelStatus); + HoodieFileStatus hoodieFileStatus = HadoopFSUtils.fromFileStatus(topLevelStatus); result.add(Pair.of(hoodieFileStatus, Pair.of(level, relativePath))); } else if (topLevelStatus.isDirectory() && metaPathFilter.accept(topLevelStatus.getPath())) { subDirectories.add(topLevelStatus.getPath().toString()); @@ -86,9 +85,9 @@ public class BootstrapUtils { while (itr.hasNext()) { FileStatus status = itr.next(); if (pathFilter.accept(status.getPath())) { - String relativePath = FSUtils.getRelativePartitionPath(new Path(basePathStr), status.getPath().getParent()); + String relativePath = HadoopFSUtils.getRelativePartitionPath(new Path(basePathStr), status.getPath().getParent()); Integer level = (int) relativePath.chars().filter(ch -> ch == '/').count(); - HoodieFileStatus hoodieFileStatus = FileStatusUtils.fromFileStatus(status); + HoodieFileStatus hoodieFileStatus = HadoopFSUtils.fromFileStatus(status); res.add(Pair.of(hoodieFileStatus, Pair.of(level, relativePath))); } } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackHelper.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackHelper.java index ca3f9b1c570..856b56ca321 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackHelper.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackHelper.java @@ -39,6 +39,7 @@ import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.exception.HoodieRollbackException; +import org.apache.hudi.hadoop.fs.HadoopFSUtils; import org.apache.hudi.storage.HoodieStorage; import org.apache.hudi.storage.HoodieStorageUtils; import org.apache.hudi.storage.StorageConfiguration; @@ -291,7 +292,7 @@ public class BaseRollbackHelper implements Serializable { // lets map each log file to partition path and log file name .mapToPair((SerializablePairFunction<String, String, String>) t -> { Path logFilePath = new Path(basePathStr, t); - String partitionPath = FSUtils.getRelativePartitionPath(new Path(basePathStr), logFilePath.getParent()); + String partitionPath = HadoopFSUtils.getRelativePartitionPath(new Path(basePathStr), logFilePath.getParent()); return Pair.of(partitionPath, logFilePath.getName()); }) // lets group by partition path and collect it as log file list per partition path @@ -356,7 +357,7 @@ public class BaseRollbackHelper implements Serializable { String basePath = metaClient.getBasePathV2().toString(); try { Path fullDeletePath = new Path(fileToDelete); - String partitionPath = FSUtils.getRelativePartitionPath(new Path(basePath), fullDeletePath.getParent()); + String partitionPath = HadoopFSUtils.getRelativePartitionPath(new Path(basePath), fullDeletePath.getParent()); boolean isDeleted = true; if (doDelete) { try { diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackStrategy.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackStrategy.java index 1fd054b9407..e6eca0924bd 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackStrategy.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackStrategy.java @@ -34,6 +34,7 @@ import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieRollbackException; +import org.apache.hudi.hadoop.fs.HadoopFSUtils; import org.apache.hudi.storage.StoragePath; import org.apache.hudi.table.HoodieTable; @@ -225,7 +226,7 @@ public class ListingBasedRollbackStrategy implements BaseRollbackPlanActionExecu } return false; }; - return fs.listStatus(FSUtils.constructAbsolutePathInHadoopPath(config.getBasePath(), partitionPath), filter); + return fs.listStatus(HadoopFSUtils.constructAbsolutePathInHadoopPath(config.getBasePath(), partitionPath), filter); } private FileStatus[] fetchFilesFromInstant(HoodieInstant instantToRollback, String partitionPath, String basePath, @@ -286,7 +287,7 @@ public class ListingBasedRollbackStrategy implements BaseRollbackPlanActionExecu } private static Path[] listFilesToBeDeleted(String basePath, String partitionPath) { - return new Path[] {FSUtils.constructAbsolutePathInHadoopPath(basePath, partitionPath)}; + return new Path[] {HadoopFSUtils.constructAbsolutePathInHadoopPath(basePath, partitionPath)}; } private static Path[] getFilesFromCommitMetadata(String basePath, HoodieCommitMetadata commitMetadata, String partitionPath) { @@ -300,7 +301,7 @@ public class ListingBasedRollbackStrategy implements BaseRollbackPlanActionExecu if (path.toString().endsWith(basefileExtension)) { String fileCommitTime = FSUtils.getCommitTime(path.getName()); return commit.equals(fileCommitTime); - } else if (FSUtils.isLogFile(path)) { + } else if (HadoopFSUtils.isLogFile(path)) { // Since the baseCommitTime is the only commit for new log files, it's okay here String fileCommitTime = FSUtils.getBaseCommitTimeFromLogPath(new StoragePath(path.toUri())); return commit.equals(fileCommitTime); @@ -356,7 +357,7 @@ public class ListingBasedRollbackStrategy implements BaseRollbackPlanActionExecu FileSlice latestFileSlice = latestFileSlices.get(writeStat.getFileId()); String fileId = writeStat.getFileId(); String latestBaseInstant = latestFileSlice.getBaseInstantTime(); - Path fullLogFilePath = FSUtils.constructAbsolutePathInHadoopPath(table.getConfig().getBasePath(), writeStat.getPath()); + Path fullLogFilePath = HadoopFSUtils.constructAbsolutePathInHadoopPath(table.getConfig().getBasePath(), writeStat.getPath()); Map<String, Long> logFilesWithBlocksToRollback = Collections.singletonMap( fullLogFilePath.toString(), writeStat.getTotalWriteBytes() > 0 ? writeStat.getTotalWriteBytes() : 1L); hoodieRollbackRequests.add(new HoodieRollbackRequest(partitionPath, fileId, latestBaseInstant, diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/MarkerBasedRollbackStrategy.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/MarkerBasedRollbackStrategy.java index 5ba61b38803..f1648ede24a 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/MarkerBasedRollbackStrategy.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/MarkerBasedRollbackStrategy.java @@ -30,6 +30,7 @@ import org.apache.hudi.common.util.Option; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.exception.HoodieRollbackException; +import org.apache.hudi.hadoop.fs.HadoopFSUtils; import org.apache.hudi.storage.StoragePath; import org.apache.hudi.table.HoodieTable; import org.apache.hudi.table.marker.MarkerBasedRollbackUtils; @@ -80,17 +81,17 @@ public class MarkerBasedRollbackStrategy<T, I, K, O> implements BaseRollbackPlan IOType type = IOType.valueOf(typeStr); String fileNameWithPartitionToRollback = WriteMarkers.stripMarkerSuffix(markerFilePath); Path fullFilePathToRollback = new Path(basePath, fileNameWithPartitionToRollback); - String partitionPath = FSUtils.getRelativePartitionPath(new Path(basePath), fullFilePathToRollback.getParent()); + String partitionPath = HadoopFSUtils.getRelativePartitionPath(new Path(basePath), fullFilePathToRollback.getParent()); switch (type) { case MERGE: case CREATE: String fileId = null; String baseInstantTime = null; - if (FSUtils.isBaseFile(fullFilePathToRollback)) { + if (HadoopFSUtils.isBaseFile(fullFilePathToRollback)) { HoodieBaseFile baseFileToDelete = new HoodieBaseFile(fullFilePathToRollback.toString()); fileId = baseFileToDelete.getFileId(); baseInstantTime = baseFileToDelete.getCommitTime(); - } else if (FSUtils.isLogFile(fullFilePathToRollback)) { + } else if (HadoopFSUtils.isLogFile(fullFilePathToRollback)) { throw new HoodieRollbackException("Log files should have only APPEND as IOTypes " + fullFilePathToRollback); } Objects.requireNonNull(fileId, "Cannot find valid fileId from path: " + fullFilePathToRollback); diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/bootstrap/HoodieSparkBootstrapSchemaProvider.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/bootstrap/HoodieSparkBootstrapSchemaProvider.java index 6319928f8de..cdbafc7c101 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/bootstrap/HoodieSparkBootstrapSchemaProvider.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/bootstrap/HoodieSparkBootstrapSchemaProvider.java @@ -22,13 +22,13 @@ import org.apache.hudi.AvroConversionUtils; import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.avro.model.HoodieFileStatus; import org.apache.hudi.client.common.HoodieSparkEngineContext; -import org.apache.hudi.common.bootstrap.FileStatusUtils; import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.util.AvroOrcUtils; import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.hadoop.fs.HadoopFSUtils; import org.apache.avro.Schema; import org.apache.hadoop.conf.Configuration; @@ -54,7 +54,7 @@ public class HoodieSparkBootstrapSchemaProvider extends HoodieBootstrapSchemaPro @Override protected Schema getBootstrapSourceSchema(HoodieEngineContext context, List<Pair<String, List<HoodieFileStatus>>> partitions) { Schema schema = partitions.stream().flatMap(p -> p.getValue().stream()).map(fs -> { - Path filePath = FileStatusUtils.toPath(fs.getPath()); + Path filePath = HadoopFSUtils.toPath(fs.getPath()); String extension = FSUtils.getFileExtension(filePath.getName()); if (PARQUET.getFileExtension().equals(extension)) { return getBootstrapSourceSchemaParquet(writeConfig, context, filePath); diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/MetadataBootstrapHandlerFactory.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/MetadataBootstrapHandlerFactory.java index 9fa9e1cbf73..98bbe9b1aba 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/MetadataBootstrapHandlerFactory.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/MetadataBootstrapHandlerFactory.java @@ -18,13 +18,14 @@ package org.apache.hudi.table.action.bootstrap; -import org.apache.hadoop.fs.Path; -import org.apache.hudi.common.bootstrap.FileStatusUtils; +import org.apache.hudi.avro.model.HoodieFileStatus; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieIOException; +import org.apache.hudi.hadoop.fs.HadoopFSUtils; import org.apache.hudi.table.HoodieTable; -import org.apache.hudi.avro.model.HoodieFileStatus; + +import org.apache.hadoop.fs.Path; import static org.apache.hudi.common.model.HoodieFileFormat.ORC; import static org.apache.hudi.common.model.HoodieFileFormat.PARQUET; @@ -32,7 +33,7 @@ import static org.apache.hudi.common.model.HoodieFileFormat.PARQUET; public class MetadataBootstrapHandlerFactory { public static BootstrapMetadataHandler getMetadataHandler(HoodieWriteConfig config, HoodieTable table, HoodieFileStatus srcFileStatus) { - Path sourceFilePath = FileStatusUtils.toPath(srcFileStatus.getPath()); + Path sourceFilePath = HadoopFSUtils.toPath(srcFileStatus.getPath()); String extension = FSUtils.getFileExtension(sourceFilePath.toString()); if (ORC.getFileExtension().equals(extension)) { diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java index 26b3efed499..723fa6b1614 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java @@ -71,6 +71,7 @@ import org.apache.hudi.config.HoodieArchivalConfig; import org.apache.hudi.config.HoodieCleanConfig; import org.apache.hudi.config.HoodieCompactionConfig; import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.hadoop.fs.HadoopFSUtils; import org.apache.hudi.index.HoodieIndex; import org.apache.hudi.index.SparkHoodieIndexFactory; import org.apache.hudi.metadata.HoodieTableMetadataWriter; @@ -861,9 +862,9 @@ public class TestCleaner extends HoodieCleanerTestBase { version2Plan.getFilePathsToBeDeletedPerPartition().get(partition1).size()); assertEquals(version1Plan.getFilesToBeDeletedPerPartition().get(partition2).size(), version2Plan.getFilePathsToBeDeletedPerPartition().get(partition2).size()); - assertEquals(new Path(FSUtils.constructAbsolutePathInHadoopPath(metaClient.getBasePath(), partition1), fileName1).toString(), + assertEquals(new StoragePath(FSUtils.constructAbsolutePath(metaClient.getBasePath(), partition1), fileName1).toString(), version2Plan.getFilePathsToBeDeletedPerPartition().get(partition1).get(0).getFilePath()); - assertEquals(new Path(FSUtils.constructAbsolutePathInHadoopPath(metaClient.getBasePath(), partition2), fileName2).toString(), + assertEquals(new StoragePath(FSUtils.constructAbsolutePath(metaClient.getBasePath(), partition2), fileName2).toString(), version2Plan.getFilePathsToBeDeletedPerPartition().get(partition2).get(0).getFilePath()); // Downgrade and verify version 1 plan @@ -1341,7 +1342,7 @@ public class TestCleaner extends HoodieCleanerTestBase { String fileName = Paths.get(fullPath).getFileName().toString(); return Pair.of(FSUtils.getFileId(fileName), FSUtils.getCommitTime(fileName)); }); - Stream<Pair<String, String>> stream2 = paths.stream().filter(rtFilePredicate).map(path -> Pair.of(FSUtils.getFileIdFromLogPath(new Path(path)), + Stream<Pair<String, String>> stream2 = paths.stream().filter(rtFilePredicate).map(path -> Pair.of(HadoopFSUtils.getFileIdFromLogPath(new Path(path)), FSUtils.getBaseCommitTimeFromLogPath(new StoragePath(path)))); return Stream.concat(stream1, stream2); } diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableInsertUpdateDelete.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableInsertUpdateDelete.java index 263a4d5314f..8e85208af6f 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableInsertUpdateDelete.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableInsertUpdateDelete.java @@ -47,6 +47,7 @@ import org.apache.hudi.common.testutils.HoodieTestDataGenerator; import org.apache.hudi.common.util.Option; import org.apache.hudi.config.HoodieCompactionConfig; import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.hadoop.fs.HadoopFSUtils; import org.apache.hudi.index.HoodieIndex; import org.apache.hudi.storage.StoragePathInfo; import org.apache.hudi.table.HoodieSparkTable; @@ -368,7 +369,7 @@ public class TestHoodieSparkMergeOnReadTableInsertUpdateDelete extends SparkClie // inject a fake log file to test marker file for log file HoodieDeltaWriteStat correctWriteStat = (HoodieDeltaWriteStat) statuses.map(WriteStatus::getStat).take(1).get(0); - assertTrue(FSUtils.isLogFile(new Path(correctWriteStat.getPath()))); + assertTrue(HadoopFSUtils.isLogFile(new Path(correctWriteStat.getPath()))); HoodieLogFile correctLogFile = new HoodieLogFile(correctWriteStat.getPath()); String correctWriteToken = FSUtils.getWriteTokenFromLogPath(correctLogFile.getPath()); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/bootstrap/FileStatusUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/bootstrap/FileStatusUtils.java index 026af3714b1..5593b2f7f53 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/bootstrap/FileStatusUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/bootstrap/FileStatusUtils.java @@ -18,62 +18,14 @@ package org.apache.hudi.common.bootstrap; -import org.apache.hudi.avro.model.HoodieFSPermission; import org.apache.hudi.avro.model.HoodieFileStatus; -import org.apache.hudi.avro.model.HoodiePath; -import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.storage.StoragePath; import org.apache.hudi.storage.StoragePathInfo; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.permission.FsAction; -import org.apache.hadoop.fs.permission.FsPermission; - -import java.io.IOException; - /** * Helper functions around FileStatus and HoodieFileStatus. */ public class FileStatusUtils { - - public static Path toPath(HoodiePath path) { - if (null == path) { - return null; - } - return new Path(path.getUri()); - } - - public static HoodiePath fromPath(Path path) { - if (null == path) { - return null; - } - return HoodiePath.newBuilder().setUri(path.toString()).build(); - } - - public static FsPermission toFSPermission(HoodieFSPermission fsPermission) { - if (null == fsPermission) { - return null; - } - FsAction userAction = fsPermission.getUserAction() != null ? FsAction.valueOf(fsPermission.getUserAction()) : null; - FsAction grpAction = fsPermission.getGroupAction() != null ? FsAction.valueOf(fsPermission.getGroupAction()) : null; - FsAction otherAction = - fsPermission.getOtherAction() != null ? FsAction.valueOf(fsPermission.getOtherAction()) : null; - boolean stickyBit = fsPermission.getStickyBit() != null ? fsPermission.getStickyBit() : false; - return new FsPermission(userAction, grpAction, otherAction, stickyBit); - } - - public static HoodieFSPermission fromFSPermission(FsPermission fsPermission) { - if (null == fsPermission) { - return null; - } - String userAction = fsPermission.getUserAction() != null ? fsPermission.getUserAction().name() : null; - String grpAction = fsPermission.getGroupAction() != null ? fsPermission.getGroupAction().name() : null; - String otherAction = fsPermission.getOtherAction() != null ? fsPermission.getOtherAction().name() : null; - return HoodieFSPermission.newBuilder().setUserAction(userAction).setGroupAction(grpAction) - .setOtherAction(otherAction).setStickyBit(fsPermission.getStickyBit()).build(); - } - public static StoragePathInfo toStoragePathInfo(HoodieFileStatus fileStatus) { if (null == fileStatus) { return null; @@ -84,42 +36,4 @@ public class FileStatusUtils { fileStatus.getIsDir() == null ? false : fileStatus.getIsDir(), fileStatus.getBlockReplication().shortValue(), fileStatus.getBlockSize(), fileStatus.getModificationTime()); } - - public static HoodieFileStatus fromFileStatus(FileStatus fileStatus) { - if (null == fileStatus) { - return null; - } - - HoodieFileStatus fStatus = new HoodieFileStatus(); - try { - fStatus.setPath(fromPath(fileStatus.getPath())); - fStatus.setLength(fileStatus.getLen()); - fStatus.setIsDir(fileStatus.isDirectory()); - fStatus.setBlockReplication((int) fileStatus.getReplication()); - fStatus.setBlockSize(fileStatus.getBlockSize()); - fStatus.setModificationTime(fileStatus.getModificationTime()); - fStatus.setAccessTime(fileStatus.getModificationTime()); - fStatus.setSymlink(fileStatus.isSymlink() ? fromPath(fileStatus.getSymlink()) : null); - safeReadAndSetMetadata(fStatus, fileStatus); - } catch (IOException ioe) { - throw new HoodieIOException(ioe.getMessage(), ioe); - } - return fStatus; - } - - /** - * Used to safely handle FileStatus calls which might fail on some FileSystem implementation. - * (DeprecatedLocalFileSystem) - */ - private static void safeReadAndSetMetadata(HoodieFileStatus fStatus, FileStatus fileStatus) { - try { - fStatus.setOwner(fileStatus.getOwner()); - fStatus.setGroup(fileStatus.getGroup()); - fStatus.setPermission(fromFSPermission(fileStatus.getPermission())); - } catch (IllegalArgumentException ie) { - // Deprecated File System (testing) does not work well with this call - // skipping - } - } - } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java index b2f87b9f01a..ec13861b849 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java @@ -35,6 +35,7 @@ import org.apache.hudi.exception.HoodieValidationException; import org.apache.hudi.exception.InvalidHoodiePathException; import org.apache.hudi.metadata.HoodieTableMetadata; import org.apache.hudi.storage.HoodieStorage; +import org.apache.hudi.storage.HoodieStorageUtils; import org.apache.hudi.storage.StorageConfiguration; import org.apache.hudi.storage.StoragePath; import org.apache.hudi.storage.StoragePathFilter; @@ -42,9 +43,6 @@ import org.apache.hudi.storage.StoragePathInfo; import org.apache.hudi.storage.StorageSchemes; import org.apache.hudi.storage.inline.InLineFSUtils; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.DistributedFileSystem; import org.slf4j.Logger; @@ -71,8 +69,6 @@ import java.util.regex.Pattern; import java.util.stream.Collectors; import java.util.stream.Stream; -import static org.apache.hudi.storage.HoodieStorageUtils.getStorageConfWithCopy; - /** * Utility functions related to accessing the file storage. */ @@ -85,8 +81,8 @@ public class FSUtils { public static final Pattern LOG_FILE_PATTERN = Pattern.compile("^\\.(.+)_(.*)\\.(log|archive)\\.(\\d+)(_((\\d+)-(\\d+)-(\\d+))(.cdc)?)?"); public static final Pattern PREFIX_BY_FILE_ID_PATTERN = Pattern.compile("^(.+)-(\\d+)"); - private static final int MAX_ATTEMPTS_RECOVER_LEASE = 10; + private static final int MAX_ATTEMPTS_RECOVER_LEASE = 10; private static final String LOG_FILE_EXTENSION = ".log"; private static final StoragePathFilter ALLOW_ALL_FILTER = file -> true; @@ -102,17 +98,6 @@ public class FSUtils { return storage.exists(new StoragePath(path + "/" + HoodieTableMetaClient.METAFOLDER_NAME)); } - /** - * Makes path qualified w/ {@link FileSystem}'s URI - * - * @param fs instance of {@link FileSystem} path belongs to - * @param path path to be qualified - * @return qualified path, prefixed w/ the URI of the target FS object provided - */ - public static Path makeQualified(FileSystem fs, Path path) { - return path.makeQualified(fs.getUri(), fs.getWorkingDirectory()); - } - /** * Makes path qualified with {@link HoodieStorage}'s URI. * @@ -159,10 +144,6 @@ public class FSUtils { } } - public static long getFileSize(FileSystem fs, Path path) throws IOException { - return fs.getFileStatus(path).getLen(); - } - public static long getFileSize(HoodieStorage storage, StoragePath path) throws IOException { return storage.getPathInfo(path).getLength(); } @@ -188,13 +169,6 @@ public class FSUtils { return datePartitions; } - /** - * Given a base partition and a partition path, return relative path of partition path to the base path. - */ - public static String getRelativePartitionPath(Path basePath, Path fullPartitionPath) { - return getRelativePartitionPath(new StoragePath(basePath.toUri()), new StoragePath(fullPartitionPath.toUri())); - } - public static String getRelativePartitionPath(StoragePath basePath, StoragePath fullPartitionPath) { basePath = getPathWithoutSchemeAndAuthority(basePath); fullPartitionPath = getPathWithoutSchemeAndAuthority(fullPartitionPath); @@ -316,7 +290,7 @@ public class FSUtils { result.add(Option.of(filenameToFileStatusMap.get(fileName))); } else { if (!ignoreMissingFiles) { - throw new FileNotFoundException("File not found: " + new Path(partitionPathIncludeBasePath.toString(), fileName)); + throw new FileNotFoundException("File not found: " + new StoragePath(partitionPathIncludeBasePath, fileName)); } result.add(Option.empty()); } @@ -387,18 +361,6 @@ public class FSUtils { return matcher.group(3); } - /** - * Get the first part of the file name in the log file. That will be the fileId. Log file do not have instantTime in - * the file name. - */ - public static String getFileIdFromLogPath(Path path) { - Matcher matcher = LOG_FILE_PATTERN.matcher(path.getName()); - if (!matcher.find()) { - throw new InvalidHoodiePathException(path.toString(), "LogFile"); - } - return matcher.group(1); - } - public static String getFileIdFromLogPath(StoragePath path) { Matcher matcher = LOG_FILE_PATTERN.matcher(path.getName()); if (!matcher.find()) { @@ -407,16 +369,6 @@ public class FSUtils { return matcher.group(1); } - /** - * Check if the file is a base file of a log file. Then get the fileId appropriately. - */ - public static String getFileIdFromFilePath(Path filePath) { - if (FSUtils.isLogFile(filePath)) { - return FSUtils.getFileIdFromLogPath(filePath); - } - return FSUtils.getFileId(filePath.getName()); - } - public static String getFileIdFromFilePath(StoragePath filePath) { if (FSUtils.isLogFile(filePath)) { return FSUtils.getFileIdFromLogPath(filePath); @@ -506,11 +458,6 @@ public class FSUtils { return HoodieLogFile.LOG_FILE_PREFIX + suffix; } - public static boolean isBaseFile(Path path) { - String extension = getFileExtension(path.getName()); - return HoodieFileFormat.BASE_FILE_EXTENSIONS.contains(extension); - } - public static boolean isBaseFile(StoragePath path) { String extension = getFileExtension(path.getName()); return HoodieFileFormat.BASE_FILE_EXTENSIONS.contains(extension); @@ -522,10 +469,6 @@ public class FSUtils { ? InLineFSUtils.getOuterFilePathFromInlinePath(logPath).getName() : logPath.getName()); } - public static boolean isLogFile(Path logPath) { - return isLogFile(new StoragePath(logPath.getName())); - } - public static boolean isLogFile(String fileName) { if (fileName.contains(LOG_FILE_EXTENSION)) { Matcher matcher = LOG_FILE_PATTERN.matcher(fileName); @@ -534,40 +477,10 @@ public class FSUtils { return false; } - /** - * Returns true if the given path is a Base file or a Log file. - */ - public static boolean isDataFile(Path path) { - return isBaseFile(path) || isLogFile(path); - } - public static boolean isDataFile(StoragePath path) { return isBaseFile(path) || isLogFile(path); } - /** - * Get the names of all the base and log files in the given partition path. - */ - public static FileStatus[] getAllDataFilesInPartition(FileSystem fs, Path partitionPath) throws IOException { - final Set<String> validFileExtensions = Arrays.stream(HoodieFileFormat.values()) - .map(HoodieFileFormat::getFileExtension).collect(Collectors.toCollection(HashSet::new)); - final String logFileExtension = HoodieFileFormat.HOODIE_LOG.getFileExtension(); - - try { - return Arrays.stream(fs.listStatus(partitionPath, path -> { - String extension = FSUtils.getFileExtension(path.getName()); - return validFileExtensions.contains(extension) || path.getName().contains(logFileExtension); - })).filter(FileStatus::isFile).toArray(FileStatus[]::new); - } catch (IOException e) { - // return empty FileStatus if partition does not exist already - if (!fs.exists(partitionPath)) { - return new FileStatus[0]; - } else { - throw e; - } - } - } - public static List<StoragePathInfo> getAllDataFilesInPartition(HoodieStorage storage, StoragePath partitionPath) throws IOException { @@ -632,7 +545,7 @@ public class FSUtils { * computes the next log version for the specified fileId in the partition path. */ public static int computeNextLogVersion(HoodieStorage storage, StoragePath partitionPath, final String fileId, - final String logFileExtension, final String baseCommitTime) throws IOException { + final String logFileExtension, final String baseCommitTime) throws IOException { Option<Pair<Integer, String>> currentVersionWithWriteToken = getLatestLogVersion(storage, partitionPath, fileId, logFileExtension, baseCommitTime); // handle potential overflow @@ -640,29 +553,6 @@ public class FSUtils { : HoodieLogFile.LOGFILE_BASE_VERSION; } - /** - * When a file was opened and the task died without closing the stream, another task executor cannot open because the - * existing lease will be active. We will try to recover the lease, from HDFS. If a data node went down, it takes - * about 10 minutes for the lease to be recovered. But if the client dies, this should be instant. - */ - public static boolean recoverDFSFileLease(final DistributedFileSystem dfs, final Path p) - throws IOException, InterruptedException { - LOG.info("Recover lease on dfs file {}", p); - // initiate the recovery - boolean recovered = false; - for (int nbAttempt = 0; nbAttempt < MAX_ATTEMPTS_RECOVER_LEASE; nbAttempt++) { - LOG.info("Attempt {} to recover lease on dfs file {}", nbAttempt, p); - recovered = dfs.recoverLease(p); - if (recovered) { - break; - } - // Sleep for 1 second before trying again. Typically it takes about 2-3 seconds to recover - // under default settings - Thread.sleep(1000); - } - return recovered; - } - public static void createPathIfNotExists(HoodieStorage storage, StoragePath partitionPath) throws IOException { if (!storage.exists(partitionPath)) { @@ -674,10 +564,6 @@ public class FSUtils { return sizeInBytes / (1024 * 1024); } - public static Path constructAbsolutePathInHadoopPath(String basePath, String relativePartitionPath) { - return new Path(constructAbsolutePath(basePath, relativePartitionPath).toUri()); - } - public static StoragePath constructAbsolutePath(String basePath, String relativePartitionPath) { if (StringUtils.isNullOrEmpty(relativePartitionPath)) { return new StoragePath(basePath); @@ -714,13 +600,6 @@ public class FSUtils { return filePathWithPartition.substring(offset); } - /** - * Get DFS full partition path (e.g. hdfs://ip-address:8020:/<absolute path>) - */ - public static String getDFSFullPartitionPath(FileSystem fs, Path fullPartitionPath) { - return fs.getUri() + fullPartitionPath.toUri().getRawPath(); - } - /** * Helper to filter out paths under metadata folder when running fs.globStatus. * @@ -766,27 +645,6 @@ public class FSUtils { return false; } - public static <T> Map<String, T> parallelizeFilesProcess( - HoodieEngineContext hoodieEngineContext, - FileSystem fs, - int parallelism, - SerializableFunction<Pair<String, StorageConfiguration<Configuration>>, T> pairFunction, - List<String> subPaths) { - Map<String, T> result = new HashMap<>(); - if (subPaths.size() > 0) { - StorageConfiguration<Configuration> conf = getStorageConfWithCopy(fs.getConf()); - int actualParallelism = Math.min(subPaths.size(), parallelism); - - hoodieEngineContext.setJobStatus(FSUtils.class.getSimpleName(), - "Parallel listing paths " + String.join(",", subPaths)); - - result = hoodieEngineContext.mapToPair(subPaths, - subPath -> new ImmutablePair<>(subPath, pairFunction.apply(new ImmutablePair<>(subPath, conf))), - actualParallelism); - } - return result; - } - /** * Processes sub-path in parallel. * @@ -847,61 +705,17 @@ public class FSUtils { */ public static boolean deleteSubPath(String subPathStr, StorageConfiguration<?> conf, boolean recursive) { try { - Path subPath = new Path(subPathStr); - FileSystem fileSystem = subPath.getFileSystem(conf.unwrapAs(Configuration.class)); - return fileSystem.delete(subPath, recursive); + StoragePath subPath = new StoragePath(subPathStr); + HoodieStorage storage = HoodieStorageUtils.getStorage(subPath, conf); + if (recursive) { + return storage.deleteDirectory(subPath); + } + return storage.deleteFile(subPath); } catch (IOException e) { throw new HoodieIOException(e.getMessage(), e); } } - /** - * Lists file status at a certain level in the directory hierarchy. - * <p> - * E.g., given "/tmp/hoodie_table" as the rootPath, and 3 as the expected level, - * this method gives back the {@link FileStatus} of all files under - * "/tmp/hoodie_table/[*]/[*]/[*]/" folders. - * - * @param hoodieEngineContext {@link HoodieEngineContext} instance. - * @param fs {@link FileSystem} instance. - * @param rootPath Root path for the file listing. - * @param expectLevel Expected level of directory hierarchy for files to be added. - * @param parallelism Parallelism for the file listing. - * @return A list of file status of files at the level. - */ - - public static List<FileStatus> getFileStatusAtLevel( - HoodieEngineContext hoodieEngineContext, FileSystem fs, Path rootPath, - int expectLevel, int parallelism) { - List<String> levelPaths = new ArrayList<>(); - List<FileStatus> result = new ArrayList<>(); - levelPaths.add(rootPath.toString()); - - for (int i = 0; i <= expectLevel; i++) { - result = FSUtils.parallelizeFilesProcess(hoodieEngineContext, fs, parallelism, - pairOfSubPathAndConf -> { - Path path = new Path(pairOfSubPathAndConf.getKey()); - try { - FileSystem fileSystem = path.getFileSystem(pairOfSubPathAndConf.getValue().unwrap()); - return Arrays.stream(fileSystem.listStatus(path)) - .collect(Collectors.toList()); - } catch (IOException e) { - throw new HoodieIOException("Failed to list " + path, e); - } - }, - levelPaths) - .values().stream() - .flatMap(list -> list.stream()).collect(Collectors.toList()); - if (i < expectLevel) { - levelPaths = result.stream() - .filter(FileStatus::isDirectory) - .map(fileStatus -> fileStatus.getPath().toString()) - .collect(Collectors.toList()); - } - } - return result; - } - public static List<StoragePathInfo> getAllDataPathInfo(HoodieStorage storage, StoragePath path) throws IOException { List<StoragePathInfo> pathInfoList = new ArrayList<>(); @@ -917,6 +731,29 @@ public class FSUtils { return pathInfoList; } + /** + * When a file was opened and the task died without closing the stream, another task executor cannot open because the + * existing lease will be active. We will try to recover the lease, from HDFS. If a data node went down, it takes + * about 10 minutes for the lease to be recovered. But if the client dies, this should be instant. + */ + public static boolean recoverDFSFileLease(final DistributedFileSystem dfs, final Path p) + throws IOException, InterruptedException { + LOG.info("Recover lease on dfs file {}", p); + // initiate the recovery + boolean recovered = false; + for (int nbAttempt = 0; nbAttempt < MAX_ATTEMPTS_RECOVER_LEASE; nbAttempt++) { + LOG.info("Attempt {} to recover lease on dfs file {}", nbAttempt, p); + recovered = dfs.recoverLease(p); + if (recovered) { + break; + } + // Sleep for 1 second before trying again. Typically it takes about 2-3 seconds to recover + // under default settings + Thread.sleep(1000); + } + return recovered; + } + /** * Serializable function interface. * diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/ConfigUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/ConfigUtils.java index 643b123d596..3866069d437 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/ConfigUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/ConfigUtils.java @@ -27,7 +27,6 @@ import org.apache.hudi.common.table.HoodieTableConfig; import org.apache.hudi.exception.HoodieNotSupportedException; import org.apache.hudi.storage.StorageConfiguration; -import org.apache.hadoop.conf.Configuration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -151,18 +150,6 @@ public class ConfigUtils { return sb.toString(); } - /** - * Creates a Hadoop {@link Configuration} instance with the properties. - * - * @param props {@link Properties} instance. - * @return Hadoop {@link Configuration} instance. - */ - public static Configuration createHadoopConf(Properties props) { - Configuration hadoopConf = new Configuration(); - props.stringPropertyNames().forEach(k -> hadoopConf.set(k, props.getProperty(k))); - return hadoopConf; - } - /** * Case-insensitive resolution of input enum name to the enum type */ @@ -301,32 +288,6 @@ public class ConfigUtils { return Option.empty(); } - /** - * Gets the raw value for a {@link ConfigProperty} config from Hadoop configuration. The key and - * alternative keys are used to fetch the config. - * - * @param conf Configs in Hadoop {@link Configuration}. - * @param configProperty {@link ConfigProperty} config to fetch. - * @return {@link Option} of value if the config exists; empty {@link Option} otherwise. - */ - public static Option<String> getRawValueWithAltKeys(Configuration conf, - ConfigProperty<?> configProperty) { - String value = conf.get(configProperty.key()); - if (value != null) { - return Option.of(value); - } - for (String alternative : configProperty.getAlternatives()) { - String altValue = conf.get(alternative); - if (altValue != null) { - LOG.warn(String.format("The configuration key '%s' has been deprecated " - + "and may be removed in the future. Please use the new key '%s' instead.", - alternative, configProperty.key())); - return Option.of(altValue); - } - } - return Option.empty(); - } - /** * Gets the String value for a {@link ConfigProperty} config from properties. The key and * alternative keys are used to fetch the config. If the config is not found, an @@ -453,24 +414,6 @@ public class ConfigUtils { return rawValue.map(v -> Boolean.parseBoolean(v.toString())).orElse(defaultValue); } - /** - * Gets the boolean value for a {@link ConfigProperty} config from Hadoop configuration. The key and - * alternative keys are used to fetch the config. The default value of {@link ConfigProperty} - * config, if exists, is returned if the config is not found in the configuration. - * - * @param conf Configs in Hadoop {@link Configuration}. - * @param configProperty {@link ConfigProperty} config to fetch. - * @return boolean value if the config exists; default boolean value if the config does not exist - * and there is default value defined in the {@link ConfigProperty} config; {@code false} otherwise. - */ - public static boolean getBooleanWithAltKeys(Configuration conf, - ConfigProperty<?> configProperty) { - Option<String> rawValue = getRawValueWithAltKeys(conf, configProperty); - boolean defaultValue = configProperty.hasDefaultValue() - ? Boolean.parseBoolean(configProperty.defaultValue().toString()) : false; - return rawValue.map(Boolean::parseBoolean).orElse(defaultValue); - } - /** * Gets the integer value for a {@link ConfigProperty} config from properties. The key and * alternative keys are used to fetch the config. The default value of {@link ConfigProperty} diff --git a/hudi-common/src/test/java/org/apache/hudi/common/util/TestConfigUtils.java b/hudi-common/src/test/java/org/apache/hudi/common/util/TestConfigUtils.java index 1f959ba1b58..5728dd8d36c 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/util/TestConfigUtils.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/util/TestConfigUtils.java @@ -19,6 +19,8 @@ package org.apache.hudi.common.util; +import org.apache.hudi.common.config.ConfigProperty; + import org.junit.jupiter.api.Test; import java.util.HashMap; @@ -28,7 +30,13 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertThrows; public class TestConfigUtils { - + public static final ConfigProperty<String> TEST_BOOLEAN_CONFIG_PROPERTY = ConfigProperty + .key("hoodie.test.boolean.config") + .defaultValue("true") + .withAlternatives("hudi.test.boolean.config") + .markAdvanced() + .withDocumentation("Testing boolean config."); + @Test public void testToMapSucceeds() { Map<String, String> expectedMap = new HashMap<>(); diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/compact/ITTestHoodieFlinkCompactor.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/compact/ITTestHoodieFlinkCompactor.java index ac4d2ea7783..b925a895628 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/compact/ITTestHoodieFlinkCompactor.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/compact/ITTestHoodieFlinkCompactor.java @@ -53,7 +53,6 @@ import org.apache.flink.table.api.TableEnvironment; import org.apache.flink.table.api.config.ExecutionConfigOptions; import org.apache.flink.table.api.config.TableConfigOptions; import org.apache.flink.table.api.internal.TableEnvironmentImpl; -import org.apache.hadoop.fs.Path; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.api.io.TempDir; @@ -429,7 +428,7 @@ public class ITTestHoodieFlinkCompactor { try { storage.listDirectEntries(FSUtils.constructAbsolutePath(metaClient.getBasePathV2(), partition)) .stream() - .filter(f -> FSUtils.isBaseFile(new Path(f.getPath().toUri()))) + .filter(f -> FSUtils.isBaseFile(f.getPath())) .forEach(f -> { HoodieBaseFile baseFile = new HoodieBaseFile(f); assertFalse(fileIdCommitTimeSet.contains( diff --git a/hudi-hadoop-common/src/main/java/org/apache/hudi/common/util/HadoopConfigUtils.java b/hudi-hadoop-common/src/main/java/org/apache/hudi/common/util/HadoopConfigUtils.java new file mode 100644 index 00000000000..9f1347872e2 --- /dev/null +++ b/hudi-hadoop-common/src/main/java/org/apache/hudi/common/util/HadoopConfigUtils.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.common.util; + +import org.apache.hudi.common.config.ConfigProperty; + +import org.apache.hadoop.conf.Configuration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Properties; + +/** + * Utils on Hadoop {@link Configuration}. + */ +public class HadoopConfigUtils { + private static final Logger LOG = LoggerFactory.getLogger(HadoopConfigUtils.class); + + /** + * Creates a Hadoop {@link Configuration} instance with the properties. + * + * @param props {@link Properties} instance. + * @return Hadoop {@link Configuration} instance. + */ + public static Configuration createHadoopConf(Properties props) { + Configuration hadoopConf = new Configuration(); + props.stringPropertyNames().forEach(k -> hadoopConf.set(k, props.getProperty(k))); + return hadoopConf; + } + + /** + * Gets the raw value for a {@link ConfigProperty} config from Hadoop configuration. The key and + * alternative keys are used to fetch the config. + * + * @param conf Configs in Hadoop {@link Configuration}. + * @param configProperty {@link ConfigProperty} config to fetch. + * @return {@link Option} of value if the config exists; empty {@link Option} otherwise. + */ + public static Option<String> getRawValueWithAltKeys(Configuration conf, + ConfigProperty<?> configProperty) { + String value = conf.get(configProperty.key()); + if (value != null) { + return Option.of(value); + } + for (String alternative : configProperty.getAlternatives()) { + String altValue = conf.get(alternative); + if (altValue != null) { + LOG.warn(String.format("The configuration key '%s' has been deprecated " + + "and may be removed in the future. Please use the new key '%s' instead.", + alternative, configProperty.key())); + return Option.of(altValue); + } + } + return Option.empty(); + } + + /** + * Gets the boolean value for a {@link ConfigProperty} config from Hadoop configuration. The key and + * alternative keys are used to fetch the config. The default value of {@link ConfigProperty} + * config, if exists, is returned if the config is not found in the configuration. + * + * @param conf Configs in Hadoop {@link Configuration}. + * @param configProperty {@link ConfigProperty} config to fetch. + * @return boolean value if the config exists; default boolean value if the config does not exist + * and there is default value defined in the {@link ConfigProperty} config; {@code false} otherwise. + */ + public static boolean getBooleanWithAltKeys(Configuration conf, + ConfigProperty<?> configProperty) { + Option<String> rawValue = getRawValueWithAltKeys(conf, configProperty); + boolean defaultValue = configProperty.hasDefaultValue() + ? Boolean.parseBoolean(configProperty.defaultValue().toString()) : false; + return rawValue.map(Boolean::parseBoolean).orElse(defaultValue); + } +} diff --git a/hudi-hadoop-common/src/main/java/org/apache/hudi/hadoop/fs/HadoopFSUtils.java b/hudi-hadoop-common/src/main/java/org/apache/hudi/hadoop/fs/HadoopFSUtils.java index 3119ee8c0c0..ca504577b40 100644 --- a/hudi-hadoop-common/src/main/java/org/apache/hudi/hadoop/fs/HadoopFSUtils.java +++ b/hudi-hadoop-common/src/main/java/org/apache/hudi/hadoop/fs/HadoopFSUtils.java @@ -19,7 +19,18 @@ package org.apache.hudi.hadoop.fs; +import org.apache.hudi.avro.model.HoodieFSPermission; +import org.apache.hudi.avro.model.HoodieFileStatus; +import org.apache.hudi.avro.model.HoodiePath; +import org.apache.hudi.common.engine.HoodieEngineContext; +import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.model.HoodieFileFormat; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.util.collection.ImmutablePair; +import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.exception.HoodieIOException; +import org.apache.hudi.exception.InvalidHoodiePathException; +import org.apache.hudi.storage.HoodieStorageUtils; import org.apache.hudi.storage.StorageConfiguration; import org.apache.hudi.storage.StoragePath; import org.apache.hudi.storage.StoragePathInfo; @@ -33,12 +44,22 @@ import org.apache.hadoop.fs.FSInputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsAction; +import org.apache.hadoop.fs.permission.FsPermission; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.File; import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; import java.util.Map; +import java.util.Set; +import java.util.regex.Matcher; +import java.util.stream.Collectors; /** * Utility functions related to accessing the file storage on Hadoop. @@ -264,4 +285,258 @@ public class HadoopFSUtils { HoodieWrapperFileSystem.class.getName()); return returnConf; } + + public static Path toPath(HoodiePath path) { + if (null == path) { + return null; + } + return new Path(path.getUri()); + } + + public static HoodiePath fromPath(Path path) { + if (null == path) { + return null; + } + return HoodiePath.newBuilder().setUri(path.toString()).build(); + } + + public static FsPermission toFSPermission(HoodieFSPermission fsPermission) { + if (null == fsPermission) { + return null; + } + FsAction userAction = fsPermission.getUserAction() != null ? FsAction.valueOf(fsPermission.getUserAction()) : null; + FsAction grpAction = fsPermission.getGroupAction() != null ? FsAction.valueOf(fsPermission.getGroupAction()) : null; + FsAction otherAction = + fsPermission.getOtherAction() != null ? FsAction.valueOf(fsPermission.getOtherAction()) : null; + boolean stickyBit = fsPermission.getStickyBit() != null ? fsPermission.getStickyBit() : false; + return new FsPermission(userAction, grpAction, otherAction, stickyBit); + } + + public static HoodieFSPermission fromFSPermission(FsPermission fsPermission) { + if (null == fsPermission) { + return null; + } + String userAction = fsPermission.getUserAction() != null ? fsPermission.getUserAction().name() : null; + String grpAction = fsPermission.getGroupAction() != null ? fsPermission.getGroupAction().name() : null; + String otherAction = fsPermission.getOtherAction() != null ? fsPermission.getOtherAction().name() : null; + return HoodieFSPermission.newBuilder().setUserAction(userAction).setGroupAction(grpAction) + .setOtherAction(otherAction).setStickyBit(fsPermission.getStickyBit()).build(); + } + + public static HoodieFileStatus fromFileStatus(FileStatus fileStatus) { + if (null == fileStatus) { + return null; + } + + HoodieFileStatus fStatus = new HoodieFileStatus(); + try { + fStatus.setPath(fromPath(fileStatus.getPath())); + fStatus.setLength(fileStatus.getLen()); + fStatus.setIsDir(fileStatus.isDirectory()); + fStatus.setBlockReplication((int) fileStatus.getReplication()); + fStatus.setBlockSize(fileStatus.getBlockSize()); + fStatus.setModificationTime(fileStatus.getModificationTime()); + fStatus.setAccessTime(fileStatus.getModificationTime()); + fStatus.setSymlink(fileStatus.isSymlink() ? fromPath(fileStatus.getSymlink()) : null); + safeReadAndSetMetadata(fStatus, fileStatus); + } catch (IOException ioe) { + throw new HoodieIOException(ioe.getMessage(), ioe); + } + return fStatus; + } + + /** + * Used to safely handle FileStatus calls which might fail on some FileSystem implementation. + * (DeprecatedLocalFileSystem) + */ + private static void safeReadAndSetMetadata(HoodieFileStatus fStatus, FileStatus fileStatus) { + try { + fStatus.setOwner(fileStatus.getOwner()); + fStatus.setGroup(fileStatus.getGroup()); + fStatus.setPermission(fromFSPermission(fileStatus.getPermission())); + } catch (IllegalArgumentException ie) { + // Deprecated File System (testing) does not work well with this call + // skipping + } + } + + public static long getFileSize(FileSystem fs, Path path) throws IOException { + return fs.getFileStatus(path).getLen(); + } + + /** + * Given a base partition and a partition path, return relative path of partition path to the base path. + */ + public static String getRelativePartitionPath(Path basePath, Path fullPartitionPath) { + return FSUtils.getRelativePartitionPath(new StoragePath(basePath.toUri()), new StoragePath(fullPartitionPath.toUri())); + } + + /** + * Get the first part of the file name in the log file. That will be the fileId. Log file do not have instantTime in + * the file name. + */ + public static String getFileIdFromLogPath(Path path) { + Matcher matcher = FSUtils.LOG_FILE_PATTERN.matcher(path.getName()); + if (!matcher.find()) { + throw new InvalidHoodiePathException(path.toString(), "LogFile"); + } + return matcher.group(1); + } + + /** + * Check if the file is a base file of a log file. Then get the fileId appropriately. + */ + public static String getFileIdFromFilePath(Path filePath) { + if (isLogFile(filePath)) { + return getFileIdFromLogPath(filePath); + } + return FSUtils.getFileId(filePath.getName()); + } + + public static boolean isBaseFile(Path path) { + String extension = FSUtils.getFileExtension(path.getName()); + return HoodieFileFormat.BASE_FILE_EXTENSIONS.contains(extension); + } + + public static boolean isLogFile(Path logPath) { + return FSUtils.isLogFile(new StoragePath(logPath.getName())); + } + + /** + * Returns true if the given path is a Base file or a Log file. + */ + public static boolean isDataFile(Path path) { + return isBaseFile(path) || isLogFile(path); + } + + /** + * Get the names of all the base and log files in the given partition path. + */ + public static FileStatus[] getAllDataFilesInPartition(FileSystem fs, Path partitionPath) throws IOException { + final Set<String> validFileExtensions = Arrays.stream(HoodieFileFormat.values()) + .map(HoodieFileFormat::getFileExtension).collect(Collectors.toCollection(HashSet::new)); + final String logFileExtension = HoodieFileFormat.HOODIE_LOG.getFileExtension(); + + try { + return Arrays.stream(fs.listStatus(partitionPath, path -> { + String extension = FSUtils.getFileExtension(path.getName()); + return validFileExtensions.contains(extension) || path.getName().contains(logFileExtension); + })).filter(FileStatus::isFile).toArray(FileStatus[]::new); + } catch (IOException e) { + // return empty FileStatus if partition does not exist already + if (!fs.exists(partitionPath)) { + return new FileStatus[0]; + } else { + throw e; + } + } + } + + public static Path constructAbsolutePathInHadoopPath(String basePath, String relativePartitionPath) { + return new Path(FSUtils.constructAbsolutePath(basePath, relativePartitionPath).toUri()); + } + + /** + * Get DFS full partition path (e.g. hdfs://ip-address:8020:/<absolute path>) + */ + public static String getDFSFullPartitionPath(FileSystem fs, Path fullPartitionPath) { + return fs.getUri() + fullPartitionPath.toUri().getRawPath(); + } + + public static <T> Map<String, T> parallelizeFilesProcess( + HoodieEngineContext hoodieEngineContext, + FileSystem fs, + int parallelism, + FSUtils.SerializableFunction<Pair<String, StorageConfiguration<Configuration>>, T> pairFunction, + List<String> subPaths) { + Map<String, T> result = new HashMap<>(); + if (subPaths.size() > 0) { + StorageConfiguration<Configuration> conf = HoodieStorageUtils.getStorageConfWithCopy(fs.getConf()); + int actualParallelism = Math.min(subPaths.size(), parallelism); + + hoodieEngineContext.setJobStatus(FSUtils.class.getSimpleName(), + "Parallel listing paths " + String.join(",", subPaths)); + + result = hoodieEngineContext.mapToPair(subPaths, + subPath -> new ImmutablePair<>(subPath, pairFunction.apply(new ImmutablePair<>(subPath, conf))), + actualParallelism); + } + return result; + } + + /** + * Lists file status at a certain level in the directory hierarchy. + * <p> + * E.g., given "/tmp/hoodie_table" as the rootPath, and 3 as the expected level, + * this method gives back the {@link FileStatus} of all files under + * "/tmp/hoodie_table/[*]/[*]/[*]/" folders. + * + * @param hoodieEngineContext {@link HoodieEngineContext} instance. + * @param fs {@link FileSystem} instance. + * @param rootPath Root path for the file listing. + * @param expectLevel Expected level of directory hierarchy for files to be added. + * @param parallelism Parallelism for the file listing. + * @return A list of file status of files at the level. + */ + + public static List<FileStatus> getFileStatusAtLevel( + HoodieEngineContext hoodieEngineContext, FileSystem fs, Path rootPath, + int expectLevel, int parallelism) { + List<String> levelPaths = new ArrayList<>(); + List<FileStatus> result = new ArrayList<>(); + levelPaths.add(rootPath.toString()); + + for (int i = 0; i <= expectLevel; i++) { + result = parallelizeFilesProcess(hoodieEngineContext, fs, parallelism, + pairOfSubPathAndConf -> { + Path path = new Path(pairOfSubPathAndConf.getKey()); + try { + FileSystem fileSystem = path.getFileSystem(pairOfSubPathAndConf.getValue().unwrap()); + return Arrays.stream(fileSystem.listStatus(path)) + .collect(Collectors.toList()); + } catch (IOException e) { + throw new HoodieIOException("Failed to list " + path, e); + } + }, + levelPaths) + .values().stream() + .flatMap(list -> list.stream()).collect(Collectors.toList()); + if (i < expectLevel) { + levelPaths = result.stream() + .filter(FileStatus::isDirectory) + .map(fileStatus -> fileStatus.getPath().toString()) + .collect(Collectors.toList()); + } + } + return result; + } + + public static Map<String, Boolean> deleteFilesParallelize( + HoodieTableMetaClient metaClient, + List<String> paths, + HoodieEngineContext context, + int parallelism, + boolean ignoreFailed) { + return HadoopFSUtils.parallelizeFilesProcess(context, + (FileSystem) metaClient.getStorage().getFileSystem(), + parallelism, + pairOfSubPathAndConf -> { + Path file = new Path(pairOfSubPathAndConf.getKey()); + try { + FileSystem fs = (FileSystem) metaClient.getStorage().getFileSystem(); + if (fs.exists(file)) { + return fs.delete(file, false); + } + return true; + } catch (IOException e) { + if (!ignoreFailed) { + throw new HoodieIOException("Failed to delete : " + file, e); + } else { + LOG.warn("Ignore failed deleting : " + file); + return true; + } + } + }, + paths); + } } diff --git a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/fs/TestFSUtils.java b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/fs/TestFSUtils.java index 3822535e7db..076cef09074 100644 --- a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/fs/TestFSUtils.java +++ b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/fs/TestFSUtils.java @@ -201,10 +201,10 @@ public class TestFSUtils extends HoodieCommonTestHarness { public void testGetRelativePartitionPath() { Path basePath = new Path("/test/apache"); Path partitionPath = new Path("/test/apache/hudi/sub"); - assertEquals("hudi/sub", FSUtils.getRelativePartitionPath(basePath, partitionPath)); + assertEquals("hudi/sub", HadoopFSUtils.getRelativePartitionPath(basePath, partitionPath)); Path nonPartitionPath = new Path("/test/something/else"); - assertThrows(IllegalArgumentException.class, () -> FSUtils.getRelativePartitionPath(basePath, nonPartitionPath)); + assertThrows(IllegalArgumentException.class, () -> HadoopFSUtils.getRelativePartitionPath(basePath, nonPartitionPath)); } @ParameterizedTest @@ -534,7 +534,7 @@ public class TestFSUtils extends HoodieCommonTestHarness { StoragePath hoodieTempDir = getHoodieTempDir(); HoodieStorage storage = metaClient.getStorage(); prepareTestDirectory(storage, hoodieTempDir); - List<FileStatus> fileStatusList = FSUtils.getFileStatusAtLevel( + List<FileStatus> fileStatusList = HadoopFSUtils.getFileStatusAtLevel( new HoodieLocalEngineContext(storage.getConf()), (FileSystem) storage.getFileSystem(), new Path(baseUri), 3, 2); assertEquals(CollectionUtils.createImmutableSet( diff --git a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/view/TestHoodieTableFileSystemView.java b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/view/TestHoodieTableFileSystemView.java index fb06fb743d9..f575a3cc877 100644 --- a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/view/TestHoodieTableFileSystemView.java +++ b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/view/TestHoodieTableFileSystemView.java @@ -25,7 +25,6 @@ import org.apache.hudi.avro.model.HoodieFSPermission; import org.apache.hudi.avro.model.HoodieFileStatus; import org.apache.hudi.avro.model.HoodiePath; import org.apache.hudi.avro.model.HoodieRequestedReplaceMetadata; -import org.apache.hudi.common.bootstrap.FileStatusUtils; import org.apache.hudi.common.bootstrap.index.BootstrapIndex.IndexWriter; import org.apache.hudi.common.bootstrap.index.hfile.HFileBootstrapIndex; import org.apache.hudi.common.fs.FSUtils; @@ -59,6 +58,7 @@ import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.collection.ImmutablePair; import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.exception.HoodieIOException; +import org.apache.hudi.hadoop.fs.HadoopFSUtils; import org.apache.hudi.storage.HoodieStorage; import org.apache.hudi.storage.StoragePath; import org.apache.hudi.storage.StoragePathInfo; @@ -427,10 +427,10 @@ public class TestHoodieTableFileSystemView extends HoodieCommonTestHarness { Option<BaseFile> bootstrapBaseFile, boolean testBootstrap) { if (testBootstrap) { assertTrue(bootstrapBaseFile.isPresent()); - assertEquals(FileStatusUtils.toPath(srcFileStatus.getPath()), + assertEquals(HadoopFSUtils.toPath(srcFileStatus.getPath()), new Path(bootstrapBaseFile.get().getPath())); assertEquals(srcFileStatus.getPath(), - FileStatusUtils.fromPath(new Path(bootstrapBaseFile.get().getPath()))); + HadoopFSUtils.fromPath(new Path(bootstrapBaseFile.get().getPath()))); assertEquals(srcFileStatus.getModificationTime(), new Long(bootstrapBaseFile.get().getPathInfo().getModificationTime())); assertEquals(srcFileStatus.getBlockSize(), new Long(bootstrapBaseFile.get().getPathInfo().getBlockSize())); diff --git a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java index 1192004c9e9..49f499756bb 100644 --- a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java +++ b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java @@ -64,6 +64,7 @@ import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.common.util.ValidationUtils; import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.exception.HoodieIOException; +import org.apache.hudi.hadoop.fs.HadoopFSUtils; import org.apache.hudi.storage.HoodieStorage; import org.apache.hudi.storage.StoragePath; import org.apache.hudi.storage.StoragePathInfo; @@ -444,7 +445,7 @@ public class HoodieTestTable { private Map<String, Long> getWrittenLogFiles(String instant, Map.Entry<String, List<String>> entry) { Map<String, Long> writtenLogFiles = new HashMap<>(); for (String fileName : entry.getValue()) { - if (FSUtils.isLogFile(new Path(fileName))) { + if (HadoopFSUtils.isLogFile(new Path(fileName))) { if (testTableState.getPartitionToLogFileInfoMap(instant) != null && testTableState.getPartitionToLogFileInfoMap(instant).containsKey(entry.getKey())) { List<Pair<String, Integer[]>> fileInfos = testTableState.getPartitionToLogFileInfoMap(instant).get(entry.getKey()); diff --git a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/util/TestHadoopConfigUtils.java b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/util/TestHadoopConfigUtils.java new file mode 100644 index 00000000000..01733d1b75d --- /dev/null +++ b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/util/TestHadoopConfigUtils.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.common.util; + +import org.apache.hadoop.conf.Configuration; +import org.junit.jupiter.api.Test; + +import static org.apache.hudi.common.util.HadoopConfigUtils.getBooleanWithAltKeys; +import static org.apache.hudi.common.util.HadoopConfigUtils.getRawValueWithAltKeys; +import static org.apache.hudi.common.util.TestConfigUtils.TEST_BOOLEAN_CONFIG_PROPERTY; +import static org.junit.jupiter.api.Assertions.assertEquals; + +public class TestHadoopConfigUtils { + @Test + public void testGetRawValueWithAltKeysFromHadoopConf() { + Configuration conf = new Configuration(); + assertEquals(Option.empty(), getRawValueWithAltKeys(conf, TEST_BOOLEAN_CONFIG_PROPERTY)); + + boolean setValue = !Boolean.parseBoolean(TEST_BOOLEAN_CONFIG_PROPERTY.defaultValue()); + conf.setBoolean(TEST_BOOLEAN_CONFIG_PROPERTY.key(), setValue); + assertEquals(Option.of(String.valueOf(setValue)), + getRawValueWithAltKeys(conf, TEST_BOOLEAN_CONFIG_PROPERTY)); + + conf = new Configuration(); + conf.setBoolean(TEST_BOOLEAN_CONFIG_PROPERTY.getAlternatives().get(0), setValue); + assertEquals(Option.of(String.valueOf(setValue)), + getRawValueWithAltKeys(conf, TEST_BOOLEAN_CONFIG_PROPERTY)); + } + + @Test + public void testGetBooleanWithAltKeysFromHadoopConf() { + Configuration conf = new Configuration(); + assertEquals(Boolean.parseBoolean(TEST_BOOLEAN_CONFIG_PROPERTY.defaultValue()), + getBooleanWithAltKeys(conf, TEST_BOOLEAN_CONFIG_PROPERTY)); + + boolean setValue = !Boolean.parseBoolean(TEST_BOOLEAN_CONFIG_PROPERTY.defaultValue()); + conf.setBoolean(TEST_BOOLEAN_CONFIG_PROPERTY.key(), setValue); + assertEquals(setValue, + getBooleanWithAltKeys(conf, TEST_BOOLEAN_CONFIG_PROPERTY)); + + conf = new Configuration(); + conf.setBoolean(TEST_BOOLEAN_CONFIG_PROPERTY.getAlternatives().get(0), setValue); + assertEquals(setValue, + getBooleanWithAltKeys(conf, TEST_BOOLEAN_CONFIG_PROPERTY)); + } +} diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieROTablePathFilter.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieROTablePathFilter.java index 48fd4bc29c9..d6a62f3a061 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieROTablePathFilter.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieROTablePathFilter.java @@ -19,7 +19,6 @@ package org.apache.hudi.hadoop; import org.apache.hudi.common.engine.HoodieLocalEngineContext; -import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.HoodieBaseFile; import org.apache.hudi.common.model.HoodiePartitionMetadata; import org.apache.hudi.common.table.HoodieTableMetaClient; @@ -208,7 +207,7 @@ public class HoodieROTablePathFilter implements Configurable, PathFilter, Serial fsView = FileSystemViewManager.createInMemoryFileSystemView(engineContext, metaClient, HoodieInputFormatUtils.buildMetadataConfig(conf)); } - String partition = FSUtils.getRelativePartitionPath(new Path(metaClient.getBasePath()), folder); + String partition = HadoopFSUtils.getRelativePartitionPath(new Path(metaClient.getBasePath()), folder); List<HoodieBaseFile> latestFiles = fsView.getLatestBaseFiles(partition).collect(Collectors.toList()); // populate the cache if (!hoodiePathCache.containsKey(folder.toString())) { diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieParquetRealtimeInputFormat.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieParquetRealtimeInputFormat.java index 2aee2edf135..7e74171c3f9 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieParquetRealtimeInputFormat.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieParquetRealtimeInputFormat.java @@ -18,7 +18,6 @@ package org.apache.hudi.hadoop.realtime; -import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.table.HoodieTableConfig; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.util.Option; @@ -79,7 +78,7 @@ public class HoodieParquetRealtimeInputFormat extends HoodieParquetInputFormat { + ", Ids :" + jobConf.get(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR)); // for log only split, set the parquet reader as empty. - if (FSUtils.isLogFile(realtimeSplit.getPath())) { + if (HadoopFSUtils.isLogFile(realtimeSplit.getPath())) { return new HoodieRealtimeRecordReader(realtimeSplit, jobConf, new HoodieEmptyRecordReader(realtimeSplit, jobConf)); } diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieInputFormatUtils.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieInputFormatUtils.java index 33d25f1c21f..9db661daf81 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieInputFormatUtils.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieInputFormatUtils.java @@ -184,7 +184,7 @@ public class HoodieInputFormatUtils { return getInputFormat(HoodieFileFormat.HFILE, realtime, conf); } // now we support read log file, try to find log file - if (FSUtils.isLogFile(new Path(path)) && realtime) { + if (HadoopFSUtils.isLogFile(new Path(path)) && realtime) { return getInputFormat(HoodieFileFormat.PARQUET, realtime, conf); } throw new HoodieIOException("Hoodie InputFormat not implemented for base file of type " + extension); diff --git a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieMergeOnReadSnapshotReader.java b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieMergeOnReadSnapshotReader.java index 30ac00b0b0d..15a935bbd9e 100644 --- a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieMergeOnReadSnapshotReader.java +++ b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieMergeOnReadSnapshotReader.java @@ -60,7 +60,7 @@ import java.util.List; import java.util.Map; import java.util.stream.Collectors; -import static org.apache.hudi.common.fs.FSUtils.getRelativePartitionPath; +import static org.apache.hudi.hadoop.fs.HadoopFSUtils.getRelativePartitionPath; import static org.apache.hudi.hadoop.testutils.InputFormatTestUtil.writeDataBlockToLogFile; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertTrue; diff --git a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieRealtimeRecordReader.java b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieRealtimeRecordReader.java index 7c0507bace6..c05e6e9d128 100644 --- a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieRealtimeRecordReader.java +++ b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieRealtimeRecordReader.java @@ -21,7 +21,6 @@ package org.apache.hudi.hadoop.realtime; import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.avro.model.HoodieCompactionPlan; import org.apache.hudi.common.config.HoodieCommonConfig; -import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.FileSlice; import org.apache.hudi.common.model.HoodieCommitMetadata; import org.apache.hudi.common.model.HoodieLogFile; @@ -211,7 +210,7 @@ public class TestHoodieRealtimeRecordReader { // TODO: HUDI-154 Once Hive 2.x PR (PR-674) is merged, enable this change // logVersionsWithAction.add(Pair.of(HoodieTimeline.ROLLBACK_ACTION, 3)); FileSlice fileSlice = - new FileSlice(partitioned ? FSUtils.getRelativePartitionPath(new Path(basePath.toString()), + new FileSlice(partitioned ? HadoopFSUtils.getRelativePartitionPath(new Path(basePath.toString()), new Path(partitionDir.getAbsolutePath())) : "default", baseInstant, "fileid0"); logVersionsWithAction.forEach(logVersionWithAction -> { try { diff --git a/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/bootstrap/SparkFullBootstrapDataProviderBase.java b/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/bootstrap/SparkFullBootstrapDataProviderBase.java index c857b61e0a4..c1bd8be8f57 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/bootstrap/SparkFullBootstrapDataProviderBase.java +++ b/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/bootstrap/SparkFullBootstrapDataProviderBase.java @@ -24,7 +24,6 @@ import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.avro.model.HoodieFileStatus; import org.apache.hudi.client.bootstrap.FullRecordBootstrapDataProvider; import org.apache.hudi.client.common.HoodieSparkEngineContext; -import org.apache.hudi.common.bootstrap.FileStatusUtils; import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieRecord; @@ -34,6 +33,7 @@ import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieIOException; +import org.apache.hudi.hadoop.fs.HadoopFSUtils; import org.apache.hudi.keygen.KeyGenerator; import org.apache.hudi.keygen.SparkKeyGeneratorInterface; import org.apache.hudi.keygen.constant.KeyGeneratorOptions; @@ -63,7 +63,7 @@ public abstract class SparkFullBootstrapDataProviderBase extends FullRecordBoots public JavaRDD<HoodieRecord> generateInputRecords(String tableName, String sourceBasePath, List<Pair<String, List<HoodieFileStatus>>> partitionPathsWithFiles, HoodieWriteConfig config) { String[] filePaths = partitionPathsWithFiles.stream().map(Pair::getValue) - .flatMap(f -> f.stream().map(fs -> FileStatusUtils.toPath(fs.getPath()).toString())) + .flatMap(f -> f.stream().map(fs -> HadoopFSUtils.toPath(fs.getPath()).toString())) .toArray(String[]::new); // NOTE: "basePath" option is required for spark to discover the partition column diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowInvalidParquetProcedure.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowInvalidParquetProcedure.scala index b9119364715..dacfdef6739 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowInvalidParquetProcedure.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowInvalidParquetProcedure.scala @@ -52,7 +52,7 @@ class ShowInvalidParquetProcedure extends BaseProcedure with ProcedureBuilder { val storageConf = HadoopFSUtils.getStorageConfWithCopy(jsc.hadoopConfiguration()) javaRdd.rdd.map(part => { val fs = HadoopFSUtils.getFs(new Path(srcPath), storageConf.unwrap()) - FSUtils.getAllDataFilesInPartition(fs, FSUtils.constructAbsolutePathInHadoopPath(srcPath, part)) + HadoopFSUtils.getAllDataFilesInPartition(fs, HadoopFSUtils.constructAbsolutePathInHadoopPath(srcPath, part)) }).flatMap(_.toList) .filter(status => { val filePath = status.getPath diff --git a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBootstrap.java b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBootstrap.java index 2b371cf1db3..feec6c78ab2 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBootstrap.java +++ b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBootstrap.java @@ -27,7 +27,6 @@ import org.apache.hudi.client.bootstrap.selector.BootstrapModeSelector; import org.apache.hudi.client.bootstrap.selector.FullRecordBootstrapModeSelector; import org.apache.hudi.client.bootstrap.selector.MetadataOnlyBootstrapModeSelector; import org.apache.hudi.client.common.HoodieSparkEngineContext; -import org.apache.hudi.common.bootstrap.FileStatusUtils; import org.apache.hudi.common.bootstrap.index.BootstrapIndex; import org.apache.hudi.common.config.HoodieMetadataConfig; import org.apache.hudi.common.config.TypedProperties; @@ -175,7 +174,7 @@ public class TestBootstrap extends HoodieSparkClientTestBase { } else { df.write().format("parquet").mode(SaveMode.Overwrite).save(srcPath); } - String filePath = FileStatusUtils.toPath(BootstrapUtils.getAllLeafFoldersWithFiles( + String filePath = HadoopFSUtils.toPath(BootstrapUtils.getAllLeafFoldersWithFiles( metaClient, (FileSystem) metaClient.getStorage().getFileSystem(), srcPath, context).stream().findAny().map(p -> p.getValue().stream().findAny()) .orElse(null).get().getPath()).toString(); @@ -513,7 +512,7 @@ public class TestBootstrap extends HoodieSparkClientTestBase { @Override public JavaRDD<HoodieRecord> generateInputRecords(String tableName, String sourceBasePath, List<Pair<String, List<HoodieFileStatus>>> partitionPaths, HoodieWriteConfig config) { - String filePath = FileStatusUtils.toPath(partitionPaths.stream().flatMap(p -> p.getValue().stream()) + String filePath = HadoopFSUtils.toPath(partitionPaths.stream().flatMap(p -> p.getValue().stream()) .findAny().get().getPath()).toString(); ParquetFileReader reader = null; JavaSparkContext jsc = HoodieSparkEngineContext.getSparkContext(context); @@ -531,7 +530,7 @@ public class TestBootstrap extends HoodieSparkClientTestBase { private static JavaRDD<HoodieRecord> generateInputBatch(JavaSparkContext jsc, List<Pair<String, List<HoodieFileStatus>>> partitionPaths, Schema writerSchema) { List<Pair<String, Path>> fullFilePathsWithPartition = partitionPaths.stream().flatMap(p -> p.getValue().stream() - .map(x -> Pair.of(p.getKey(), FileStatusUtils.toPath(x.getPath())))).collect(Collectors.toList()); + .map(x -> Pair.of(p.getKey(), HadoopFSUtils.toPath(x.getPath())))).collect(Collectors.toList()); return jsc.parallelize(fullFilePathsWithPartition.stream().flatMap(p -> { try { Configuration conf = jsc.hadoopConfiguration(); diff --git a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestOrcBootstrap.java b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestOrcBootstrap.java index fe105efff42..45921cd9568 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestOrcBootstrap.java +++ b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestOrcBootstrap.java @@ -28,7 +28,6 @@ import org.apache.hudi.client.bootstrap.selector.BootstrapModeSelector; import org.apache.hudi.client.bootstrap.selector.FullRecordBootstrapModeSelector; import org.apache.hudi.client.bootstrap.selector.MetadataOnlyBootstrapModeSelector; import org.apache.hudi.client.common.HoodieSparkEngineContext; -import org.apache.hudi.common.bootstrap.FileStatusUtils; import org.apache.hudi.common.bootstrap.index.BootstrapIndex; import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.model.HoodieAvroRecord; @@ -50,6 +49,7 @@ import org.apache.hudi.config.HoodieCompactionConfig; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.hadoop.HoodieParquetInputFormat; +import org.apache.hudi.hadoop.fs.HadoopFSUtils; import org.apache.hudi.index.HoodieIndex.IndexType; import org.apache.hudi.io.hadoop.OrcReaderIterator; import org.apache.hudi.keygen.NonpartitionedKeyGenerator; @@ -155,7 +155,7 @@ public class TestOrcBootstrap extends HoodieSparkClientTestBase { } else { df.write().format("orc").mode(SaveMode.Overwrite).save(srcPath); } - String filePath = FileStatusUtils.toPath(BootstrapUtils.getAllLeafFoldersWithFiles(metaClient, (FileSystem) metaClient.getStorage().getFileSystem(), + String filePath = HadoopFSUtils.toPath(BootstrapUtils.getAllLeafFoldersWithFiles(metaClient, (FileSystem) metaClient.getStorage().getFileSystem(), srcPath, context).stream().findAny().map(p -> p.getValue().stream().findAny()) .orElse(null).get().getPath()).toString(); Reader orcReader = @@ -401,12 +401,12 @@ public class TestOrcBootstrap extends HoodieSparkClientTestBase { public JavaRDD<HoodieRecord> generateInputRecords(String tableName, String sourceBasePath, List<Pair<String, List<HoodieFileStatus>>> partitionPaths, HoodieWriteConfig config) { String[] filePaths = partitionPaths.stream().map(Pair::getValue) - .flatMap(f -> f.stream().map(fs -> FileStatusUtils.toPath(fs.getPath()).toString())) + .flatMap(f -> f.stream().map(fs -> HadoopFSUtils.toPath(fs.getPath()).toString())) .toArray(String[]::new); JavaSparkContext jsc = HoodieSparkEngineContext.getSparkContext(context); - String filePath = FileStatusUtils.toPath(partitionPaths.stream().flatMap(p -> p.getValue().stream()) + String filePath = HadoopFSUtils.toPath(partitionPaths.stream().flatMap(p -> p.getValue().stream()) .findAny().get().getPath()).toString(); try { Reader orcReader = OrcFile.createReader( @@ -425,7 +425,7 @@ public class TestOrcBootstrap extends HoodieSparkClientTestBase { private static JavaRDD<HoodieRecord> generateInputBatch(JavaSparkContext jsc, List<Pair<String, List<HoodieFileStatus>>> partitionPaths, Schema writerSchema) { List<Pair<String, Path>> fullFilePathsWithPartition = partitionPaths.stream().flatMap(p -> p.getValue().stream() - .map(x -> Pair.of(p.getKey(), FileStatusUtils.toPath(x.getPath())))).collect(Collectors.toList()); + .map(x -> Pair.of(p.getKey(), HadoopFSUtils.toPath(x.getPath())))).collect(Collectors.toList()); return jsc.parallelize(fullFilePathsWithPartition.stream().flatMap(p -> { try { Configuration conf = jsc.hadoopConfiguration(); diff --git a/hudi-sync/hudi-adb-sync/src/main/java/org/apache/hudi/sync/adb/HoodieAdbJdbcClient.java b/hudi-sync/hudi-adb-sync/src/main/java/org/apache/hudi/sync/adb/HoodieAdbJdbcClient.java index 0c4305017f1..74fbe94aef7 100644 --- a/hudi-sync/hudi-adb-sync/src/main/java/org/apache/hudi/sync/adb/HoodieAdbJdbcClient.java +++ b/hudi-sync/hudi-adb-sync/src/main/java/org/apache/hudi/sync/adb/HoodieAdbJdbcClient.java @@ -18,11 +18,11 @@ package org.apache.hudi.sync.adb; -import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.common.util.ValidationUtils; import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.hadoop.fs.HadoopFSUtils; import org.apache.hudi.hive.HoodieHiveSyncException; import org.apache.hudi.hive.SchemaDifference; import org.apache.hudi.hive.util.HiveSchemaUtil; @@ -323,7 +323,7 @@ public class HoodieAdbJdbcClient extends HoodieSyncClient { if (!StringUtils.isNullOrEmpty(str)) { List<String> values = partitionValueExtractor.extractPartitionValuesInPath(str); Path storagePartitionPath = - FSUtils.constructAbsolutePathInHadoopPath(config.getString(META_SYNC_BASE_PATH), String.join("/", values)); + HadoopFSUtils.constructAbsolutePathInHadoopPath(config.getString(META_SYNC_BASE_PATH), String.join("/", values)); String fullStoragePartitionPath = Path.getPathWithoutSchemeAndAuthority(storagePartitionPath).toUri().getPath(); partitions.put(values, fullStoragePartitionPath); @@ -359,7 +359,7 @@ public class HoodieAdbJdbcClient extends HoodieSyncClient { .append(tableName).append("`").append(" add if not exists "); for (String partition : partitions) { String partitionClause = getPartitionClause(partition); - Path partitionPath = FSUtils.constructAbsolutePathInHadoopPath(config.getString(META_SYNC_BASE_PATH), partition); + Path partitionPath = HadoopFSUtils.constructAbsolutePathInHadoopPath(config.getString(META_SYNC_BASE_PATH), partition); String fullPartitionPathStr = config.generateAbsolutePathStr(partitionPath); sqlBuilder.append(" partition (").append(partitionClause).append(") location '") .append(fullPartitionPathStr).append("' "); @@ -376,7 +376,7 @@ public class HoodieAdbJdbcClient extends HoodieSyncClient { String alterTable = "alter table `" + tableName + "`"; for (String partition : partitions) { String partitionClause = getPartitionClause(partition); - Path partitionPath = FSUtils.constructAbsolutePathInHadoopPath(config.getString(META_SYNC_BASE_PATH), partition); + Path partitionPath = HadoopFSUtils.constructAbsolutePathInHadoopPath(config.getString(META_SYNC_BASE_PATH), partition); String fullPartitionPathStr = config.generateAbsolutePathStr(partitionPath); String changePartition = alterTable + " add if not exists partition (" + partitionClause + ") location '" + fullPartitionPathStr + "'"; @@ -455,13 +455,13 @@ public class HoodieAdbJdbcClient extends HoodieSyncClient { List<PartitionEvent> events = new ArrayList<>(); for (String storagePartition : partitionStoragePartitions) { Path storagePartitionPath = - FSUtils.constructAbsolutePathInHadoopPath(config.getString(META_SYNC_BASE_PATH), storagePartition); + HadoopFSUtils.constructAbsolutePathInHadoopPath(config.getString(META_SYNC_BASE_PATH), storagePartition); String fullStoragePartitionPath = Path.getPathWithoutSchemeAndAuthority(storagePartitionPath).toUri().getPath(); // Check if the partition values or if hdfs path is the same List<String> storagePartitionValues = partitionValueExtractor.extractPartitionValuesInPath(storagePartition); if (config.getBoolean(ADB_SYNC_USE_HIVE_STYLE_PARTITIONING)) { String partition = String.join("/", storagePartitionValues); - storagePartitionPath = FSUtils.constructAbsolutePathInHadoopPath(config.getString(META_SYNC_BASE_PATH), partition); + storagePartitionPath = HadoopFSUtils.constructAbsolutePathInHadoopPath(config.getString(META_SYNC_BASE_PATH), partition); fullStoragePartitionPath = Path.getPathWithoutSchemeAndAuthority(storagePartitionPath).toUri().getPath(); } if (!storagePartitionValues.isEmpty()) { diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/HMSDDLExecutor.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/HMSDDLExecutor.java index b5471079524..c3db79fb368 100644 --- a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/HMSDDLExecutor.java +++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/HMSDDLExecutor.java @@ -21,6 +21,7 @@ package org.apache.hudi.hive.ddl; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.util.CollectionUtils; import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.hadoop.fs.HadoopFSUtils; import org.apache.hudi.hive.HiveSyncConfig; import org.apache.hudi.hive.HoodieHiveSyncException; import org.apache.hudi.hive.util.HivePartitionUtil; @@ -205,7 +206,7 @@ public class HMSDDLExecutor implements DDLExecutor { partitionSd.setOutputFormat(sd.getOutputFormat()); partitionSd.setSerdeInfo(sd.getSerdeInfo()); String fullPartitionPath = - FSUtils.constructAbsolutePathInHadoopPath(syncConfig.getString(META_SYNC_BASE_PATH), x).toString(); + FSUtils.constructAbsolutePath(syncConfig.getString(META_SYNC_BASE_PATH), x).toString(); List<String> partitionValues = partitionValueExtractor.extractPartitionValuesInPath(x); partitionSd.setLocation(fullPartitionPath); partitionList.add(new Partition(partitionValues, databaseName, tableName, 0, 0, partitionSd, null)); @@ -229,10 +230,10 @@ public class HMSDDLExecutor implements DDLExecutor { try { StorageDescriptor sd = client.getTable(databaseName, tableName).getSd(); List<Partition> partitionList = changedPartitions.stream().map(partition -> { - Path partitionPath = FSUtils.constructAbsolutePathInHadoopPath(syncConfig.getString(META_SYNC_BASE_PATH), partition); + Path partitionPath = HadoopFSUtils.constructAbsolutePathInHadoopPath(syncConfig.getString(META_SYNC_BASE_PATH), partition); String partitionScheme = partitionPath.toUri().getScheme(); String fullPartitionPath = StorageSchemes.HDFS.getScheme().equals(partitionScheme) - ? FSUtils.getDFSFullPartitionPath(syncConfig.getHadoopFileSystem(), partitionPath) : partitionPath.toString(); + ? HadoopFSUtils.getDFSFullPartitionPath(syncConfig.getHadoopFileSystem(), partitionPath) : partitionPath.toString(); List<String> partitionValues = partitionValueExtractor.extractPartitionValuesInPath(partition); StorageDescriptor partitionSd = sd.deepCopy(); partitionSd.setLocation(fullPartitionPath); diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/QueryBasedDDLExecutor.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/QueryBasedDDLExecutor.java index 194f99705bf..156353f0e24 100644 --- a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/QueryBasedDDLExecutor.java +++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/QueryBasedDDLExecutor.java @@ -22,6 +22,7 @@ import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.util.PartitionPathEncodeUtils; import org.apache.hudi.common.util.ValidationUtils; import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.hadoop.fs.HadoopFSUtils; import org.apache.hudi.hive.HiveSyncConfig; import org.apache.hudi.hive.HoodieHiveSyncException; import org.apache.hudi.hive.util.HiveSchemaUtil; @@ -162,7 +163,7 @@ public abstract class QueryBasedDDLExecutor implements DDLExecutor { for (int i = 0; i < partitions.size(); i++) { String partitionClause = getPartitionClause(partitions.get(i)); String fullPartitionPath = - FSUtils.constructAbsolutePathInHadoopPath(config.getString(META_SYNC_BASE_PATH), partitions.get(i)).toString(); + FSUtils.constructAbsolutePath(config.getString(META_SYNC_BASE_PATH), partitions.get(i)).toString(); alterSQL.append(" PARTITION (").append(partitionClause).append(") LOCATION '").append(fullPartitionPath) .append("' "); if ((i + 1) % batchSyncPartitionNum == 0) { @@ -211,10 +212,10 @@ public abstract class QueryBasedDDLExecutor implements DDLExecutor { String alterTable = "ALTER TABLE " + HIVE_ESCAPE_CHARACTER + tableName + HIVE_ESCAPE_CHARACTER; for (String partition : partitions) { String partitionClause = getPartitionClause(partition); - Path partitionPath = FSUtils.constructAbsolutePathInHadoopPath(config.getString(META_SYNC_BASE_PATH), partition); + Path partitionPath = HadoopFSUtils.constructAbsolutePathInHadoopPath(config.getString(META_SYNC_BASE_PATH), partition); String partitionScheme = partitionPath.toUri().getScheme(); String fullPartitionPath = StorageSchemes.HDFS.getScheme().equals(partitionScheme) - ? FSUtils.getDFSFullPartitionPath(config.getHadoopFileSystem(), partitionPath) : partitionPath.toString(); + ? HadoopFSUtils.getDFSFullPartitionPath(config.getHadoopFileSystem(), partitionPath) : partitionPath.toString(); String changePartition = alterTable + " PARTITION (" + partitionClause + ") SET LOCATION '" + fullPartitionPath + "'"; changePartitions.add(changePartition); diff --git a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java index f2c67bc22e5..136c9c4e636 100644 --- a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java +++ b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java @@ -82,9 +82,9 @@ import java.util.Map; import java.util.Set; import java.util.stream.Collectors; -import static org.apache.hudi.common.fs.FSUtils.getRelativePartitionPath; import static org.apache.hudi.common.table.timeline.HoodieTimeline.COMMIT_ACTION; import static org.apache.hudi.common.table.timeline.HoodieTimeline.DELTA_COMMIT_ACTION; +import static org.apache.hudi.hadoop.fs.HadoopFSUtils.getRelativePartitionPath; import static org.apache.hudi.hive.HiveSyncConfig.HIVE_SYNC_FILTER_PUSHDOWN_ENABLED; import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_AUTO_CREATE_DATABASE; import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_CREATE_MANAGED_TABLE; @@ -358,7 +358,7 @@ public class TestHiveSyncTool { // it and generate a partition update event for it. ddlExecutor.runSQL("ALTER TABLE `" + HiveTestUtil.TABLE_NAME + "` PARTITION (`datestr`='2050-01-01') SET LOCATION '" - + FSUtils.constructAbsolutePathInHadoopPath(basePath, "2050/1/1").toString() + "'"); + + FSUtils.constructAbsolutePath(basePath, "2050/1/1").toString() + "'"); hivePartitions = hiveClient.getAllPartitions(HiveTestUtil.TABLE_NAME); List<String> writtenPartitionsSince = hiveClient.getWrittenPartitionsSince(Option.empty(), Option.empty()); diff --git a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieSyncClient.java b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieSyncClient.java index ffb82021213..03085cc9d9b 100644 --- a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieSyncClient.java +++ b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieSyncClient.java @@ -21,8 +21,8 @@ package org.apache.hudi.sync.common; import org.apache.hudi.common.engine.HoodieLocalEngineContext; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.HoodieTableType; -import org.apache.hudi.common.table.ParquetTableSchemaResolver; import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.ParquetTableSchemaResolver; import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.table.timeline.TimelineUtils; import org.apache.hudi.common.util.Option; @@ -162,7 +162,7 @@ public abstract class HoodieSyncClient implements HoodieMetaSyncOperations, Auto List<PartitionEvent> events = new ArrayList<>(); for (String storagePartition : allPartitionsOnStorage) { Path storagePartitionPath = - FSUtils.constructAbsolutePathInHadoopPath(config.getString(META_SYNC_BASE_PATH), storagePartition); + HadoopFSUtils.constructAbsolutePathInHadoopPath(config.getString(META_SYNC_BASE_PATH), storagePartition); String fullStoragePartitionPath = Path.getPathWithoutSchemeAndAuthority(storagePartitionPath).toUri().getPath(); // Check if the partition values or if hdfs path is the same List<String> storagePartitionValues = partitionValueExtractor.extractPartitionValuesInPath(storagePartition); @@ -206,7 +206,7 @@ public abstract class HoodieSyncClient implements HoodieMetaSyncOperations, Auto List<PartitionEvent> events = new ArrayList<>(); for (String storagePartition : writtenPartitionsOnStorage) { Path storagePartitionPath = - FSUtils.constructAbsolutePathInHadoopPath(config.getString(META_SYNC_BASE_PATH), storagePartition); + HadoopFSUtils.constructAbsolutePathInHadoopPath(config.getString(META_SYNC_BASE_PATH), storagePartition); String fullStoragePartitionPath = Path.getPathWithoutSchemeAndAuthority(storagePartitionPath).toUri().getPath(); // Check if the partition values or if hdfs path is the same List<String> storagePartitionValues = partitionValueExtractor.extractPartitionValuesInPath(storagePartition); diff --git a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieSyncConfig.java b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieSyncConfig.java index e85324b7a77..35900fc75da 100644 --- a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieSyncConfig.java +++ b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieSyncConfig.java @@ -25,7 +25,7 @@ import org.apache.hudi.common.config.HoodieConfig; import org.apache.hudi.common.config.HoodieMetadataConfig; import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.table.HoodieTableConfig; -import org.apache.hudi.common.util.ConfigUtils; +import org.apache.hudi.common.util.HadoopConfigUtils; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.hadoop.fs.HadoopFSUtils; @@ -199,7 +199,7 @@ public class HoodieSyncConfig extends HoodieConfig { private Configuration hadoopConf; public HoodieSyncConfig(Properties props) { - this(props, ConfigUtils.createHadoopConf(props)); + this(props, HadoopConfigUtils.createHadoopConf(props)); } public HoodieSyncConfig(Properties props, Configuration hadoopConf) { diff --git a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieSyncTool.java b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieSyncTool.java index 729807d1b9b..c614a7ae82b 100644 --- a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieSyncTool.java +++ b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieSyncTool.java @@ -18,7 +18,7 @@ package org.apache.hudi.sync.common; import org.apache.hudi.common.config.TypedProperties; -import org.apache.hudi.common.util.ConfigUtils; +import org.apache.hudi.common.util.HadoopConfigUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; @@ -35,7 +35,7 @@ public abstract class HoodieSyncTool implements AutoCloseable { protected Configuration hadoopConf; public HoodieSyncTool(Properties props) { - this(props, ConfigUtils.createHadoopConf(props)); + this(props, HadoopConfigUtils.createHadoopConf(props)); } public HoodieSyncTool(Properties props, Configuration hadoopConf) { diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieDataTableUtils.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieDataTableUtils.java index 7647f93c899..6f1be367c2e 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieDataTableUtils.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieDataTableUtils.java @@ -38,7 +38,7 @@ public class HoodieDataTableUtils { String basePath) throws IOException { List<String> allPartitionPaths = tableMetadata.getAllPartitionPaths() .stream().map(partitionPath -> - FSUtils.constructAbsolutePathInHadoopPath(basePath, partitionPath).toString()) + FSUtils.constructAbsolutePath(basePath, partitionPath).toString()) .collect(Collectors.toList()); return tableMetadata.getAllFilesInPartitions(allPartitionPaths).values().stream() .map(fileStatuses -> diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieRepairTool.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieRepairTool.java index 94dde8ce41e..f7fdbcae64c 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieRepairTool.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieRepairTool.java @@ -22,7 +22,6 @@ package org.apache.hudi.utilities; import org.apache.hudi.client.common.HoodieSparkEngineContext; import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.engine.HoodieEngineContext; -import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.table.timeline.HoodieArchivedTimeline; @@ -290,13 +289,13 @@ public class HoodieRepairTool { HoodieEngineContext context, String basePathStr, int expectedLevel, int parallelism) { FileSystem fs = HadoopFSUtils.getFs(basePathStr, context.getStorageConf()); Path basePath = new Path(basePathStr); - return FSUtils.getFileStatusAtLevel( + return HadoopFSUtils.getFileStatusAtLevel( context, fs, basePath, expectedLevel, parallelism).stream() .filter(fileStatus -> { if (!fileStatus.isFile()) { return false; } - return FSUtils.isDataFile(fileStatus.getPath()); + return HadoopFSUtils.isDataFile(fileStatus.getPath()); }) .map(fileStatus -> fileStatus.getPath().toString()) .collect(Collectors.toList()); @@ -414,7 +413,7 @@ public class HoodieRepairTool { List<String> relativeFilePaths = listFilesFromBasePath( context, backupPathStr, partitionLevels, cfg.parallelism).stream() .map(filePath -> - FSUtils.getRelativePartitionPath(new Path(backupPathStr), new Path(filePath))) + HadoopFSUtils.getRelativePartitionPath(new Path(backupPathStr), new Path(filePath))) .collect(Collectors.toList()); return restoreFiles(relativeFilePaths); } diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieSnapshotCopier.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieSnapshotCopier.java index 36050c926ab..9b3dcc6ffe1 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieSnapshotCopier.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieSnapshotCopier.java @@ -138,7 +138,7 @@ public class HoodieSnapshotCopier implements Serializable { context.foreach(filesToCopy, tuple -> { String partition = tuple._1(); Path sourceFilePath = new Path(tuple._2()); - Path toPartitionPath = FSUtils.constructAbsolutePathInHadoopPath(outputDir, partition); + Path toPartitionPath = HadoopFSUtils.constructAbsolutePathInHadoopPath(outputDir, partition); FileSystem ifs = HadoopFSUtils.getFs(baseDir, storageConf.unwrapCopyAs(Configuration.class)); if (!ifs.exists(toPartitionPath)) { diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieSnapshotExporter.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieSnapshotExporter.java index fd80d37a8d2..c6c8a393bbd 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieSnapshotExporter.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieSnapshotExporter.java @@ -228,7 +228,7 @@ public class HoodieSnapshotExporter { context.foreach(partitionAndFileList, partitionAndFile -> { String partition = partitionAndFile.getLeft(); Path sourceFilePath = new Path(partitionAndFile.getRight()); - Path toPartitionPath = FSUtils.constructAbsolutePathInHadoopPath(cfg.targetOutputPath, partition); + Path toPartitionPath = HadoopFSUtils.constructAbsolutePathInHadoopPath(cfg.targetOutputPath, partition); FileSystem executorSourceFs = HadoopFSUtils.getFs(cfg.sourceBasePath, storageConf.newInstance()); FileSystem executorOutputFs = HadoopFSUtils.getFs(cfg.targetOutputPath, storageConf.newInstance());
