This is an automated email from the ASF dual-hosted git repository. yihua pushed a commit to branch branch-0.x in repository https://gitbox.apache.org/repos/asf/hudi.git
commit 7c0f9ac7965c20c4b6fe5dd66c1018c038269d84 Author: Danny Chan <[email protected]> AuthorDate: Tue May 14 16:31:25 2024 -0700 [HUDI-7572] Avoid to schedule empty compaction plan without log files (#10974) --- .../BaseHoodieCompactionPlanGenerator.java | 9 ++++- .../table/action/compact/CompactionTestBase.java | 18 +++++++++ .../table/action/compact/TestAsyncCompaction.java | 43 ++++++++++------------ .../table/action/compact/TestInlineCompaction.java | 4 +- 4 files changed, 47 insertions(+), 27 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/plan/generators/BaseHoodieCompactionPlanGenerator.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/plan/generators/BaseHoodieCompactionPlanGenerator.java index 2c92c3b87cb..2d528227797 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/plan/generators/BaseHoodieCompactionPlanGenerator.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/plan/generators/BaseHoodieCompactionPlanGenerator.java @@ -136,7 +136,12 @@ public abstract class BaseHoodieCompactionPlanGenerator<T extends HoodieRecordPa LOG.info("Total number of file slices " + totalFileSlices.value()); if (operations.isEmpty()) { - LOG.warn("No operations are retrieved for " + metaClient.getBasePath()); + LOG.warn("No operations are retrieved for {}", metaClient.getBasePathV2()); + return null; + } + + if (totalLogFiles.value() <= 0) { + LOG.warn("No log files are retrieved for {}", metaClient.getBasePathV2()); return null; } @@ -149,7 +154,7 @@ public abstract class BaseHoodieCompactionPlanGenerator<T extends HoodieRecordPa + "Please fix your strategy implementation. FileIdsWithPendingCompactions :" + fgIdsInPendingCompactionAndClustering + ", Selected workload :" + compactionPlan); if (compactionPlan.getOperations().isEmpty()) { - LOG.warn("After filtering, Nothing to compact for " + metaClient.getBasePath()); + LOG.warn("After filtering, Nothing to compact for {}", metaClient.getBasePathV2()); } return compactionPlan; } diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/CompactionTestBase.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/CompactionTestBase.java index 5596b433d4f..47e1420a9dc 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/CompactionTestBase.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/CompactionTestBase.java @@ -29,6 +29,7 @@ import org.apache.hudi.common.model.HoodieBaseFile; import org.apache.hudi.common.model.HoodieFileGroupId; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieTableType; +import org.apache.hudi.common.table.HoodieTableConfig; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; @@ -53,6 +54,7 @@ import org.apache.spark.api.java.JavaRDD; import java.io.IOException; import java.util.Arrays; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Set; @@ -78,6 +80,7 @@ public class CompactionTestBase extends HoodieClientTestBase { .hfileMaxFileSize(1024 * 1024 * 1024).parquetMaxFileSize(1024 * 1024 * 1024).orcMaxFileSize(1024 * 1024 * 1024).build()) .forTable("test-trip-table") .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build()) + .withProps(Collections.singletonMap(HoodieTableConfig.TYPE.key(), HoodieTableType.MERGE_ON_READ.name())) .withEmbeddedTimelineServerEnabled(true); } @@ -163,6 +166,21 @@ public class CompactionTestBase extends HoodieClientTestBase { assertEquals(compactionInstantTime, instant.getTimestamp(), "Last compaction instant must be the one set"); } + /** + * Tries to schedule a compaction plan and returns the latest pending compaction instant time. + * + * @param compactionInstantTime The given compaction instant time + * @param client The write client + * @param cfg The write config + * + * @return The latest pending instant time. + */ + protected String tryScheduleCompaction(String compactionInstantTime, SparkRDDWriteClient client, HoodieWriteConfig cfg) { + client.scheduleCompactionAtInstant(compactionInstantTime, Option.empty()); + HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(cfg.getBasePath()).build(); + return metaClient.getActiveTimeline().filterPendingCompactionTimeline().lastInstant().map(HoodieInstant::getTimestamp).orElse(null); + } + protected void scheduleAndExecuteCompaction(String compactionInstantTime, SparkRDDWriteClient client, HoodieTable table, HoodieWriteConfig cfg, int expectedNumRecs, boolean hasDeltaCommitAfterPendingCompaction) throws IOException { scheduleCompaction(compactionInstantTime, client, cfg); diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestAsyncCompaction.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestAsyncCompaction.java index cf915b4c14a..0d3804720ac 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestAsyncCompaction.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestAsyncCompaction.java @@ -23,6 +23,7 @@ import org.apache.hudi.client.SparkRDDWriteClient; import org.apache.hudi.common.model.HoodieFileGroupId; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieInstant.State; import org.apache.hudi.common.table.timeline.HoodieTimeline; @@ -38,13 +39,16 @@ import org.junit.jupiter.api.Test; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.List; import java.util.Set; import java.util.stream.Collectors; +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; -import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertTrue; /** @@ -194,7 +198,7 @@ public class TestAsyncCompaction extends CompactionTestBase { @Test public void testScheduleIngestionBeforePendingCompaction() throws Exception { - // Case: Failure case. Latest pending compaction instant time must be earlier than this instant time + // Case: Non-serial case. Latest pending compaction instant time can be earlier than this instant time HoodieWriteConfig cfg = getConfig(false); SparkRDDWriteClient client = getHoodieWriteClient(cfg); SparkRDDReadClient readClient = getHoodieReadClient(cfg.getBasePath()); @@ -210,16 +214,17 @@ public class TestAsyncCompaction extends CompactionTestBase { new ArrayList<>()); // Schedule compaction but do not run them - scheduleCompaction(compactionInstantTime, client, cfg); + String compactInstantTime = HoodieActiveTimeline.createNewInstantTime(); + scheduleCompaction(compactInstantTime, client, cfg); HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(cfg.getBasePath()).build(); HoodieInstant pendingCompactionInstant = metaClient.getActiveTimeline().filterPendingCompactionTimeline().firstInstant().get(); - assertEquals(compactionInstantTime, pendingCompactionInstant.getTimestamp(), "Pending Compaction instant has expected instant time"); + assertEquals(compactInstantTime, pendingCompactionInstant.getTimestamp(), "Pending Compaction instant has expected instant time"); - assertThrows(IllegalArgumentException.class, () -> { - runNextDeltaCommits(client, readClient, Arrays.asList(failedInstantTime), records, cfg, false, - Arrays.asList(compactionInstantTime)); - }, "Latest pending compaction instant time must be earlier than this instant time"); + assertDoesNotThrow(() -> { + runNextDeltaCommits(client, readClient, Collections.singletonList(failedInstantTime), records, cfg, false, + Collections.singletonList(compactInstantTime)); + }, "Latest pending compaction instant time can be earlier than this instant time"); } @Test @@ -272,23 +277,15 @@ public class TestAsyncCompaction extends CompactionTestBase { runNextDeltaCommits(client, readClient, Arrays.asList(firstInstantTime, secondInstantTime), records, cfg, true, new ArrayList<>()); - assertThrows(IllegalArgumentException.class, () -> { - // Schedule compaction but do not run them - scheduleCompaction(compactionInstantTime, client, cfg); - }, "Compaction Instant to be scheduled cannot have older timestamp"); + // Schedule compaction but do not run them + assertNull(tryScheduleCompaction(compactionInstantTime, client, cfg), "Compaction Instant can be scheduled with older timestamp"); // Schedule with timestamp same as that of committed instant - assertThrows(IllegalArgumentException.class, () -> { - // Schedule compaction but do not run them - scheduleCompaction(secondInstantTime, client, cfg); - }, "Compaction Instant to be scheduled cannot have same timestamp as committed instant"); - - final String compactionInstantTime2 = "006"; - scheduleCompaction(compactionInstantTime2, client, cfg); - assertThrows(IllegalArgumentException.class, () -> { - // Schedule compaction with the same times as a pending compaction - scheduleCompaction(secondInstantTime, client, cfg); - }, "Compaction Instant to be scheduled cannot have same timestamp as a pending compaction"); + assertNull(tryScheduleCompaction(secondInstantTime, client, cfg), "Compaction Instant to be scheduled can have same timestamp as committed instant"); + + final String compactionInstantTime2 = HoodieActiveTimeline.createNewInstantTime(); + // Schedule compaction but do not run them + assertNotNull(tryScheduleCompaction(compactionInstantTime2, client, cfg), "Compaction Instant can be scheduled with greater timestamp"); } @Test diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestInlineCompaction.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestInlineCompaction.java index 9e7d1b2f666..3ab6580e72b 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestInlineCompaction.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestInlineCompaction.java @@ -55,7 +55,7 @@ public class TestInlineCompaction extends CompactionTestBase { .build(); } - private HoodieWriteConfig getConfigDisableComapction(int maxDeltaCommits, int maxDeltaTime, CompactionTriggerStrategy inlineCompactionType) { + private HoodieWriteConfig getConfigDisableCompaction(int maxDeltaCommits, int maxDeltaTime, CompactionTriggerStrategy inlineCompactionType) { return getConfigBuilder(false) .withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(false).build()) .withCompactionConfig(HoodieCompactionConfig.newBuilder() @@ -111,7 +111,7 @@ public class TestInlineCompaction extends CompactionTestBase { @Test public void testSuccessfulCompactionBasedOnNumAfterCompactionRequest() throws Exception { // Given: make 4 commits - HoodieWriteConfig cfg = getConfigDisableComapction(4, 60, CompactionTriggerStrategy.NUM_COMMITS_AFTER_LAST_REQUEST); + HoodieWriteConfig cfg = getConfigDisableCompaction(4, 60, CompactionTriggerStrategy.NUM_COMMITS_AFTER_LAST_REQUEST); // turn off compaction table service to mock compaction service is down or very slow List<String> instants = IntStream.range(0, 4).mapToObj(i -> HoodieActiveTimeline.createNewInstantTime()).collect(Collectors.toList());
