[
https://issues.apache.org/jira/browse/HUDI-4736?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Raymond Xu updated HUDI-4736:
-----------------------------
Sprint: 2022/08/22, 2022/09/05 (was: 2022/08/22)
> Fix inflight clean action preventing clean service to continue when multiple
> cleans are not allowed
> ---------------------------------------------------------------------------------------------------
>
> Key: HUDI-4736
> URL: https://issues.apache.org/jira/browse/HUDI-4736
> Project: Apache Hudi
> Issue Type: Bug
> Components: cleaning
> Affects Versions: 0.11.0, 0.11.1
> Reporter: Ethan Guo
> Assignee: Ethan Guo
> Priority: Blocker
> Labels: pull-request-available
> Fix For: 0.12.1
>
>
> For Hudi Deltastreamer async cleaning, when the Spark job fails in the middle
> of the cleaning, leaving the clean instant inflight, the Spark job retried
> next time may not resume the inflight clean action if
> `hoodie.clean.allow.multiple` is `false`, i.e., multiple clean schedules are
> disabled. This is due to a bug in the code below.
>
> Relevant logic in BaseHoodieWriteClient:
> {code:java}
> public HoodieCleanMetadata clean(String cleanInstantTime, boolean
> scheduleInline, boolean skipLocking) throws HoodieIOException {
> if (!tableServicesEnabled(config)) {
> return null;
> }
> final Timer.Context timerContext = metrics.getCleanCtx();
> CleanerUtils.rollbackFailedWrites(config.getFailedWritesCleanPolicy(),
> HoodieTimeline.CLEAN_ACTION, () -> rollbackFailedWrites(skipLocking));
> HoodieCleanMetadata metadata = null;
> HoodieTable table = createTable(config, hadoopConf);
> if (config.allowMultipleCleans() ||
> !table.getActiveTimeline().getCleanerTimeline().filterInflightsAndRequested().firstInstant().isPresent())
> {
> LOG.info("Cleaner started");
> // proceed only if multiple clean schedules are enabled or if there are
> no pending cleans.
> if (scheduleInline) {
> scheduleTableServiceInternal(cleanInstantTime, Option.empty(),
> TableServiceType.CLEAN);
> table.getMetaClient().reloadActiveTimeline();
> }
> metadata = table.clean(context, cleanInstantTime, skipLocking);
> if (timerContext != null && metadata != null) {
> long durationMs = metrics.getDurationInMs(timerContext.stop());
> metrics.updateCleanMetrics(durationMs, metadata.getTotalFilesDeleted());
> LOG.info("Cleaned " + metadata.getTotalFilesDeleted() + " files"
> + " Earliest Retained Instant :" +
> metadata.getEarliestCommitToRetain()
> + " cleanerElapsedMs" + durationMs);
> }
> }
> return metadata;
> } {code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)