This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new eb7cc5e2cbcb fix: Fix the timeline compaction blocked caused by the
archived file being too large (#17784)
eb7cc5e2cbcb is described below
commit eb7cc5e2cbcb03a7b57fad0a38586b16b01012cc
Author: chaoyang <[email protected]>
AuthorDate: Wed Jan 7 15:52:28 2026 +0800
fix: Fix the timeline compaction blocked caused by the archived file being
too large (#17784)
* fix: Fix the timeline compaction blocked caused by the L0 file being too
large
---------
Signed-off-by: TheR1sing3un <[email protected]>
---
.../timeline/versioning/v2/LSMTimelineWriter.java | 28 ++++++++++++++++------
.../apache/hudi/config/HoodieArchivalConfig.java | 5 ++++
.../org/apache/hudi/config/HoodieWriteConfig.java | 4 ++++
.../apache/hudi/io/TestHoodieTimelineArchiver.java | 22 +++++++++++++++++
4 files changed, 52 insertions(+), 7 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/timeline/versioning/v2/LSMTimelineWriter.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/timeline/versioning/v2/LSMTimelineWriter.java
index 7641ab5ad02b..3ce2078fc46f 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/timeline/versioning/v2/LSMTimelineWriter.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/timeline/versioning/v2/LSMTimelineWriter.java
@@ -71,8 +71,6 @@ public class LSMTimelineWriter {
public static final int FILE_LAYER_ZERO = 0;
- public static final long MAX_FILE_SIZE_IN_BYTES = 1024 * 1024 * 1000;
-
private final HoodieWriteConfig config;
private final TaskContextSupplier taskContextSupplier;
private final HoodieTableMetaClient metaClient;
@@ -312,7 +310,7 @@ public class LSMTimelineWriter {
//TODO boundary to revisit in later pr to use HoodieSchema directly
HoodieSchema schema =
HoodieSchema.fromAvroSchema(HoodieLSMTimelineInstant.getClassSchema());
try (ClosableIterator<IndexedRecord> iterator =
reader.getIndexedRecordIterator(schema,
- schema)) {
+ schema)) {
while (iterator.hasNext()) {
IndexedRecord record = iterator.next();
writer.write(record.get(0).toString(), new
HoodieAvroIndexedRecord(record), schema);
@@ -382,16 +380,32 @@ public class LSMTimelineWriter {
private List<String>
getCandidateFiles(List<HoodieLSMTimelineManifest.LSMFileEntry> files, int
filesBatch) throws IOException {
List<String> candidates = new ArrayList<>();
long totalFileLen = 0L;
- for (int i = 0; i < filesBatch; i++) {
+ // try to find at most one group of files to compact
+ // 1. files num in the group should be at least 2
+ // 2. files num in the group should not exceed the batch size
+ // 3. the group's total file size should not exceed the threshold
+ // 4. all files in the group should be consecutive in instant order
+ for (int i = 0; i < files.size(); i++) {
HoodieLSMTimelineManifest.LSMFileEntry fileEntry = files.get(i);
- if (totalFileLen > MAX_FILE_SIZE_IN_BYTES) {
- return candidates;
- }
// we may also need to consider a single file that is very close to the
threshold in size,
// to avoid the write amplification,
// for e.g, two 800MB files compact into a 1.6GB file.
totalFileLen += fileEntry.getFileLen();
candidates.add(fileEntry.getFileName());
+ if (candidates.size() >= filesBatch) {
+ // stop once we reach the batch size
+ break;
+ }
+ if (totalFileLen >
writeConfig.getTimelineCompactionTargetFileMaxBytes()) {
+ if (candidates.size() < 2) {
+ // reset if we have not reached the minimum files num to compact
+ totalFileLen = 0L;
+ candidates.clear();
+ } else {
+ // stop once we reach the file size threshold
+ break;
+ }
+ }
}
return candidates;
}
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieArchivalConfig.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieArchivalConfig.java
index 0d6ae2180661..8c27a3949929 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieArchivalConfig.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieArchivalConfig.java
@@ -102,6 +102,11 @@ public class HoodieArchivalConfig extends HoodieConfig {
.withDocumentation("If enabled, archival will proceed beyond savepoint,
skipping savepoint commits."
+ " If disabled, archival will stop at the earliest savepoint
commit.");
+ public static final ConfigProperty<Long>
TIMELINE_COMPACTION_TARGET_FILE_MAX_BYTES = ConfigProperty
+ .key("hoodie.timeline.compaction.target.file.max.bytes")
+ .defaultValue(1000L * 1024 * 1024)
+ .markAdvanced()
+ .withDocumentation("Max size (in bytes) for each archived timeline
file.");
/**
* @deprecated Use {@link #MAX_COMMITS_TO_KEEP} and its methods instead
*/
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index 2dd08b72671a..047aa169a644 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -1739,6 +1739,10 @@ public class HoodieWriteConfig extends HoodieConfig {
return getInt(HoodieArchivalConfig.TIMELINE_COMPACTION_BATCH_SIZE);
}
+ public long getTimelineCompactionTargetFileMaxBytes() {
+ return
getLong(HoodieArchivalConfig.TIMELINE_COMPACTION_TARGET_FILE_MAX_BYTES);
+ }
+
public int getParquetSmallFileLimit() {
return getInt(HoodieCompactionConfig.PARQUET_SMALL_FILE_LIMIT);
}
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiver.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiver.java
index 460562012df5..a6c25b7e9096 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiver.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiver.java
@@ -725,6 +725,28 @@ public class TestHoodieTimelineArchiver extends
HoodieSparkClientTestHarness {
assertEquals(Arrays.asList(7, 8, 9),
LSMTimeline.allSnapshotVersions(metaClient,
metaClient.getArchivePath()).stream().sorted().collect(Collectors.toList()));
}
+ @Test
+ public void testCompactionWithLargeL0File() throws Exception {
+ HoodieWriteConfig writeConfig = initTestTableAndGetWriteConfig(true, 4, 5,
2, 3);
+
writeConfig.setValue(HoodieArchivalConfig.TIMELINE_COMPACTION_TARGET_FILE_MAX_BYTES.key(),
"3200");
+ // do ingestion and trigger archive actions here.
+ for (int i = 1; i < 19; i++) {
+ testTable.doWriteOperation(
+ WriteClientTestUtils.createNewInstantTime(),
WriteOperationType.UPSERT, i == 1 ? Arrays.asList("p1", "p2", "p3") :
Collections.emptyList(),
+ i == 1 ? Arrays.asList("p1", "p2", "p3") : Arrays.asList("p1"), 2);
+ archiveAndGetCommitsList(writeConfig);
+ }
+ // first L0 file will be larger than max archived file size
+
+ // loading archived timeline and active timeline success
+ HoodieActiveTimeline rawActiveTimeline =
TIMELINE_FACTORY.createActiveTimeline(metaClient, false);
+ HoodieArchivedTimeline archivedTimeLine = metaClient.getArchivedTimeline();
+ assertEquals(4 * 3 + 14, rawActiveTimeline.countInstants() +
archivedTimeLine.countInstants());
+ // L0 file should be only one
+ HoodieLSMTimelineManifest latestManifest =
LSMTimeline.latestSnapshotManifest(metaClient, metaClient.getArchivePath());
+ assertEquals(1, latestManifest.getFiles().stream().filter(f ->
LSMTimeline.isFileFromLayer(f.getFileName(), 0)).count());
+ }
+
@Test
public void testReadArchivedCompactionPlan() throws Exception {
HoodieWriteConfig writeConfig = initTestTableAndGetWriteConfig(true, 4, 5,
5, HoodieTableType.MERGE_ON_READ);