nsivabalan commented on code in PR #18306:
URL: https://github.com/apache/hudi/pull/18306#discussion_r2977749679
##########
hudi-common/src/main/java/org/apache/hudi/common/util/CompactionUtils.java:
##########
@@ -363,6 +367,67 @@ public static Option<Pair<HoodieTimeline, HoodieInstant>>
getDeltaCommitsSinceLa
}
}
+ /**
+ * Returns the most recent log compaction instant from the raw active
timeline.
+ *
+ * @param rawActiveTimeline Active timeline of table, that has current and
previous states of each instant.
+ * @return The latest log compaction instant, or empty if no log compaction
is present.
+ */
+ public static Option<HoodieInstant> getLastLogCompaction(final
HoodieActiveTimeline rawActiveTimeline) {
+ Option<HoodieInstant> lastLogCompactionInstantOption =
Option.fromJavaOptional(
Review Comment:
`lastLogCompactionInstantOpt`
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/ScheduleCompactionActionExecutor.java:
##########
@@ -160,55 +161,69 @@ private Option<Pair<Integer, String>>
getLatestDeltaCommitInfoSinceLastCompactio
return Option.empty();
}
+ private Option<Pair<Integer, String>>
getLatestDeltaCommitInfoSinceLogCompaction() {
+ HoodieActiveTimeline rawActiveTimeline =
table.getMetaClient().getTableFormat()
+ .getTimelineFactory().createActiveTimeline(table.getMetaClient(),
false);
+ Option<Pair<HoodieTimeline, HoodieInstant>> deltaCommitsInfo =
+ CompactionUtils.getDeltaCommitsSinceLatestCompletedLogCompaction(
+ table.getActiveTimeline().getDeltaCommitTimeline(),
rawActiveTimeline);
+ if (deltaCommitsInfo.isPresent()) {
+ return Option.of(Pair.of(
+ deltaCommitsInfo.get().getLeft().countInstants(),
+ deltaCommitsInfo.get().getRight().requestedTime()));
+ }
+ return Option.empty();
+ }
+
private boolean needCompact(CompactionTriggerStrategy
compactionTriggerStrategy) {
boolean compactable;
// get deltaCommitsSinceLastCompaction and lastCompactionTs
- Option<Pair<Integer, String>> latestDeltaCommitInfoOption =
getLatestDeltaCommitInfo();
- if (!latestDeltaCommitInfoOption.isPresent()) {
+ Option<Pair<Integer, String>> latestDeltaCommitInfoSinceCompactOption =
getLatestDeltaCommitInfoSinceCompaction();
+ if (!latestDeltaCommitInfoSinceCompactOption.isPresent()) {
return false;
}
- Pair<Integer, String> latestDeltaCommitInfo =
latestDeltaCommitInfoOption.get();
+ Pair<Integer, String> latestDeltaCommitInfoSinceCompact =
latestDeltaCommitInfoSinceCompactOption.get();
if (WriteOperationType.LOG_COMPACT.equals(operationType)) {
- return true;
+ return needLogCompact(latestDeltaCommitInfoSinceCompact);
Review Comment:
thnx. this looks neat
##########
hudi-common/src/main/java/org/apache/hudi/common/util/CompactionUtils.java:
##########
@@ -363,6 +367,67 @@ public static Option<Pair<HoodieTimeline, HoodieInstant>>
getDeltaCommitsSinceLa
}
}
+ /**
+ * Returns the most recent log compaction instant from the raw active
timeline.
+ *
+ * @param rawActiveTimeline Active timeline of table, that has current and
previous states of each instant.
+ * @return The latest log compaction instant, or empty if no log compaction
is present.
+ */
+ public static Option<HoodieInstant> getLastLogCompaction(final
HoodieActiveTimeline rawActiveTimeline) {
+ Option<HoodieInstant> lastLogCompactionInstantOption =
Option.fromJavaOptional(
+ rawActiveTimeline
+ .filterPendingLogCompactionTimeline()
+ .getReverseOrderedInstants()
+ .findFirst()
+ );
+ if (!lastLogCompactionInstantOption.isPresent()) {
+ return Option.empty();
+ }
+ String logCompactionTimestamp =
lastLogCompactionInstantOption.get().requestedTime();
+ Option<HoodieInstant> completedInstant = Option.fromJavaOptional(
+ rawActiveTimeline
+ .findInstantsInClosedRange(logCompactionTimestamp,
logCompactionTimestamp)
+ .getInstantsAsStream()
+ .filter(HoodieInstant::isCompleted)
+ .findFirst()
+ );
+ return completedInstant.isPresent() ? completedInstant :
lastLogCompactionInstantOption;
+ }
+
+ /**
+ * Returns a pair of (timeline containing the delta commits after the latest
completed
+ * log compaction delta commit, the completed log compaction commit
instant), if the latest completed
+ * log compaction commit is present; a pair of (timeline containing all the
delta commits,
+ * the first delta commit instant), if there is no completed log compaction
commit.
+ *
+ * @param deltaCommitTimeline Active timeline of table that contains only
delta commits.
+ * @param rawActiveTimeline Active timeline of table, that has current and
previous states of each instant.
+ * @return Pair of timeline containing delta commits and an instant.
+ */
+ public static Option<Pair<HoodieTimeline, HoodieInstant>>
getDeltaCommitsSinceLatestCompletedLogCompaction(
+ final HoodieTimeline deltaCommitTimeline,
+ final HoodieActiveTimeline rawActiveTimeline) {
+ Option<HoodieInstant> lastLogCompactionOption =
getLastLogCompaction(rawActiveTimeline);
Review Comment:
for larger names, we can omit "ion" for `Option`.
`lastLogCompactionOpt`
##########
hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnMergeOnReadStorage.java:
##########
@@ -236,6 +236,93 @@ public void testLogCompactionOnMORTable() throws Exception
{
storageConf, Arrays.asList(dataGen.getPartitionPaths()));
}
+ /**
+ * Verify that log compaction is only scheduled when the number of delta
commits since the last
+ * compaction or log compaction meets or exceeds the
LogCompactionBlocksThreshold, and that
+ * the counter resets after each log compaction.
+ */
+ @Test
+ public void testLogCompactionSchedulingRespectsThreshold() throws Exception {
+ int logCompactionThreshold = 3;
+ HoodieCompactionConfig compactionConfig =
HoodieCompactionConfig.newBuilder()
+ .withMaxNumDeltaCommitsBeforeCompaction(1)
+ .withLogCompactionBlocksThreshold(logCompactionThreshold)
+ .build();
+ HoodieWriteConfig config =
getConfigBuilder(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA,
+
HoodieIndex.IndexType.INMEMORY).withCompactionConfig(compactionConfig).build();
+ SparkRDDWriteClient client = getHoodieWriteClient(config);
+
+ // Insert
+ int expectedTotalRecs = 100;
+ String newCommitTime = WriteClientTestUtils.createNewInstantTime();
+ insertBatch(config, client, newCommitTime, "000", expectedTotalRecs,
+ SparkRDDWriteClient::insert, false, false, expectedTotalRecs,
expectedTotalRecs,
+ 1, Option.empty(), INSTANT_GENERATOR);
+
+ // Do enough upserts to trigger compaction
(maxNumDeltaCommitsBeforeCompaction=1)
+ String prevCommitTime = newCommitTime;
+ newCommitTime = WriteClientTestUtils.createNewInstantTime();
+ expectedTotalRecs += 50;
+ updateBatch(config, client, newCommitTime, prevCommitTime,
+ Option.of(Arrays.asList(prevCommitTime)), "000", 50,
SparkRDDWriteClient::upsert,
+ false, false, 50, expectedTotalRecs, 2, config.populateMetaFields(),
INSTANT_GENERATOR);
+
+ // Schedule and execute compaction to establish a baseline
+ Option<String> compactionTimeStamp =
client.scheduleCompaction(Option.empty());
+ assertTrue(compactionTimeStamp.isPresent());
+ HoodieWriteMetadata result = client.compact(compactionTimeStamp.get());
+ client.commitCompaction(compactionTimeStamp.get(), result, Option.empty());
+
+ // Do (threshold - 1) upserts -- below threshold
+ prevCommitTime = compactionTimeStamp.get();
+ for (int i = 0; i < logCompactionThreshold - 1; i++) {
+ newCommitTime = WriteClientTestUtils.createNewInstantTime();
+ expectedTotalRecs += 50;
+ updateBatch(config, client, newCommitTime, prevCommitTime,
+ Option.of(Arrays.asList(prevCommitTime)), "000", 50,
SparkRDDWriteClient::upsert,
+ false, false, 50, expectedTotalRecs, i + 3,
config.populateMetaFields(), INSTANT_GENERATOR);
+ prevCommitTime = newCommitTime;
+ }
+
+ // Log compaction should NOT be scheduled (below threshold)
+ Option<String> logCompactionTimeStamp =
client.scheduleLogCompaction(Option.empty());
+ assertFalse(logCompactionTimeStamp.isPresent(),
+ "Log compaction should not be scheduled when delta commits <
threshold");
+
+ // One more upsert to reach the threshold
+ newCommitTime = WriteClientTestUtils.createNewInstantTime();
+ expectedTotalRecs += 50;
+ updateBatch(config, client, newCommitTime, prevCommitTime,
+ Option.of(Arrays.asList(prevCommitTime)), "000", 50,
SparkRDDWriteClient::upsert,
+ false, false, 50, expectedTotalRecs, logCompactionThreshold + 2,
config.populateMetaFields(), INSTANT_GENERATOR);
+ prevCommitTime = newCommitTime;
+
+ // Log compaction SHOULD be scheduled now (at threshold)
+ logCompactionTimeStamp = client.scheduleLogCompaction(Option.empty());
+ assertTrue(logCompactionTimeStamp.isPresent(),
+ "Log compaction should be scheduled when delta commits >= threshold");
+ result = client.logCompact(logCompactionTimeStamp.get());
+ client.commitLogCompaction(logCompactionTimeStamp.get(), result,
Option.empty());
+
+ // After log compaction, do (threshold - 1) upserts again -- below
threshold
+ prevCommitTime = logCompactionTimeStamp.get();
+ int totalCommitsAfterLogCompact = logCompactionThreshold + 3;
+ for (int i = 0; i < logCompactionThreshold - 1; i++) {
+ newCommitTime = WriteClientTestUtils.createNewInstantTime();
+ expectedTotalRecs += 50;
+ updateBatch(config, client, newCommitTime, prevCommitTime,
+ Option.of(Arrays.asList(prevCommitTime)), "000", 50,
SparkRDDWriteClient::upsert,
+ false, false, 50, expectedTotalRecs, totalCommitsAfterLogCompact + i,
+ config.populateMetaFields(), INSTANT_GENERATOR);
+ prevCommitTime = newCommitTime;
+ }
+
+ // Log compaction should NOT be scheduled again (counter reset, below
threshold)
Review Comment:
do we have test, where we have pending compaction and log blocks threshold
is not met ?
##########
hudi-common/src/main/java/org/apache/hudi/common/util/CompactionUtils.java:
##########
@@ -363,6 +367,67 @@ public static Option<Pair<HoodieTimeline, HoodieInstant>>
getDeltaCommitsSinceLa
}
}
+ /**
+ * Returns the most recent log compaction instant from the raw active
timeline.
+ *
+ * @param rawActiveTimeline Active timeline of table, that has current and
previous states of each instant.
+ * @return The latest log compaction instant, or empty if no log compaction
is present.
+ */
+ public static Option<HoodieInstant> getLastLogCompaction(final
HoodieActiveTimeline rawActiveTimeline) {
+ Option<HoodieInstant> lastLogCompactionInstantOption =
Option.fromJavaOptional(
+ rawActiveTimeline
+ .filterPendingLogCompactionTimeline()
+ .getReverseOrderedInstants()
+ .findFirst()
+ );
+ if (!lastLogCompactionInstantOption.isPresent()) {
+ return Option.empty();
+ }
+ String logCompactionTimestamp =
lastLogCompactionInstantOption.get().requestedTime();
+ Option<HoodieInstant> completedInstant = Option.fromJavaOptional(
Review Comment:
`completedInstantOpt`
##########
hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnMergeOnReadStorage.java:
##########
@@ -236,6 +236,93 @@ public void testLogCompactionOnMORTable() throws Exception
{
storageConf, Arrays.asList(dataGen.getPartitionPaths()));
}
+ /**
+ * Verify that log compaction is only scheduled when the number of delta
commits since the last
+ * compaction or log compaction meets or exceeds the
LogCompactionBlocksThreshold, and that
+ * the counter resets after each log compaction.
+ */
+ @Test
+ public void testLogCompactionSchedulingRespectsThreshold() throws Exception {
+ int logCompactionThreshold = 3;
Review Comment:
instead of end to end functional test, can we try mocking timeline entries
and write tests based on that. lower test runtimes and we could cover all diff
combinations.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]