This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch branch-0.x
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/branch-0.x by this push:
     new 33e73d9deba [HUDI-7460] Relaxing compaction scheduling when there are 
pending delta commits (#10791)
33e73d9deba is described below

commit 33e73d9deba96c45c5cf58a36f87258e0aa3f503
Author: Sivabalan Narayanan <[email protected]>
AuthorDate: Tue May 14 09:58:38 2024 -0700

    [HUDI-7460] Relaxing compaction scheduling when there are pending delta 
commits (#10791)
---
 .../action/compact/ScheduleCompactionActionExecutor.java      | 11 ++++++-----
 .../apache/hudi/table/action/compact/TestAsyncCompaction.java | 11 +++++++----
 2 files changed, 13 insertions(+), 9 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/ScheduleCompactionActionExecutor.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/ScheduleCompactionActionExecutor.java
index d23f12f4762..f529285e29d 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/ScheduleCompactionActionExecutor.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/ScheduleCompactionActionExecutor.java
@@ -93,11 +93,12 @@ public class ScheduleCompactionActionExecutor<T, I, K, O> 
extends BaseActionExec
       // TODO(yihua): this validation is removed for Java client used by 
kafka-connect.  Need to revisit this.
       if (config.getEngineType() == EngineType.SPARK) {
         // if there are inflight writes, their instantTime must not be less 
than that of compaction instant time
-        
table.getActiveTimeline().getCommitsTimeline().filterPendingExcludingMajorAndMinorCompaction().firstInstant()
-            .ifPresent(earliestInflight -> ValidationUtils.checkArgument(
-                
HoodieTimeline.compareTimestamps(earliestInflight.getTimestamp(), 
HoodieTimeline.GREATER_THAN, instantTime),
-                "Earliest write inflight instant time must be later than 
compaction time. Earliest :" + earliestInflight
-                    + ", Compaction scheduled at " + instantTime));
+        Option<HoodieInstant> earliestInflightOpt = 
table.getActiveTimeline().getCommitsTimeline().filterPendingExcludingMajorAndMinorCompaction().firstInstant();
+        if (earliestInflightOpt.isPresent() && 
!HoodieTimeline.compareTimestamps(earliestInflightOpt.get().getTimestamp(), 
HoodieTimeline.GREATER_THAN, instantTime)) {
+          LOG.warn("Earliest write inflight instant time must be later than 
compaction time. Earliest :" + earliestInflightOpt.get()
+              + ", Compaction scheduled at " + instantTime + ". Hence skipping 
to schedule compaction");
+          return Option.empty();
+        }
       }
       // Committed and pending compaction instants should have strictly lower 
timestamps
       List<HoodieInstant> conflictingInstants = table.getActiveTimeline()
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestAsyncCompaction.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestAsyncCompaction.java
index 44f2db7193c..cf915b4c14a 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestAsyncCompaction.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestAsyncCompaction.java
@@ -26,6 +26,7 @@ import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.HoodieInstant.State;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.util.Option;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.table.HoodieSparkTable;
 import org.apache.hudi.table.HoodieTable;
@@ -42,6 +43,7 @@ import java.util.Set;
 import java.util.stream.Collectors;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
@@ -246,10 +248,11 @@ public class TestAsyncCompaction extends 
CompactionTestBase {
         
metaClient.getActiveTimeline().filterPendingExcludingCompaction().firstInstant().get();
     assertEquals(inflightInstantTime, inflightInstant.getTimestamp(), 
"inflight instant has expected instant time");
 
-    assertThrows(IllegalArgumentException.class, () -> {
-      // Schedule compaction but do not run them
-      scheduleCompaction(compactionInstantTime, client, cfg);
-    }, "Earliest ingestion inflight instant time must be later than compaction 
time");
+    // since there is a pending delta commit, compaction schedule should not 
generate any plan
+    client = getHoodieWriteClient(cfg);
+    client.scheduleCompactionAtInstant(compactionInstantTime, Option.empty());
+    metaClient = 
HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(cfg.getBasePath()).build();
+    
assertFalse(metaClient.getActiveTimeline().filterPendingCompactionTimeline().lastInstant().isPresent());
   }
 
   @Test

Reply via email to