This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new cf314f1c3c75 perf: Add dedicated batch size config for LSM timeline
migration on u… (#19052)
cf314f1c3c75 is described below
commit cf314f1c3c75faf0863489f63bc2fc803080d4b2
Author: Shuo Cheng <[email protected]>
AuthorDate: Wed Jun 24 12:19:40 2026 +0800
perf: Add dedicated batch size config for LSM timeline migration on u…
(#19052)
* perf: Add dedicated batch size config for LSM timeline migration on
upgrade
---
.../apache/hudi/config/HoodieArchivalConfig.java | 14 ++++
.../org/apache/hudi/config/HoodieWriteConfig.java | 4 ++
.../table/upgrade/SevenToEightUpgradeHandler.java | 13 ++--
.../upgrade/TestSevenToEightUpgradeHandler.java | 74 ++++++++++++++++++++++
.../apache/hudi/utils/TestFlinkWriteClients.java | 13 ++++
5 files changed, 113 insertions(+), 5 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieArchivalConfig.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieArchivalConfig.java
index 8854c87edeab..e97e268fa9f9 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieArchivalConfig.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieArchivalConfig.java
@@ -88,6 +88,15 @@ public class HoodieArchivalConfig extends HoodieConfig {
.withDocumentation("Archiving of instants is batched in best-effort
manner, to pack more instants into a single"
+ " archive log. This config controls such archival batch size.");
+ public static final ConfigProperty<Integer>
MIGRATION_COMMITS_ARCHIVAL_BATCH_SIZE = ConfigProperty
+ .key("hoodie.timeline.migration.commits.archival.batch")
+ .defaultValue(500)
+ .markAdvanced()
+ .withDocumentation("Batch size used when migrating the legacy archived
timeline to the LSM timeline during a"
+ + " table version upgrade. A larger batch size minimizes the number
of parquet files (and the associated"
+ + " remote storage operations like exists check, parquet write and
manifest update) created during the"
+ + " one-time migration, which significantly reduces the total
migration time.");
+
public static final ConfigProperty<Integer> TIMELINE_COMPACTION_BATCH_SIZE =
ConfigProperty
.key("hoodie.timeline.compaction.batch.size")
.defaultValue(10)
@@ -211,6 +220,11 @@ public class HoodieArchivalConfig extends HoodieConfig {
return this;
}
+ public HoodieArchivalConfig.Builder
withMigrationCommitsArchivalBatchSize(int batchSize) {
+ archivalConfig.setValue(MIGRATION_COMMITS_ARCHIVAL_BATCH_SIZE,
String.valueOf(batchSize));
+ return this;
+ }
+
public Builder withArchiveBeyondSavepoint(boolean archiveBeyondSavepoint) {
archivalConfig.setValue(ARCHIVE_BEYOND_SAVEPOINT,
String.valueOf(archiveBeyondSavepoint));
return this;
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index 115c1216c450..b7bafe0f0b21 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -2020,6 +2020,10 @@ public class HoodieWriteConfig extends HoodieConfig {
return getInt(HoodieArchivalConfig.COMMITS_ARCHIVAL_BATCH_SIZE);
}
+ public int getMigrationCommitArchivalBatchSize() {
+ return getInt(HoodieArchivalConfig.MIGRATION_COMMITS_ARCHIVAL_BATCH_SIZE);
+ }
+
public boolean shouldBlockArchivalOnCleanECTR() {
return
getBoolean(HoodieArchivalConfig.BLOCK_ARCHIVAL_ON_LATEST_CLEAN_ECTR);
}
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/SevenToEightUpgradeHandler.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/SevenToEightUpgradeHandler.java
index 7f7338e3da30..533e44353992 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/SevenToEightUpgradeHandler.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/SevenToEightUpgradeHandler.java
@@ -137,7 +137,7 @@ public class SevenToEightUpgradeHandler implements
UpgradeHandler {
}, instants.size());
}
- upgradeToLSMTimeline(table, context, config);
+ upgradeToLSMTimeline(table, config);
return new UpgradeDowngrade.TableConfigChangeSet(tablePropsToAdd,
Collections.emptySet());
}
@@ -249,7 +249,7 @@ public class SevenToEightUpgradeHandler implements
UpgradeHandler {
}
}
- static void upgradeToLSMTimeline(HoodieTable table, HoodieEngineContext
engineContext, HoodieWriteConfig config) {
+ static void upgradeToLSMTimeline(HoodieTable table, HoodieWriteConfig
config) {
table.getMetaClient().getTableConfig().getTimelineLayoutVersion().ifPresent(
timelineLayoutVersion ->
ValidationUtils.checkState(TimelineLayoutVersion.LAYOUT_VERSION_1.equals(timelineLayoutVersion),
"Upgrade to LSM timeline is only supported for layout version 1.
Given version: " + timelineLayoutVersion));
@@ -257,7 +257,12 @@ public class SevenToEightUpgradeHandler implements
UpgradeHandler {
LegacyArchivedMetaEntryReader reader = new
LegacyArchivedMetaEntryReader(table.getMetaClient());
StoragePath archivePath = new
StoragePath(table.getMetaClient().getMetaPath(), "timeline/history");
LSMTimelineWriter lsmTimelineWriter =
LSMTimelineWriter.getInstance(config, table, Option.of(archivePath));
- int batchSize = config.getCommitArchivalBatchSize();
+ // Use a dedicated, larger batch size for the one-time migration to
minimize the number of parquet
+ // files created on remote storage. Each write() call involves multiple
remote storage operations
+ // (exists check, parquet write, manifest update); the regular archival
batch size is much smaller
+ // than what migration needs, so with hundreds of actions it creates
excessive I/O that
+ // significantly increases the migration time.
+ int batchSize = config.getMigrationCommitArchivalBatchSize();
List<ActiveAction> activeActionsBatch = new ArrayList<>(batchSize);
try (ClosableIterator<ActiveAction> iterator =
reader.getActiveActionsIterator()) {
while (iterator.hasNext()) {
@@ -265,7 +270,6 @@ public class SevenToEightUpgradeHandler implements
UpgradeHandler {
// If the batch is full, write it to the LSM timeline
if (activeActionsBatch.size() == batchSize) {
lsmTimelineWriter.write(new ArrayList<>(activeActionsBatch),
Option.empty(), Option.empty());
- lsmTimelineWriter.compactAndClean(engineContext);
activeActionsBatch.clear();
}
}
@@ -273,7 +277,6 @@ public class SevenToEightUpgradeHandler implements
UpgradeHandler {
// Write any remaining actions in the final batch
if (!activeActionsBatch.isEmpty()) {
lsmTimelineWriter.write(new ArrayList<>(activeActionsBatch),
Option.empty(), Option.empty());
- lsmTimelineWriter.compactAndClean(engineContext);
}
}
} catch (Exception e) {
diff --git
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/upgrade/TestSevenToEightUpgradeHandler.java
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/upgrade/TestSevenToEightUpgradeHandler.java
index 66d03811b76f..400f37ba768c 100644
---
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/upgrade/TestSevenToEightUpgradeHandler.java
+++
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/upgrade/TestSevenToEightUpgradeHandler.java
@@ -19,14 +19,22 @@
package org.apache.hudi.table.upgrade;
+import org.apache.hudi.client.timeline.versioning.v2.LSMTimelineWriter;
+import org.apache.hudi.client.utils.LegacyArchivedMetaEntryReader;
import org.apache.hudi.common.bootstrap.index.hfile.HFileBootstrapIndex;
import org.apache.hudi.common.config.ConfigProperty;
import org.apache.hudi.common.config.RecordMergeMode;
import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload;
import org.apache.hudi.common.table.HoodieTableConfig;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.ActiveAction;
+import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.ClosableIterator;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
+import org.apache.hudi.storage.StoragePath;
+import org.apache.hudi.table.HoodieTable;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -34,10 +42,14 @@ import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;
import org.mockito.Mock;
+import org.mockito.MockedConstruction;
+import org.mockito.MockedStatic;
import org.mockito.Mockito;
import org.mockito.junit.jupiter.MockitoExtension;
+import java.util.ArrayList;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import static
org.apache.hudi.common.table.HoodieTableConfig.BOOTSTRAP_INDEX_CLASS_NAME;
@@ -52,8 +64,15 @@ import static
org.apache.hudi.common.table.HoodieTableConfig.RECORD_MERGE_MODE;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.isA;
+import static org.mockito.Mockito.lenient;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.mockStatic;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@ExtendWith(MockitoExtension.class)
@@ -153,6 +172,61 @@ class TestSevenToEightUpgradeHandler {
}
}
+ @Test
+ void testUpgradeToLSMTimelineSingleBatch() throws Exception {
+ // A single batch large enough to hold all actions should result in
exactly one write() call,
+ // proving the migration batch size config (not the regular archival batch
size) drives batching.
+ LSMTimelineWriter writer = runMigration(500, 4);
+ verify(writer, times(1)).write(any(), any(), any());
+ verify(writer, never()).compactAndClean(any());
+ }
+
+ @Test
+ void testUpgradeToLSMTimelineBatchesByMigrationBatchSize() throws Exception {
+ // With more actions than the migration batch size, the in-loop batching
branch must fire:
+ // 4 actions with a batch size of 2 -> [2, 2] -> 2 write() calls. This
pins that the configured
+ // migration batch size (not just "all actions in one batch") actually
governs batching.
+ LSMTimelineWriter writer = runMigration(2, 4);
+ verify(writer, times(2)).write(any(), any(), any());
+ verify(writer, never()).compactAndClean(any());
+ }
+
+ /**
+ * Runs {@link SevenToEightUpgradeHandler#upgradeToLSMTimeline} with the
given migration batch size
+ * over the given number of archived actions, and returns the (mocked) LSM
timeline writer for verification.
+ */
+ private LSMTimelineWriter runMigration(int migrationBatchSize, int
totalActions) {
+ HoodieTable table = mock(HoodieTable.class);
+ HoodieTableMetaClient metaClient = mock(HoodieTableMetaClient.class);
+ HoodieTableConfig tableConfig = mock(HoodieTableConfig.class);
+
+ when(table.getMetaClient()).thenReturn(metaClient);
+ when(metaClient.getTableConfig()).thenReturn(tableConfig);
+
when(tableConfig.getTimelineLayoutVersion()).thenReturn(Option.of(TimelineLayoutVersion.LAYOUT_VERSION_1));
+ when(metaClient.getMetaPath()).thenReturn(new StoragePath("/tmp/.hoodie"));
+
when(config.getMigrationCommitArchivalBatchSize()).thenReturn(migrationBatchSize);
+ // The regular archival batch size must not be consulted during migration.
+ lenient().when(config.getCommitArchivalBatchSize()).thenReturn(1);
+
+ List<ActiveAction> actions = new ArrayList<>();
+ for (int i = 0; i < totalActions; i++) {
+ actions.add(mock(ActiveAction.class));
+ }
+
+ LSMTimelineWriter writer = mock(LSMTimelineWriter.class);
+ try (MockedStatic<LSMTimelineWriter> mockedWriterStatic =
mockStatic(LSMTimelineWriter.class);
+ MockedConstruction<LegacyArchivedMetaEntryReader> mockedReader =
Mockito.mockConstruction(
+ LegacyArchivedMetaEntryReader.class,
+ (readerMock, ctx) -> when(readerMock.getActiveActionsIterator())
+ .thenReturn(ClosableIterator.wrap(actions.iterator())))) {
+ mockedWriterStatic.when(() -> LSMTimelineWriter.getInstance(
+ any(HoodieWriteConfig.class), any(HoodieTable.class),
any(Option.class))).thenReturn(writer);
+
+ SevenToEightUpgradeHandler.upgradeToLSMTimeline(table, config);
+ }
+ return writer;
+ }
+
private static Map<ConfigProperty, String> createMap(Object... keyValues) {
Map<ConfigProperty, String> map = new HashMap<>();
for (int i = 0; i < keyValues.length; i += 2) {
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestFlinkWriteClients.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestFlinkWriteClients.java
index 3910b91daebd..b357f0fae12f 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestFlinkWriteClients.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestFlinkWriteClients.java
@@ -38,6 +38,7 @@ import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.HoodieTableVersion;
import org.apache.hudi.common.table.marker.MarkerType;
+import org.apache.hudi.config.HoodieArchivalConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.index.HoodieIndex;
@@ -121,6 +122,18 @@ public class TestFlinkWriteClients {
assertEquals(12, writeConfig.getGlobalRecordLevelIndexMinFileGroupCount());
}
+ @Test
+ void testUserConfiguredMigrationCommitArchivalBatchSizeIsPropagated() {
+ // A raw hoodie.* property set on the Flink configuration must be
propagated to the write config
+ // (and therefore reach the upgrade handler that reads it during the v7 ->
v8 LSM timeline migration).
+
conf.setString(HoodieArchivalConfig.MIGRATION_COMMITS_ARCHIVAL_BATCH_SIZE.key(),
"123");
+ HoodieWriteConfig writeConfig =
FlinkWriteClients.getHoodieClientConfig(conf, false, false);
+ assertEquals(123, writeConfig.getMigrationCommitArchivalBatchSize());
+ // The regular archival batch size must stay independent at its own
default.
+
assertEquals(Integer.parseInt(HoodieArchivalConfig.COMMITS_ARCHIVAL_BATCH_SIZE.defaultValue()),
+ writeConfig.getCommitArchivalBatchSize());
+ }
+
@Test
void testHoodieClientConfigPrecedence() {
conf.setString(HoodieWriteConfig.EMBEDDED_TIMELINE_SERVER_ENABLE.key(),
"false");