This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 42243862f02 [HUDI-5505] Fix counting of delta commits since last 
compaction in ScheduleCompactionActionExecutor.getLatestDeltaCommitInfo (#11251)
42243862f02 is described below

commit 42243862f0271fda16e70afdbfde61b47792ff70
Author: Alexander Erofeev <[email protected]>
AuthorDate: Wed May 29 07:24:20 2024 +0700

    [HUDI-5505] Fix counting of delta commits since last compaction in 
ScheduleCompactionActionExecutor.getLatestDeltaCommitInfo (#11251)
---
 .../compact/ScheduleCompactionActionExecutor.java  |  2 +-
 .../table/action/compact/TestHoodieCompactor.java  | 34 +++++++++++++++++++++-
 .../apache/hudi/common/util/CompactionUtils.java   | 15 ++++++++++
 3 files changed, 49 insertions(+), 2 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/ScheduleCompactionActionExecutor.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/ScheduleCompactionActionExecutor.java
index 941d93fd350..5dc0f067a3d 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/ScheduleCompactionActionExecutor.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/ScheduleCompactionActionExecutor.java
@@ -129,7 +129,7 @@ public class ScheduleCompactionActionExecutor<T, I, K, O> 
extends BaseActionExec
 
   private Option<Pair<Integer, String>> getLatestDeltaCommitInfo() {
     Option<Pair<HoodieTimeline, HoodieInstant>> deltaCommitsInfo =
-        
CompactionUtils.getDeltaCommitsSinceLatestCompaction(table.getActiveTimeline());
+        
CompactionUtils.getCompletedDeltaCommitsSinceLatestCompaction(table.getActiveTimeline());
     if (deltaCommitsInfo.isPresent()) {
       return Option.of(Pair.of(
           deltaCommitsInfo.get().getLeft().countInstants(),
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestHoodieCompactor.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestHoodieCompactor.java
index deda77376af..78086a64359 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestHoodieCompactor.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestHoodieCompactor.java
@@ -89,8 +89,12 @@ public class TestHoodieCompactor extends 
HoodieSparkClientTestHarness {
   }
 
   public HoodieWriteConfig getConfig() {
+    return getConfig(1);
+  }
+
+  public HoodieWriteConfig getConfig(int numCommitsBeforeCompaction) {
     return getConfigBuilder()
-        
.withCompactionConfig(HoodieCompactionConfig.newBuilder().withMaxNumDeltaCommitsBeforeCompaction(1).build())
+        
.withCompactionConfig(HoodieCompactionConfig.newBuilder().withMaxNumDeltaCommitsBeforeCompaction(numCommitsBeforeCompaction).build())
         .withMetricsConfig(getMetricsConfig())
         .build();
   }
@@ -181,6 +185,34 @@ public class TestHoodieCompactor extends 
HoodieSparkClientTestHarness {
     }
   }
 
+  @Test
+  public void testNeedCompactionCondition() throws Exception {
+    HoodieWriteConfig config = getConfig(3);
+    try (SparkRDDWriteClient writeClient = getHoodieWriteClient(config)) {
+      // insert 100 records.
+      String newCommitTime = "100";
+      writeClient.startCommitWithTime(newCommitTime);
+
+      // commit 1
+      List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 100);
+      JavaRDD<HoodieRecord> recordsRDD = jsc.parallelize(records, 1);
+      writeClient.insert(recordsRDD, newCommitTime).collect();
+
+      // commit 2
+      updateRecords(config, "101", records);
+
+      // commit 3 (inflight)
+      newCommitTime = "102";
+      writeClient.startCommitWithTime(newCommitTime);
+      metaClient.getActiveTimeline().transitionRequestedToInflight(new 
HoodieInstant(State.REQUESTED,
+          HoodieTimeline.DELTA_COMMIT_ACTION, newCommitTime), Option.empty());
+
+      // check that compaction will not be scheduled
+      String compactionTime = "107";
+      assertFalse(writeClient.scheduleCompactionAtInstant(compactionTime, 
Option.empty()));
+    }
+  }
+
   @Test
   public void testWriteStatusContentsAfterCompaction() throws Exception {
     // insert 100 records
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/util/CompactionUtils.java 
b/hudi-common/src/main/java/org/apache/hudi/common/util/CompactionUtils.java
index 321fdd0fce5..5c2270db919 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/util/CompactionUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/CompactionUtils.java
@@ -274,6 +274,21 @@ public class CompactionUtils {
     return 
metaClient.getActiveTimeline().filterPendingCompactionTimeline().getInstants();
   }
 
+  /**
+   * Returns a pair of (timeline containing the completed delta commits after 
the latest completed
+   * compaction commit, the completed compaction commit instant), if the 
latest completed
+   * compaction commit is present; a pair of (timeline containing all the 
completed delta commits,
+   * the first delta commit instant), if there is no completed compaction 
commit.
+   *
+   * @param activeTimeline Active timeline of a table.
+   * @return Pair of timeline containing delta commits and an instant.
+   */
+  public static Option<Pair<HoodieTimeline, HoodieInstant>> 
getCompletedDeltaCommitsSinceLatestCompaction(
+      HoodieActiveTimeline activeTimeline) {
+    return getDeltaCommitsSinceLatestCompaction(activeTimeline)
+        .map(pair -> Pair.of(pair.getLeft().filterCompletedInstants(), 
pair.getRight()));
+  }
+
   /**
    * Returns a pair of (timeline containing the delta commits after the latest 
completed
    * compaction commit, the completed compaction commit instant), if the 
latest completed

Reply via email to