nsivabalan commented on code in PR #18306:
URL: https://github.com/apache/hudi/pull/18306#discussion_r2933930511


##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/ScheduleCompactionActionExecutor.java:
##########
@@ -160,55 +161,84 @@ private Option<Pair<Integer, String>> 
getLatestDeltaCommitInfoSinceLastCompactio
     return Option.empty();
   }
 
+  private Option<Pair<Integer, String>> getDeltaCommitInfoSinceLogCompaction() 
{
+    HoodieActiveTimeline rawActiveTimeline = 
table.getMetaClient().getTableFormat()
+        .getTimelineFactory().createActiveTimeline(table.getMetaClient(), 
false);
+    Option<Pair<HoodieTimeline, HoodieInstant>> deltaCommitsInfo =
+        CompactionUtils.getDeltaCommitsSinceLatestLogCompaction(
+            table.getActiveTimeline().getDeltaCommitTimeline(), 
rawActiveTimeline);
+    if (deltaCommitsInfo.isPresent()) {
+      return Option.of(Pair.of(
+          deltaCommitsInfo.get().getLeft().countInstants(),
+          deltaCommitsInfo.get().getRight().requestedTime()));
+    }
+    return Option.empty();
+  }
+
   private boolean needCompact(CompactionTriggerStrategy 
compactionTriggerStrategy) {
     boolean compactable;
     // get deltaCommitsSinceLastCompaction and lastCompactionTs
-    Option<Pair<Integer, String>> latestDeltaCommitInfoOption = 
getLatestDeltaCommitInfo();
-    if (!latestDeltaCommitInfoOption.isPresent()) {
+    Option<Pair<Integer, String>> latestDeltaCommitInfoSinceCompactOption = 
getLatestDeltaCommitInfoSinceCompaction();
+    if (!latestDeltaCommitInfoSinceCompactOption.isPresent()) {
       return false;
     }
-    Pair<Integer, String> latestDeltaCommitInfo = 
latestDeltaCommitInfoOption.get();
+    Pair<Integer, String> latestDeltaCommitInfoSinceCompact = 
latestDeltaCommitInfoSinceCompactOption.get();
     if (WriteOperationType.LOG_COMPACT.equals(operationType)) {
-      return true;
+      // Log compaction schedule is triggered based on 
getLogCompactionBlocksThreshold value.
+      // One deltacommit can create either one or more than one block 
depending on the size of the write batch.
+      // In the worst case it would require approximately equal no. of 
deltacommits to reach the LogCompactionBlocksThreshold value.
+      // Each logcompaction create one or more blocks, and transient failures 
and retries can cause the number of blocks to
+      // exceed the LogCompactionBlocksThreshold value before the next time 
log compaction scheduling is attempted.
+      // As a result, LogCompactionBlocksThreshold is treated as a threshold, 
where if the approximate number of deltacommits
+      // since the last compaction and log compaction meets this threshold, 
then log compaction should be scheduled.
+      Option<Pair<Integer, String>> latestDeltaCommitInfoSinceLogCompactOption 
= getDeltaCommitInfoSinceLogCompaction();
+      int numDeltaCommitsSinceLatestCompaction = 
latestDeltaCommitInfoSinceCompact.getLeft();
+      int numDeltaCommitsSinceLatestLogCompaction = 
latestDeltaCommitInfoSinceLogCompactOption.isPresent()
+          ? latestDeltaCommitInfoSinceLogCompactOption.get().getLeft()
+          : 0;
+
+      int numDeltaCommitsSinceLatestCompactionOrLogCompaction = 
Math.min(numDeltaCommitsSinceLatestCompaction, 
numDeltaCommitsSinceLatestLogCompaction);
+      log.info("There have been {} delta commits since last compaction or log 
compaction.", numDeltaCommitsSinceLatestCompactionOrLogCompaction);

Review Comment:
   lets not add a info logging everytime. 
   atleast if we are looking to return `true` from here, its ok to do info 
logging. 
   if not, it might be noisy



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/ScheduleCompactionActionExecutor.java:
##########
@@ -160,55 +161,84 @@ private Option<Pair<Integer, String>> 
getLatestDeltaCommitInfoSinceLastCompactio
     return Option.empty();
   }
 
+  private Option<Pair<Integer, String>> getDeltaCommitInfoSinceLogCompaction() 
{

Review Comment:
   shouldn't this be `getLatestDeltaCommitInfoSinceLogCompaction`(to align the 
naming w/ existing method) 



##########
hudi-common/src/main/java/org/apache/hudi/common/util/CompactionUtils.java:
##########
@@ -363,6 +368,59 @@ public static Option<Pair<HoodieTimeline, HoodieInstant>> 
getDeltaCommitsSinceLa
     }
   }
 
+  /**
+   * Returns a pair of (timeline containing the delta commits after the latest 
completed
+   * log compaction delta commit, the completed log compaction commit 
instant), if the latest completed
+   * log compaction commit is present; a pair of (timeline containing all the 
delta commits,
+   * the first delta commit instant), if there is no completed log compaction 
commit.
+   *
+   * @param deltaCommitTimeline Active timeline of table that contains only 
delta commits.
+   * @param rawActiveTimeline Active timeline of table, that has current and 
previous states of each instant.
+   * @return Pair of timeline containing delta commits and an instant.
+   */
+  public static Option<Pair<HoodieTimeline, HoodieInstant>> 
getDeltaCommitsSinceLatestLogCompaction(
+      final HoodieTimeline deltaCommitTimeline,
+      final HoodieActiveTimeline rawActiveTimeline) {
+    Option<HoodieInstant> lastLogCompactionInstantOption = 
Option.fromJavaOptional(
+        rawActiveTimeline
+            .filterPendingLogCompactionTimeline()
+            .getReverseOrderedInstants()
+            .findFirst()
+    );
+
+    if (lastLogCompactionInstantOption.isPresent()) {
+      // Search for a corresponding completed delta commit for the latest log 
compact instant observed.
+      // If a delta commit is found, then that means the last compact instant 
was completed.
+      // Otherwise return empty, since that means there is a pending log 
compaction that has not
+      // been executed yet. The latter scenario should not happen in practice, 
as log compaction
+      // scheduling is only done after previous log compact pending instants 
have been executed or
+      // rolled back.
+      String lastLogCompactionTimestamp = 
lastLogCompactionInstantOption.get().requestedTime();
+      Option<HoodieInstant> lastCompletedLogCompactionInstantOption = 
Option.fromJavaOptional(
+          deltaCommitTimeline
+              .filterCompletedInstants()
+              .filter(hoodieInstant -> 
hoodieInstant.requestedTime().equals(lastLogCompactionTimestamp))
+              .getInstantsAsStream()
+              .findFirst()
+      );
+      if (lastCompletedLogCompactionInstantOption.isPresent()) {
+        HoodieInstant lastCompletedLogCompactionInstant = 
lastCompletedLogCompactionInstantOption.get();
+        return Option.of(Pair.of(deltaCommitTimeline.findInstantsAfter(
+            lastCompletedLogCompactionInstant.requestedTime(), 
Integer.MAX_VALUE), lastCompletedLogCompactionInstant));
+      } else {
+        LOG.info("Last log compaction instant {}, is in pending state so 
returning empty value.", lastLogCompactionTimestamp);

Review Comment:
   lets be judicious on info logging. 
   can you confirm we log this occasionally.
   



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/ScheduleCompactionActionExecutor.java:
##########
@@ -160,55 +161,84 @@ private Option<Pair<Integer, String>> 
getLatestDeltaCommitInfoSinceLastCompactio
     return Option.empty();
   }
 
+  private Option<Pair<Integer, String>> getDeltaCommitInfoSinceLogCompaction() 
{
+    HoodieActiveTimeline rawActiveTimeline = 
table.getMetaClient().getTableFormat()
+        .getTimelineFactory().createActiveTimeline(table.getMetaClient(), 
false);
+    Option<Pair<HoodieTimeline, HoodieInstant>> deltaCommitsInfo =
+        CompactionUtils.getDeltaCommitsSinceLatestLogCompaction(
+            table.getActiveTimeline().getDeltaCommitTimeline(), 
rawActiveTimeline);
+    if (deltaCommitsInfo.isPresent()) {
+      return Option.of(Pair.of(
+          deltaCommitsInfo.get().getLeft().countInstants(),
+          deltaCommitsInfo.get().getRight().requestedTime()));
+    }
+    return Option.empty();
+  }
+
   private boolean needCompact(CompactionTriggerStrategy 
compactionTriggerStrategy) {
     boolean compactable;
     // get deltaCommitsSinceLastCompaction and lastCompactionTs
-    Option<Pair<Integer, String>> latestDeltaCommitInfoOption = 
getLatestDeltaCommitInfo();
-    if (!latestDeltaCommitInfoOption.isPresent()) {
+    Option<Pair<Integer, String>> latestDeltaCommitInfoSinceCompactOption = 
getLatestDeltaCommitInfoSinceCompaction();
+    if (!latestDeltaCommitInfoSinceCompactOption.isPresent()) {
       return false;
     }
-    Pair<Integer, String> latestDeltaCommitInfo = 
latestDeltaCommitInfoOption.get();
+    Pair<Integer, String> latestDeltaCommitInfoSinceCompact = 
latestDeltaCommitInfoSinceCompactOption.get();
     if (WriteOperationType.LOG_COMPACT.equals(operationType)) {
-      return true;
+      // Log compaction schedule is triggered based on 
getLogCompactionBlocksThreshold value.
+      // One deltacommit can create either one or more than one block 
depending on the size of the write batch.

Review Comment:
   lets move this to a private method and keep this leaner. 
   



##########
hudi-common/src/main/java/org/apache/hudi/common/util/CompactionUtils.java:
##########
@@ -363,6 +368,59 @@ public static Option<Pair<HoodieTimeline, HoodieInstant>> 
getDeltaCommitsSinceLa
     }
   }
 
+  /**
+   * Returns a pair of (timeline containing the delta commits after the latest 
completed
+   * log compaction delta commit, the completed log compaction commit 
instant), if the latest completed
+   * log compaction commit is present; a pair of (timeline containing all the 
delta commits,
+   * the first delta commit instant), if there is no completed log compaction 
commit.
+   *
+   * @param deltaCommitTimeline Active timeline of table that contains only 
delta commits.
+   * @param rawActiveTimeline Active timeline of table, that has current and 
previous states of each instant.
+   * @return Pair of timeline containing delta commits and an instant.
+   */
+  public static Option<Pair<HoodieTimeline, HoodieInstant>> 
getDeltaCommitsSinceLatestLogCompaction(
+      final HoodieTimeline deltaCommitTimeline,
+      final HoodieActiveTimeline rawActiveTimeline) {
+    Option<HoodieInstant> lastLogCompactionInstantOption = 
Option.fromJavaOptional(
+        rawActiveTimeline
+            .filterPendingLogCompactionTimeline()
+            .getReverseOrderedInstants()
+            .findFirst()
+    );
+
+    if (lastLogCompactionInstantOption.isPresent()) {
+      // Search for a corresponding completed delta commit for the latest log 
compact instant observed.
+      // If a delta commit is found, then that means the last compact instant 
was completed.
+      // Otherwise return empty, since that means there is a pending log 
compaction that has not
+      // been executed yet. The latter scenario should not happen in practice, 
as log compaction
+      // scheduling is only done after previous log compact pending instants 
have been executed or
+      // rolled back.
+      String lastLogCompactionTimestamp = 
lastLogCompactionInstantOption.get().requestedTime();
+      Option<HoodieInstant> lastCompletedLogCompactionInstantOption = 
Option.fromJavaOptional(
+          deltaCommitTimeline
+              .filterCompletedInstants()
+              .filter(hoodieInstant -> 
hoodieInstant.requestedTime().equals(lastLogCompactionTimestamp))
+              .getInstantsAsStream()
+              .findFirst()
+      );
+      if (lastCompletedLogCompactionInstantOption.isPresent()) {
+        HoodieInstant lastCompletedLogCompactionInstant = 
lastCompletedLogCompactionInstantOption.get();
+        return Option.of(Pair.of(deltaCommitTimeline.findInstantsAfter(

Review Comment:
   should we use `findInstantsModifiedAfterByCompletionTime` 



##########
hudi-common/src/main/java/org/apache/hudi/common/util/CompactionUtils.java:
##########
@@ -363,6 +368,59 @@ public static Option<Pair<HoodieTimeline, HoodieInstant>> 
getDeltaCommitsSinceLa
     }
   }
 
+  /**
+   * Returns a pair of (timeline containing the delta commits after the latest 
completed
+   * log compaction delta commit, the completed log compaction commit 
instant), if the latest completed
+   * log compaction commit is present; a pair of (timeline containing all the 
delta commits,
+   * the first delta commit instant), if there is no completed log compaction 
commit.
+   *
+   * @param deltaCommitTimeline Active timeline of table that contains only 
delta commits.
+   * @param rawActiveTimeline Active timeline of table, that has current and 
previous states of each instant.
+   * @return Pair of timeline containing delta commits and an instant.
+   */
+  public static Option<Pair<HoodieTimeline, HoodieInstant>> 
getDeltaCommitsSinceLatestLogCompaction(

Review Comment:
   `getDeltaCommitsSinceLatestCompletedLogCompaction` 



##########
hudi-common/src/main/java/org/apache/hudi/common/util/CompactionUtils.java:
##########
@@ -363,6 +368,59 @@ public static Option<Pair<HoodieTimeline, HoodieInstant>> 
getDeltaCommitsSinceLa
     }
   }
 
+  /**
+   * Returns a pair of (timeline containing the delta commits after the latest 
completed
+   * log compaction delta commit, the completed log compaction commit 
instant), if the latest completed
+   * log compaction commit is present; a pair of (timeline containing all the 
delta commits,
+   * the first delta commit instant), if there is no completed log compaction 
commit.
+   *
+   * @param deltaCommitTimeline Active timeline of table that contains only 
delta commits.
+   * @param rawActiveTimeline Active timeline of table, that has current and 
previous states of each instant.
+   * @return Pair of timeline containing delta commits and an instant.
+   */
+  public static Option<Pair<HoodieTimeline, HoodieInstant>> 
getDeltaCommitsSinceLatestLogCompaction(
+      final HoodieTimeline deltaCommitTimeline,
+      final HoodieActiveTimeline rawActiveTimeline) {
+    Option<HoodieInstant> lastLogCompactionInstantOption = 
Option.fromJavaOptional(
+        rawActiveTimeline
+            .filterPendingLogCompactionTimeline()

Review Comment:
   how about we introduce `filterLogCompactionTimeline()` 
   and then process the latest instant from it. 
   we can avoid the polling the timeline twice right? 
   



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/ScheduleCompactionActionExecutor.java:
##########
@@ -160,55 +161,84 @@ private Option<Pair<Integer, String>> 
getLatestDeltaCommitInfoSinceLastCompactio
     return Option.empty();
   }
 
+  private Option<Pair<Integer, String>> getDeltaCommitInfoSinceLogCompaction() 
{
+    HoodieActiveTimeline rawActiveTimeline = 
table.getMetaClient().getTableFormat()
+        .getTimelineFactory().createActiveTimeline(table.getMetaClient(), 
false);
+    Option<Pair<HoodieTimeline, HoodieInstant>> deltaCommitsInfo =
+        CompactionUtils.getDeltaCommitsSinceLatestLogCompaction(
+            table.getActiveTimeline().getDeltaCommitTimeline(), 
rawActiveTimeline);
+    if (deltaCommitsInfo.isPresent()) {
+      return Option.of(Pair.of(
+          deltaCommitsInfo.get().getLeft().countInstants(),
+          deltaCommitsInfo.get().getRight().requestedTime()));
+    }
+    return Option.empty();
+  }
+
   private boolean needCompact(CompactionTriggerStrategy 
compactionTriggerStrategy) {
     boolean compactable;
     // get deltaCommitsSinceLastCompaction and lastCompactionTs
-    Option<Pair<Integer, String>> latestDeltaCommitInfoOption = 
getLatestDeltaCommitInfo();
-    if (!latestDeltaCommitInfoOption.isPresent()) {
+    Option<Pair<Integer, String>> latestDeltaCommitInfoSinceCompactOption = 
getLatestDeltaCommitInfoSinceCompaction();
+    if (!latestDeltaCommitInfoSinceCompactOption.isPresent()) {
       return false;
     }
-    Pair<Integer, String> latestDeltaCommitInfo = 
latestDeltaCommitInfoOption.get();
+    Pair<Integer, String> latestDeltaCommitInfoSinceCompact = 
latestDeltaCommitInfoSinceCompactOption.get();
     if (WriteOperationType.LOG_COMPACT.equals(operationType)) {
-      return true;
+      // Log compaction schedule is triggered based on 
getLogCompactionBlocksThreshold value.
+      // One deltacommit can create either one or more than one block 
depending on the size of the write batch.
+      // In the worst case it would require approximately equal no. of 
deltacommits to reach the LogCompactionBlocksThreshold value.
+      // Each logcompaction create one or more blocks, and transient failures 
and retries can cause the number of blocks to
+      // exceed the LogCompactionBlocksThreshold value before the next time 
log compaction scheduling is attempted.
+      // As a result, LogCompactionBlocksThreshold is treated as a threshold, 
where if the approximate number of deltacommits
+      // since the last compaction and log compaction meets this threshold, 
then log compaction should be scheduled.
+      Option<Pair<Integer, String>> latestDeltaCommitInfoSinceLogCompactOption 
= getDeltaCommitInfoSinceLogCompaction();
+      int numDeltaCommitsSinceLatestCompaction = 
latestDeltaCommitInfoSinceCompact.getLeft();
+      int numDeltaCommitsSinceLatestLogCompaction = 
latestDeltaCommitInfoSinceLogCompactOption.isPresent()
+          ? latestDeltaCommitInfoSinceLogCompactOption.get().getLeft()
+          : 0;
+
+      int numDeltaCommitsSinceLatestCompactionOrLogCompaction = 
Math.min(numDeltaCommitsSinceLatestCompaction, 
numDeltaCommitsSinceLatestLogCompaction);

Review Comment:
   too big of a name. 
   can we just use `numDeltaCommitSince`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to