yihua commented on code in PR #11805:
URL: https://github.com/apache/hudi/pull/11805#discussion_r1744825114
##########
hudi-io/src/main/java/org/apache/hudi/storage/HoodieStorage.java:
##########
@@ -426,6 +426,26 @@ public List<StoragePathInfo>
listDirectEntries(List<StoragePath> pathList) throw
return result;
}
+ /**
+ * Lists the file info of the direct files/directories in the given list of
paths
+ * and filters the results, if the paths are directory.
+ *
+ * @param pathList given path list.
+ * @param filter filter to apply.
+ * @return the list of path info of the files/directories in the given paths.
+ * @throws FileNotFoundException when the path does not exist.
+ * @throws IOException IO error.
+ */
+ @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
+ public List<StoragePathInfo> listDirectEntries(List<StoragePath> pathList,
+ StoragePathFilter filter)
throws IOException {
+ List<StoragePathInfo> result = new ArrayList<>();
+ for (StoragePath path : pathList) {
+ result.addAll(listDirectEntries(path, filter));
+ }
+ return result;
Review Comment:
use `pathList.stream().flatMap...`
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackStrategy.java:
##########
@@ -272,17 +279,25 @@ private FileStatus[]
fetchFilesFromCommitMetadata(HoodieInstant instantToRollbac
* @param partitionPath
* @param basePath
* @param baseFileExtension
- * @param fs
+ * @param storage
* @return
* @throws IOException
*/
- private FileStatus[] fetchFilesFromListFiles(HoodieInstant
instantToRollback, String partitionPath, String basePath,
- String baseFileExtension,
FileSystem fs)
- throws IOException {
- SerializablePathFilter pathFilter =
getSerializablePathFilter(baseFileExtension, instantToRollback.getTimestamp());
- Path[] filePaths = listFilesToBeDeleted(basePath, partitionPath);
+ private List<StoragePathInfo> fetchFilesFromListFiles(HoodieInstant
instantToRollback,
+ String partitionPath,
+ String basePath,
+ String
baseFileExtension,
+ HoodieStorage storage)
{
+ StoragePathFilter pathFilter = getPathFilter(baseFileExtension,
instantToRollback.getTimestamp());
+ List<StoragePath> filePaths = listFilesToBeDeleted(basePath,
partitionPath);
+
+ try {
+ return storage.listDirectEntries(filePaths, pathFilter);
+ } catch (IOException ioe) {
Review Comment:
Same here: Should this still throw `IOException` as before?
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackStrategy.java:
##########
@@ -221,49 +216,61 @@ private static String formatDeletePath(String path) {
return path.substring(path.indexOf(":") + 1);
}
- private FileStatus[] listBaseFilesToBeDeleted(String commit, String
basefileExtension, String partitionPath,
- FileSystem fs) throws
IOException {
+ private List<StoragePathInfo> listBaseFilesToBeDeleted(String commit,
+ String
basefileExtension,
+ String partitionPath,
+ HoodieStorage
storage) throws IOException {
LOG.info("Collecting files to be cleaned/rolledback up for path " +
partitionPath + " and commit " + commit);
- PathFilter filter = (path) -> {
+ StoragePathFilter filter = (path) -> {
if (path.toString().contains(basefileExtension)) {
String fileCommitTime = FSUtils.getCommitTime(path.getName());
return commit.equals(fileCommitTime);
}
return false;
};
- return
fs.listStatus(HadoopFSUtils.constructAbsolutePathInHadoopPath(config.getBasePath(),
partitionPath), filter);
+ return
storage.listDirectEntries(FSUtils.constructAbsolutePath(config.getBasePath(),
partitionPath), filter);
}
- private FileStatus[] fetchFilesFromInstant(HoodieInstant instantToRollback,
String partitionPath, String basePath,
- String baseFileExtension,
FileSystem fs,
- Option<HoodieCommitMetadata>
commitMetadataOptional,
- Boolean isCommitMetadataCompleted,
- HoodieTableType tableType) throws
IOException {
+ private List<StoragePathInfo> fetchFilesFromInstant(HoodieInstant
instantToRollback,
+ String partitionPath,
String basePath,
+ String
baseFileExtension, HoodieStorage storage,
+
Option<HoodieCommitMetadata> commitMetadataOptional,
+ Boolean
isCommitMetadataCompleted,
+ HoodieTableType
tableType) throws IOException {
// go w/ commit metadata only for COW table. for MOR, we need to get
associated log files when commit corresponding to base file is rolledback.
if (isCommitMetadataCompleted && tableType ==
HoodieTableType.COPY_ON_WRITE) {
return fetchFilesFromCommitMetadata(instantToRollback, partitionPath,
basePath, commitMetadataOptional.get(),
- baseFileExtension, fs);
+ baseFileExtension, storage);
} else {
- return fetchFilesFromListFiles(instantToRollback, partitionPath,
basePath, baseFileExtension, fs);
+ return fetchFilesFromListFiles(instantToRollback, partitionPath,
basePath, baseFileExtension, storage);
}
}
- private FileStatus[] fetchFilesFromCommitMetadata(HoodieInstant
instantToRollback, String partitionPath,
- String basePath,
HoodieCommitMetadata commitMetadata,
- String baseFileExtension,
FileSystem fs)
- throws IOException {
- SerializablePathFilter pathFilter =
getSerializablePathFilter(baseFileExtension, instantToRollback.getTimestamp());
- Path[] filePaths = getFilesFromCommitMetadata(basePath, commitMetadata,
partitionPath);
-
- return fs.listStatus(Arrays.stream(filePaths).filter(entry -> {
- try {
- return fs.exists(entry);
- } catch (Exception e) {
- LOG.error("Exists check failed for " + entry.toString(), e);
- }
- // if any Exception is thrown, do not ignore. let's try to add the file
of interest to be deleted. we can't miss any files to be rolled back.
- return true;
- }).toArray(Path[]::new), pathFilter);
+ private List<StoragePathInfo> fetchFilesFromCommitMetadata(HoodieInstant
instantToRollback,
+ String
partitionPath,
+ String basePath,
+
HoodieCommitMetadata commitMetadata,
+ String
baseFileExtension,
+ HoodieStorage
storage) {
+ StoragePathFilter pathFilter = getPathFilter(baseFileExtension,
+ instantToRollback.getTimestamp());
+ List<StoragePath> filePaths = getFilesFromCommitMetadata(basePath,
commitMetadata, partitionPath)
+ .filter(entry -> {
+ try {
+ return storage.exists(entry);
+ } catch (Exception e) {
+ LOG.error("Exists check failed for " + entry.toString(), e);
+ }
+ // if any Exception is thrown, do not ignore. let's try to add the
file of interest to be deleted. we can't miss any files to be rolled back.
+ return true;
+ }).collect(Collectors.toList());
+ try {
+ return storage.listDirectEntries(filePaths, pathFilter);
+ } catch (IOException ioe) {
+ LOG.error("Failed to get StoragePathInfo", ioe);
+ }
+
+ return new ArrayList<>();
Review Comment:
Should this still throw `IOException` as before?
##########
hudi-io/src/main/java/org/apache/hudi/storage/HoodieStorage.java:
##########
@@ -426,6 +426,26 @@ public List<StoragePathInfo>
listDirectEntries(List<StoragePath> pathList) throw
return result;
}
+ /**
+ * Lists the file info of the direct files/directories in the given list of
paths
+ * and filters the results, if the paths are directory.
+ *
+ * @param pathList given path list.
+ * @param filter filter to apply.
+ * @return the list of path info of the files/directories in the given paths.
+ * @throws FileNotFoundException when the path does not exist.
+ * @throws IOException IO error.
+ */
+ @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
+ public List<StoragePathInfo> listDirectEntries(List<StoragePath> pathList,
Review Comment:
Let's add unit tests on this new API in `TestHoodieStorageBase#testListing`
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/ConsistentBucketIndexUtils.java:
##########
@@ -267,8 +263,9 @@ private static Option<HoodieConsistentHashingMetadata>
loadMetadataFromGivenFile
* @param partition Partition metadata file belongs to
* @return true if hashing metadata file is latest else false
*/
- private static boolean recommitMetadataFile(HoodieTable table, FileStatus
metaFile, String partition) {
- Path partitionPath = new
Path(FSUtils.constructAbsolutePath(table.getMetaClient().getBasePath(),
partition).toUri());
+ private static boolean recommitMetadataFile(HoodieTable table,
StoragePathInfo metaFile, String partition) {
+ StoragePath partitionPath = new StoragePath(
+ FSUtils.constructAbsolutePath(table.getMetaClient().getBasePath(),
partition).toUri());
Review Comment:
Seems not changed?
```suggestion
StoragePath partitionPath =
FSUtils.constructAbsolutePath(table.getMetaClient().getBasePath(), partition);
```
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/FileSystemBasedLockProvider.java:
##########
@@ -164,10 +163,10 @@ private boolean checkIfExpired() {
}
private void acquireLock() {
- try (FSDataOutputStream fos = fs.create(this.lockFile, false)) {
- if (!fs.exists(this.lockFile)) {
+ try (OutputStream os = storage.create(this.lockFile, false)) {
+ if (!storage.exists(this.lockFile)) {
initLockInfo();
- fos.writeBytes(lockInfo.toString());
+ os.write(lockInfo.toString().getBytes());
Review Comment:
Use `StringUtils#getUTF8Bytes` instead of `lockInfo.toString().getBytes()`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]