This is an automated email from the ASF dual-hosted git repository.
codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 27217621a37 [HUDI-7572] Avoid to schedule empty compaction plan
without log files (#10974)
27217621a37 is described below
commit 27217621a37ed81d0718f5f8814aa828e8ab0b20
Author: Danny Chan <[email protected]>
AuthorDate: Tue Apr 9 17:18:18 2024 +0800
[HUDI-7572] Avoid to schedule empty compaction plan without log files
(#10974)
---
.../BaseHoodieCompactionPlanGenerator.java | 9 +++++--
.../table/action/compact/CompactionTestBase.java | 18 +++++++++++++
.../table/action/compact/TestAsyncCompaction.java | 30 +++++++++-------------
.../table/action/compact/TestInlineCompaction.java | 4 +--
4 files changed, 39 insertions(+), 22 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/plan/generators/BaseHoodieCompactionPlanGenerator.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/plan/generators/BaseHoodieCompactionPlanGenerator.java
index 2626bc59918..9f6fb0d786a 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/plan/generators/BaseHoodieCompactionPlanGenerator.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/plan/generators/BaseHoodieCompactionPlanGenerator.java
@@ -151,7 +151,12 @@ public abstract class BaseHoodieCompactionPlanGenerator<T
extends HoodieRecordPa
LOG.info("Total number of file slices " + totalFileSlices.value());
if (operations.isEmpty()) {
- LOG.warn("No operations are retrieved for " + metaClient.getBasePath());
+ LOG.warn("No operations are retrieved for {}",
metaClient.getBasePathV2());
+ return null;
+ }
+
+ if (totalLogFiles.value() <= 0) {
+ LOG.warn("No log files are retrieved for {}",
metaClient.getBasePathV2());
return null;
}
@@ -164,7 +169,7 @@ public abstract class BaseHoodieCompactionPlanGenerator<T
extends HoodieRecordPa
+ "Please fix your strategy implementation.
FileIdsWithPendingCompactions :" + fgIdsInPendingCompactionAndClustering
+ ", Selected workload :" + compactionPlan);
if (compactionPlan.getOperations().isEmpty()) {
- LOG.warn("After filtering, Nothing to compact for " +
metaClient.getBasePath());
+ LOG.warn("After filtering, Nothing to compact for {}",
metaClient.getBasePathV2());
}
return compactionPlan;
}
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/CompactionTestBase.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/CompactionTestBase.java
index e425c4c4352..973241ae592 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/CompactionTestBase.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/CompactionTestBase.java
@@ -30,6 +30,7 @@ import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieFileGroupId;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
@@ -54,6 +55,7 @@ import org.apache.spark.api.java.JavaRDD;
import java.io.IOException;
import java.util.Arrays;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -79,6 +81,7 @@ public class CompactionTestBase extends HoodieClientTestBase {
.hfileMaxFileSize(1024 * 1024 * 1024).parquetMaxFileSize(1024 *
1024 * 1024).orcMaxFileSize(1024 * 1024 * 1024).build())
.forTable("test-trip-table")
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build())
+ .withProps(Collections.singletonMap(HoodieTableConfig.TYPE.key(),
HoodieTableType.MERGE_ON_READ.name()))
.withEmbeddedTimelineServerEnabled(true);
}
@@ -164,6 +167,21 @@ public class CompactionTestBase extends
HoodieClientTestBase {
assertEquals(compactionInstantTime, instant.getTimestamp(), "Last
compaction instant must be the one set");
}
+ /**
+ * Tries to schedule a compaction plan and returns the latest pending
compaction instant time.
+ *
+ * @param compactionInstantTime The given compaction instant time
+ * @param client The write client
+ * @param cfg The write config
+ *
+ * @return The latest pending instant time.
+ */
+ protected String tryScheduleCompaction(String compactionInstantTime,
SparkRDDWriteClient client, HoodieWriteConfig cfg) {
+ client.scheduleCompactionAtInstant(compactionInstantTime, Option.empty());
+ HoodieTableMetaClient metaClient =
HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(cfg.getBasePath()).build();
+ return
metaClient.getActiveTimeline().filterPendingCompactionTimeline().lastInstant().map(HoodieInstant::getTimestamp).orElse(null);
+ }
+
protected void scheduleAndExecuteCompaction(String compactionInstantTime,
SparkRDDWriteClient client, HoodieTable table,
HoodieWriteConfig cfg, int
expectedNumRecs, boolean hasDeltaCommitAfterPendingCompaction) throws
IOException {
scheduleCompaction(compactionInstantTime, client, cfg);
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestAsyncCompaction.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestAsyncCompaction.java
index 456fc31cb0d..7aa652c6eaf 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestAsyncCompaction.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestAsyncCompaction.java
@@ -28,7 +28,6 @@ import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieInstant.State;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
-import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieSparkTable;
import org.apache.hudi.table.HoodieTable;
@@ -47,6 +46,8 @@ import java.util.stream.Collectors;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
/**
@@ -242,7 +243,7 @@ public class TestAsyncCompaction extends CompactionTestBase
{
@Test
public void testScheduleIngestionBeforePendingCompaction() throws Exception {
- // Case: Failure case. Latest pending compaction instant time can be
earlier than this instant time
+ // Case: Non-serial case. Latest pending compaction instant time can be
earlier than this instant time
HoodieWriteConfig cfg = getConfig(false);
SparkRDDWriteClient client = getHoodieWriteClient(cfg);
SparkRDDReadClient readClient = getHoodieReadClient(cfg.getBasePath());
@@ -250,7 +251,6 @@ public class TestAsyncCompaction extends CompactionTestBase
{
String firstInstantTime = "001";
String secondInstantTime = "004";
String failedInstantTime = "005";
- String compactionInstantTime = client.createNewInstantTime();
int numRecs = 2000;
final List<HoodieRecord> initialRecords =
dataGen.generateInserts(firstInstantTime, numRecs);
@@ -258,6 +258,7 @@ public class TestAsyncCompaction extends CompactionTestBase
{
new ArrayList<>());
// Schedule compaction but do not run them
+ String compactionInstantTime = client.createNewInstantTime();
scheduleCompaction(compactionInstantTime, client, cfg);
HoodieTableMetaClient metaClient =
HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(cfg.getBasePath()).build();
HoodieInstant pendingCompactionInstant =
@@ -265,8 +266,8 @@ public class TestAsyncCompaction extends CompactionTestBase
{
assertEquals(compactionInstantTime,
pendingCompactionInstant.getTimestamp(), "Pending Compaction instant has
expected instant time");
assertDoesNotThrow(() -> {
- runNextDeltaCommits(client, readClient,
Arrays.asList(failedInstantTime), records, cfg, false,
- Arrays.asList(compactionInstantTime));
+ runNextDeltaCommits(client, readClient,
Collections.singletonList(failedInstantTime), records, cfg, false,
+ Collections.singletonList(compactionInstantTime));
}, "Latest pending compaction instant time can be earlier than this
instant time");
}
@@ -319,22 +320,15 @@ public class TestAsyncCompaction extends
CompactionTestBase {
runNextDeltaCommits(client, readClient, Arrays.asList(firstInstantTime,
secondInstantTime), records, cfg, true,
new ArrayList<>());
- assertDoesNotThrow(() -> {
- // Schedule compaction but do not run them
- scheduleCompaction(compactionInstantTime, client, cfg);
- }, "Compaction Instant can be scheduled with older timestamp");
+ // Schedule compaction but do not run them
+ assertNull(tryScheduleCompaction(compactionInstantTime, client, cfg),
"Compaction Instant can be scheduled with older timestamp");
// Schedule with timestamp same as that of committed instant
- assertDoesNotThrow(() -> {
- // Schedule compaction but do not run them
- client.scheduleCompactionAtInstant(secondInstantTime, Option.empty());
- }, "Compaction Instant to be scheduled can have same timestamp as
committed instant");
+ assertNull(tryScheduleCompaction(secondInstantTime, client, cfg),
"Compaction Instant to be scheduled can have same timestamp as committed
instant");
- final String compactionInstantTime2 = "006";
- assertDoesNotThrow(() -> {
- // Schedule compaction but do not run them
- client.scheduleCompactionAtInstant(compactionInstantTime2,
Option.empty());
- }, "Compaction Instant can be scheduled with greater timestamp");
+ final String compactionInstantTime2 = client.createNewInstantTime();
+ // Schedule compaction but do not run them
+ assertNotNull(tryScheduleCompaction(compactionInstantTime2, client, cfg),
"Compaction Instant can be scheduled with greater timestamp");
}
@Test
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestInlineCompaction.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestInlineCompaction.java
index 6454f770e12..fd0d3556a9f 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestInlineCompaction.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestInlineCompaction.java
@@ -54,7 +54,7 @@ public class TestInlineCompaction extends CompactionTestBase {
.build();
}
- private HoodieWriteConfig getConfigDisableComapction(int maxDeltaCommits,
int maxDeltaTime, CompactionTriggerStrategy inlineCompactionType) {
+ private HoodieWriteConfig getConfigDisableCompaction(int maxDeltaCommits,
int maxDeltaTime, CompactionTriggerStrategy inlineCompactionType) {
return getConfigBuilder(false)
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(false).build())
.withCompactionConfig(HoodieCompactionConfig.newBuilder()
@@ -111,7 +111,7 @@ public class TestInlineCompaction extends
CompactionTestBase {
@Test
public void testSuccessfulCompactionBasedOnNumAfterCompactionRequest()
throws Exception {
// Given: make 4 commits
- HoodieWriteConfig cfg = getConfigDisableComapction(4, 60,
CompactionTriggerStrategy.NUM_COMMITS_AFTER_LAST_REQUEST);
+ HoodieWriteConfig cfg = getConfigDisableCompaction(4, 60,
CompactionTriggerStrategy.NUM_COMMITS_AFTER_LAST_REQUEST);
// turn off compaction table service to mock compaction service is down or
very slow
try (SparkRDDWriteClient<?> writeClient = getHoodieWriteClient(cfg)) {
List<String> instants = IntStream.range(0, 4).mapToObj(i ->
writeClient.createNewInstantTime()).collect(Collectors.toList());