This is an automated email from the ASF dual-hosted git repository.

codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 27217621a37 [HUDI-7572] Avoid to schedule empty compaction plan 
without log files (#10974)
27217621a37 is described below

commit 27217621a37ed81d0718f5f8814aa828e8ab0b20
Author: Danny Chan <[email protected]>
AuthorDate: Tue Apr 9 17:18:18 2024 +0800

    [HUDI-7572] Avoid to schedule empty compaction plan without log files 
(#10974)
---
 .../BaseHoodieCompactionPlanGenerator.java         |  9 +++++--
 .../table/action/compact/CompactionTestBase.java   | 18 +++++++++++++
 .../table/action/compact/TestAsyncCompaction.java  | 30 +++++++++-------------
 .../table/action/compact/TestInlineCompaction.java |  4 +--
 4 files changed, 39 insertions(+), 22 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/plan/generators/BaseHoodieCompactionPlanGenerator.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/plan/generators/BaseHoodieCompactionPlanGenerator.java
index 2626bc59918..9f6fb0d786a 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/plan/generators/BaseHoodieCompactionPlanGenerator.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/plan/generators/BaseHoodieCompactionPlanGenerator.java
@@ -151,7 +151,12 @@ public abstract class BaseHoodieCompactionPlanGenerator<T 
extends HoodieRecordPa
     LOG.info("Total number of file slices " + totalFileSlices.value());
 
     if (operations.isEmpty()) {
-      LOG.warn("No operations are retrieved for " + metaClient.getBasePath());
+      LOG.warn("No operations are retrieved for {}", 
metaClient.getBasePathV2());
+      return null;
+    }
+
+    if (totalLogFiles.value() <= 0) {
+      LOG.warn("No log files are retrieved for {}", 
metaClient.getBasePathV2());
       return null;
     }
 
@@ -164,7 +169,7 @@ public abstract class BaseHoodieCompactionPlanGenerator<T 
extends HoodieRecordPa
             + "Please fix your strategy implementation. 
FileIdsWithPendingCompactions :" + fgIdsInPendingCompactionAndClustering
             + ", Selected workload :" + compactionPlan);
     if (compactionPlan.getOperations().isEmpty()) {
-      LOG.warn("After filtering, Nothing to compact for " + 
metaClient.getBasePath());
+      LOG.warn("After filtering, Nothing to compact for {}", 
metaClient.getBasePathV2());
     }
     return compactionPlan;
   }
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/CompactionTestBase.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/CompactionTestBase.java
index e425c4c4352..973241ae592 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/CompactionTestBase.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/CompactionTestBase.java
@@ -30,6 +30,7 @@ import org.apache.hudi.common.model.HoodieBaseFile;
 import org.apache.hudi.common.model.HoodieFileGroupId;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.table.HoodieTableConfig;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
@@ -54,6 +55,7 @@ import org.apache.spark.api.java.JavaRDD;
 
 import java.io.IOException;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -79,6 +81,7 @@ public class CompactionTestBase extends HoodieClientTestBase {
             .hfileMaxFileSize(1024 * 1024 * 1024).parquetMaxFileSize(1024 * 
1024 * 1024).orcMaxFileSize(1024 * 1024 * 1024).build())
         .forTable("test-trip-table")
         
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build())
+        .withProps(Collections.singletonMap(HoodieTableConfig.TYPE.key(), 
HoodieTableType.MERGE_ON_READ.name()))
         .withEmbeddedTimelineServerEnabled(true);
   }
 
@@ -164,6 +167,21 @@ public class CompactionTestBase extends 
HoodieClientTestBase {
     assertEquals(compactionInstantTime, instant.getTimestamp(), "Last 
compaction instant must be the one set");
   }
 
+  /**
+   * Tries to schedule a compaction plan and returns the latest pending 
compaction instant time.
+   *
+   * @param compactionInstantTime The given compaction instant time
+   * @param client                The write client
+   * @param cfg                   The write config
+   *
+   * @return The latest pending instant time.
+   */
+  protected String tryScheduleCompaction(String compactionInstantTime, 
SparkRDDWriteClient client, HoodieWriteConfig cfg) {
+    client.scheduleCompactionAtInstant(compactionInstantTime, Option.empty());
+    HoodieTableMetaClient metaClient = 
HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(cfg.getBasePath()).build();
+    return 
metaClient.getActiveTimeline().filterPendingCompactionTimeline().lastInstant().map(HoodieInstant::getTimestamp).orElse(null);
+  }
+
   protected void scheduleAndExecuteCompaction(String compactionInstantTime, 
SparkRDDWriteClient client, HoodieTable table,
                                             HoodieWriteConfig cfg, int 
expectedNumRecs, boolean hasDeltaCommitAfterPendingCompaction) throws 
IOException {
     scheduleCompaction(compactionInstantTime, client, cfg);
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestAsyncCompaction.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestAsyncCompaction.java
index 456fc31cb0d..7aa652c6eaf 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestAsyncCompaction.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestAsyncCompaction.java
@@ -28,7 +28,6 @@ import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.HoodieInstant.State;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
-import org.apache.hudi.common.util.Option;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.table.HoodieSparkTable;
 import org.apache.hudi.table.HoodieTable;
@@ -47,6 +46,8 @@ import java.util.stream.Collectors;
 
 import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 /**
@@ -242,7 +243,7 @@ public class TestAsyncCompaction extends CompactionTestBase 
{
 
   @Test
   public void testScheduleIngestionBeforePendingCompaction() throws Exception {
-    // Case: Failure case. Latest pending compaction instant time can be 
earlier than this instant time
+    // Case: Non-serial case. Latest pending compaction instant time can be 
earlier than this instant time
     HoodieWriteConfig cfg = getConfig(false);
     SparkRDDWriteClient client = getHoodieWriteClient(cfg);
     SparkRDDReadClient readClient = getHoodieReadClient(cfg.getBasePath());
@@ -250,7 +251,6 @@ public class TestAsyncCompaction extends CompactionTestBase 
{
     String firstInstantTime = "001";
     String secondInstantTime = "004";
     String failedInstantTime = "005";
-    String compactionInstantTime = client.createNewInstantTime();
     int numRecs = 2000;
 
     final List<HoodieRecord> initialRecords = 
dataGen.generateInserts(firstInstantTime, numRecs);
@@ -258,6 +258,7 @@ public class TestAsyncCompaction extends CompactionTestBase 
{
         new ArrayList<>());
 
     // Schedule compaction but do not run them
+    String compactionInstantTime = client.createNewInstantTime();
     scheduleCompaction(compactionInstantTime, client, cfg);
     HoodieTableMetaClient metaClient = 
HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(cfg.getBasePath()).build();
     HoodieInstant pendingCompactionInstant =
@@ -265,8 +266,8 @@ public class TestAsyncCompaction extends CompactionTestBase 
{
     assertEquals(compactionInstantTime, 
pendingCompactionInstant.getTimestamp(), "Pending Compaction instant has 
expected instant time");
 
     assertDoesNotThrow(() -> {
-      runNextDeltaCommits(client, readClient, 
Arrays.asList(failedInstantTime), records, cfg, false,
-          Arrays.asList(compactionInstantTime));
+      runNextDeltaCommits(client, readClient, 
Collections.singletonList(failedInstantTime), records, cfg, false,
+          Collections.singletonList(compactionInstantTime));
     }, "Latest pending compaction instant time can be earlier than this 
instant time");
   }
 
@@ -319,22 +320,15 @@ public class TestAsyncCompaction extends 
CompactionTestBase {
     runNextDeltaCommits(client, readClient, Arrays.asList(firstInstantTime, 
secondInstantTime), records, cfg, true,
         new ArrayList<>());
 
-    assertDoesNotThrow(() -> {
-      // Schedule compaction but do not run them
-      scheduleCompaction(compactionInstantTime, client, cfg);
-    }, "Compaction Instant can be scheduled with older timestamp");
+    // Schedule compaction but do not run them
+    assertNull(tryScheduleCompaction(compactionInstantTime, client, cfg), 
"Compaction Instant can be scheduled with older timestamp");
 
     // Schedule with timestamp same as that of committed instant
-    assertDoesNotThrow(() -> {
-      // Schedule compaction but do not run them
-      client.scheduleCompactionAtInstant(secondInstantTime, Option.empty());
-    }, "Compaction Instant to be scheduled can have same timestamp as 
committed instant");
+    assertNull(tryScheduleCompaction(secondInstantTime, client, cfg), 
"Compaction Instant to be scheduled can have same timestamp as committed 
instant");
 
-    final String compactionInstantTime2 = "006";
-    assertDoesNotThrow(() -> {
-      // Schedule compaction but do not run them
-      client.scheduleCompactionAtInstant(compactionInstantTime2, 
Option.empty());
-    }, "Compaction Instant can be scheduled with greater timestamp");
+    final String compactionInstantTime2 = client.createNewInstantTime();
+    // Schedule compaction but do not run them
+    assertNotNull(tryScheduleCompaction(compactionInstantTime2, client, cfg), 
"Compaction Instant can be scheduled with greater timestamp");
   }
 
   @Test
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestInlineCompaction.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestInlineCompaction.java
index 6454f770e12..fd0d3556a9f 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestInlineCompaction.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestInlineCompaction.java
@@ -54,7 +54,7 @@ public class TestInlineCompaction extends CompactionTestBase {
         .build();
   }
 
-  private HoodieWriteConfig getConfigDisableComapction(int maxDeltaCommits, 
int maxDeltaTime, CompactionTriggerStrategy inlineCompactionType) {
+  private HoodieWriteConfig getConfigDisableCompaction(int maxDeltaCommits, 
int maxDeltaTime, CompactionTriggerStrategy inlineCompactionType) {
     return getConfigBuilder(false)
           
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(false).build())
           .withCompactionConfig(HoodieCompactionConfig.newBuilder()
@@ -111,7 +111,7 @@ public class TestInlineCompaction extends 
CompactionTestBase {
   @Test
   public void testSuccessfulCompactionBasedOnNumAfterCompactionRequest() 
throws Exception {
     // Given: make 4 commits
-    HoodieWriteConfig cfg = getConfigDisableComapction(4, 60, 
CompactionTriggerStrategy.NUM_COMMITS_AFTER_LAST_REQUEST);
+    HoodieWriteConfig cfg = getConfigDisableCompaction(4, 60, 
CompactionTriggerStrategy.NUM_COMMITS_AFTER_LAST_REQUEST);
     // turn off compaction table service to mock compaction service is down or 
very slow
     try (SparkRDDWriteClient<?> writeClient = getHoodieWriteClient(cfg)) {
       List<String> instants = IntStream.range(0, 4).mapToObj(i -> 
writeClient.createNewInstantTime()).collect(Collectors.toList());

Reply via email to