This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch branch-0.x
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit 7c0f9ac7965c20c4b6fe5dd66c1018c038269d84
Author: Danny Chan <[email protected]>
AuthorDate: Tue May 14 16:31:25 2024 -0700

    [HUDI-7572] Avoid to schedule empty compaction plan without log files 
(#10974)
---
 .../BaseHoodieCompactionPlanGenerator.java         |  9 ++++-
 .../table/action/compact/CompactionTestBase.java   | 18 +++++++++
 .../table/action/compact/TestAsyncCompaction.java  | 43 ++++++++++------------
 .../table/action/compact/TestInlineCompaction.java |  4 +-
 4 files changed, 47 insertions(+), 27 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/plan/generators/BaseHoodieCompactionPlanGenerator.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/plan/generators/BaseHoodieCompactionPlanGenerator.java
index 2c92c3b87cb..2d528227797 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/plan/generators/BaseHoodieCompactionPlanGenerator.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/plan/generators/BaseHoodieCompactionPlanGenerator.java
@@ -136,7 +136,12 @@ public abstract class BaseHoodieCompactionPlanGenerator<T 
extends HoodieRecordPa
     LOG.info("Total number of file slices " + totalFileSlices.value());
 
     if (operations.isEmpty()) {
-      LOG.warn("No operations are retrieved for " + metaClient.getBasePath());
+      LOG.warn("No operations are retrieved for {}", 
metaClient.getBasePathV2());
+      return null;
+    }
+
+    if (totalLogFiles.value() <= 0) {
+      LOG.warn("No log files are retrieved for {}", 
metaClient.getBasePathV2());
       return null;
     }
 
@@ -149,7 +154,7 @@ public abstract class BaseHoodieCompactionPlanGenerator<T 
extends HoodieRecordPa
             + "Please fix your strategy implementation. 
FileIdsWithPendingCompactions :" + fgIdsInPendingCompactionAndClustering
             + ", Selected workload :" + compactionPlan);
     if (compactionPlan.getOperations().isEmpty()) {
-      LOG.warn("After filtering, Nothing to compact for " + 
metaClient.getBasePath());
+      LOG.warn("After filtering, Nothing to compact for {}", 
metaClient.getBasePathV2());
     }
     return compactionPlan;
   }
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/CompactionTestBase.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/CompactionTestBase.java
index 5596b433d4f..47e1420a9dc 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/CompactionTestBase.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/CompactionTestBase.java
@@ -29,6 +29,7 @@ import org.apache.hudi.common.model.HoodieBaseFile;
 import org.apache.hudi.common.model.HoodieFileGroupId;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.table.HoodieTableConfig;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
@@ -53,6 +54,7 @@ import org.apache.spark.api.java.JavaRDD;
 
 import java.io.IOException;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -78,6 +80,7 @@ public class CompactionTestBase extends HoodieClientTestBase {
             .hfileMaxFileSize(1024 * 1024 * 1024).parquetMaxFileSize(1024 * 
1024 * 1024).orcMaxFileSize(1024 * 1024 * 1024).build())
         .forTable("test-trip-table")
         
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build())
+        .withProps(Collections.singletonMap(HoodieTableConfig.TYPE.key(), 
HoodieTableType.MERGE_ON_READ.name()))
         .withEmbeddedTimelineServerEnabled(true);
   }
 
@@ -163,6 +166,21 @@ public class CompactionTestBase extends 
HoodieClientTestBase {
     assertEquals(compactionInstantTime, instant.getTimestamp(), "Last 
compaction instant must be the one set");
   }
 
+  /**
+   * Tries to schedule a compaction plan and returns the latest pending 
compaction instant time.
+   *
+   * @param compactionInstantTime The given compaction instant time
+   * @param client                The write client
+   * @param cfg                   The write config
+   *
+   * @return The latest pending instant time.
+   */
+  protected String tryScheduleCompaction(String compactionInstantTime, 
SparkRDDWriteClient client, HoodieWriteConfig cfg) {
+    client.scheduleCompactionAtInstant(compactionInstantTime, Option.empty());
+    HoodieTableMetaClient metaClient = 
HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(cfg.getBasePath()).build();
+    return 
metaClient.getActiveTimeline().filterPendingCompactionTimeline().lastInstant().map(HoodieInstant::getTimestamp).orElse(null);
+  }
+
   protected void scheduleAndExecuteCompaction(String compactionInstantTime, 
SparkRDDWriteClient client, HoodieTable table,
                                             HoodieWriteConfig cfg, int 
expectedNumRecs, boolean hasDeltaCommitAfterPendingCompaction) throws 
IOException {
     scheduleCompaction(compactionInstantTime, client, cfg);
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestAsyncCompaction.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestAsyncCompaction.java
index cf915b4c14a..0d3804720ac 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestAsyncCompaction.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestAsyncCompaction.java
@@ -23,6 +23,7 @@ import org.apache.hudi.client.SparkRDDWriteClient;
 import org.apache.hudi.common.model.HoodieFileGroupId;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.HoodieInstant.State;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
@@ -38,13 +39,16 @@ import org.junit.jupiter.api.Test;
 
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
 import java.util.Set;
 import java.util.stream.Collectors;
 
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
-import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 /**
@@ -194,7 +198,7 @@ public class TestAsyncCompaction extends CompactionTestBase 
{
 
   @Test
   public void testScheduleIngestionBeforePendingCompaction() throws Exception {
-    // Case: Failure case. Latest pending compaction instant time must be 
earlier than this instant time
+    // Case: Non-serial case. Latest pending compaction instant time can be 
earlier than this instant time
     HoodieWriteConfig cfg = getConfig(false);
     SparkRDDWriteClient client = getHoodieWriteClient(cfg);
     SparkRDDReadClient readClient = getHoodieReadClient(cfg.getBasePath());
@@ -210,16 +214,17 @@ public class TestAsyncCompaction extends 
CompactionTestBase {
         new ArrayList<>());
 
     // Schedule compaction but do not run them
-    scheduleCompaction(compactionInstantTime, client, cfg);
+    String compactInstantTime = HoodieActiveTimeline.createNewInstantTime();
+    scheduleCompaction(compactInstantTime, client, cfg);
     HoodieTableMetaClient metaClient = 
HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(cfg.getBasePath()).build();
     HoodieInstant pendingCompactionInstant =
         
metaClient.getActiveTimeline().filterPendingCompactionTimeline().firstInstant().get();
-    assertEquals(compactionInstantTime, 
pendingCompactionInstant.getTimestamp(), "Pending Compaction instant has 
expected instant time");
+    assertEquals(compactInstantTime, pendingCompactionInstant.getTimestamp(), 
"Pending Compaction instant has expected instant time");
 
-    assertThrows(IllegalArgumentException.class, () -> {
-      runNextDeltaCommits(client, readClient, 
Arrays.asList(failedInstantTime), records, cfg, false,
-          Arrays.asList(compactionInstantTime));
-    }, "Latest pending compaction instant time must be earlier than this 
instant time");
+    assertDoesNotThrow(() -> {
+      runNextDeltaCommits(client, readClient, 
Collections.singletonList(failedInstantTime), records, cfg, false,
+          Collections.singletonList(compactInstantTime));
+    }, "Latest pending compaction instant time can be earlier than this 
instant time");
   }
 
   @Test
@@ -272,23 +277,15 @@ public class TestAsyncCompaction extends 
CompactionTestBase {
     runNextDeltaCommits(client, readClient, Arrays.asList(firstInstantTime, 
secondInstantTime), records, cfg, true,
         new ArrayList<>());
 
-    assertThrows(IllegalArgumentException.class, () -> {
-      // Schedule compaction but do not run them
-      scheduleCompaction(compactionInstantTime, client, cfg);
-    }, "Compaction Instant to be scheduled cannot have older timestamp");
+    // Schedule compaction but do not run them
+    assertNull(tryScheduleCompaction(compactionInstantTime, client, cfg), 
"Compaction Instant can be scheduled with older timestamp");
 
     // Schedule with timestamp same as that of committed instant
-    assertThrows(IllegalArgumentException.class, () -> {
-      // Schedule compaction but do not run them
-      scheduleCompaction(secondInstantTime, client, cfg);
-    }, "Compaction Instant to be scheduled cannot have same timestamp as 
committed instant");
-
-    final String compactionInstantTime2 = "006";
-    scheduleCompaction(compactionInstantTime2, client, cfg);
-    assertThrows(IllegalArgumentException.class, () -> {
-      // Schedule compaction with the same times as a pending compaction
-      scheduleCompaction(secondInstantTime, client, cfg);
-    }, "Compaction Instant to be scheduled cannot have same timestamp as a 
pending compaction");
+    assertNull(tryScheduleCompaction(secondInstantTime, client, cfg), 
"Compaction Instant to be scheduled can have same timestamp as committed 
instant");
+
+    final String compactionInstantTime2 = 
HoodieActiveTimeline.createNewInstantTime();
+    // Schedule compaction but do not run them
+    assertNotNull(tryScheduleCompaction(compactionInstantTime2, client, cfg), 
"Compaction Instant can be scheduled with greater timestamp");
   }
 
   @Test
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestInlineCompaction.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestInlineCompaction.java
index 9e7d1b2f666..3ab6580e72b 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestInlineCompaction.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestInlineCompaction.java
@@ -55,7 +55,7 @@ public class TestInlineCompaction extends CompactionTestBase {
         .build();
   }
 
-  private HoodieWriteConfig getConfigDisableComapction(int maxDeltaCommits, 
int maxDeltaTime, CompactionTriggerStrategy inlineCompactionType) {
+  private HoodieWriteConfig getConfigDisableCompaction(int maxDeltaCommits, 
int maxDeltaTime, CompactionTriggerStrategy inlineCompactionType) {
     return getConfigBuilder(false)
           
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(false).build())
           .withCompactionConfig(HoodieCompactionConfig.newBuilder()
@@ -111,7 +111,7 @@ public class TestInlineCompaction extends 
CompactionTestBase {
   @Test
   public void testSuccessfulCompactionBasedOnNumAfterCompactionRequest() 
throws Exception {
     // Given: make 4 commits
-    HoodieWriteConfig cfg = getConfigDisableComapction(4, 60, 
CompactionTriggerStrategy.NUM_COMMITS_AFTER_LAST_REQUEST);
+    HoodieWriteConfig cfg = getConfigDisableCompaction(4, 60, 
CompactionTriggerStrategy.NUM_COMMITS_AFTER_LAST_REQUEST);
     // turn off compaction table service to mock compaction service is down or 
very slow
     List<String> instants = IntStream.range(0, 4).mapToObj(i -> 
HoodieActiveTimeline.createNewInstantTime()).collect(Collectors.toList());
 

Reply via email to