danny0405 commented on code in PR #6100:
URL: https://github.com/apache/hudi/pull/6100#discussion_r931948444
##########
hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestCompactionUtil.java:
##########
@@ -149,6 +151,73 @@ void testScheduleCompaction() throws Exception {
assertThat("Two compaction plan expects to be scheduled",
numCompactionCommits, is(2));
}
+ @Test
+ void testGetScheduleCompactionInstant() throws Exception {
+ Map<String, String> options = new HashMap<>();
+ options.put(FlinkOptions.COMPACTION_SCHEDULE_ENABLED.key(), "false");
+ options.put(FlinkOptions.COMPACTION_TRIGGER_STRATEGY.key(),
FlinkOptions.TIME_ELAPSED);
+ options.put(FlinkOptions.COMPACTION_DELTA_SECONDS.key(), "0");
+ beforeEach(options);
+
+ // write a commit with data first
+ TestData.writeDataAsBatch(TestData.DATA_SET_SINGLE_INSERT, conf);
+
+ HoodieFlinkWriteClient<?> writeClient =
StreamerUtil.createWriteClient(conf);
+ CompactionUtil.scheduleCompaction(metaClient, writeClient, true, true);
+
+ Option<HoodieInstant> pendingCompactionInstant =
metaClient.reloadActiveTimeline().filterPendingCompactionTimeline().lastInstant();
+ assertTrue(pendingCompactionInstant.isPresent(), "A compaction plan
expects to be scheduled");
+
+ // write another commit with data and start a new instant
+ TestData.writeDataAsBatch(TestData.DATA_SET_INSERT, conf);
+ TimeUnit.SECONDS.sleep(3); // in case the instant time interval is too
close
+ writeClient.startCommit();
+
+ CompactionUtil.scheduleCompaction(metaClient, writeClient, true, false);
+ Option<HoodieInstant> lastPendingCompactionInstant =
CompactionUtil.getScheduleCompactionInstant(table, conf);
+
assertEquals(table.getMetaClient().reloadActiveTimeline().filterPendingCompactionTimeline().firstInstant(),
lastPendingCompactionInstant);
+ conf.set(FlinkOptions.COMPACTION_SCHEDULE_SEQ, "LIFO");
+ Option<HoodieInstant> firstPendingCompactionInstant =
CompactionUtil.getScheduleCompactionInstant(table, conf);
+
assertEquals(table.getMetaClient().reloadActiveTimeline().filterPendingCompactionTimeline().lastInstant(),
firstPendingCompactionInstant);
+ }
+
+ @Test
+ void testGetScheduleCompactionInstantWithDynamicSeq() throws Exception {
+ Map<String, String> options = new HashMap<>();
+ options.put(FlinkOptions.COMPACTION_SCHEDULE_ENABLED.key(), "false");
+ options.put(FlinkOptions.COMPACTION_TRIGGER_STRATEGY.key(),
FlinkOptions.TIME_ELAPSED);
+ options.put(FlinkOptions.COMPACTION_DELTA_SECONDS.key(), "0");
+ options.put(FlinkOptions.COMPACTION_SCHEDULE_SEQ.key(), "DYNAMIC");
+ options.put(FlinkOptions.PENDING_PLAN_NUM.key(), "3");
+ beforeEach(options);
+
+ createCompactPlan(2);
+ //pending plan num smaller than PENDING_PLAN_NUM,schedule FIFO
+ Option<HoodieInstant> firstPendingCompactionInstant =
CompactionUtil.getScheduleCompactionInstant(table, conf);
+
assertEquals(table.getMetaClient().reloadActiveTimeline().filterPendingCompactionTimeline().firstInstant(),
firstPendingCompactionInstant);
+
+ createCompactPlan(3);
+ //pending plan num bigger than PENDING_PLAN_NUM,schedule LIFO
+ Option<HoodieInstant> lastPendingCompactionInstant =
CompactionUtil.getScheduleCompactionInstant(table, conf);
+
assertEquals(table.getMetaClient().reloadActiveTimeline().filterPendingCompactionTimeline().lastInstant(),
lastPendingCompactionInstant);
+ }
+
+ public void createCompactPlan(int num) {
+ IntStream.range(0, num).forEach(i -> {
+ HoodieCompactionOperation operation = new HoodieCompactionOperation();
Review Comment:
Can method `generateCompactionPlan` be reused ?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]