This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new eb7cc5e2cbcb fix: Fix the timeline compaction blocked caused by the 
archived file being too large (#17784)
eb7cc5e2cbcb is described below

commit eb7cc5e2cbcb03a7b57fad0a38586b16b01012cc
Author: chaoyang <[email protected]>
AuthorDate: Wed Jan 7 15:52:28 2026 +0800

    fix: Fix the timeline compaction blocked caused by the archived file being 
too large (#17784)
    
    * fix: Fix the timeline compaction blocked caused by the L0 file being too 
large
    
    ---------
    
    Signed-off-by: TheR1sing3un <[email protected]>
---
 .../timeline/versioning/v2/LSMTimelineWriter.java  | 28 ++++++++++++++++------
 .../apache/hudi/config/HoodieArchivalConfig.java   |  5 ++++
 .../org/apache/hudi/config/HoodieWriteConfig.java  |  4 ++++
 .../apache/hudi/io/TestHoodieTimelineArchiver.java | 22 +++++++++++++++++
 4 files changed, 52 insertions(+), 7 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/timeline/versioning/v2/LSMTimelineWriter.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/timeline/versioning/v2/LSMTimelineWriter.java
index 7641ab5ad02b..3ce2078fc46f 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/timeline/versioning/v2/LSMTimelineWriter.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/timeline/versioning/v2/LSMTimelineWriter.java
@@ -71,8 +71,6 @@ public class LSMTimelineWriter {
 
   public static final int FILE_LAYER_ZERO = 0;
 
-  public static final long MAX_FILE_SIZE_IN_BYTES = 1024 * 1024 * 1000;
-
   private final HoodieWriteConfig config;
   private final TaskContextSupplier taskContextSupplier;
   private final HoodieTableMetaClient metaClient;
@@ -312,7 +310,7 @@ public class LSMTimelineWriter {
           //TODO boundary to revisit in later pr to use HoodieSchema directly
           HoodieSchema schema = 
HoodieSchema.fromAvroSchema(HoodieLSMTimelineInstant.getClassSchema());
           try (ClosableIterator<IndexedRecord> iterator = 
reader.getIndexedRecordIterator(schema,
-                  schema)) {
+              schema)) {
             while (iterator.hasNext()) {
               IndexedRecord record = iterator.next();
               writer.write(record.get(0).toString(), new 
HoodieAvroIndexedRecord(record), schema);
@@ -382,16 +380,32 @@ public class LSMTimelineWriter {
   private List<String> 
getCandidateFiles(List<HoodieLSMTimelineManifest.LSMFileEntry> files, int 
filesBatch) throws IOException {
     List<String> candidates = new ArrayList<>();
     long totalFileLen = 0L;
-    for (int i = 0; i < filesBatch; i++) {
+    // try to find at most one group of files to compact
+    // 1. files num in the group should be at least 2
+    // 2. files num in the group should not exceed the batch size
+    // 3. the group's total file size should not exceed the threshold
+    // 4. all files in the group should be consecutive in instant order
+    for (int i = 0; i < files.size(); i++) {
       HoodieLSMTimelineManifest.LSMFileEntry fileEntry = files.get(i);
-      if (totalFileLen > MAX_FILE_SIZE_IN_BYTES) {
-        return candidates;
-      }
       // we may also need to consider a single file that is very close to the 
threshold in size,
       // to avoid the write amplification,
       // for e.g, two 800MB files compact into a 1.6GB file.
       totalFileLen += fileEntry.getFileLen();
       candidates.add(fileEntry.getFileName());
+      if (candidates.size() >= filesBatch) {
+        // stop once we reach the batch size
+        break;
+      }
+      if (totalFileLen > 
writeConfig.getTimelineCompactionTargetFileMaxBytes()) {
+        if (candidates.size() < 2) {
+          // reset if we have not reached the minimum files num to compact
+          totalFileLen = 0L;
+          candidates.clear();
+        } else {
+          // stop once we reach the file size threshold
+          break;
+        }
+      }
     }
     return candidates;
   }
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieArchivalConfig.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieArchivalConfig.java
index 0d6ae2180661..8c27a3949929 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieArchivalConfig.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieArchivalConfig.java
@@ -102,6 +102,11 @@ public class HoodieArchivalConfig extends HoodieConfig {
       .withDocumentation("If enabled, archival will proceed beyond savepoint, 
skipping savepoint commits."
           + " If disabled, archival will stop at the earliest savepoint 
commit.");
 
+  public static final ConfigProperty<Long> 
TIMELINE_COMPACTION_TARGET_FILE_MAX_BYTES = ConfigProperty
+        .key("hoodie.timeline.compaction.target.file.max.bytes")
+        .defaultValue(1000L * 1024 * 1024)
+        .markAdvanced()
+        .withDocumentation("Max size (in bytes) for each archived timeline 
file.");
   /**
    * @deprecated Use {@link #MAX_COMMITS_TO_KEEP} and its methods instead
    */
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index 2dd08b72671a..047aa169a644 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -1739,6 +1739,10 @@ public class HoodieWriteConfig extends HoodieConfig {
     return getInt(HoodieArchivalConfig.TIMELINE_COMPACTION_BATCH_SIZE);
   }
 
+  public long getTimelineCompactionTargetFileMaxBytes() {
+    return 
getLong(HoodieArchivalConfig.TIMELINE_COMPACTION_TARGET_FILE_MAX_BYTES);
+  }
+
   public int getParquetSmallFileLimit() {
     return getInt(HoodieCompactionConfig.PARQUET_SMALL_FILE_LIMIT);
   }
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiver.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiver.java
index 460562012df5..a6c25b7e9096 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiver.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiver.java
@@ -725,6 +725,28 @@ public class TestHoodieTimelineArchiver extends 
HoodieSparkClientTestHarness {
     assertEquals(Arrays.asList(7, 8, 9), 
LSMTimeline.allSnapshotVersions(metaClient, 
metaClient.getArchivePath()).stream().sorted().collect(Collectors.toList()));
   }
 
+  @Test
+  public void testCompactionWithLargeL0File() throws Exception {
+    HoodieWriteConfig writeConfig = initTestTableAndGetWriteConfig(true, 4, 5, 
2, 3);
+    
writeConfig.setValue(HoodieArchivalConfig.TIMELINE_COMPACTION_TARGET_FILE_MAX_BYTES.key(),
 "3200");
+    // do ingestion and trigger archive actions here.
+    for (int i = 1; i < 19; i++) {
+      testTable.doWriteOperation(
+          WriteClientTestUtils.createNewInstantTime(), 
WriteOperationType.UPSERT, i == 1 ? Arrays.asList("p1", "p2", "p3") : 
Collections.emptyList(),
+          i == 1 ? Arrays.asList("p1", "p2", "p3") : Arrays.asList("p1"), 2);
+      archiveAndGetCommitsList(writeConfig);
+    }
+    // first L0 file will be larger than max archived file size
+
+    // loading archived timeline and active timeline success
+    HoodieActiveTimeline rawActiveTimeline = 
TIMELINE_FACTORY.createActiveTimeline(metaClient, false);
+    HoodieArchivedTimeline archivedTimeLine = metaClient.getArchivedTimeline();
+    assertEquals(4 * 3 + 14, rawActiveTimeline.countInstants() + 
archivedTimeLine.countInstants());
+    // L0 file should be only one
+    HoodieLSMTimelineManifest latestManifest = 
LSMTimeline.latestSnapshotManifest(metaClient, metaClient.getArchivePath());
+    assertEquals(1, latestManifest.getFiles().stream().filter(f -> 
LSMTimeline.isFileFromLayer(f.getFileName(), 0)).count());
+  }
+
   @Test
   public void testReadArchivedCompactionPlan() throws Exception {
     HoodieWriteConfig writeConfig = initTestTableAndGetWriteConfig(true, 4, 5, 
5, HoodieTableType.MERGE_ON_READ);

Reply via email to