This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new a640493f74e [HUDI-6580] Duplicate calculation of
earliestInstantToRetain when generating a clean plan (#9264)
a640493f74e is described below
commit a640493f74ecfa4462d96ecec2dab5952188fca7
Author: zhuanshenbsj1 <[email protected]>
AuthorDate: Mon Jul 24 12:42:16 2023 +0800
[HUDI-6580] Duplicate calculation of earliestInstantToRetain when
generating a clean plan (#9264)
Remove duplicate calculation of earliestInstantToRetain when generating a
clean plan.
Duplicate calculation may cause inconsistent state between when it yields
more completed instants during multiple iterations, results in the
earliestInstantToRetain not being the actual location cleaned up by the initial
partition.
---
.../action/clean/CleanPlanActionExecutor.java | 4 +--
.../hudi/table/action/clean/CleanPlanner.java | 29 +++++++++++-----------
2 files changed, 17 insertions(+), 16 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanActionExecutor.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanActionExecutor.java
index b494df42b49..57b583f54b7 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanActionExecutor.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanActionExecutor.java
@@ -112,14 +112,14 @@ public class CleanPlanActionExecutor<T, I, K, O> extends
BaseActionExecutor<T, I
return
HoodieCleanerPlan.newBuilder().setPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS.name()).build();
}
LOG.info("Earliest commit to retain for clean : " +
(earliestInstant.isPresent() ? earliestInstant.get().getTimestamp() : "null"));
- LOG.info("Total Partitions to clean : " + partitionsToClean.size() + ",
with policy " + config.getCleanerPolicy());
+ LOG.info("Total partitions to clean : " + partitionsToClean.size() + ",
with policy " + config.getCleanerPolicy());
int cleanerParallelism = Math.min(partitionsToClean.size(),
config.getCleanerParallelism());
LOG.info("Using cleanerParallelism: " + cleanerParallelism);
context.setJobStatus(this.getClass().getSimpleName(), "Generating list
of file slices to be cleaned: " + config.getTableName());
Map<String, Pair<Boolean, List<CleanFileInfo>>>
cleanOpsWithPartitionMeta = context
- .map(partitionsToClean, partitionPathToClean ->
Pair.of(partitionPathToClean, planner.getDeletePaths(partitionPathToClean)),
cleanerParallelism)
+ .map(partitionsToClean, partitionPathToClean ->
Pair.of(partitionPathToClean, planner.getDeletePaths(partitionPathToClean,
earliestInstant)), cleanerParallelism)
.stream()
.collect(Collectors.toMap(Pair::getKey, Pair::getValue));
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java
index 80aa7b31624..d89c876bdfc 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java
@@ -291,8 +291,8 @@ public class CleanPlanner<T, I, K, O> implements
Serializable {
return Pair.of(toDeletePartition, deletePaths);
}
- private Pair<Boolean, List<CleanFileInfo>>
getFilesToCleanKeepingLatestCommits(String partitionPath) {
- return getFilesToCleanKeepingLatestCommits(partitionPath,
config.getCleanerCommitsRetained(), HoodieCleaningPolicy.KEEP_LATEST_COMMITS);
+ private Pair<Boolean, List<CleanFileInfo>>
getFilesToCleanKeepingLatestCommits(String partitionPath, Option<HoodieInstant>
earliestCommitToRetain) {
+ return getFilesToCleanKeepingLatestCommits(partitionPath,
config.getCleanerCommitsRetained(), earliestCommitToRetain,
HoodieCleaningPolicy.KEEP_LATEST_COMMITS);
}
/**
@@ -312,7 +312,8 @@ public class CleanPlanner<T, I, K, O> implements
Serializable {
* @return A {@link Pair} whose left is boolean indicating whether partition
itself needs to be deleted,
* and right is a list of {@link CleanFileInfo} about the files in
the partition that needs to be deleted.
*/
- private Pair<Boolean, List<CleanFileInfo>>
getFilesToCleanKeepingLatestCommits(String partitionPath, int commitsRetained,
HoodieCleaningPolicy policy) {
+ private Pair<Boolean, List<CleanFileInfo>>
getFilesToCleanKeepingLatestCommits(String partitionPath,
+ int commitsRetained, Option<HoodieInstant> earliestCommitToRetain,
HoodieCleaningPolicy policy) {
LOG.info("Cleaning " + partitionPath + ", retaining latest " +
commitsRetained + " commits. ");
List<CleanFileInfo> deletePaths = new ArrayList<>();
@@ -324,10 +325,9 @@ public class CleanPlanner<T, I, K, O> implements
Serializable {
// determine if we have enough commits, to start cleaning.
boolean toDeletePartition = false;
if (commitTimeline.countInstants() > commitsRetained) {
- Option<HoodieInstant> earliestCommitToRetainOption =
getEarliestCommitToRetain();
- HoodieInstant earliestCommitToRetain =
earliestCommitToRetainOption.get();
+ HoodieInstant earliestInstant = earliestCommitToRetain.get();
// all replaced file groups before earliestCommitToRetain are eligible
to clean
- deletePaths.addAll(getReplacedFilesEligibleToClean(savepointedFiles,
partitionPath, earliestCommitToRetainOption));
+ deletePaths.addAll(getReplacedFilesEligibleToClean(savepointedFiles,
partitionPath, earliestCommitToRetain));
// add active files
List<HoodieFileGroup> fileGroups =
fileSystemView.getAllFileGroups(partitionPath).collect(Collectors.toList());
for (HoodieFileGroup fileGroup : fileGroups) {
@@ -339,7 +339,7 @@ public class CleanPlanner<T, I, K, O> implements
Serializable {
String lastVersion = fileSliceList.get(0).getBaseInstantTime();
String lastVersionBeforeEarliestCommitToRetain =
- getLatestVersionBeforeCommit(fileSliceList,
earliestCommitToRetain);
+ getLatestVersionBeforeCommit(fileSliceList, earliestInstant);
// Ensure there are more than 1 version of the file (we only clean old
files from updates)
// i.e., always spare the last commit.
@@ -372,7 +372,7 @@ public class CleanPlanner<T, I, K, O> implements
Serializable {
// Always keep the last commit
if (!isFileSliceNeededForPendingMajorOrMinorCompaction(aSlice) &&
HoodieTimeline
- .compareTimestamps(earliestCommitToRetain.getTimestamp(),
HoodieTimeline.GREATER_THAN, fileCommitTime)) {
+ .compareTimestamps(earliestInstant.getTimestamp(),
HoodieTimeline.GREATER_THAN, fileCommitTime)) {
// this is a commit, that should be cleaned.
aFile.ifPresent(hoodieDataFile -> {
deletePaths.add(new CleanFileInfo(hoodieDataFile.getPath(),
false));
@@ -393,7 +393,7 @@ public class CleanPlanner<T, I, K, O> implements
Serializable {
// mark it to be deleted
if (fileGroups.isEmpty()
&& !hasPendingFiles(partitionPath)
- && noSubsequentReplaceCommit(earliestCommitToRetain.getTimestamp(),
partitionPath)) {
+ && noSubsequentReplaceCommit(earliestInstant.getTimestamp(),
partitionPath)) {
toDeletePartition = true;
}
}
@@ -428,10 +428,11 @@ public class CleanPlanner<T, I, K, O> implements
Serializable {
* This policy gives much more flexibility to users for retaining data for
running incremental queries as compared to
* KEEP_LATEST_COMMITS cleaning policy. The default number of hours is 5.
* @param partitionPath partition path to check
+ * @param earliestCommitToRetain earliest commit to retain
* @return list of files to clean
*/
- private Pair<Boolean, List<CleanFileInfo>>
getFilesToCleanKeepingLatestHours(String partitionPath) {
- return getFilesToCleanKeepingLatestCommits(partitionPath, 0,
HoodieCleaningPolicy.KEEP_LATEST_BY_HOURS);
+ private Pair<Boolean, List<CleanFileInfo>>
getFilesToCleanKeepingLatestHours(String partitionPath, Option<HoodieInstant>
earliestCommitToRetain) {
+ return getFilesToCleanKeepingLatestCommits(partitionPath, 0,
earliestCommitToRetain, HoodieCleaningPolicy.KEEP_LATEST_BY_HOURS);
}
private List<CleanFileInfo> getReplacedFilesEligibleToClean(List<String>
savepointedFiles, String partitionPath, Option<HoodieInstant>
earliestCommitToRetain) {
@@ -485,15 +486,15 @@ public class CleanPlanner<T, I, K, O> implements
Serializable {
/**
* Returns files to be cleaned for the given partitionPath based on cleaning
policy.
*/
- public Pair<Boolean, List<CleanFileInfo>> getDeletePaths(String
partitionPath) {
+ public Pair<Boolean, List<CleanFileInfo>> getDeletePaths(String
partitionPath, Option<HoodieInstant> earliestCommitToRetain) {
HoodieCleaningPolicy policy = config.getCleanerPolicy();
Pair<Boolean, List<CleanFileInfo>> deletePaths;
if (policy == HoodieCleaningPolicy.KEEP_LATEST_COMMITS) {
- deletePaths = getFilesToCleanKeepingLatestCommits(partitionPath);
+ deletePaths = getFilesToCleanKeepingLatestCommits(partitionPath,
earliestCommitToRetain);
} else if (policy == HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS) {
deletePaths = getFilesToCleanKeepingLatestVersions(partitionPath);
} else if (policy == HoodieCleaningPolicy.KEEP_LATEST_BY_HOURS) {
- deletePaths = getFilesToCleanKeepingLatestHours(partitionPath);
+ deletePaths = getFilesToCleanKeepingLatestHours(partitionPath,
earliestCommitToRetain);
} else {
throw new IllegalArgumentException("Unknown cleaning policy : " +
policy.name());
}