This is an automated email from the ASF dual-hosted git repository.
sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new d7c16f56f4f [HUDI-6718] Check Timeline Before Transitioning Inflight
Clean in Multiwriter Scenario (#9468)
d7c16f56f4f is described below
commit d7c16f56f4f9dfa3a160dac459ae11944f922ec8
Author: Jon Vexler <[email protected]>
AuthorDate: Wed Aug 23 22:30:41 2023 -0400
[HUDI-6718] Check Timeline Before Transitioning Inflight Clean in
Multiwriter Scenario (#9468)
- If two cleans start at nearly the same time, they will both attempt to
execute the same clean instances. This does not cause any data corruption, but
will cause a writer to fail when they attempt to create the commit in the
timeline. This is because the commit will have already been written by the
first writer. Now, we check the timeline before transitioning state.
Co-authored-by: Jonathan Vexler <=>
---
.../hudi/table/action/clean/CleanActionExecutor.java | 14 +++++++++++++-
1 file changed, 13 insertions(+), 1 deletion(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanActionExecutor.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanActionExecutor.java
index 05e1056324a..c931e7bce9d 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanActionExecutor.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanActionExecutor.java
@@ -261,8 +261,10 @@ public class CleanActionExecutor<T, I, K, O> extends
BaseActionExecutor<T, I, K,
LOG.info("Finishing previously unfinished cleaner instant=" +
hoodieInstant);
try {
cleanMetadataList.add(runPendingClean(table, hoodieInstant));
+ } catch (HoodieIOException e) {
+ checkIfOtherWriterCommitted(hoodieInstant, e);
} catch (Exception e) {
- LOG.warn("Failed to perform previous clean operation, instant: " +
hoodieInstant, e);
+ LOG.error("Failed to perform previous clean operation, instant: "
+ hoodieInstant, e);
throw e;
}
}
@@ -278,4 +280,14 @@ public class CleanActionExecutor<T, I, K, O> extends
BaseActionExecutor<T, I, K,
// This requires the CleanActionExecutor to be refactored as
BaseCommitActionExecutor
return cleanMetadataList.size() > 0 ?
cleanMetadataList.get(cleanMetadataList.size() - 1) : null;
}
+
+ private void checkIfOtherWriterCommitted(HoodieInstant hoodieInstant,
HoodieIOException e) {
+ table.getMetaClient().reloadActiveTimeline();
+ if
(table.getCleanTimeline().filterCompletedInstants().containsInstant(hoodieInstant.getTimestamp()))
{
+ LOG.warn("Clean operation was completed by another writer for instant: "
+ hoodieInstant);
+ } else {
+ LOG.error("Failed to perform previous clean operation, instant: " +
hoodieInstant, e);
+ throw e;
+ }
+ }
}