kbuci commented on code in PR #18337:
URL: https://github.com/apache/hudi/pull/18337#discussion_r2955267325


##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanActionExecutor.java:
##########
@@ -214,15 +238,52 @@ protected Option<HoodieCleanerPlan> requestClean() {
     }
     final HoodieCleanerPlan cleanerPlan = requestClean(cleanerEngineContext);
     Option<HoodieCleanerPlan> option = Option.empty();
-    if (nonEmpty(cleanerPlan.getFilePathsToBeDeletedPerPartition())
-        && 
cleanerPlan.getFilePathsToBeDeletedPerPartition().values().stream().mapToInt(List::size).sum()
 > 0) {
+    if ((cleanerPlan.getPartitionsToBeDeleted() != null && 
!cleanerPlan.getPartitionsToBeDeleted().isEmpty())
+        || (nonEmpty(cleanerPlan.getFilePathsToBeDeletedPerPartition())
+        && 
cleanerPlan.getFilePathsToBeDeletedPerPartition().values().stream().mapToInt(List::size).sum()
 > 0)) {
       // Only create cleaner plan which does some work
       option = Option.of(cleanerPlan);
     }
+    // If cleaner plan returned an empty list, incremental clean is enabled 
and there was no
+    // completed clean created in the last X hours configured in 
MAX_DURATION_TO_CREATE_EMPTY_CLEAN,
+    // create a dummy clean to avoid full scan in the future.
+    // Note: For a dataset with incremental clean enabled, that does not 
receive any updates, cleaner plan always comes
+    // with an empty list of files to be cleaned.  CleanActionExecutor would 
never be invoked for this dataset.
+    // To avoid fullscan on the dataset with every ingestion run, empty clean 
commit is created here.
+    if (config.incrementalCleanerModeEnabled() && 
cleanerPlan.getEarliestInstantToRetain() != null && 
config.maxDurationToCreateEmptyCleanMs() > 0) {
+      // Only create an empty clean commit if earliestInstantToRetain is 
present in the plan
+      boolean eligibleForEmptyCleanCommit = true;
 
+      // if there is no previous clean instant or the previous clean instant 
was before the configured max duration, schedule an empty clean commit
+      Option<HoodieInstant> lastCleanInstant = 
table.getCleanTimeline().lastInstant();
+      if (lastCleanInstant.isPresent()) {
+        try {
+          ZonedDateTime latestDateTime = 
ZonedDateTime.ofInstant(java.time.Instant.now(), 
table.getMetaClient().getTableConfig().getTimelineTimezone().getZoneId());
+          long currentCleanTimeMs = latestDateTime.toInstant().toEpochMilli();
+          long lastCleanTimeMs = 
HoodieInstantTimeGenerator.parseDateFromInstantTime(lastCleanInstant.get().requestedTime()).toInstant().toEpochMilli();
+          eligibleForEmptyCleanCommit = currentCleanTimeMs - lastCleanTimeMs > 
config.maxDurationToCreateEmptyCleanMs();
+        } catch (ParseException e) {
+          log.warn("Unable to parse last clean commit time", e);
+        }
+      }
+      if (eligibleForEmptyCleanCommit) {
+        log.warn("Creating an empty clean instant with earliestCommitToRetain 
of {}", cleanerPlan.getEarliestInstantToRetain().getTimestamp());
+        return Option.of(cleanerPlan);

Review Comment:
   Are we ensuring that the ECTR of the new empty clean is always >= than ECTR 
of latest clean? 
   For context, in our org's internal impl we initially had a bug where
   1. `T1.clean` is completed
   2.  cleaner commits retained is increased
   3. empty clean `T2.clean` is completed, but with an ECTR that is before T1's 
ECTR
   
   so wanted to make sure this wasn't an issue here as well



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanActionExecutor.java:
##########
@@ -214,15 +238,52 @@ protected Option<HoodieCleanerPlan> requestClean() {
     }
     final HoodieCleanerPlan cleanerPlan = requestClean(cleanerEngineContext);
     Option<HoodieCleanerPlan> option = Option.empty();
-    if (nonEmpty(cleanerPlan.getFilePathsToBeDeletedPerPartition())
-        && 
cleanerPlan.getFilePathsToBeDeletedPerPartition().values().stream().mapToInt(List::size).sum()
 > 0) {
+    if ((cleanerPlan.getPartitionsToBeDeleted() != null && 
!cleanerPlan.getPartitionsToBeDeleted().isEmpty())
+        || (nonEmpty(cleanerPlan.getFilePathsToBeDeletedPerPartition())
+        && 
cleanerPlan.getFilePathsToBeDeletedPerPartition().values().stream().mapToInt(List::size).sum()
 > 0)) {
       // Only create cleaner plan which does some work
       option = Option.of(cleanerPlan);
     }
+    // If cleaner plan returned an empty list, incremental clean is enabled 
and there was no
+    // completed clean created in the last X hours configured in 
MAX_DURATION_TO_CREATE_EMPTY_CLEAN,
+    // create a dummy clean to avoid full scan in the future.
+    // Note: For a dataset with incremental clean enabled, that does not 
receive any updates, cleaner plan always comes
+    // with an empty list of files to be cleaned.  CleanActionExecutor would 
never be invoked for this dataset.
+    // To avoid fullscan on the dataset with every ingestion run, empty clean 
commit is created here.
+    if (config.incrementalCleanerModeEnabled() && 
cleanerPlan.getEarliestInstantToRetain() != null && 
config.maxDurationToCreateEmptyCleanMs() > 0) {
+      // Only create an empty clean commit if earliestInstantToRetain is 
present in the plan
+      boolean eligibleForEmptyCleanCommit = true;
 
+      // if there is no previous clean instant or the previous clean instant 
was before the configured max duration, schedule an empty clean commit
+      Option<HoodieInstant> lastCleanInstant = 
table.getCleanTimeline().lastInstant();
+      if (lastCleanInstant.isPresent()) {
+        try {
+          ZonedDateTime latestDateTime = 
ZonedDateTime.ofInstant(java.time.Instant.now(), 
table.getMetaClient().getTableConfig().getTimelineTimezone().getZoneId());
+          long currentCleanTimeMs = latestDateTime.toInstant().toEpochMilli();
+          long lastCleanTimeMs = 
HoodieInstantTimeGenerator.parseDateFromInstantTime(lastCleanInstant.get().requestedTime()).toInstant().toEpochMilli();
+          eligibleForEmptyCleanCommit = currentCleanTimeMs - lastCleanTimeMs > 
config.maxDurationToCreateEmptyCleanMs();
+        } catch (ParseException e) {
+          log.warn("Unable to parse last clean commit time", e);

Review Comment:
   Shouldn't we let the exception get re-thrown , since the user is expecting 
that an empty clean will be attempted to be created



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to