This is an automated email from the ASF dual-hosted git repository.
sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 9af7f29ceef7 perf(table-services): Only attempt scheduling log
compaction if number of deltacommits is at least LogCompactionBlocksThreshold
(#18306)
9af7f29ceef7 is described below
commit 9af7f29ceef7f15e582773789c94e03b32a470be
Author: Krishen <[email protected]>
AuthorDate: Wed Apr 1 11:32:46 2026 -0700
perf(table-services): Only attempt scheduling log compaction if number of
deltacommits is at least LogCompactionBlocksThreshold (#18306)
Currently, log compaction is always scheduled whenever the operation type
is LOG_COMPACT, regardless of how many delta commits have occurred since the
last log compaction. This leads to unnecessary log compaction scheduling,
wasting resources when few delta commits (and therefore most likely only a few
log files/blocks) have accumulated. This is especially helpful for the Metadata
table with RLI, where all file groups in RLI are any updated with new log
files/bocks at a roughly equal [...]
Summary and Changelog
Changes log compaction scheduling to use the LogCompactionBlocksThreshold
config as a gating threshold. Instead of unconditionally scheduling log
compaction, the scheduler now counts delta commits since the last compaction
and the last log compaction, takes the minimum of the two, and only schedules
log compaction when that count meets or exceeds the threshold.
Added CompactionUtils.getDeltaCommitsSinceLatestLogCompaction() which
determines the number of delta commits since the most recent completed log
compaction by inspecting the raw active timeline (needed because completed log
compaction instants transition from LOG_COMPACTION_ACTION to
DELTA_COMMIT_ACTION)
Added
ScheduleCompactionActionExecutor.getDeltaCommitInfoSinceLogCompaction() which
creates a raw active timeline and delegates to the new CompactionUtils method
Renamed getLatestDeltaCommitInfo() to
getLatestDeltaCommitInfoSinceCompaction() for clarity
Updated needCompact() to replace the unconditional return true for
LOG_COMPACT with threshold-based logic: Math.min(deltaCommitsSinceCompaction,
deltaCommitsSinceLogCompaction) >= logCompactionBlocksThreshold
Added unit tests for getDeltaCommitsSinceLatestLogCompaction covering
completed log compaction, no log compaction, and empty timeline cases
---------
Co-authored-by: Krishen Bhan <“[email protected]”>
---
.../compact/ScheduleCompactionActionExecutor.java | 65 ++++-
.../apache/hudi/common/util/CompactionUtils.java | 65 +++++
.../org/apache/hudi/sink/TestWriteCopyOnWrite.java | 2 +-
.../hudi/common/util/TestCompactionUtils.java | 315 +++++++++++++++++++++
4 files changed, 431 insertions(+), 16 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/ScheduleCompactionActionExecutor.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/ScheduleCompactionActionExecutor.java
index b3433daa7e5c..298beb15b7e0 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/ScheduleCompactionActionExecutor.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/ScheduleCompactionActionExecutor.java
@@ -24,6 +24,7 @@ import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.HoodieTableVersion;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.TimelineUtils;
@@ -138,7 +139,7 @@ public class ScheduleCompactionActionExecutor<T, I, K, O>
extends BaseTableServi
return new HoodieCompactionPlan();
}
- private Option<Pair<Integer, String>> getLatestDeltaCommitInfo() {
+ private Option<Pair<Integer, String>>
getLatestDeltaCommitInfoSinceCompaction() {
Option<Pair<HoodieTimeline, HoodieInstant>> deltaCommitsInfo =
CompactionUtils.getCompletedDeltaCommitsSinceLatestCompaction(table.getActiveTimeline());
if (deltaCommitsInfo.isPresent()) {
@@ -160,55 +161,69 @@ public class ScheduleCompactionActionExecutor<T, I, K, O>
extends BaseTableServi
return Option.empty();
}
+ private Option<Pair<Integer, String>>
getLatestDeltaCommitInfoSinceLogCompaction() {
+ HoodieActiveTimeline rawActiveTimeline =
table.getMetaClient().getTableFormat()
+ .getTimelineFactory().createActiveTimeline(table.getMetaClient(),
false);
+ Option<Pair<HoodieTimeline, HoodieInstant>> deltaCommitsInfo =
+ CompactionUtils.getDeltaCommitsSinceLatestCompletedLogCompaction(
+ table.getActiveTimeline().getDeltaCommitTimeline(),
rawActiveTimeline);
+ if (deltaCommitsInfo.isPresent()) {
+ return Option.of(Pair.of(
+ deltaCommitsInfo.get().getLeft().countInstants(),
+ deltaCommitsInfo.get().getRight().requestedTime()));
+ }
+ return Option.empty();
+ }
+
private boolean needCompact(CompactionTriggerStrategy
compactionTriggerStrategy) {
boolean compactable;
// get deltaCommitsSinceLastCompaction and lastCompactionTs
- Option<Pair<Integer, String>> latestDeltaCommitInfoOption =
getLatestDeltaCommitInfo();
- if (!latestDeltaCommitInfoOption.isPresent()) {
+ Option<Pair<Integer, String>> latestDeltaCommitInfoSinceCompactOption =
getLatestDeltaCommitInfoSinceCompaction();
+ if (!latestDeltaCommitInfoSinceCompactOption.isPresent()) {
return false;
}
- Pair<Integer, String> latestDeltaCommitInfo =
latestDeltaCommitInfoOption.get();
+ Pair<Integer, String> latestDeltaCommitInfoSinceCompact =
latestDeltaCommitInfoSinceCompactOption.get();
if (WriteOperationType.LOG_COMPACT.equals(operationType)) {
- return true;
+ return needLogCompact(latestDeltaCommitInfoSinceCompact);
}
int inlineCompactDeltaCommitMax = config.getInlineCompactDeltaCommitMax();
int inlineCompactDeltaSecondsMax =
config.getInlineCompactDeltaSecondsMax();
switch (compactionTriggerStrategy) {
case NUM_COMMITS:
- compactable = inlineCompactDeltaCommitMax <=
latestDeltaCommitInfo.getLeft();
+ compactable = inlineCompactDeltaCommitMax <=
latestDeltaCommitInfoSinceCompact.getLeft();
if (compactable) {
log.info("The delta commits >= {}, trigger compaction scheduler.",
inlineCompactDeltaCommitMax);
}
break;
case NUM_COMMITS_AFTER_LAST_REQUEST:
- latestDeltaCommitInfoOption =
getLatestDeltaCommitInfoSinceLastCompactionRequest();
+ latestDeltaCommitInfoSinceCompactOption =
getLatestDeltaCommitInfoSinceLastCompactionRequest();
- if (!latestDeltaCommitInfoOption.isPresent()) {
+ if (!latestDeltaCommitInfoSinceCompactOption.isPresent()) {
return false;
}
- latestDeltaCommitInfo = latestDeltaCommitInfoOption.get();
- compactable = inlineCompactDeltaCommitMax <=
latestDeltaCommitInfo.getLeft();
+ latestDeltaCommitInfoSinceCompact =
latestDeltaCommitInfoSinceCompactOption.get();
+ compactable = inlineCompactDeltaCommitMax <=
latestDeltaCommitInfoSinceCompact.getLeft();
if (compactable) {
log.info("The delta commits >= {} since the last compaction request,
trigger compaction scheduler.", inlineCompactDeltaCommitMax);
}
break;
case TIME_ELAPSED:
- compactable = inlineCompactDeltaSecondsMax <=
parsedToSeconds(instantTime) -
parsedToSeconds(latestDeltaCommitInfo.getRight());
+ compactable = inlineCompactDeltaSecondsMax <=
parsedToSeconds(instantTime) -
parsedToSeconds(latestDeltaCommitInfoSinceCompact.getRight());
if (compactable) {
log.info("The elapsed time >={}s, trigger compaction scheduler.",
inlineCompactDeltaSecondsMax);
}
break;
case NUM_OR_TIME:
- compactable = inlineCompactDeltaCommitMax <=
latestDeltaCommitInfo.getLeft()
- || inlineCompactDeltaSecondsMax <= parsedToSeconds(instantTime) -
parsedToSeconds(latestDeltaCommitInfo.getRight());
+ compactable = inlineCompactDeltaCommitMax <=
latestDeltaCommitInfoSinceCompact.getLeft()
+ || inlineCompactDeltaSecondsMax <= parsedToSeconds(instantTime) -
parsedToSeconds(latestDeltaCommitInfoSinceCompact.getRight());
if (compactable) {
log.info("The delta commits >= {} or elapsed_time >={}s, trigger
compaction scheduler.", inlineCompactDeltaCommitMax,
inlineCompactDeltaSecondsMax);
}
break;
case NUM_AND_TIME:
- compactable = inlineCompactDeltaCommitMax <=
latestDeltaCommitInfo.getLeft()
- && inlineCompactDeltaSecondsMax <= parsedToSeconds(instantTime) -
parsedToSeconds(latestDeltaCommitInfo.getRight());
+ compactable = inlineCompactDeltaCommitMax <=
latestDeltaCommitInfoSinceCompact.getLeft()
+ && inlineCompactDeltaSecondsMax <= parsedToSeconds(instantTime) -
parsedToSeconds(latestDeltaCommitInfoSinceCompact.getRight());
if (compactable) {
log.info("The delta commits >= {} and elapsed_time >={}s, trigger
compaction scheduler.", inlineCompactDeltaCommitMax,
inlineCompactDeltaSecondsMax);
@@ -220,6 +235,26 @@ public class ScheduleCompactionActionExecutor<T, I, K, O>
extends BaseTableServi
return compactable;
}
+ /**
+ * Determines whether log compaction should be scheduled based on the number
of delta commits
+ * since the last compaction and the last log compaction, compared against
the
+ * {@code hoodie.log.compaction.blocks.threshold} config.
+ */
+ private boolean needLogCompact(Pair<Integer, String>
latestDeltaCommitInfoSinceCompact) {
+ Option<Pair<Integer, String>> latestDeltaCommitInfoSinceLogCompactOption =
getLatestDeltaCommitInfoSinceLogCompaction();
+ int numDeltaCommitsSinceLatestCompaction =
latestDeltaCommitInfoSinceCompact.getLeft();
+ int numDeltaCommitsSinceLatestLogCompaction =
latestDeltaCommitInfoSinceLogCompactOption.isPresent()
+ ? latestDeltaCommitInfoSinceLogCompactOption.get().getLeft()
+ : 0;
+
+ int numDeltaCommitsSince = Math.min(numDeltaCommitsSinceLatestCompaction,
numDeltaCommitsSinceLatestLogCompaction);
+ boolean shouldLogCompact = numDeltaCommitsSince >=
config.getLogCompactionBlocksThreshold();
+ if (shouldLogCompact) {
+ log.info("There have been {} delta commits since last compaction or log
compaction, triggering log compaction.", numDeltaCommitsSince);
+ }
+ return shouldLogCompact;
+ }
+
private Long parsedToSeconds(String time) {
return TimelineUtils.parseDateFromInstantTimeSafely(time).orElseThrow(()
-> new HoodieCompactionException("Failed to parse timestamp " + time))
.getTime() / 1000;
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/util/CompactionUtils.java
b/hudi-common/src/main/java/org/apache/hudi/common/util/CompactionUtils.java
index 0123274b42ef..f1a7b2b77c9d 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/util/CompactionUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/CompactionUtils.java
@@ -34,6 +34,9 @@ import
org.apache.hudi.common.table.timeline.versioning.compaction.CompactionV2M
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import java.io.IOException;
import java.io.InputStream;
import java.util.HashMap;
@@ -49,6 +52,7 @@ import static
org.apache.hudi.common.table.timeline.TimelineMetadataUtils.deseri
* Helper class to generate compaction plan from FileGroup/FileSlice
abstraction.
*/
public class CompactionUtils {
+ private static final Logger LOG =
LoggerFactory.getLogger(CompactionUtils.class);
public static final Integer COMPACTION_METADATA_VERSION_1 =
CompactionV1MigrationHandler.VERSION;
public static final Integer COMPACTION_METADATA_VERSION_2 =
CompactionV2MigrationHandler.VERSION;
@@ -363,6 +367,67 @@ public class CompactionUtils {
}
}
+ /**
+ * Returns the most recent log compaction instant from the raw active
timeline.
+ *
+ * @param rawActiveTimeline Active timeline of table, that has current and
previous states of each instant.
+ * @return The latest log compaction instant, or empty if no log compaction
is present.
+ */
+ public static Option<HoodieInstant> getLastLogCompaction(final
HoodieActiveTimeline rawActiveTimeline) {
+ Option<HoodieInstant> lastLogCompactionInstantOpt =
Option.fromJavaOptional(
+ rawActiveTimeline
+ .filterPendingLogCompactionTimeline()
+ .getReverseOrderedInstants()
+ .findFirst()
+ );
+ if (!lastLogCompactionInstantOpt.isPresent()) {
+ return Option.empty();
+ }
+ String logCompactionTimestamp =
lastLogCompactionInstantOpt.get().requestedTime();
+ Option<HoodieInstant> completedInstantOpt = Option.fromJavaOptional(
+ rawActiveTimeline
+ .findInstantsInClosedRange(logCompactionTimestamp,
logCompactionTimestamp)
+ .getInstantsAsStream()
+ .filter(HoodieInstant::isCompleted)
+ .findFirst()
+ );
+ return completedInstantOpt.isPresent() ? completedInstantOpt :
lastLogCompactionInstantOpt;
+ }
+
+ /**
+ * Returns a pair of (timeline containing the delta commits after the latest
completed
+ * log compaction delta commit, the completed log compaction commit
instant), if the latest completed
+ * log compaction commit is present; a pair of (timeline containing all the
delta commits,
+ * the first delta commit instant), if there is no completed log compaction
commit.
+ *
+ * @param deltaCommitTimeline Active timeline of table that contains only
delta commits.
+ * @param rawActiveTimeline Active timeline of table, that has current and
previous states of each instant.
+ * @return Pair of timeline containing delta commits and an instant.
+ */
+ public static Option<Pair<HoodieTimeline, HoodieInstant>>
getDeltaCommitsSinceLatestCompletedLogCompaction(
+ final HoodieTimeline deltaCommitTimeline,
+ final HoodieActiveTimeline rawActiveTimeline) {
+ Option<HoodieInstant> lastLogCompactionOpt =
getLastLogCompaction(rawActiveTimeline);
+
+ if (lastLogCompactionOpt.isPresent()) {
+ HoodieInstant lastLogCompaction = lastLogCompactionOpt.get();
+ if (lastLogCompaction.isCompleted()) {
+ return
Option.of(Pair.of(deltaCommitTimeline.findInstantsModifiedAfterByCompletionTime(
+ lastLogCompaction.requestedTime()), lastLogCompaction));
+ } else {
+ LOG.info("Last log compaction instant {} is in pending state,
returning empty value.", lastLogCompaction.requestedTime());
+ return Option.empty();
+ }
+ } else {
+ Option<HoodieInstant> firstDeltaCommitInstantOption =
deltaCommitTimeline.firstInstant();
+ if (firstDeltaCommitInstantOption.isPresent()) {
+ return Option.of(Pair.of(deltaCommitTimeline,
firstDeltaCommitInstantOption.get()));
+ } else {
+ return Option.empty();
+ }
+ }
+ }
+
/**
* Gets the earliest instant to retain for MOR compaction.
* If there is no completed compaction,
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java
index 9bf56a9dc4d4..a4e869428c24 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java
@@ -961,7 +961,7 @@ public class TestWriteCopyOnWrite extends TestWriteBase {
if (mdtCompactionEnabled) {
if (isLogCompaction) {
conf.setString(HoodieMetadataConfig.ENABLE_LOG_COMPACTION_ON_METADATA_TABLE.key(),
"true");
-
conf.setString(HoodieMetadataConfig.LOG_COMPACT_BLOCKS_THRESHOLD.key(), "2");
+
conf.setString(HoodieMetadataConfig.LOG_COMPACT_BLOCKS_THRESHOLD.key(), "1");
} else {
conf.set(FlinkOptions.METADATA_COMPACTION_DELTA_COMMITS, 1);
}
diff --git
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/util/TestCompactionUtils.java
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/util/TestCompactionUtils.java
index ee9bb2f37a30..ac7dac4eb429 100644
---
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/util/TestCompactionUtils.java
+++
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/util/TestCompactionUtils.java
@@ -62,6 +62,7 @@ import static
org.apache.hudi.common.testutils.HoodieTestUtils.createMetaClient;
import static
org.apache.hudi.common.util.CompactionUtils.COMPACTION_METADATA_VERSION_1;
import static
org.apache.hudi.common.util.CompactionUtils.LATEST_COMPACTION_METADATA_VERSION;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -349,6 +350,320 @@ public class TestCompactionUtils extends
HoodieCommonTestHarness {
assertEquals(Option.empty(),
CompactionUtils.getDeltaCommitsSinceLatestCompaction(timeline));
}
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ public void testGetDeltaCommitsSinceLatestCompletedLogCompaction(boolean
hasCompletedLogCompaction) {
+ if (hasCompletedLogCompaction) {
+ // Delta commit timeline: completed delta commits 01-05, 07-08, plus
inflight 09
+ HoodieActiveTimeline deltaCommitTimeline = new MockHoodieActiveTimeline(
+ Stream.of(
+
INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.COMPLETED,
HoodieTimeline.DELTA_COMMIT_ACTION, "01"),
+
INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.COMPLETED,
HoodieTimeline.DELTA_COMMIT_ACTION, "02"),
+
INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.COMPLETED,
HoodieTimeline.DELTA_COMMIT_ACTION, "03"),
+
INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.COMPLETED,
HoodieTimeline.DELTA_COMMIT_ACTION, "04"),
+
INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.COMPLETED,
HoodieTimeline.DELTA_COMMIT_ACTION, "05"),
+
INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.COMPLETED,
HoodieTimeline.DELTA_COMMIT_ACTION, "07"),
+
INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.COMPLETED,
HoodieTimeline.DELTA_COMMIT_ACTION, "08"),
+ INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.INFLIGHT,
HoodieTimeline.DELTA_COMMIT_ACTION, "09"))
+ .collect(Collectors.toList()));
+
+ // Raw timeline includes a log compaction instant at "05" (inflight)
plus inflight delta commit "09"
+ HoodieActiveTimeline rawActiveTimeline = new MockHoodieActiveTimeline(
+ Stream.of(
+
INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.COMPLETED,
HoodieTimeline.DELTA_COMMIT_ACTION, "01"),
+
INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.COMPLETED,
HoodieTimeline.DELTA_COMMIT_ACTION, "02"),
+
INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.COMPLETED,
HoodieTimeline.DELTA_COMMIT_ACTION, "03"),
+
INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.COMPLETED,
HoodieTimeline.DELTA_COMMIT_ACTION, "04"),
+
INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.COMPLETED,
HoodieTimeline.DELTA_COMMIT_ACTION, "05"),
+ INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.INFLIGHT,
HoodieTimeline.LOG_COMPACTION_ACTION, "05"),
+
INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.COMPLETED,
HoodieTimeline.DELTA_COMMIT_ACTION, "07"),
+
INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.COMPLETED,
HoodieTimeline.DELTA_COMMIT_ACTION, "08"),
+ INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.INFLIGHT,
HoodieTimeline.DELTA_COMMIT_ACTION, "09"))
+ .collect(Collectors.toList()));
+
+ Pair<HoodieTimeline, HoodieInstant> actual =
+ CompactionUtils.getDeltaCommitsSinceLatestCompletedLogCompaction(
+ deltaCommitTimeline.getDeltaCommitTimeline(),
rawActiveTimeline).get();
+ assertEquals(
+ Stream.of(
+
INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.COMPLETED,
HoodieTimeline.DELTA_COMMIT_ACTION, "07"),
+
INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.COMPLETED,
HoodieTimeline.DELTA_COMMIT_ACTION, "08"),
+
INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.INFLIGHT,
HoodieTimeline.DELTA_COMMIT_ACTION, "09"))
+ .collect(Collectors.toList()),
+ actual.getLeft().getInstants());
+ assertEquals(
+ INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.COMPLETED,
HoodieTimeline.DELTA_COMMIT_ACTION, "05"),
+ actual.getRight());
+ } else {
+ // No log compaction instants: raw timeline is same as delta commit
timeline
+ HoodieActiveTimeline deltaCommitTimeline = new MockHoodieActiveTimeline(
+ Stream.of(
+
INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.COMPLETED,
HoodieTimeline.DELTA_COMMIT_ACTION, "01"),
+
INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.COMPLETED,
HoodieTimeline.DELTA_COMMIT_ACTION, "02"),
+
INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.COMPLETED,
HoodieTimeline.DELTA_COMMIT_ACTION, "03"),
+
INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.COMPLETED,
HoodieTimeline.DELTA_COMMIT_ACTION, "04"),
+
INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.COMPLETED,
HoodieTimeline.DELTA_COMMIT_ACTION, "05"),
+
INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.COMPLETED,
HoodieTimeline.DELTA_COMMIT_ACTION, "07"),
+
INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.COMPLETED,
HoodieTimeline.DELTA_COMMIT_ACTION, "08"),
+ INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.INFLIGHT,
HoodieTimeline.DELTA_COMMIT_ACTION, "09"))
+ .collect(Collectors.toList()));
+ HoodieActiveTimeline rawActiveTimeline = deltaCommitTimeline;
+
+ Pair<HoodieTimeline, HoodieInstant> actual =
+ CompactionUtils.getDeltaCommitsSinceLatestCompletedLogCompaction(
+ deltaCommitTimeline.getDeltaCommitTimeline(),
rawActiveTimeline).get();
+ assertEquals(
+ Stream.of(
+
INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.COMPLETED,
HoodieTimeline.DELTA_COMMIT_ACTION, "01"),
+
INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.COMPLETED,
HoodieTimeline.DELTA_COMMIT_ACTION, "02"),
+
INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.COMPLETED,
HoodieTimeline.DELTA_COMMIT_ACTION, "03"),
+
INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.COMPLETED,
HoodieTimeline.DELTA_COMMIT_ACTION, "04"),
+
INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.COMPLETED,
HoodieTimeline.DELTA_COMMIT_ACTION, "05"),
+
INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.COMPLETED,
HoodieTimeline.DELTA_COMMIT_ACTION, "07"),
+
INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.COMPLETED,
HoodieTimeline.DELTA_COMMIT_ACTION, "08"),
+
INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.INFLIGHT,
HoodieTimeline.DELTA_COMMIT_ACTION, "09"))
+ .collect(Collectors.toList()),
+ actual.getLeft().getInstants());
+ assertEquals(
+ INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.COMPLETED,
HoodieTimeline.DELTA_COMMIT_ACTION, "01"),
+ actual.getRight());
+ }
+ }
+
+ @Test
+ public void
testGetDeltaCommitsSinceLatestCompletedLogCompactionWithEmptyDeltaCommits() {
+ HoodieActiveTimeline timeline = new MockHoodieActiveTimeline();
+ assertEquals(Option.empty(),
CompactionUtils.getDeltaCommitsSinceLatestCompletedLogCompaction(
+ timeline.getDeltaCommitTimeline(), timeline));
+ }
+
+ @Test
+ public void testGetLastLogCompactionWithCompletedLogCompaction() {
+ // Raw timeline: completed delta commit at "05" + inflight log compaction
at "05"
+ HoodieActiveTimeline rawActiveTimeline = new MockHoodieActiveTimeline(
+ Stream.of(
+ INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.COMPLETED,
HoodieTimeline.DELTA_COMMIT_ACTION, "01"),
+ INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.COMPLETED,
HoodieTimeline.DELTA_COMMIT_ACTION, "03"),
+ INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.COMPLETED,
HoodieTimeline.DELTA_COMMIT_ACTION, "05"),
+ INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.INFLIGHT,
HoodieTimeline.LOG_COMPACTION_ACTION, "05"),
+ INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.COMPLETED,
HoodieTimeline.DELTA_COMMIT_ACTION, "07"))
+ .collect(Collectors.toList()));
+
+ Option<HoodieInstant> result =
CompactionUtils.getLastLogCompaction(rawActiveTimeline);
+ assertTrue(result.isPresent());
+ assertTrue(result.get().isCompleted());
+ assertEquals("05", result.get().requestedTime());
+ }
+
+ @Test
+ public void testGetLastLogCompactionWithPendingLogCompaction() {
+ // Raw timeline: inflight log compaction at "05" with no corresponding
completed delta commit at "05"
+ HoodieActiveTimeline rawActiveTimeline = new MockHoodieActiveTimeline(
+ Stream.of(
+ INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.COMPLETED,
HoodieTimeline.DELTA_COMMIT_ACTION, "01"),
+ INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.COMPLETED,
HoodieTimeline.DELTA_COMMIT_ACTION, "03"),
+ INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.INFLIGHT,
HoodieTimeline.LOG_COMPACTION_ACTION, "05"))
+ .collect(Collectors.toList()));
+
+ Option<HoodieInstant> result =
CompactionUtils.getLastLogCompaction(rawActiveTimeline);
+ assertTrue(result.isPresent());
+ assertFalse(result.get().isCompleted());
+ assertEquals(HoodieTimeline.LOG_COMPACTION_ACTION,
result.get().getAction());
+ assertEquals("05", result.get().requestedTime());
+ }
+
+ @Test
+ public void testGetLastLogCompactionWithNoLogCompaction() {
+ HoodieActiveTimeline rawActiveTimeline = new MockHoodieActiveTimeline(
+ Stream.of(
+ INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.COMPLETED,
HoodieTimeline.DELTA_COMMIT_ACTION, "01"),
+ INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.COMPLETED,
HoodieTimeline.DELTA_COMMIT_ACTION, "03"))
+ .collect(Collectors.toList()));
+
+ assertEquals(Option.empty(),
CompactionUtils.getLastLogCompaction(rawActiveTimeline));
+ }
+
+ @Test
+ public void testGetLastLogCompactionWithEmptyTimeline() {
+ HoodieActiveTimeline rawActiveTimeline = new MockHoodieActiveTimeline();
+ assertEquals(Option.empty(),
CompactionUtils.getLastLogCompaction(rawActiveTimeline));
+ }
+
+ @Test
+ public void
testGetDeltaCommitsSinceLatestCompletedLogCompactionWithPendingLogCompaction() {
+ // Delta commit timeline: completed delta commits 01-03, plus inflight 05
+ HoodieActiveTimeline deltaCommitTimeline = new MockHoodieActiveTimeline(
+ Stream.of(
+ INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.COMPLETED,
HoodieTimeline.DELTA_COMMIT_ACTION, "01"),
+ INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.COMPLETED,
HoodieTimeline.DELTA_COMMIT_ACTION, "02"),
+ INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.COMPLETED,
HoodieTimeline.DELTA_COMMIT_ACTION, "03"),
+ INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.INFLIGHT,
HoodieTimeline.DELTA_COMMIT_ACTION, "05"))
+ .collect(Collectors.toList()));
+
+ // Raw timeline: log compaction at "04" is pending (no completed delta
commit at "04")
+ HoodieActiveTimeline rawActiveTimeline = new MockHoodieActiveTimeline(
+ Stream.of(
+ INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.COMPLETED,
HoodieTimeline.DELTA_COMMIT_ACTION, "01"),
+ INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.COMPLETED,
HoodieTimeline.DELTA_COMMIT_ACTION, "02"),
+ INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.COMPLETED,
HoodieTimeline.DELTA_COMMIT_ACTION, "03"),
+ INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.INFLIGHT,
HoodieTimeline.LOG_COMPACTION_ACTION, "04"),
+ INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.INFLIGHT,
HoodieTimeline.DELTA_COMMIT_ACTION, "05"))
+ .collect(Collectors.toList()));
+
+ assertEquals(Option.empty(),
CompactionUtils.getDeltaCommitsSinceLatestCompletedLogCompaction(
+ deltaCommitTimeline.getDeltaCommitTimeline(), rawActiveTimeline));
+ }
+
+ @Test
+ public void testLogCompactionSchedulingBelowThreshold() {
+ int logCompactionBlocksThreshold = 3;
+ // 2 delta commits since compaction at "05", no prior log compaction
+ HoodieActiveTimeline activeTimeline = new MockHoodieActiveTimeline(
+ Stream.of(
+ INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.COMPLETED,
HoodieTimeline.COMMIT_ACTION, "05"),
+ INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.COMPLETED,
HoodieTimeline.DELTA_COMMIT_ACTION, "06"),
+ INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.COMPLETED,
HoodieTimeline.DELTA_COMMIT_ACTION, "07"))
+ .collect(Collectors.toList()));
+
+ int deltasSinceCompaction =
CompactionUtils.getCompletedDeltaCommitsSinceLatestCompaction(activeTimeline)
+ .get().getLeft().countInstants();
+ Option<Pair<HoodieTimeline, HoodieInstant>> logCompactionInfo =
+ CompactionUtils.getDeltaCommitsSinceLatestCompletedLogCompaction(
+ activeTimeline.getDeltaCommitTimeline(), activeTimeline);
+ int deltasSinceLogCompaction = logCompactionInfo.isPresent() ?
logCompactionInfo.get().getLeft().countInstants() : 0;
+
+ int numDeltaCommitsSince = Math.min(deltasSinceCompaction,
deltasSinceLogCompaction);
+ assertFalse(numDeltaCommitsSince >= logCompactionBlocksThreshold,
+ "Log compaction should not be scheduled when delta commits <
threshold");
+ }
+
+ @Test
+ public void testLogCompactionSchedulingAtThreshold() {
+ int logCompactionBlocksThreshold = 3;
+ // 3 delta commits since compaction at "05", no prior log compaction
+ HoodieActiveTimeline activeTimeline = new MockHoodieActiveTimeline(
+ Stream.of(
+ INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.COMPLETED,
HoodieTimeline.COMMIT_ACTION, "05"),
+ INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.COMPLETED,
HoodieTimeline.DELTA_COMMIT_ACTION, "06"),
+ INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.COMPLETED,
HoodieTimeline.DELTA_COMMIT_ACTION, "07"),
+ INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.COMPLETED,
HoodieTimeline.DELTA_COMMIT_ACTION, "08"))
+ .collect(Collectors.toList()));
+
+ int deltasSinceCompaction =
CompactionUtils.getCompletedDeltaCommitsSinceLatestCompaction(activeTimeline)
+ .get().getLeft().countInstants();
+ Option<Pair<HoodieTimeline, HoodieInstant>> logCompactionInfo =
+ CompactionUtils.getDeltaCommitsSinceLatestCompletedLogCompaction(
+ activeTimeline.getDeltaCommitTimeline(), activeTimeline);
+ int deltasSinceLogCompaction = logCompactionInfo.isPresent() ?
logCompactionInfo.get().getLeft().countInstants() : 0;
+
+ int numDeltaCommitsSince = Math.min(deltasSinceCompaction,
deltasSinceLogCompaction);
+ assertTrue(numDeltaCommitsSince >= logCompactionBlocksThreshold,
+ "Log compaction should be scheduled when delta commits >= threshold");
+ }
+
+ @Test
+ public void testLogCompactionSchedulingResetsAfterCompletedLogCompaction() {
+ int logCompactionBlocksThreshold = 3;
+ // 4 delta commits since compaction, but only 1 since completed log
compaction at "08"
+ HoodieActiveTimeline activeTimeline = new MockHoodieActiveTimeline(
+ Stream.of(
+ INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.COMPLETED,
HoodieTimeline.COMMIT_ACTION, "05"),
+ INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.COMPLETED,
HoodieTimeline.DELTA_COMMIT_ACTION, "06"),
+ INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.COMPLETED,
HoodieTimeline.DELTA_COMMIT_ACTION, "07"),
+ INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.COMPLETED,
HoodieTimeline.DELTA_COMMIT_ACTION, "08"),
+ INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.COMPLETED,
HoodieTimeline.DELTA_COMMIT_ACTION, "09"))
+ .collect(Collectors.toList()));
+ HoodieActiveTimeline rawActiveTimeline = new MockHoodieActiveTimeline(
+ Stream.of(
+ INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.COMPLETED,
HoodieTimeline.COMMIT_ACTION, "05"),
+ INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.COMPLETED,
HoodieTimeline.DELTA_COMMIT_ACTION, "06"),
+ INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.COMPLETED,
HoodieTimeline.DELTA_COMMIT_ACTION, "07"),
+ INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.COMPLETED,
HoodieTimeline.DELTA_COMMIT_ACTION, "08"),
+ INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.INFLIGHT,
HoodieTimeline.LOG_COMPACTION_ACTION, "08"),
+ INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.COMPLETED,
HoodieTimeline.DELTA_COMMIT_ACTION, "09"))
+ .collect(Collectors.toList()));
+
+ int deltasSinceCompaction =
CompactionUtils.getCompletedDeltaCommitsSinceLatestCompaction(activeTimeline)
+ .get().getLeft().countInstants();
+ Option<Pair<HoodieTimeline, HoodieInstant>> logCompactionInfo =
+ CompactionUtils.getDeltaCommitsSinceLatestCompletedLogCompaction(
+ activeTimeline.getDeltaCommitTimeline(), rawActiveTimeline);
+ int deltasSinceLogCompaction = logCompactionInfo.isPresent() ?
logCompactionInfo.get().getLeft().countInstants() : 0;
+
+ int numDeltaCommitsSince = Math.min(deltasSinceCompaction,
deltasSinceLogCompaction);
+ assertFalse(numDeltaCommitsSince >= logCompactionBlocksThreshold,
+ "Log compaction should not be scheduled after reset when delta commits
< threshold");
+ }
+
+ @Test
+ public void testLogCompactionSchedulingWithPendingLogCompaction() {
+ int logCompactionBlocksThreshold = 3;
+ // 5 delta commits since compaction (above threshold), but pending log
compaction blocks scheduling
+ HoodieActiveTimeline activeTimeline = new MockHoodieActiveTimeline(
+ Stream.of(
+ INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.COMPLETED,
HoodieTimeline.COMMIT_ACTION, "05"),
+ INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.COMPLETED,
HoodieTimeline.DELTA_COMMIT_ACTION, "06"),
+ INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.COMPLETED,
HoodieTimeline.DELTA_COMMIT_ACTION, "07"),
+ INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.COMPLETED,
HoodieTimeline.DELTA_COMMIT_ACTION, "08"),
+ INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.COMPLETED,
HoodieTimeline.DELTA_COMMIT_ACTION, "09"),
+ INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.COMPLETED,
HoodieTimeline.DELTA_COMMIT_ACTION, "10"))
+ .collect(Collectors.toList()));
+ HoodieActiveTimeline rawActiveTimeline = new MockHoodieActiveTimeline(
+ Stream.of(
+ INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.COMPLETED,
HoodieTimeline.COMMIT_ACTION, "05"),
+ INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.COMPLETED,
HoodieTimeline.DELTA_COMMIT_ACTION, "06"),
+ INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.COMPLETED,
HoodieTimeline.DELTA_COMMIT_ACTION, "07"),
+ INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.INFLIGHT,
HoodieTimeline.LOG_COMPACTION_ACTION, "08"),
+ INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.COMPLETED,
HoodieTimeline.DELTA_COMMIT_ACTION, "09"),
+ INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.COMPLETED,
HoodieTimeline.DELTA_COMMIT_ACTION, "10"))
+ .collect(Collectors.toList()));
+
+ int deltasSinceCompaction =
CompactionUtils.getCompletedDeltaCommitsSinceLatestCompaction(activeTimeline)
+ .get().getLeft().countInstants();
+ Option<Pair<HoodieTimeline, HoodieInstant>> logCompactionInfo =
+ CompactionUtils.getDeltaCommitsSinceLatestCompletedLogCompaction(
+ activeTimeline.getDeltaCommitTimeline(), rawActiveTimeline);
+ int deltasSinceLogCompaction = logCompactionInfo.isPresent() ?
logCompactionInfo.get().getLeft().countInstants() : 0;
+
+ assertEquals(5, deltasSinceCompaction);
+ assertFalse(logCompactionInfo.isPresent(),
+ "Pending log compaction should return empty");
+ int numDeltaCommitsSince = Math.min(deltasSinceCompaction,
deltasSinceLogCompaction);
+ assertFalse(numDeltaCommitsSince >= logCompactionBlocksThreshold,
+ "Log compaction should not be scheduled when there is a pending log
compaction");
+ }
+
+ @Test
+ public void
testLogCompactionSchedulingWithPendingCompactionAndThresholdNotMet() {
+ int logCompactionBlocksThreshold = 5;
+ // Completed compaction at "03", pending (inflight) compaction at "06",
+ // 2 delta commits ("04", "05") between completed and pending compaction,
+ // plus 2 more ("07", "08") after pending compaction. No prior log
compaction.
+ // Delta commits since last *completed* compaction = 4, which is below
threshold of 5.
+ HoodieActiveTimeline activeTimeline = new MockHoodieActiveTimeline(
+ Stream.of(
+ INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.COMPLETED,
HoodieTimeline.COMMIT_ACTION, "03"),
+ INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.COMPLETED,
HoodieTimeline.DELTA_COMMIT_ACTION, "04"),
+ INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.COMPLETED,
HoodieTimeline.DELTA_COMMIT_ACTION, "05"),
+ INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.INFLIGHT,
HoodieTimeline.COMPACTION_ACTION, "06"),
+ INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.COMPLETED,
HoodieTimeline.DELTA_COMMIT_ACTION, "07"),
+ INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.COMPLETED,
HoodieTimeline.DELTA_COMMIT_ACTION, "08"))
+ .collect(Collectors.toList()));
+ HoodieActiveTimeline rawActiveTimeline = activeTimeline;
+
+ int deltasSinceCompaction =
CompactionUtils.getCompletedDeltaCommitsSinceLatestCompaction(activeTimeline)
+ .get().getLeft().countInstants();
+ Option<Pair<HoodieTimeline, HoodieInstant>> logCompactionInfo =
+ CompactionUtils.getDeltaCommitsSinceLatestCompletedLogCompaction(
+ activeTimeline.getDeltaCommitTimeline(), rawActiveTimeline);
+ int deltasSinceLogCompaction = logCompactionInfo.isPresent() ?
logCompactionInfo.get().getLeft().countInstants() : 0;
+
+ assertEquals(4, deltasSinceCompaction);
+ int numDeltaCommitsSince = Math.min(deltasSinceCompaction,
deltasSinceLogCompaction);
+ assertFalse(numDeltaCommitsSince >= logCompactionBlocksThreshold,
+ "Log compaction should not be scheduled with pending compaction and
delta commits < threshold");
+ }
+
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testGetOldestInstantToKeepForCompaction(boolean
hasCompletedCompaction) {