vinothchandar commented on a change in pull request #2421:
URL: https://github.com/apache/hudi/pull/2421#discussion_r555199075
##########
File path:
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
##########
@@ -262,18 +264,33 @@ private static void
processRollbackMetadata(HoodieRollbackMetadata rollbackMetad
partitionToDeletedFiles.get(partition).addAll(deletedFiles);
}
- if (!pm.getAppendFiles().isEmpty()) {
+ if (hasRollbackLogFiles) {
if (!partitionToAppendedFiles.containsKey(partition)) {
partitionToAppendedFiles.put(partition, new HashMap<>());
}
// Extract appended file name from the absolute paths saved in
getAppendFiles()
- pm.getAppendFiles().forEach((path, size) -> {
+ pm.getRollbackLogFiles().forEach((path, size) -> {
partitionToAppendedFiles.get(partition).merge(new
Path(path).getName(), size, (oldSize, newSizeCopy) -> {
return size + oldSize;
Review comment:
we should change it here too. the picking of largest
##########
File path: hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java
##########
@@ -424,10 +425,29 @@ public static boolean isLogFile(Path logPath) {
*/
public static Stream<HoodieLogFile> getAllLogFiles(FileSystem fs, Path
partitionPath, final String fileId,
final String logFileExtension, final String baseCommitTime) throws
IOException {
- return Arrays
- .stream(fs.listStatus(partitionPath,
- path -> path.getName().startsWith("." + fileId) &&
path.getName().contains(logFileExtension)))
- .map(HoodieLogFile::new).filter(s ->
s.getBaseCommitTime().equals(baseCommitTime));
+ try {
+ return Arrays
+ .stream(fs.listStatus(partitionPath,
+ path -> path.getName().startsWith("." + fileId) &&
path.getName().contains(logFileExtension)))
+ .map(HoodieLogFile::new).filter(s ->
s.getBaseCommitTime().equals(baseCommitTime));
+ } catch (FileNotFoundException e) {
+ return Stream.<HoodieLogFile>builder().build();
+ }
+ }
+
+ /**
+ * Get all the log files for the passed in FileId in the partition path.
+ */
+ public static Stream<HoodieLogFile> getAllLogFiles(FileSystem fs, Path
partitionPath,
Review comment:
is this used? need to check
##########
File path:
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackHelper.java
##########
@@ -116,6 +118,12 @@ public ListingBasedRollbackHelper(HoodieTableMetaClient
metaClient, HoodieWriteC
.withDeletedFileResults(filesToDeletedStatus).build());
}
case APPEND_ROLLBACK_BLOCK: {
+ // collect all log files that is supposed to be deleted with this
rollback
+ Map<FileStatus, Long> writtenLogFileSizeMap =
FSUtils.getAllLogFiles(metaClient.getFs(),
Review comment:
its done below anyway. So all good
##########
File path:
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackHelper.java
##########
@@ -116,6 +118,12 @@ public ListingBasedRollbackHelper(HoodieTableMetaClient
metaClient, HoodieWriteC
.withDeletedFileResults(filesToDeletedStatus).build());
}
case APPEND_ROLLBACK_BLOCK: {
+ // collect all log files that is supposed to be deleted with this
rollback
+ Map<FileStatus, Long> writtenLogFileSizeMap =
FSUtils.getAllLogFiles(metaClient.getFs(),
Review comment:
i think we should guard the option variables here.
`rollbackRequest.getFileId()`, `rollbackRequest.getLatestBaseInstant()` with a
`isPresent()` check
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]