satishkotha commented on a change in pull request #2422:
URL: https://github.com/apache/hudi/pull/2422#discussion_r554244115
##########
File path:
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
##########
@@ -92,13 +95,36 @@
case HoodieTimeline.SAVEPOINT_ACTION:
// Nothing to be done here
break;
+ case HoodieTimeline.REPLACE_COMMIT_ACTION:
+ HoodieReplaceCommitMetadata replaceMetadata =
TimelineMetadataUtils.deserializeHoodieReplaceMetadata(
Review comment:
I think this is in json format in active timeline. (Only archival uses
avro format similar to other commits). Can you double check while adding tests?
You may need similar change in CleanPlanner
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java
##########
@@ -370,6 +376,59 @@ private String
getLatestVersionBeforeCommit(List<FileSlice> fileSliceList, Hoodi
return earliestCommitToRetain;
}
+ public Map<String, List<String>>
getReplacedFileIdsToClean(Option<HoodieInstant> earliestInstantToRetain) {
Review comment:
fileSystemView#getReplacedFileGroupsBeforeOrOn looks similar (we may
have to add another method 'getReplacedFileGroupsBefore' to enforce strict
inequality). Maybe we can reuse code? I can do this later on if we file code
cleanup task.
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java
##########
@@ -370,6 +376,59 @@ private String
getLatestVersionBeforeCommit(List<FileSlice> fileSliceList, Hoodi
return earliestCommitToRetain;
}
+ public Map<String, List<String>>
getReplacedFileIdsToClean(Option<HoodieInstant> earliestInstantToRetain) {
+ HoodieCleaningPolicy policy = config.getCleanerPolicy();
+ HoodieTimeline replaceTimeline =
hoodieTable.getActiveTimeline().getCompletedReplaceTimeline();
+
+ // Determine which replace commits can be cleaned.
+ Stream<HoodieInstant> cleanableReplaceCommits;
+ if (policy == HoodieCleaningPolicy.KEEP_LATEST_COMMITS) {
+ if (!earliestInstantToRetain.isPresent()) {
+ LOG.info("Not enough instants to start cleaning replace commits");
+ return Collections.emptyMap();
+ }
+ // all replace commits, before the earliest instant we want to retain,
should be eligible for deleting the
+ // replaced file groups.
+ cleanableReplaceCommits = replaceTimeline
+ .filter(instant ->
HoodieTimeline.compareTimestamps(instant.getTimestamp(),
HoodieTimeline.LESSER_THAN,
+ earliestInstantToRetain.get().getTimestamp()))
+ .getInstants();
+ } else if (policy == HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS) {
+ // In this scenario, we will assume that once replaced a file group
automatically becomes eligible for cleaning completely
+ // In other words, the file versions only apply to the active file
groups.
+ cleanableReplaceCommits = replaceTimeline.getInstants();
+ } else {
+ throw new IllegalArgumentException("Unknown cleaning policy : " +
policy.name());
+ }
+
+ // merge everything and make a map full of file ids to be cleaned.
+ return cleanableReplaceCommits.map(instant -> {
+ try {
+ return
TimelineMetadataUtils.deserializeHoodieReplaceMetadata(hoodieTable.getActiveTimeline().getInstantDetails(instant).get()).getPartitionToReplaceFileIds();
+ } catch (IOException e) {
+ throw new HoodieIOException("Unable to deserialize " + instant, e);
+ }
+ }).reduce((leftMap, rightMap) -> {
+ rightMap.forEach((partition, fileIds) -> {
+ if (!leftMap.containsKey(partition)) {
+ leftMap.put(partition, fileIds);
+ } else {
+ // duplicates should nt be possible; since replace of a file group
should happen once only
+ leftMap.get(partition).addAll(fileIds);
+ }
+ });
+ return leftMap;
+ }).orElse(new HashMap<>());
+ }
+
+ public List<CleanFileInfo> getDeletePathsForReplacedFileGroups(String
partitionPath, List<String> eligibleFileIds) {
+ return hoodieTable.getFileSystemView().getAllFileGroups(partitionPath)
Review comment:
i think getAllFileGroups doesn't return replaced file groups. Looks like
we may have to change name to getAllActiveFileGroups to avoid confusion. You
can make getAllFileGroupsIncludingReplaced public and use it?
We could also use getReplacedFileGroupsBeforeOrOn (or add new method
mentioned above) that returns HoodieFileGroups
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java
##########
@@ -370,6 +376,59 @@ private String
getLatestVersionBeforeCommit(List<FileSlice> fileSliceList, Hoodi
return earliestCommitToRetain;
}
+ public Map<String, List<String>>
getReplacedFileIdsToClean(Option<HoodieInstant> earliestInstantToRetain) {
+ HoodieCleaningPolicy policy = config.getCleanerPolicy();
+ HoodieTimeline replaceTimeline =
hoodieTable.getActiveTimeline().getCompletedReplaceTimeline();
+
+ // Determine which replace commits can be cleaned.
+ Stream<HoodieInstant> cleanableReplaceCommits;
+ if (policy == HoodieCleaningPolicy.KEEP_LATEST_COMMITS) {
+ if (!earliestInstantToRetain.isPresent()) {
+ LOG.info("Not enough instants to start cleaning replace commits");
+ return Collections.emptyMap();
+ }
+ // all replace commits, before the earliest instant we want to retain,
should be eligible for deleting the
+ // replaced file groups.
+ cleanableReplaceCommits = replaceTimeline
+ .filter(instant ->
HoodieTimeline.compareTimestamps(instant.getTimestamp(),
HoodieTimeline.LESSER_THAN,
+ earliestInstantToRetain.get().getTimestamp()))
+ .getInstants();
+ } else if (policy == HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS) {
+ // In this scenario, we will assume that once replaced a file group
automatically becomes eligible for cleaning completely
+ // In other words, the file versions only apply to the active file
groups.
+ cleanableReplaceCommits = replaceTimeline.getInstants();
+ } else {
+ throw new IllegalArgumentException("Unknown cleaning policy : " +
policy.name());
+ }
+
+ // merge everything and make a map full of file ids to be cleaned.
+ return cleanableReplaceCommits.map(instant -> {
+ try {
+ return
TimelineMetadataUtils.deserializeHoodieReplaceMetadata(hoodieTable.getActiveTimeline().getInstantDetails(instant).get()).getPartitionToReplaceFileIds();
+ } catch (IOException e) {
+ throw new HoodieIOException("Unable to deserialize " + instant, e);
+ }
+ }).reduce((leftMap, rightMap) -> {
+ rightMap.forEach((partition, fileIds) -> {
+ if (!leftMap.containsKey(partition)) {
+ leftMap.put(partition, fileIds);
+ } else {
+ // duplicates should nt be possible; since replace of a file group
should happen once only
+ leftMap.get(partition).addAll(fileIds);
+ }
+ });
+ return leftMap;
+ }).orElse(new HashMap<>());
+ }
+
+ public List<CleanFileInfo> getDeletePathsForReplacedFileGroups(String
partitionPath, List<String> eligibleFileIds) {
+ return hoodieTable.getFileSystemView().getAllFileGroups(partitionPath)
+ .filter(fg ->
eligibleFileIds.contains(fg.getFileGroupId().getFileId()))
+ .flatMap(HoodieFileGroup::getAllFileSlices)
+ .flatMap(fileSlice -> getCleanFileInfoForSlice(fileSlice).stream())
+ .collect(Collectors.toList());
Review comment:
in line 220 we are honoring savepoint files for cleaning regular
commits. But we might be missing that for replacecommits. is that fine for now?
##########
File path:
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
##########
@@ -92,13 +95,36 @@
case HoodieTimeline.SAVEPOINT_ACTION:
// Nothing to be done here
break;
+ case HoodieTimeline.REPLACE_COMMIT_ACTION:
+ HoodieReplaceCommitMetadata replaceMetadata =
TimelineMetadataUtils.deserializeHoodieReplaceMetadata(
+ timeline.getInstantDetails(instant).get());
+ records = Option.of(convertMetadataToRecords(replaceMetadata,
instant.getTimestamp()));
+ break;
default:
throw new HoodieException("Unknown type of action " +
instant.getAction());
}
return records;
}
+ public static List<HoodieRecord>
convertMetadataToRecords(HoodieReplaceCommitMetadata replaceCommitMetadata,
String instantTime) {
Review comment:
if HoodieReplaceCommitMetadata is json format, we could reuse
convertMetadataToRecords(HoodieCommitMetadata, String)
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/BaseCleanActionExecutor.java
##########
@@ -81,9 +83,21 @@ HoodieCleanerPlan requestClean(HoodieEngineContext context) {
context.setJobStatus(this.getClass().getSimpleName(), "Generates list of
file slices to be cleaned");
- Map<String, List<HoodieCleanFileInfo>> cleanOps = context
- .map(partitionsToClean, partitionPathToClean ->
Pair.of(partitionPathToClean, planner.getDeletePaths(partitionPathToClean)),
cleanerParallelism)
- .stream()
+ // Compute the file paths, to be cleaned in each valid file group
+ Stream<Pair<String, List<CleanFileInfo>>> cleanInfos =
context.map(partitionsToClean,
+ partitionPathToClean -> Pair.of(partitionPathToClean,
planner.getDeletePaths(partitionPathToClean)),
Review comment:
is it possible to have one call to planner.getDeletePaths return all
files to be cleaned? that seems like better abstraction at a high level to me.
Not sure if there are disadvantages of separating them.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]