This is an automated email from the ASF dual-hosted git repository.
biyan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new fdce2b80c8 [HUDI-5279] move logic for deleting active instant to
HoodieActiveTimeline (#7196)
fdce2b80c8 is described below
commit fdce2b80c8aec49be0e3506b7120283861eafa6e
Author: Yann Byron <[email protected]>
AuthorDate: Wed Nov 30 18:11:23 2022 +0800
[HUDI-5279] move logic for deleting active instant to HoodieActiveTimeline
(#7196)
---
.../apache/hudi/client/HoodieTimelineArchiver.java | 40 ++++++++++++----------
.../table/timeline/HoodieActiveTimeline.java | 14 +++++---
2 files changed, 30 insertions(+), 24 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java
index bb814f817d..a61a5c9008 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java
@@ -537,15 +537,14 @@ public class HoodieTimelineArchiver<T extends
HoodieAvroPayload, I, K, O> {
private boolean deleteArchivedInstants(List<HoodieInstant> archivedInstants,
HoodieEngineContext context) throws IOException {
LOG.info("Deleting instants " + archivedInstants);
- List<String> pendingInstantFiles = new ArrayList<>();
- List<String> completedInstantFiles = new ArrayList<>();
+ List<HoodieInstant> pendingInstants = new ArrayList<>();
+ List<HoodieInstant> completedInstants = new ArrayList<>();
for (HoodieInstant instant : archivedInstants) {
- String filePath = new Path(metaClient.getMetaPath(),
instant.getFileName()).toString();
if (instant.isCompleted()) {
- completedInstantFiles.add(filePath);
+ completedInstants.add(instant);
} else {
- pendingInstantFiles.add(filePath);
+ pendingInstants.add(instant);
}
}
@@ -556,27 +555,30 @@ public class HoodieTimelineArchiver<T extends
HoodieAvroPayload, I, K, O> {
// other monitors on the timeline(such as the compaction or clustering
services) would
// mistakenly recognize the pending file as a pending operation,
// then all kinds of weird bugs occur.
- boolean success = deleteArchivedInstantFiles(context, true,
pendingInstantFiles);
- success &= deleteArchivedInstantFiles(context, success,
completedInstantFiles);
+ HoodieActiveTimeline activeTimeline = metaClient.getActiveTimeline();
+ if (!pendingInstants.isEmpty()) {
+ context.foreach(
+ pendingInstants,
+ instant -> activeTimeline.deleteInstantFileIfExists(instant),
+ Math.min(pendingInstants.size(),
config.getArchiveDeleteParallelism())
+ );
+ }
+ if (!completedInstants.isEmpty()) {
+ context.foreach(
+ completedInstants,
+ instant -> activeTimeline.deleteInstantFileIfExists(instant),
+ Math.min(completedInstants.size(),
config.getArchiveDeleteParallelism())
+ );
+ }
// Remove older meta-data from auxiliary path too
Option<HoodieInstant> latestCommitted =
Option.fromJavaOptional(archivedInstants.stream().filter(i -> i.isCompleted()
&& (i.getAction().equals(HoodieTimeline.COMMIT_ACTION)
||
(i.getAction().equals(HoodieTimeline.DELTA_COMMIT_ACTION)))).max(Comparator.comparing(HoodieInstant::getTimestamp)));
LOG.info("Latest Committed Instant=" + latestCommitted);
if (latestCommitted.isPresent()) {
- success &=
deleteAllInstantsOlderOrEqualsInAuxMetaFolder(latestCommitted.get());
+ return
deleteAllInstantsOlderOrEqualsInAuxMetaFolder(latestCommitted.get());
}
- return success;
- }
-
- private boolean deleteArchivedInstantFiles(HoodieEngineContext context,
boolean success, List<String> files) {
- Map<String, Boolean> resultDeleteInstantFiles =
deleteFilesParallelize(metaClient, files, context, false);
-
- for (Map.Entry<String, Boolean> result :
resultDeleteInstantFiles.entrySet()) {
- LOG.info("Archived and deleted instant file " + result.getKey() + " : "
+ result.getValue());
- success &= result.getValue();
- }
- return success;
+ return true;
}
/**
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java
index be351ab8e8..0ef46031ec 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java
@@ -266,22 +266,26 @@ public class HoodieActiveTimeline extends
HoodieDefaultTimeline {
deleteInstantFile(instant);
}
+ /**
+ * Note: This method should only be used in the case that delete
requested/inflight instant or empty clean instant,
+ * and completed commit instant in an archive operation.
+ */
public void deleteInstantFileIfExists(HoodieInstant instant) {
LOG.info("Deleting instant " + instant);
- Path inFlightCommitFilePath =
getInstantFileNamePath(instant.getFileName());
+ Path commitFilePath = getInstantFileNamePath(instant.getFileName());
try {
- if (metaClient.getFs().exists(inFlightCommitFilePath)) {
- boolean result = metaClient.getFs().delete(inFlightCommitFilePath,
false);
+ if (metaClient.getFs().exists(commitFilePath)) {
+ boolean result = metaClient.getFs().delete(commitFilePath, false);
if (result) {
LOG.info("Removed instant " + instant);
} else {
throw new HoodieIOException("Could not delete instant " + instant);
}
} else {
- LOG.warn("The commit " + inFlightCommitFilePath + " to remove does not
exist");
+ LOG.warn("The commit " + commitFilePath + " to remove does not exist");
}
} catch (IOException e) {
- throw new HoodieIOException("Could not remove inflight commit " +
inFlightCommitFilePath, e);
+ throw new HoodieIOException("Could not remove commit " + commitFilePath,
e);
}
}