This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 42243862f02 [HUDI-5505] Fix counting of delta commits since last
compaction in ScheduleCompactionActionExecutor.getLatestDeltaCommitInfo (#11251)
42243862f02 is described below
commit 42243862f0271fda16e70afdbfde61b47792ff70
Author: Alexander Erofeev <[email protected]>
AuthorDate: Wed May 29 07:24:20 2024 +0700
[HUDI-5505] Fix counting of delta commits since last compaction in
ScheduleCompactionActionExecutor.getLatestDeltaCommitInfo (#11251)
---
.../compact/ScheduleCompactionActionExecutor.java | 2 +-
.../table/action/compact/TestHoodieCompactor.java | 34 +++++++++++++++++++++-
.../apache/hudi/common/util/CompactionUtils.java | 15 ++++++++++
3 files changed, 49 insertions(+), 2 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/ScheduleCompactionActionExecutor.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/ScheduleCompactionActionExecutor.java
index 941d93fd350..5dc0f067a3d 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/ScheduleCompactionActionExecutor.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/ScheduleCompactionActionExecutor.java
@@ -129,7 +129,7 @@ public class ScheduleCompactionActionExecutor<T, I, K, O>
extends BaseActionExec
private Option<Pair<Integer, String>> getLatestDeltaCommitInfo() {
Option<Pair<HoodieTimeline, HoodieInstant>> deltaCommitsInfo =
-
CompactionUtils.getDeltaCommitsSinceLatestCompaction(table.getActiveTimeline());
+
CompactionUtils.getCompletedDeltaCommitsSinceLatestCompaction(table.getActiveTimeline());
if (deltaCommitsInfo.isPresent()) {
return Option.of(Pair.of(
deltaCommitsInfo.get().getLeft().countInstants(),
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestHoodieCompactor.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestHoodieCompactor.java
index deda77376af..78086a64359 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestHoodieCompactor.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestHoodieCompactor.java
@@ -89,8 +89,12 @@ public class TestHoodieCompactor extends
HoodieSparkClientTestHarness {
}
public HoodieWriteConfig getConfig() {
+ return getConfig(1);
+ }
+
+ public HoodieWriteConfig getConfig(int numCommitsBeforeCompaction) {
return getConfigBuilder()
-
.withCompactionConfig(HoodieCompactionConfig.newBuilder().withMaxNumDeltaCommitsBeforeCompaction(1).build())
+
.withCompactionConfig(HoodieCompactionConfig.newBuilder().withMaxNumDeltaCommitsBeforeCompaction(numCommitsBeforeCompaction).build())
.withMetricsConfig(getMetricsConfig())
.build();
}
@@ -181,6 +185,34 @@ public class TestHoodieCompactor extends
HoodieSparkClientTestHarness {
}
}
+ @Test
+ public void testNeedCompactionCondition() throws Exception {
+ HoodieWriteConfig config = getConfig(3);
+ try (SparkRDDWriteClient writeClient = getHoodieWriteClient(config)) {
+ // insert 100 records.
+ String newCommitTime = "100";
+ writeClient.startCommitWithTime(newCommitTime);
+
+ // commit 1
+ List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 100);
+ JavaRDD<HoodieRecord> recordsRDD = jsc.parallelize(records, 1);
+ writeClient.insert(recordsRDD, newCommitTime).collect();
+
+ // commit 2
+ updateRecords(config, "101", records);
+
+ // commit 3 (inflight)
+ newCommitTime = "102";
+ writeClient.startCommitWithTime(newCommitTime);
+ metaClient.getActiveTimeline().transitionRequestedToInflight(new
HoodieInstant(State.REQUESTED,
+ HoodieTimeline.DELTA_COMMIT_ACTION, newCommitTime), Option.empty());
+
+ // check that compaction will not be scheduled
+ String compactionTime = "107";
+ assertFalse(writeClient.scheduleCompactionAtInstant(compactionTime,
Option.empty()));
+ }
+ }
+
@Test
public void testWriteStatusContentsAfterCompaction() throws Exception {
// insert 100 records
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/util/CompactionUtils.java
b/hudi-common/src/main/java/org/apache/hudi/common/util/CompactionUtils.java
index 321fdd0fce5..5c2270db919 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/util/CompactionUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/CompactionUtils.java
@@ -274,6 +274,21 @@ public class CompactionUtils {
return
metaClient.getActiveTimeline().filterPendingCompactionTimeline().getInstants();
}
+ /**
+ * Returns a pair of (timeline containing the completed delta commits after
the latest completed
+ * compaction commit, the completed compaction commit instant), if the
latest completed
+ * compaction commit is present; a pair of (timeline containing all the
completed delta commits,
+ * the first delta commit instant), if there is no completed compaction
commit.
+ *
+ * @param activeTimeline Active timeline of a table.
+ * @return Pair of timeline containing delta commits and an instant.
+ */
+ public static Option<Pair<HoodieTimeline, HoodieInstant>>
getCompletedDeltaCommitsSinceLatestCompaction(
+ HoodieActiveTimeline activeTimeline) {
+ return getDeltaCommitsSinceLatestCompaction(activeTimeline)
+ .map(pair -> Pair.of(pair.getLeft().filterCompletedInstants(),
pair.getRight()));
+ }
+
/**
* Returns a pair of (timeline containing the delta commits after the latest
completed
* compaction commit, the completed compaction commit instant), if the
latest completed