This is an automated email from the ASF dual-hosted git repository.
yihua pushed a commit to branch branch-0.x
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/branch-0.x by this push:
new 33e73d9deba [HUDI-7460] Relaxing compaction scheduling when there are
pending delta commits (#10791)
33e73d9deba is described below
commit 33e73d9deba96c45c5cf58a36f87258e0aa3f503
Author: Sivabalan Narayanan <[email protected]>
AuthorDate: Tue May 14 09:58:38 2024 -0700
[HUDI-7460] Relaxing compaction scheduling when there are pending delta
commits (#10791)
---
.../action/compact/ScheduleCompactionActionExecutor.java | 11 ++++++-----
.../apache/hudi/table/action/compact/TestAsyncCompaction.java | 11 +++++++----
2 files changed, 13 insertions(+), 9 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/ScheduleCompactionActionExecutor.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/ScheduleCompactionActionExecutor.java
index d23f12f4762..f529285e29d 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/ScheduleCompactionActionExecutor.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/ScheduleCompactionActionExecutor.java
@@ -93,11 +93,12 @@ public class ScheduleCompactionActionExecutor<T, I, K, O>
extends BaseActionExec
// TODO(yihua): this validation is removed for Java client used by
kafka-connect. Need to revisit this.
if (config.getEngineType() == EngineType.SPARK) {
// if there are inflight writes, their instantTime must not be less
than that of compaction instant time
-
table.getActiveTimeline().getCommitsTimeline().filterPendingExcludingMajorAndMinorCompaction().firstInstant()
- .ifPresent(earliestInflight -> ValidationUtils.checkArgument(
-
HoodieTimeline.compareTimestamps(earliestInflight.getTimestamp(),
HoodieTimeline.GREATER_THAN, instantTime),
- "Earliest write inflight instant time must be later than
compaction time. Earliest :" + earliestInflight
- + ", Compaction scheduled at " + instantTime));
+ Option<HoodieInstant> earliestInflightOpt =
table.getActiveTimeline().getCommitsTimeline().filterPendingExcludingMajorAndMinorCompaction().firstInstant();
+ if (earliestInflightOpt.isPresent() &&
!HoodieTimeline.compareTimestamps(earliestInflightOpt.get().getTimestamp(),
HoodieTimeline.GREATER_THAN, instantTime)) {
+ LOG.warn("Earliest write inflight instant time must be later than
compaction time. Earliest :" + earliestInflightOpt.get()
+ + ", Compaction scheduled at " + instantTime + ". Hence skipping
to schedule compaction");
+ return Option.empty();
+ }
}
// Committed and pending compaction instants should have strictly lower
timestamps
List<HoodieInstant> conflictingInstants = table.getActiveTimeline()
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestAsyncCompaction.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestAsyncCompaction.java
index 44f2db7193c..cf915b4c14a 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestAsyncCompaction.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestAsyncCompaction.java
@@ -26,6 +26,7 @@ import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieInstant.State;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieSparkTable;
import org.apache.hudi.table.HoodieTable;
@@ -42,6 +43,7 @@ import java.util.Set;
import java.util.stream.Collectors;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -246,10 +248,11 @@ public class TestAsyncCompaction extends
CompactionTestBase {
metaClient.getActiveTimeline().filterPendingExcludingCompaction().firstInstant().get();
assertEquals(inflightInstantTime, inflightInstant.getTimestamp(),
"inflight instant has expected instant time");
- assertThrows(IllegalArgumentException.class, () -> {
- // Schedule compaction but do not run them
- scheduleCompaction(compactionInstantTime, client, cfg);
- }, "Earliest ingestion inflight instant time must be later than compaction
time");
+ // since there is a pending delta commit, compaction schedule should not
generate any plan
+ client = getHoodieWriteClient(cfg);
+ client.scheduleCompactionAtInstant(compactionInstantTime, Option.empty());
+ metaClient =
HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(cfg.getBasePath()).build();
+
assertFalse(metaClient.getActiveTimeline().filterPendingCompactionTimeline().lastInstant().isPresent());
}
@Test