kbuci commented on code in PR #18306:
URL: https://github.com/apache/hudi/pull/18306#discussion_r2998518431
##########
hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnMergeOnReadStorage.java:
##########
@@ -236,6 +236,93 @@ public void testLogCompactionOnMORTable() throws Exception
{
storageConf, Arrays.asList(dataGen.getPartitionPaths()));
}
+ /**
+ * Verify that log compaction is only scheduled when the number of delta
commits since the last
+ * compaction or log compaction meets or exceeds the
LogCompactionBlocksThreshold, and that
+ * the counter resets after each log compaction.
+ */
+ @Test
+ public void testLogCompactionSchedulingRespectsThreshold() throws Exception {
+ int logCompactionThreshold = 3;
+ HoodieCompactionConfig compactionConfig =
HoodieCompactionConfig.newBuilder()
+ .withMaxNumDeltaCommitsBeforeCompaction(1)
+ .withLogCompactionBlocksThreshold(logCompactionThreshold)
+ .build();
+ HoodieWriteConfig config =
getConfigBuilder(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA,
+
HoodieIndex.IndexType.INMEMORY).withCompactionConfig(compactionConfig).build();
+ SparkRDDWriteClient client = getHoodieWriteClient(config);
+
+ // Insert
+ int expectedTotalRecs = 100;
+ String newCommitTime = WriteClientTestUtils.createNewInstantTime();
+ insertBatch(config, client, newCommitTime, "000", expectedTotalRecs,
+ SparkRDDWriteClient::insert, false, false, expectedTotalRecs,
expectedTotalRecs,
+ 1, Option.empty(), INSTANT_GENERATOR);
+
+ // Do enough upserts to trigger compaction
(maxNumDeltaCommitsBeforeCompaction=1)
+ String prevCommitTime = newCommitTime;
+ newCommitTime = WriteClientTestUtils.createNewInstantTime();
+ expectedTotalRecs += 50;
+ updateBatch(config, client, newCommitTime, prevCommitTime,
+ Option.of(Arrays.asList(prevCommitTime)), "000", 50,
SparkRDDWriteClient::upsert,
+ false, false, 50, expectedTotalRecs, 2, config.populateMetaFields(),
INSTANT_GENERATOR);
+
+ // Schedule and execute compaction to establish a baseline
+ Option<String> compactionTimeStamp =
client.scheduleCompaction(Option.empty());
+ assertTrue(compactionTimeStamp.isPresent());
+ HoodieWriteMetadata result = client.compact(compactionTimeStamp.get());
+ client.commitCompaction(compactionTimeStamp.get(), result, Option.empty());
+
+ // Do (threshold - 1) upserts -- below threshold
+ prevCommitTime = compactionTimeStamp.get();
+ for (int i = 0; i < logCompactionThreshold - 1; i++) {
+ newCommitTime = WriteClientTestUtils.createNewInstantTime();
+ expectedTotalRecs += 50;
+ updateBatch(config, client, newCommitTime, prevCommitTime,
+ Option.of(Arrays.asList(prevCommitTime)), "000", 50,
SparkRDDWriteClient::upsert,
+ false, false, 50, expectedTotalRecs, i + 3,
config.populateMetaFields(), INSTANT_GENERATOR);
+ prevCommitTime = newCommitTime;
+ }
+
+ // Log compaction should NOT be scheduled (below threshold)
+ Option<String> logCompactionTimeStamp =
client.scheduleLogCompaction(Option.empty());
+ assertFalse(logCompactionTimeStamp.isPresent(),
+ "Log compaction should not be scheduled when delta commits <
threshold");
+
+ // One more upsert to reach the threshold
+ newCommitTime = WriteClientTestUtils.createNewInstantTime();
+ expectedTotalRecs += 50;
+ updateBatch(config, client, newCommitTime, prevCommitTime,
+ Option.of(Arrays.asList(prevCommitTime)), "000", 50,
SparkRDDWriteClient::upsert,
+ false, false, 50, expectedTotalRecs, logCompactionThreshold + 2,
config.populateMetaFields(), INSTANT_GENERATOR);
+ prevCommitTime = newCommitTime;
+
+ // Log compaction SHOULD be scheduled now (at threshold)
+ logCompactionTimeStamp = client.scheduleLogCompaction(Option.empty());
+ assertTrue(logCompactionTimeStamp.isPresent(),
+ "Log compaction should be scheduled when delta commits >= threshold");
+ result = client.logCompact(logCompactionTimeStamp.get());
+ client.commitLogCompaction(logCompactionTimeStamp.get(), result,
Option.empty());
+
+ // After log compaction, do (threshold - 1) upserts again -- below
threshold
+ prevCommitTime = logCompactionTimeStamp.get();
+ int totalCommitsAfterLogCompact = logCompactionThreshold + 3;
+ for (int i = 0; i < logCompactionThreshold - 1; i++) {
+ newCommitTime = WriteClientTestUtils.createNewInstantTime();
+ expectedTotalRecs += 50;
+ updateBatch(config, client, newCommitTime, prevCommitTime,
+ Option.of(Arrays.asList(prevCommitTime)), "000", 50,
SparkRDDWriteClient::upsert,
+ false, false, 50, expectedTotalRecs, totalCommitsAfterLogCompact + i,
+ config.populateMetaFields(), INSTANT_GENERATOR);
+ prevCommitTime = newCommitTime;
+ }
+
+ // Log compaction should NOT be scheduled again (counter reset, below
threshold)
Review Comment:
Sure added tests for both pending compaction and pending logcompaction (when
log blocks threhsold not met)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]