nsivabalan commented on a change in pull request #4974:
URL: https://github.com/apache/hudi/pull/4974#discussion_r822159191
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java
##########
@@ -395,6 +399,18 @@ public void mergeArchiveFiles(List<FileStatus>
compactCandidate) throws IOExcept
// made after the first savepoint present.
Option<HoodieInstant> firstSavepoint =
table.getCompletedSavepointTimeline().firstInstant();
if (!commitTimeline.empty() && commitTimeline.countInstants() >
maxInstantsToKeep) {
+ // For Merge-On-Read table, inline or async compaction is enabled
+ // We need to make sure that there are enough delta commits in the
active timeline
+ // to trigger compaction scheduling, when the trigger strategy of
compaction is
+ // NUM_COMMITS or NUM_AND_TIME.
+ Option<HoodieInstant> oldestInstantToKeepForCompaction =
Review comment:
minor: oldestInstantToRetain
##########
File path:
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiver.java
##########
@@ -946,6 +961,152 @@ public void
testArchiveCommitsWithCompactionCommitInMetadataTableTimeline() thro
}
}
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ public void testArchivalWithMaxDeltaCommitsGuaranteeForCompaction(boolean
enableMetadata) throws Exception {
+ HoodieWriteConfig writeConfig = initTestTableAndGetWriteConfig(
+ enableMetadata, 2, 4, 8, 1, HoodieTableType.MERGE_ON_READ);
+
+ // When max archival commits is set to 4, even after 8 delta commits,
since the number of delta
+ // commits is still smaller than 8, the archival should not kick in.
+ // The archival should only kick in after the 9th delta commit
+ // instant "00000001" to "00000009"
+ for (int i = 1; i < 10; i++) {
+ testTable.doWriteOperation("0000000" + i, WriteOperationType.UPSERT, i
== 1
+ ? Arrays.asList("p1", "p2") : Collections.emptyList(),
Arrays.asList("p1", "p2"), 2);
+ // archival
+ Pair<List<HoodieInstant>, List<HoodieInstant>> commitsList =
archiveAndGetCommitsList(writeConfig);
+ List<HoodieInstant> originalCommits = commitsList.getKey();
+ List<HoodieInstant> commitsAfterArchival = commitsList.getValue();
+
+ if (i <= 8) {
+ assertEquals(originalCommits, commitsAfterArchival);
+ } else {
+ assertEquals(1, originalCommits.size() - commitsAfterArchival.size());
+ assertFalse(commitsAfterArchival.contains(
+ new HoodieInstant(State.COMPLETED,
HoodieTimeline.DELTA_COMMIT_ACTION, "00000001")));
+ IntStream.range(2, 10).forEach(j ->
+ assertTrue(commitsAfterArchival.contains(
+ new HoodieInstant(State.COMPLETED,
HoodieTimeline.DELTA_COMMIT_ACTION, "0000000" + j))));
+ }
+ }
+
+ testTable.doCompaction("00000010", Arrays.asList("p1", "p2"));
+
+ // instant "00000011" to "00000019"
+ for (int i = 1; i < 10; i++) {
+ testTable.doWriteOperation("0000001" + i, WriteOperationType.UPSERT, i
== 1
+ ? Arrays.asList("p1", "p2") : Collections.emptyList(),
Arrays.asList("p1", "p2"), 2);
+ // archival
+ Pair<List<HoodieInstant>, List<HoodieInstant>> commitsList =
archiveAndGetCommitsList(writeConfig);
+ List<HoodieInstant> originalCommits = commitsList.getKey();
+ List<HoodieInstant> commitsAfterArchival = commitsList.getValue();
+
+ // first 9 delta commits before the completed compaction should be
archived
+ IntStream.range(1, 10).forEach(j ->
Review comment:
guess we are doing this assertion for all iterations of 11 to 19. we can
probably do just once at 11.
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/ScheduleCompactionActionExecutor.java
##########
@@ -128,27 +129,25 @@ private HoodieCompactionPlan scheduleCompaction() {
return new HoodieCompactionPlan();
}
- private Pair<Integer, String> getLatestDeltaCommitInfo() {
- Option<HoodieInstant> lastCompaction =
table.getActiveTimeline().getCommitTimeline()
- .filterCompletedInstants().lastInstant();
- HoodieTimeline deltaCommits =
table.getActiveTimeline().getDeltaCommitTimeline();
-
- String latestInstantTs;
- final int deltaCommitsSinceLastCompaction;
- if (lastCompaction.isPresent()) {
- latestInstantTs = lastCompaction.get().getTimestamp();
- deltaCommitsSinceLastCompaction =
deltaCommits.findInstantsAfter(latestInstantTs,
Integer.MAX_VALUE).countInstants();
- } else {
- latestInstantTs = deltaCommits.firstInstant().get().getTimestamp();
- deltaCommitsSinceLastCompaction =
deltaCommits.findInstantsAfterOrEquals(latestInstantTs,
Integer.MAX_VALUE).countInstants();
+ private Option<Pair<Integer, String>> getLatestDeltaCommitInfo() {
+ Option<Pair<HoodieTimeline, HoodieInstant>> deltaCommitsInfo =
+
CompactionUtils.getDeltaCommitsSinceLatestCompaction(table.getActiveTimeline());
+ if (deltaCommitsInfo.isPresent()) {
+ return Option.of(Pair.of(
+ deltaCommitsInfo.get().getLeft().countInstants(),
+ deltaCommitsInfo.get().getRight().getTimestamp()));
}
- return Pair.of(deltaCommitsSinceLastCompaction, latestInstantTs);
+ return Option.empty();
}
private boolean needCompact(CompactionTriggerStrategy
compactionTriggerStrategy) {
boolean compactable;
// get deltaCommitsSinceLastCompaction and lastCompactionTs
- Pair<Integer, String> latestDeltaCommitInfo = getLatestDeltaCommitInfo();
+ Option<Pair<Integer, String>> latestDeltaCommitInfoOption =
getLatestDeltaCommitInfo();
+ if (!latestDeltaCommitInfoOption.isPresent()) {
+ return false;
Review comment:
is this referring to an empty table where in there is no delta commit
only?
does this also refer to scenarios, where there is no delta commits after the
last compaction ?
##########
File path:
hudi-common/src/main/java/org/apache/hudi/common/util/CompactionUtils.java
##########
@@ -195,10 +196,76 @@ public static HoodieCompactionPlan
getCompactionPlan(HoodieTableMetaClient metaC
/**
* Return all pending compaction instant times.
- *
+ *
* @return
*/
public static List<HoodieInstant>
getPendingCompactionInstantTimes(HoodieTableMetaClient metaClient) {
return
metaClient.getActiveTimeline().filterPendingCompactionTimeline().getInstants().collect(Collectors.toList());
}
+
+ /**
+ * Returns a pair of (timeline containing the delta commits after the latest
completed
+ * compaction commit, the completed compaction commit instant), if the
latest completed
+ * compaction commit is present; a pair of (timeline containing all the
delta commits,
+ * the first delta commit instant), if there is no completed compaction
commit.
+ *
+ * @param activeTimeline Active timeline of a table.
+ * @return Pair of timeline containing delta commits and an instant.
+ */
+ public static Option<Pair<HoodieTimeline, HoodieInstant>>
getDeltaCommitsSinceLatestCompaction(
+ HoodieActiveTimeline activeTimeline) {
+ Option<HoodieInstant> lastCompaction = activeTimeline.getCommitTimeline()
+ .filterCompletedInstants().lastInstant();
+ HoodieTimeline deltaCommits = activeTimeline.getDeltaCommitTimeline();
+
+ HoodieInstant latestInstant;
+ if (lastCompaction.isPresent()) {
+ latestInstant = lastCompaction.get();
+ // timeline containing the delta commits after the latest completed
compaction commit,
+ // and the completed compaction commit instant
+ return Option.of(Pair.of(deltaCommits.findInstantsAfter(
+ latestInstant.getTimestamp(), Integer.MAX_VALUE),
lastCompaction.get()));
+ } else {
+ if (deltaCommits.countInstants() > 0) {
+ latestInstant = deltaCommits.firstInstant().get();
+ // timeline containing all the delta commits, and the first delta
commit instant
+ return Option.of(Pair.of(deltaCommits.findInstantsAfterOrEquals(
+ latestInstant.getTimestamp(), Integer.MAX_VALUE), latestInstant));
+ } else {
+ return Option.empty();
+ }
+ }
+ }
+
+ /**
+ * Gets the oldest instant to keep for MOR compaction.
+ * If there is no completed compaction,
+ * num delta commits >= "hoodie.compact.inline.max.delta.commits"
+ * If there is a completed compaction,
+ * num delta commits after latest completed compaction >=
"hoodie.compact.inline.max.delta.commits"
+ *
+ * @param activeTimeline Active timeline of a table.
+ * @param maxDeltaCommits Maximum number of delta commits that trigger the
compaction plan,
+ * i.e., "hoodie.compact.inline.max.delta.commits".
+ * @return the oldest instant to keep for MOR compaction.
+ */
+ public static Option<HoodieInstant> getOldestInstantToKeepForCompaction(
+ HoodieActiveTimeline activeTimeline, int maxDeltaCommits) {
Review comment:
should we not name this as getOldestInstantToKeepFor**Archival** ?
##########
File path:
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiver.java
##########
@@ -946,6 +961,152 @@ public void
testArchiveCommitsWithCompactionCommitInMetadataTableTimeline() thro
}
}
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ public void testArchivalWithMaxDeltaCommitsGuaranteeForCompaction(boolean
enableMetadata) throws Exception {
+ HoodieWriteConfig writeConfig = initTestTableAndGetWriteConfig(
+ enableMetadata, 2, 4, 8, 1, HoodieTableType.MERGE_ON_READ);
+
+ // When max archival commits is set to 4, even after 8 delta commits,
since the number of delta
+ // commits is still smaller than 8, the archival should not kick in.
+ // The archival should only kick in after the 9th delta commit
+ // instant "00000001" to "00000009"
+ for (int i = 1; i < 10; i++) {
+ testTable.doWriteOperation("0000000" + i, WriteOperationType.UPSERT, i
== 1
+ ? Arrays.asList("p1", "p2") : Collections.emptyList(),
Arrays.asList("p1", "p2"), 2);
+ // archival
+ Pair<List<HoodieInstant>, List<HoodieInstant>> commitsList =
archiveAndGetCommitsList(writeConfig);
+ List<HoodieInstant> originalCommits = commitsList.getKey();
+ List<HoodieInstant> commitsAfterArchival = commitsList.getValue();
+
+ if (i <= 8) {
+ assertEquals(originalCommits, commitsAfterArchival);
+ } else {
+ assertEquals(1, originalCommits.size() - commitsAfterArchival.size());
+ assertFalse(commitsAfterArchival.contains(
+ new HoodieInstant(State.COMPLETED,
HoodieTimeline.DELTA_COMMIT_ACTION, "00000001")));
+ IntStream.range(2, 10).forEach(j ->
+ assertTrue(commitsAfterArchival.contains(
+ new HoodieInstant(State.COMPLETED,
HoodieTimeline.DELTA_COMMIT_ACTION, "0000000" + j))));
+ }
+ }
+
+ testTable.doCompaction("00000010", Arrays.asList("p1", "p2"));
+
+ // instant "00000011" to "00000019"
+ for (int i = 1; i < 10; i++) {
+ testTable.doWriteOperation("0000001" + i, WriteOperationType.UPSERT, i
== 1
+ ? Arrays.asList("p1", "p2") : Collections.emptyList(),
Arrays.asList("p1", "p2"), 2);
+ // archival
+ Pair<List<HoodieInstant>, List<HoodieInstant>> commitsList =
archiveAndGetCommitsList(writeConfig);
+ List<HoodieInstant> originalCommits = commitsList.getKey();
+ List<HoodieInstant> commitsAfterArchival = commitsList.getValue();
+
+ // first 9 delta commits before the completed compaction should be
archived
+ IntStream.range(1, 10).forEach(j ->
+ assertFalse(commitsAfterArchival.contains(
+ new HoodieInstant(State.COMPLETED,
HoodieTimeline.DELTA_COMMIT_ACTION, "0000000" + j))));
+
+ if (i == 1) {
+ assertEquals(8, originalCommits.size() - commitsAfterArchival.size());
+ // instant from "00000011" should be in the active timeline
+ assertTrue(commitsAfterArchival.contains(
+ new HoodieInstant(State.COMPLETED, HoodieTimeline.COMMIT_ACTION,
"00000010")));
+ assertTrue(commitsAfterArchival.contains(
+ new HoodieInstant(State.COMPLETED,
HoodieTimeline.DELTA_COMMIT_ACTION, "00000011")));
+ } else if (i < 8) {
+ assertEquals(originalCommits, commitsAfterArchival);
+ } else {
+ assertEquals(1, originalCommits.size() - commitsAfterArchival.size());
+ assertFalse(commitsAfterArchival.contains(
+ new HoodieInstant(State.COMPLETED, HoodieTimeline.COMMIT_ACTION,
"00000010")));
+ // i == 8 -> ["00000011", "00000018"] should be in the active timeline
Review comment:
is this comment valid ?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]