This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 9af7f29ceef7 perf(table-services): Only attempt scheduling log 
compaction if number of deltacommits is at least LogCompactionBlocksThreshold 
(#18306)
9af7f29ceef7 is described below

commit 9af7f29ceef7f15e582773789c94e03b32a470be
Author: Krishen <[email protected]>
AuthorDate: Wed Apr 1 11:32:46 2026 -0700

    perf(table-services): Only attempt scheduling log compaction if number of 
deltacommits is at least LogCompactionBlocksThreshold (#18306)
    
    Currently, log compaction is always scheduled whenever the operation type 
is LOG_COMPACT, regardless of how many delta commits have occurred since the 
last log compaction. This leads to unnecessary log compaction scheduling, 
wasting resources when few delta commits (and therefore most likely only a few 
log files/blocks) have accumulated. This is especially helpful for the Metadata 
table with RLI, where all file groups in RLI are any updated with new log 
files/bocks at a roughly equal  [...]
    
    Summary and Changelog
    Changes log compaction scheduling to use the LogCompactionBlocksThreshold 
config as a gating threshold. Instead of unconditionally scheduling log 
compaction, the scheduler now counts delta commits since the last compaction 
and the last log compaction, takes the minimum of the two, and only schedules 
log compaction when that count meets or exceeds the threshold.
    
    Added CompactionUtils.getDeltaCommitsSinceLatestLogCompaction() which 
determines the number of delta commits since the most recent completed log 
compaction by inspecting the raw active timeline (needed because completed log 
compaction instants transition from LOG_COMPACTION_ACTION to 
DELTA_COMMIT_ACTION)
    Added 
ScheduleCompactionActionExecutor.getDeltaCommitInfoSinceLogCompaction() which 
creates a raw active timeline and delegates to the new CompactionUtils method
    Renamed getLatestDeltaCommitInfo() to 
getLatestDeltaCommitInfoSinceCompaction() for clarity
    Updated needCompact() to replace the unconditional return true for 
LOG_COMPACT with threshold-based logic: Math.min(deltaCommitsSinceCompaction, 
deltaCommitsSinceLogCompaction) >= logCompactionBlocksThreshold
    Added unit tests for getDeltaCommitsSinceLatestLogCompaction covering 
completed log compaction, no log compaction, and empty timeline cases
    
    
    
    ---------
    
    Co-authored-by: Krishen Bhan <“[email protected]”>
---
 .../compact/ScheduleCompactionActionExecutor.java  |  65 ++++-
 .../apache/hudi/common/util/CompactionUtils.java   |  65 +++++
 .../org/apache/hudi/sink/TestWriteCopyOnWrite.java |   2 +-
 .../hudi/common/util/TestCompactionUtils.java      | 315 +++++++++++++++++++++
 4 files changed, 431 insertions(+), 16 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/ScheduleCompactionActionExecutor.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/ScheduleCompactionActionExecutor.java
index b3433daa7e5c..298beb15b7e0 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/ScheduleCompactionActionExecutor.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/ScheduleCompactionActionExecutor.java
@@ -24,6 +24,7 @@ import org.apache.hudi.common.engine.HoodieEngineContext;
 import org.apache.hudi.common.model.HoodieTableType;
 import org.apache.hudi.common.model.WriteOperationType;
 import org.apache.hudi.common.table.HoodieTableVersion;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.table.timeline.TimelineUtils;
@@ -138,7 +139,7 @@ public class ScheduleCompactionActionExecutor<T, I, K, O> 
extends BaseTableServi
     return new HoodieCompactionPlan();
   }
 
-  private Option<Pair<Integer, String>> getLatestDeltaCommitInfo() {
+  private Option<Pair<Integer, String>> 
getLatestDeltaCommitInfoSinceCompaction() {
     Option<Pair<HoodieTimeline, HoodieInstant>> deltaCommitsInfo =
         
CompactionUtils.getCompletedDeltaCommitsSinceLatestCompaction(table.getActiveTimeline());
     if (deltaCommitsInfo.isPresent()) {
@@ -160,55 +161,69 @@ public class ScheduleCompactionActionExecutor<T, I, K, O> 
extends BaseTableServi
     return Option.empty();
   }
 
+  private Option<Pair<Integer, String>> 
getLatestDeltaCommitInfoSinceLogCompaction() {
+    HoodieActiveTimeline rawActiveTimeline = 
table.getMetaClient().getTableFormat()
+        .getTimelineFactory().createActiveTimeline(table.getMetaClient(), 
false);
+    Option<Pair<HoodieTimeline, HoodieInstant>> deltaCommitsInfo =
+        CompactionUtils.getDeltaCommitsSinceLatestCompletedLogCompaction(
+            table.getActiveTimeline().getDeltaCommitTimeline(), 
rawActiveTimeline);
+    if (deltaCommitsInfo.isPresent()) {
+      return Option.of(Pair.of(
+          deltaCommitsInfo.get().getLeft().countInstants(),
+          deltaCommitsInfo.get().getRight().requestedTime()));
+    }
+    return Option.empty();
+  }
+
   private boolean needCompact(CompactionTriggerStrategy 
compactionTriggerStrategy) {
     boolean compactable;
     // get deltaCommitsSinceLastCompaction and lastCompactionTs
-    Option<Pair<Integer, String>> latestDeltaCommitInfoOption = 
getLatestDeltaCommitInfo();
-    if (!latestDeltaCommitInfoOption.isPresent()) {
+    Option<Pair<Integer, String>> latestDeltaCommitInfoSinceCompactOption = 
getLatestDeltaCommitInfoSinceCompaction();
+    if (!latestDeltaCommitInfoSinceCompactOption.isPresent()) {
       return false;
     }
-    Pair<Integer, String> latestDeltaCommitInfo = 
latestDeltaCommitInfoOption.get();
+    Pair<Integer, String> latestDeltaCommitInfoSinceCompact = 
latestDeltaCommitInfoSinceCompactOption.get();
     if (WriteOperationType.LOG_COMPACT.equals(operationType)) {
-      return true;
+      return needLogCompact(latestDeltaCommitInfoSinceCompact);
     }
     int inlineCompactDeltaCommitMax = config.getInlineCompactDeltaCommitMax();
     int inlineCompactDeltaSecondsMax = 
config.getInlineCompactDeltaSecondsMax();
     switch (compactionTriggerStrategy) {
       case NUM_COMMITS:
-        compactable = inlineCompactDeltaCommitMax <= 
latestDeltaCommitInfo.getLeft();
+        compactable = inlineCompactDeltaCommitMax <= 
latestDeltaCommitInfoSinceCompact.getLeft();
         if (compactable) {
           log.info("The delta commits >= {}, trigger compaction scheduler.", 
inlineCompactDeltaCommitMax);
         }
         break;
       case NUM_COMMITS_AFTER_LAST_REQUEST:
-        latestDeltaCommitInfoOption = 
getLatestDeltaCommitInfoSinceLastCompactionRequest();
+        latestDeltaCommitInfoSinceCompactOption = 
getLatestDeltaCommitInfoSinceLastCompactionRequest();
 
-        if (!latestDeltaCommitInfoOption.isPresent()) {
+        if (!latestDeltaCommitInfoSinceCompactOption.isPresent()) {
           return false;
         }
-        latestDeltaCommitInfo = latestDeltaCommitInfoOption.get();
-        compactable = inlineCompactDeltaCommitMax <= 
latestDeltaCommitInfo.getLeft();
+        latestDeltaCommitInfoSinceCompact = 
latestDeltaCommitInfoSinceCompactOption.get();
+        compactable = inlineCompactDeltaCommitMax <= 
latestDeltaCommitInfoSinceCompact.getLeft();
         if (compactable) {
           log.info("The delta commits >= {} since the last compaction request, 
trigger compaction scheduler.", inlineCompactDeltaCommitMax);
         }
         break;
       case TIME_ELAPSED:
-        compactable = inlineCompactDeltaSecondsMax <= 
parsedToSeconds(instantTime) - 
parsedToSeconds(latestDeltaCommitInfo.getRight());
+        compactable = inlineCompactDeltaSecondsMax <= 
parsedToSeconds(instantTime) - 
parsedToSeconds(latestDeltaCommitInfoSinceCompact.getRight());
         if (compactable) {
           log.info("The elapsed time >={}s, trigger compaction scheduler.", 
inlineCompactDeltaSecondsMax);
         }
         break;
       case NUM_OR_TIME:
-        compactable = inlineCompactDeltaCommitMax <= 
latestDeltaCommitInfo.getLeft()
-            || inlineCompactDeltaSecondsMax <= parsedToSeconds(instantTime) - 
parsedToSeconds(latestDeltaCommitInfo.getRight());
+        compactable = inlineCompactDeltaCommitMax <= 
latestDeltaCommitInfoSinceCompact.getLeft()
+            || inlineCompactDeltaSecondsMax <= parsedToSeconds(instantTime) - 
parsedToSeconds(latestDeltaCommitInfoSinceCompact.getRight());
         if (compactable) {
           log.info("The delta commits >= {} or elapsed_time >={}s, trigger 
compaction scheduler.", inlineCompactDeltaCommitMax,
               inlineCompactDeltaSecondsMax);
         }
         break;
       case NUM_AND_TIME:
-        compactable = inlineCompactDeltaCommitMax <= 
latestDeltaCommitInfo.getLeft()
-            && inlineCompactDeltaSecondsMax <= parsedToSeconds(instantTime) - 
parsedToSeconds(latestDeltaCommitInfo.getRight());
+        compactable = inlineCompactDeltaCommitMax <= 
latestDeltaCommitInfoSinceCompact.getLeft()
+            && inlineCompactDeltaSecondsMax <= parsedToSeconds(instantTime) - 
parsedToSeconds(latestDeltaCommitInfoSinceCompact.getRight());
         if (compactable) {
           log.info("The delta commits >= {} and elapsed_time >={}s, trigger 
compaction scheduler.", inlineCompactDeltaCommitMax,
               inlineCompactDeltaSecondsMax);
@@ -220,6 +235,26 @@ public class ScheduleCompactionActionExecutor<T, I, K, O> 
extends BaseTableServi
     return compactable;
   }
 
+  /**
+   * Determines whether log compaction should be scheduled based on the number 
of delta commits
+   * since the last compaction and the last log compaction, compared against 
the
+   * {@code hoodie.log.compaction.blocks.threshold} config.
+   */
+  private boolean needLogCompact(Pair<Integer, String> 
latestDeltaCommitInfoSinceCompact) {
+    Option<Pair<Integer, String>> latestDeltaCommitInfoSinceLogCompactOption = 
getLatestDeltaCommitInfoSinceLogCompaction();
+    int numDeltaCommitsSinceLatestCompaction = 
latestDeltaCommitInfoSinceCompact.getLeft();
+    int numDeltaCommitsSinceLatestLogCompaction = 
latestDeltaCommitInfoSinceLogCompactOption.isPresent()
+        ? latestDeltaCommitInfoSinceLogCompactOption.get().getLeft()
+        : 0;
+
+    int numDeltaCommitsSince = Math.min(numDeltaCommitsSinceLatestCompaction, 
numDeltaCommitsSinceLatestLogCompaction);
+    boolean shouldLogCompact = numDeltaCommitsSince >= 
config.getLogCompactionBlocksThreshold();
+    if (shouldLogCompact) {
+      log.info("There have been {} delta commits since last compaction or log 
compaction, triggering log compaction.", numDeltaCommitsSince);
+    }
+    return shouldLogCompact;
+  }
+
   private Long parsedToSeconds(String time) {
     return TimelineUtils.parseDateFromInstantTimeSafely(time).orElseThrow(() 
-> new HoodieCompactionException("Failed to parse timestamp " + time))
             .getTime() / 1000;
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/util/CompactionUtils.java 
b/hudi-common/src/main/java/org/apache/hudi/common/util/CompactionUtils.java
index 0123274b42ef..f1a7b2b77c9d 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/util/CompactionUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/CompactionUtils.java
@@ -34,6 +34,9 @@ import 
org.apache.hudi.common.table.timeline.versioning.compaction.CompactionV2M
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.exception.HoodieException;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.io.IOException;
 import java.io.InputStream;
 import java.util.HashMap;
@@ -49,6 +52,7 @@ import static 
org.apache.hudi.common.table.timeline.TimelineMetadataUtils.deseri
  * Helper class to generate compaction plan from FileGroup/FileSlice 
abstraction.
  */
 public class CompactionUtils {
+  private static final Logger LOG = 
LoggerFactory.getLogger(CompactionUtils.class);
 
   public static final Integer COMPACTION_METADATA_VERSION_1 = 
CompactionV1MigrationHandler.VERSION;
   public static final Integer COMPACTION_METADATA_VERSION_2 = 
CompactionV2MigrationHandler.VERSION;
@@ -363,6 +367,67 @@ public class CompactionUtils {
     }
   }
 
+  /**
+   * Returns the most recent log compaction instant from the raw active 
timeline.
+   *
+   * @param rawActiveTimeline Active timeline of table, that has current and 
previous states of each instant.
+   * @return The latest log compaction instant, or empty if no log compaction 
is present.
+   */
+  public static Option<HoodieInstant> getLastLogCompaction(final 
HoodieActiveTimeline rawActiveTimeline) {
+    Option<HoodieInstant> lastLogCompactionInstantOpt = 
Option.fromJavaOptional(
+        rawActiveTimeline
+            .filterPendingLogCompactionTimeline()
+            .getReverseOrderedInstants()
+            .findFirst()
+    );
+    if (!lastLogCompactionInstantOpt.isPresent()) {
+      return Option.empty();
+    }
+    String logCompactionTimestamp = 
lastLogCompactionInstantOpt.get().requestedTime();
+    Option<HoodieInstant> completedInstantOpt = Option.fromJavaOptional(
+        rawActiveTimeline
+            .findInstantsInClosedRange(logCompactionTimestamp, 
logCompactionTimestamp)
+            .getInstantsAsStream()
+            .filter(HoodieInstant::isCompleted)
+            .findFirst()
+    );
+    return completedInstantOpt.isPresent() ? completedInstantOpt : 
lastLogCompactionInstantOpt;
+  }
+
+  /**
+   * Returns a pair of (timeline containing the delta commits after the latest 
completed
+   * log compaction delta commit, the completed log compaction commit 
instant), if the latest completed
+   * log compaction commit is present; a pair of (timeline containing all the 
delta commits,
+   * the first delta commit instant), if there is no completed log compaction 
commit.
+   *
+   * @param deltaCommitTimeline Active timeline of table that contains only 
delta commits.
+   * @param rawActiveTimeline Active timeline of table, that has current and 
previous states of each instant.
+   * @return Pair of timeline containing delta commits and an instant.
+   */
+  public static Option<Pair<HoodieTimeline, HoodieInstant>> 
getDeltaCommitsSinceLatestCompletedLogCompaction(
+      final HoodieTimeline deltaCommitTimeline,
+      final HoodieActiveTimeline rawActiveTimeline) {
+    Option<HoodieInstant> lastLogCompactionOpt = 
getLastLogCompaction(rawActiveTimeline);
+
+    if (lastLogCompactionOpt.isPresent()) {
+      HoodieInstant lastLogCompaction = lastLogCompactionOpt.get();
+      if (lastLogCompaction.isCompleted()) {
+        return 
Option.of(Pair.of(deltaCommitTimeline.findInstantsModifiedAfterByCompletionTime(
+            lastLogCompaction.requestedTime()), lastLogCompaction));
+      } else {
+        LOG.info("Last log compaction instant {} is in pending state, 
returning empty value.", lastLogCompaction.requestedTime());
+        return Option.empty();
+      }
+    } else {
+      Option<HoodieInstant> firstDeltaCommitInstantOption = 
deltaCommitTimeline.firstInstant();
+      if (firstDeltaCommitInstantOption.isPresent()) {
+        return Option.of(Pair.of(deltaCommitTimeline, 
firstDeltaCommitInstantOption.get()));
+      } else {
+        return Option.empty();
+      }
+    }
+  }
+
   /**
    * Gets the earliest instant to retain for MOR compaction.
    * If there is no completed compaction,
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java
index 9bf56a9dc4d4..a4e869428c24 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java
@@ -961,7 +961,7 @@ public class TestWriteCopyOnWrite extends TestWriteBase {
     if (mdtCompactionEnabled) {
       if (isLogCompaction) {
         
conf.setString(HoodieMetadataConfig.ENABLE_LOG_COMPACTION_ON_METADATA_TABLE.key(),
 "true");
-        
conf.setString(HoodieMetadataConfig.LOG_COMPACT_BLOCKS_THRESHOLD.key(), "2");
+        
conf.setString(HoodieMetadataConfig.LOG_COMPACT_BLOCKS_THRESHOLD.key(), "1");
       } else {
         conf.set(FlinkOptions.METADATA_COMPACTION_DELTA_COMMITS, 1);
       }
diff --git 
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/util/TestCompactionUtils.java
 
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/util/TestCompactionUtils.java
index ee9bb2f37a30..ac7dac4eb429 100644
--- 
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/util/TestCompactionUtils.java
+++ 
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/util/TestCompactionUtils.java
@@ -62,6 +62,7 @@ import static 
org.apache.hudi.common.testutils.HoodieTestUtils.createMetaClient;
 import static 
org.apache.hudi.common.util.CompactionUtils.COMPACTION_METADATA_VERSION_1;
 import static 
org.apache.hudi.common.util.CompactionUtils.LATEST_COMPACTION_METADATA_VERSION;
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
@@ -349,6 +350,320 @@ public class TestCompactionUtils extends 
HoodieCommonTestHarness {
     assertEquals(Option.empty(), 
CompactionUtils.getDeltaCommitsSinceLatestCompaction(timeline));
   }
 
+  @ParameterizedTest
+  @ValueSource(booleans = {true, false})
+  public void testGetDeltaCommitsSinceLatestCompletedLogCompaction(boolean 
hasCompletedLogCompaction) {
+    if (hasCompletedLogCompaction) {
+      // Delta commit timeline: completed delta commits 01-05, 07-08, plus 
inflight 09
+      HoodieActiveTimeline deltaCommitTimeline = new MockHoodieActiveTimeline(
+          Stream.of(
+              
INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.COMPLETED, 
HoodieTimeline.DELTA_COMMIT_ACTION, "01"),
+              
INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.COMPLETED, 
HoodieTimeline.DELTA_COMMIT_ACTION, "02"),
+              
INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.COMPLETED, 
HoodieTimeline.DELTA_COMMIT_ACTION, "03"),
+              
INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.COMPLETED, 
HoodieTimeline.DELTA_COMMIT_ACTION, "04"),
+              
INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.COMPLETED, 
HoodieTimeline.DELTA_COMMIT_ACTION, "05"),
+              
INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.COMPLETED, 
HoodieTimeline.DELTA_COMMIT_ACTION, "07"),
+              
INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.COMPLETED, 
HoodieTimeline.DELTA_COMMIT_ACTION, "08"),
+              INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.INFLIGHT, 
HoodieTimeline.DELTA_COMMIT_ACTION, "09"))
+              .collect(Collectors.toList()));
+
+      // Raw timeline includes a log compaction instant at "05" (inflight) 
plus inflight delta commit "09"
+      HoodieActiveTimeline rawActiveTimeline = new MockHoodieActiveTimeline(
+          Stream.of(
+              
INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.COMPLETED, 
HoodieTimeline.DELTA_COMMIT_ACTION, "01"),
+              
INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.COMPLETED, 
HoodieTimeline.DELTA_COMMIT_ACTION, "02"),
+              
INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.COMPLETED, 
HoodieTimeline.DELTA_COMMIT_ACTION, "03"),
+              
INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.COMPLETED, 
HoodieTimeline.DELTA_COMMIT_ACTION, "04"),
+              
INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.COMPLETED, 
HoodieTimeline.DELTA_COMMIT_ACTION, "05"),
+              INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.INFLIGHT, 
HoodieTimeline.LOG_COMPACTION_ACTION, "05"),
+              
INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.COMPLETED, 
HoodieTimeline.DELTA_COMMIT_ACTION, "07"),
+              
INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.COMPLETED, 
HoodieTimeline.DELTA_COMMIT_ACTION, "08"),
+              INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.INFLIGHT, 
HoodieTimeline.DELTA_COMMIT_ACTION, "09"))
+              .collect(Collectors.toList()));
+
+      Pair<HoodieTimeline, HoodieInstant> actual =
+          CompactionUtils.getDeltaCommitsSinceLatestCompletedLogCompaction(
+              deltaCommitTimeline.getDeltaCommitTimeline(), 
rawActiveTimeline).get();
+      assertEquals(
+          Stream.of(
+                  
INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.COMPLETED, 
HoodieTimeline.DELTA_COMMIT_ACTION, "07"),
+                  
INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.COMPLETED, 
HoodieTimeline.DELTA_COMMIT_ACTION, "08"),
+                  
INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.INFLIGHT, 
HoodieTimeline.DELTA_COMMIT_ACTION, "09"))
+              .collect(Collectors.toList()),
+          actual.getLeft().getInstants());
+      assertEquals(
+          INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.COMPLETED, 
HoodieTimeline.DELTA_COMMIT_ACTION, "05"),
+          actual.getRight());
+    } else {
+      // No log compaction instants: raw timeline is same as delta commit 
timeline
+      HoodieActiveTimeline deltaCommitTimeline = new MockHoodieActiveTimeline(
+          Stream.of(
+              
INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.COMPLETED, 
HoodieTimeline.DELTA_COMMIT_ACTION, "01"),
+              
INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.COMPLETED, 
HoodieTimeline.DELTA_COMMIT_ACTION, "02"),
+              
INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.COMPLETED, 
HoodieTimeline.DELTA_COMMIT_ACTION, "03"),
+              
INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.COMPLETED, 
HoodieTimeline.DELTA_COMMIT_ACTION, "04"),
+              
INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.COMPLETED, 
HoodieTimeline.DELTA_COMMIT_ACTION, "05"),
+              
INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.COMPLETED, 
HoodieTimeline.DELTA_COMMIT_ACTION, "07"),
+              
INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.COMPLETED, 
HoodieTimeline.DELTA_COMMIT_ACTION, "08"),
+              INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.INFLIGHT, 
HoodieTimeline.DELTA_COMMIT_ACTION, "09"))
+              .collect(Collectors.toList()));
+      HoodieActiveTimeline rawActiveTimeline = deltaCommitTimeline;
+
+      Pair<HoodieTimeline, HoodieInstant> actual =
+          CompactionUtils.getDeltaCommitsSinceLatestCompletedLogCompaction(
+              deltaCommitTimeline.getDeltaCommitTimeline(), 
rawActiveTimeline).get();
+      assertEquals(
+          Stream.of(
+                  
INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.COMPLETED, 
HoodieTimeline.DELTA_COMMIT_ACTION, "01"),
+                  
INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.COMPLETED, 
HoodieTimeline.DELTA_COMMIT_ACTION, "02"),
+                  
INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.COMPLETED, 
HoodieTimeline.DELTA_COMMIT_ACTION, "03"),
+                  
INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.COMPLETED, 
HoodieTimeline.DELTA_COMMIT_ACTION, "04"),
+                  
INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.COMPLETED, 
HoodieTimeline.DELTA_COMMIT_ACTION, "05"),
+                  
INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.COMPLETED, 
HoodieTimeline.DELTA_COMMIT_ACTION, "07"),
+                  
INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.COMPLETED, 
HoodieTimeline.DELTA_COMMIT_ACTION, "08"),
+                  
INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.INFLIGHT, 
HoodieTimeline.DELTA_COMMIT_ACTION, "09"))
+              .collect(Collectors.toList()),
+          actual.getLeft().getInstants());
+      assertEquals(
+          INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.COMPLETED, 
HoodieTimeline.DELTA_COMMIT_ACTION, "01"),
+          actual.getRight());
+    }
+  }
+
+  @Test
+  public void 
testGetDeltaCommitsSinceLatestCompletedLogCompactionWithEmptyDeltaCommits() {
+    HoodieActiveTimeline timeline = new MockHoodieActiveTimeline();
+    assertEquals(Option.empty(), 
CompactionUtils.getDeltaCommitsSinceLatestCompletedLogCompaction(
+        timeline.getDeltaCommitTimeline(), timeline));
+  }
+
+  @Test
+  public void testGetLastLogCompactionWithCompletedLogCompaction() {
+    // Raw timeline: completed delta commit at "05" + inflight log compaction 
at "05"
+    HoodieActiveTimeline rawActiveTimeline = new MockHoodieActiveTimeline(
+        Stream.of(
+            INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.COMPLETED, 
HoodieTimeline.DELTA_COMMIT_ACTION, "01"),
+            INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.COMPLETED, 
HoodieTimeline.DELTA_COMMIT_ACTION, "03"),
+            INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.COMPLETED, 
HoodieTimeline.DELTA_COMMIT_ACTION, "05"),
+            INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.INFLIGHT, 
HoodieTimeline.LOG_COMPACTION_ACTION, "05"),
+            INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.COMPLETED, 
HoodieTimeline.DELTA_COMMIT_ACTION, "07"))
+            .collect(Collectors.toList()));
+
+    Option<HoodieInstant> result = 
CompactionUtils.getLastLogCompaction(rawActiveTimeline);
+    assertTrue(result.isPresent());
+    assertTrue(result.get().isCompleted());
+    assertEquals("05", result.get().requestedTime());
+  }
+
+  @Test
+  public void testGetLastLogCompactionWithPendingLogCompaction() {
+    // Raw timeline: inflight log compaction at "05" with no corresponding 
completed delta commit at "05"
+    HoodieActiveTimeline rawActiveTimeline = new MockHoodieActiveTimeline(
+        Stream.of(
+            INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.COMPLETED, 
HoodieTimeline.DELTA_COMMIT_ACTION, "01"),
+            INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.COMPLETED, 
HoodieTimeline.DELTA_COMMIT_ACTION, "03"),
+            INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.INFLIGHT, 
HoodieTimeline.LOG_COMPACTION_ACTION, "05"))
+            .collect(Collectors.toList()));
+
+    Option<HoodieInstant> result = 
CompactionUtils.getLastLogCompaction(rawActiveTimeline);
+    assertTrue(result.isPresent());
+    assertFalse(result.get().isCompleted());
+    assertEquals(HoodieTimeline.LOG_COMPACTION_ACTION, 
result.get().getAction());
+    assertEquals("05", result.get().requestedTime());
+  }
+
+  @Test
+  public void testGetLastLogCompactionWithNoLogCompaction() {
+    HoodieActiveTimeline rawActiveTimeline = new MockHoodieActiveTimeline(
+        Stream.of(
+            INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.COMPLETED, 
HoodieTimeline.DELTA_COMMIT_ACTION, "01"),
+            INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.COMPLETED, 
HoodieTimeline.DELTA_COMMIT_ACTION, "03"))
+            .collect(Collectors.toList()));
+
+    assertEquals(Option.empty(), 
CompactionUtils.getLastLogCompaction(rawActiveTimeline));
+  }
+
+  @Test
+  public void testGetLastLogCompactionWithEmptyTimeline() {
+    HoodieActiveTimeline rawActiveTimeline = new MockHoodieActiveTimeline();
+    assertEquals(Option.empty(), 
CompactionUtils.getLastLogCompaction(rawActiveTimeline));
+  }
+
+  @Test
+  public void 
testGetDeltaCommitsSinceLatestCompletedLogCompactionWithPendingLogCompaction() {
+    // Delta commit timeline: completed delta commits 01-03, plus inflight 05
+    HoodieActiveTimeline deltaCommitTimeline = new MockHoodieActiveTimeline(
+        Stream.of(
+            INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.COMPLETED, 
HoodieTimeline.DELTA_COMMIT_ACTION, "01"),
+            INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.COMPLETED, 
HoodieTimeline.DELTA_COMMIT_ACTION, "02"),
+            INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.COMPLETED, 
HoodieTimeline.DELTA_COMMIT_ACTION, "03"),
+            INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.INFLIGHT, 
HoodieTimeline.DELTA_COMMIT_ACTION, "05"))
+            .collect(Collectors.toList()));
+
+    // Raw timeline: log compaction at "04" is pending (no completed delta 
commit at "04")
+    HoodieActiveTimeline rawActiveTimeline = new MockHoodieActiveTimeline(
+        Stream.of(
+            INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.COMPLETED, 
HoodieTimeline.DELTA_COMMIT_ACTION, "01"),
+            INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.COMPLETED, 
HoodieTimeline.DELTA_COMMIT_ACTION, "02"),
+            INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.COMPLETED, 
HoodieTimeline.DELTA_COMMIT_ACTION, "03"),
+            INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.INFLIGHT, 
HoodieTimeline.LOG_COMPACTION_ACTION, "04"),
+            INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.INFLIGHT, 
HoodieTimeline.DELTA_COMMIT_ACTION, "05"))
+            .collect(Collectors.toList()));
+
+    assertEquals(Option.empty(), 
CompactionUtils.getDeltaCommitsSinceLatestCompletedLogCompaction(
+        deltaCommitTimeline.getDeltaCommitTimeline(), rawActiveTimeline));
+  }
+
+  @Test
+  public void testLogCompactionSchedulingBelowThreshold() {
+    int logCompactionBlocksThreshold = 3;
+    // 2 delta commits since compaction at "05", no prior log compaction
+    HoodieActiveTimeline activeTimeline = new MockHoodieActiveTimeline(
+        Stream.of(
+            INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.COMPLETED, 
HoodieTimeline.COMMIT_ACTION, "05"),
+            INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.COMPLETED, 
HoodieTimeline.DELTA_COMMIT_ACTION, "06"),
+            INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.COMPLETED, 
HoodieTimeline.DELTA_COMMIT_ACTION, "07"))
+            .collect(Collectors.toList()));
+
+    int deltasSinceCompaction = 
CompactionUtils.getCompletedDeltaCommitsSinceLatestCompaction(activeTimeline)
+        .get().getLeft().countInstants();
+    Option<Pair<HoodieTimeline, HoodieInstant>> logCompactionInfo =
+        CompactionUtils.getDeltaCommitsSinceLatestCompletedLogCompaction(
+            activeTimeline.getDeltaCommitTimeline(), activeTimeline);
+    int deltasSinceLogCompaction = logCompactionInfo.isPresent() ? 
logCompactionInfo.get().getLeft().countInstants() : 0;
+
+    int numDeltaCommitsSince = Math.min(deltasSinceCompaction, 
deltasSinceLogCompaction);
+    assertFalse(numDeltaCommitsSince >= logCompactionBlocksThreshold,
+        "Log compaction should not be scheduled when delta commits < 
threshold");
+  }
+
+  @Test
+  public void testLogCompactionSchedulingAtThreshold() {
+    int logCompactionBlocksThreshold = 3;
+    // 3 delta commits since compaction at "05", no prior log compaction
+    HoodieActiveTimeline activeTimeline = new MockHoodieActiveTimeline(
+        Stream.of(
+            INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.COMPLETED, 
HoodieTimeline.COMMIT_ACTION, "05"),
+            INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.COMPLETED, 
HoodieTimeline.DELTA_COMMIT_ACTION, "06"),
+            INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.COMPLETED, 
HoodieTimeline.DELTA_COMMIT_ACTION, "07"),
+            INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.COMPLETED, 
HoodieTimeline.DELTA_COMMIT_ACTION, "08"))
+            .collect(Collectors.toList()));
+
+    int deltasSinceCompaction = 
CompactionUtils.getCompletedDeltaCommitsSinceLatestCompaction(activeTimeline)
+        .get().getLeft().countInstants();
+    Option<Pair<HoodieTimeline, HoodieInstant>> logCompactionInfo =
+        CompactionUtils.getDeltaCommitsSinceLatestCompletedLogCompaction(
+            activeTimeline.getDeltaCommitTimeline(), activeTimeline);
+    int deltasSinceLogCompaction = logCompactionInfo.isPresent() ? 
logCompactionInfo.get().getLeft().countInstants() : 0;
+
+    int numDeltaCommitsSince = Math.min(deltasSinceCompaction, 
deltasSinceLogCompaction);
+    assertTrue(numDeltaCommitsSince >= logCompactionBlocksThreshold,
+        "Log compaction should be scheduled when delta commits >= threshold");
+  }
+
+  @Test
+  public void testLogCompactionSchedulingResetsAfterCompletedLogCompaction() {
+    int logCompactionBlocksThreshold = 3;
+    // 4 delta commits since compaction, but only 1 since completed log 
compaction at "08"
+    HoodieActiveTimeline activeTimeline = new MockHoodieActiveTimeline(
+        Stream.of(
+            INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.COMPLETED, 
HoodieTimeline.COMMIT_ACTION, "05"),
+            INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.COMPLETED, 
HoodieTimeline.DELTA_COMMIT_ACTION, "06"),
+            INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.COMPLETED, 
HoodieTimeline.DELTA_COMMIT_ACTION, "07"),
+            INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.COMPLETED, 
HoodieTimeline.DELTA_COMMIT_ACTION, "08"),
+            INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.COMPLETED, 
HoodieTimeline.DELTA_COMMIT_ACTION, "09"))
+            .collect(Collectors.toList()));
+    HoodieActiveTimeline rawActiveTimeline = new MockHoodieActiveTimeline(
+        Stream.of(
+            INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.COMPLETED, 
HoodieTimeline.COMMIT_ACTION, "05"),
+            INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.COMPLETED, 
HoodieTimeline.DELTA_COMMIT_ACTION, "06"),
+            INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.COMPLETED, 
HoodieTimeline.DELTA_COMMIT_ACTION, "07"),
+            INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.COMPLETED, 
HoodieTimeline.DELTA_COMMIT_ACTION, "08"),
+            INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.INFLIGHT, 
HoodieTimeline.LOG_COMPACTION_ACTION, "08"),
+            INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.COMPLETED, 
HoodieTimeline.DELTA_COMMIT_ACTION, "09"))
+            .collect(Collectors.toList()));
+
+    int deltasSinceCompaction = 
CompactionUtils.getCompletedDeltaCommitsSinceLatestCompaction(activeTimeline)
+        .get().getLeft().countInstants();
+    Option<Pair<HoodieTimeline, HoodieInstant>> logCompactionInfo =
+        CompactionUtils.getDeltaCommitsSinceLatestCompletedLogCompaction(
+            activeTimeline.getDeltaCommitTimeline(), rawActiveTimeline);
+    int deltasSinceLogCompaction = logCompactionInfo.isPresent() ? 
logCompactionInfo.get().getLeft().countInstants() : 0;
+
+    int numDeltaCommitsSince = Math.min(deltasSinceCompaction, 
deltasSinceLogCompaction);
+    assertFalse(numDeltaCommitsSince >= logCompactionBlocksThreshold,
+        "Log compaction should not be scheduled after reset when delta commits 
< threshold");
+  }
+
+  @Test
+  public void testLogCompactionSchedulingWithPendingLogCompaction() {
+    int logCompactionBlocksThreshold = 3;
+    // 5 delta commits since compaction (above threshold), but pending log 
compaction blocks scheduling
+    HoodieActiveTimeline activeTimeline = new MockHoodieActiveTimeline(
+        Stream.of(
+            INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.COMPLETED, 
HoodieTimeline.COMMIT_ACTION, "05"),
+            INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.COMPLETED, 
HoodieTimeline.DELTA_COMMIT_ACTION, "06"),
+            INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.COMPLETED, 
HoodieTimeline.DELTA_COMMIT_ACTION, "07"),
+            INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.COMPLETED, 
HoodieTimeline.DELTA_COMMIT_ACTION, "08"),
+            INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.COMPLETED, 
HoodieTimeline.DELTA_COMMIT_ACTION, "09"),
+            INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.COMPLETED, 
HoodieTimeline.DELTA_COMMIT_ACTION, "10"))
+            .collect(Collectors.toList()));
+    HoodieActiveTimeline rawActiveTimeline = new MockHoodieActiveTimeline(
+        Stream.of(
+            INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.COMPLETED, 
HoodieTimeline.COMMIT_ACTION, "05"),
+            INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.COMPLETED, 
HoodieTimeline.DELTA_COMMIT_ACTION, "06"),
+            INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.COMPLETED, 
HoodieTimeline.DELTA_COMMIT_ACTION, "07"),
+            INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.INFLIGHT, 
HoodieTimeline.LOG_COMPACTION_ACTION, "08"),
+            INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.COMPLETED, 
HoodieTimeline.DELTA_COMMIT_ACTION, "09"),
+            INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.COMPLETED, 
HoodieTimeline.DELTA_COMMIT_ACTION, "10"))
+            .collect(Collectors.toList()));
+
+    int deltasSinceCompaction = 
CompactionUtils.getCompletedDeltaCommitsSinceLatestCompaction(activeTimeline)
+        .get().getLeft().countInstants();
+    Option<Pair<HoodieTimeline, HoodieInstant>> logCompactionInfo =
+        CompactionUtils.getDeltaCommitsSinceLatestCompletedLogCompaction(
+            activeTimeline.getDeltaCommitTimeline(), rawActiveTimeline);
+    int deltasSinceLogCompaction = logCompactionInfo.isPresent() ? 
logCompactionInfo.get().getLeft().countInstants() : 0;
+
+    assertEquals(5, deltasSinceCompaction);
+    assertFalse(logCompactionInfo.isPresent(),
+        "Pending log compaction should return empty");
+    int numDeltaCommitsSince = Math.min(deltasSinceCompaction, 
deltasSinceLogCompaction);
+    assertFalse(numDeltaCommitsSince >= logCompactionBlocksThreshold,
+        "Log compaction should not be scheduled when there is a pending log 
compaction");
+  }
+
+  @Test
+  public void 
testLogCompactionSchedulingWithPendingCompactionAndThresholdNotMet() {
+    int logCompactionBlocksThreshold = 5;
+    // Completed compaction at "03", pending (inflight) compaction at "06",
+    // 2 delta commits ("04", "05") between completed and pending compaction,
+    // plus 2 more ("07", "08") after pending compaction. No prior log 
compaction.
+    // Delta commits since last *completed* compaction = 4, which is below 
threshold of 5.
+    HoodieActiveTimeline activeTimeline = new MockHoodieActiveTimeline(
+        Stream.of(
+            INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.COMPLETED, 
HoodieTimeline.COMMIT_ACTION, "03"),
+            INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.COMPLETED, 
HoodieTimeline.DELTA_COMMIT_ACTION, "04"),
+            INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.COMPLETED, 
HoodieTimeline.DELTA_COMMIT_ACTION, "05"),
+            INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.INFLIGHT, 
HoodieTimeline.COMPACTION_ACTION, "06"),
+            INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.COMPLETED, 
HoodieTimeline.DELTA_COMMIT_ACTION, "07"),
+            INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.COMPLETED, 
HoodieTimeline.DELTA_COMMIT_ACTION, "08"))
+            .collect(Collectors.toList()));
+    HoodieActiveTimeline rawActiveTimeline = activeTimeline;
+
+    int deltasSinceCompaction = 
CompactionUtils.getCompletedDeltaCommitsSinceLatestCompaction(activeTimeline)
+        .get().getLeft().countInstants();
+    Option<Pair<HoodieTimeline, HoodieInstant>> logCompactionInfo =
+        CompactionUtils.getDeltaCommitsSinceLatestCompletedLogCompaction(
+            activeTimeline.getDeltaCommitTimeline(), rawActiveTimeline);
+    int deltasSinceLogCompaction = logCompactionInfo.isPresent() ? 
logCompactionInfo.get().getLeft().countInstants() : 0;
+
+    assertEquals(4, deltasSinceCompaction);
+    int numDeltaCommitsSince = Math.min(deltasSinceCompaction, 
deltasSinceLogCompaction);
+    assertFalse(numDeltaCommitsSince >= logCompactionBlocksThreshold,
+        "Log compaction should not be scheduled with pending compaction and 
delta commits < threshold");
+  }
+
   @ParameterizedTest
   @ValueSource(booleans = {true, false})
   public void testGetOldestInstantToKeepForCompaction(boolean 
hasCompletedCompaction) {


Reply via email to