This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new cf314f1c3c75 perf: Add dedicated batch size config for LSM timeline 
migration on u… (#19052)
cf314f1c3c75 is described below

commit cf314f1c3c75faf0863489f63bc2fc803080d4b2
Author: Shuo Cheng <[email protected]>
AuthorDate: Wed Jun 24 12:19:40 2026 +0800

    perf: Add dedicated batch size config for LSM timeline migration on u… 
(#19052)
    
    * perf: Add dedicated batch size config for LSM timeline migration on 
upgrade
---
 .../apache/hudi/config/HoodieArchivalConfig.java   | 14 ++++
 .../org/apache/hudi/config/HoodieWriteConfig.java  |  4 ++
 .../table/upgrade/SevenToEightUpgradeHandler.java  | 13 ++--
 .../upgrade/TestSevenToEightUpgradeHandler.java    | 74 ++++++++++++++++++++++
 .../apache/hudi/utils/TestFlinkWriteClients.java   | 13 ++++
 5 files changed, 113 insertions(+), 5 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieArchivalConfig.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieArchivalConfig.java
index 8854c87edeab..e97e268fa9f9 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieArchivalConfig.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieArchivalConfig.java
@@ -88,6 +88,15 @@ public class HoodieArchivalConfig extends HoodieConfig {
       .withDocumentation("Archiving of instants is batched in best-effort 
manner, to pack more instants into a single"
           + " archive log. This config controls such archival batch size.");
 
+  public static final ConfigProperty<Integer> 
MIGRATION_COMMITS_ARCHIVAL_BATCH_SIZE = ConfigProperty
+      .key("hoodie.timeline.migration.commits.archival.batch")
+      .defaultValue(500)
+      .markAdvanced()
+      .withDocumentation("Batch size used when migrating the legacy archived 
timeline to the LSM timeline during a"
+          + " table version upgrade. A larger batch size minimizes the number 
of parquet files (and the associated"
+          + " remote storage operations like exists check, parquet write and 
manifest update) created during the"
+          + " one-time migration, which significantly reduces the total 
migration time.");
+
   public static final ConfigProperty<Integer> TIMELINE_COMPACTION_BATCH_SIZE = 
ConfigProperty
       .key("hoodie.timeline.compaction.batch.size")
       .defaultValue(10)
@@ -211,6 +220,11 @@ public class HoodieArchivalConfig extends HoodieConfig {
       return this;
     }
 
+    public HoodieArchivalConfig.Builder 
withMigrationCommitsArchivalBatchSize(int batchSize) {
+      archivalConfig.setValue(MIGRATION_COMMITS_ARCHIVAL_BATCH_SIZE, 
String.valueOf(batchSize));
+      return this;
+    }
+
     public Builder withArchiveBeyondSavepoint(boolean archiveBeyondSavepoint) {
       archivalConfig.setValue(ARCHIVE_BEYOND_SAVEPOINT, 
String.valueOf(archiveBeyondSavepoint));
       return this;
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index 115c1216c450..b7bafe0f0b21 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -2020,6 +2020,10 @@ public class HoodieWriteConfig extends HoodieConfig {
     return getInt(HoodieArchivalConfig.COMMITS_ARCHIVAL_BATCH_SIZE);
   }
 
+  public int getMigrationCommitArchivalBatchSize() {
+    return getInt(HoodieArchivalConfig.MIGRATION_COMMITS_ARCHIVAL_BATCH_SIZE);
+  }
+
   public boolean shouldBlockArchivalOnCleanECTR() {
     return 
getBoolean(HoodieArchivalConfig.BLOCK_ARCHIVAL_ON_LATEST_CLEAN_ECTR);
   }
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/SevenToEightUpgradeHandler.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/SevenToEightUpgradeHandler.java
index 7f7338e3da30..533e44353992 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/SevenToEightUpgradeHandler.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/SevenToEightUpgradeHandler.java
@@ -137,7 +137,7 @@ public class SevenToEightUpgradeHandler implements 
UpgradeHandler {
       }, instants.size());
     }
 
-    upgradeToLSMTimeline(table, context, config);
+    upgradeToLSMTimeline(table, config);
 
     return new UpgradeDowngrade.TableConfigChangeSet(tablePropsToAdd, 
Collections.emptySet());
   }
@@ -249,7 +249,7 @@ public class SevenToEightUpgradeHandler implements 
UpgradeHandler {
     }
   }
 
-  static void upgradeToLSMTimeline(HoodieTable table, HoodieEngineContext 
engineContext, HoodieWriteConfig config) {
+  static void upgradeToLSMTimeline(HoodieTable table, HoodieWriteConfig 
config) {
     
table.getMetaClient().getTableConfig().getTimelineLayoutVersion().ifPresent(
         timelineLayoutVersion -> 
ValidationUtils.checkState(TimelineLayoutVersion.LAYOUT_VERSION_1.equals(timelineLayoutVersion),
             "Upgrade to LSM timeline is only supported for layout version 1. 
Given version: " + timelineLayoutVersion));
@@ -257,7 +257,12 @@ public class SevenToEightUpgradeHandler implements 
UpgradeHandler {
       LegacyArchivedMetaEntryReader reader = new 
LegacyArchivedMetaEntryReader(table.getMetaClient());
       StoragePath archivePath = new 
StoragePath(table.getMetaClient().getMetaPath(), "timeline/history");
       LSMTimelineWriter lsmTimelineWriter = 
LSMTimelineWriter.getInstance(config, table, Option.of(archivePath));
-      int batchSize = config.getCommitArchivalBatchSize();
+      // Use a dedicated, larger batch size for the one-time migration to 
minimize the number of parquet
+      // files created on remote storage. Each write() call involves multiple 
remote storage operations
+      // (exists check, parquet write, manifest update); the regular archival 
batch size is much smaller
+      // than what migration needs, so with hundreds of actions it creates 
excessive I/O that
+      // significantly increases the migration time.
+      int batchSize = config.getMigrationCommitArchivalBatchSize();
       List<ActiveAction> activeActionsBatch = new ArrayList<>(batchSize);
       try (ClosableIterator<ActiveAction> iterator = 
reader.getActiveActionsIterator()) {
         while (iterator.hasNext()) {
@@ -265,7 +270,6 @@ public class SevenToEightUpgradeHandler implements 
UpgradeHandler {
           // If the batch is full, write it to the LSM timeline
           if (activeActionsBatch.size() == batchSize) {
             lsmTimelineWriter.write(new ArrayList<>(activeActionsBatch), 
Option.empty(), Option.empty());
-            lsmTimelineWriter.compactAndClean(engineContext);
             activeActionsBatch.clear();
           }
         }
@@ -273,7 +277,6 @@ public class SevenToEightUpgradeHandler implements 
UpgradeHandler {
         // Write any remaining actions in the final batch
         if (!activeActionsBatch.isEmpty()) {
           lsmTimelineWriter.write(new ArrayList<>(activeActionsBatch), 
Option.empty(), Option.empty());
-          lsmTimelineWriter.compactAndClean(engineContext);
         }
       }
     } catch (Exception e) {
diff --git 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/upgrade/TestSevenToEightUpgradeHandler.java
 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/upgrade/TestSevenToEightUpgradeHandler.java
index 66d03811b76f..400f37ba768c 100644
--- 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/upgrade/TestSevenToEightUpgradeHandler.java
+++ 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/upgrade/TestSevenToEightUpgradeHandler.java
@@ -19,14 +19,22 @@
 
 package org.apache.hudi.table.upgrade;
 
+import org.apache.hudi.client.timeline.versioning.v2.LSMTimelineWriter;
+import org.apache.hudi.client.utils.LegacyArchivedMetaEntryReader;
 import org.apache.hudi.common.bootstrap.index.hfile.HFileBootstrapIndex;
 import org.apache.hudi.common.config.ConfigProperty;
 import org.apache.hudi.common.config.RecordMergeMode;
 import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload;
 import org.apache.hudi.common.table.HoodieTableConfig;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.ActiveAction;
+import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
 import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.ClosableIterator;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
+import org.apache.hudi.storage.StoragePath;
+import org.apache.hudi.table.HoodieTable;
 
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
@@ -34,10 +42,14 @@ import org.junit.jupiter.api.extension.ExtendWith;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.CsvSource;
 import org.mockito.Mock;
+import org.mockito.MockedConstruction;
+import org.mockito.MockedStatic;
 import org.mockito.Mockito;
 import org.mockito.junit.jupiter.MockitoExtension;
 
+import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 
 import static 
org.apache.hudi.common.table.HoodieTableConfig.BOOTSTRAP_INDEX_CLASS_NAME;
@@ -52,8 +64,15 @@ import static 
org.apache.hudi.common.table.HoodieTableConfig.RECORD_MERGE_MODE;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.Mockito.isA;
+import static org.mockito.Mockito.lenient;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.mockStatic;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 @ExtendWith(MockitoExtension.class)
@@ -153,6 +172,61 @@ class TestSevenToEightUpgradeHandler {
     }
   }
 
+  @Test
+  void testUpgradeToLSMTimelineSingleBatch() throws Exception {
+    // A single batch large enough to hold all actions should result in 
exactly one write() call,
+    // proving the migration batch size config (not the regular archival batch 
size) drives batching.
+    LSMTimelineWriter writer = runMigration(500, 4);
+    verify(writer, times(1)).write(any(), any(), any());
+    verify(writer, never()).compactAndClean(any());
+  }
+
+  @Test
+  void testUpgradeToLSMTimelineBatchesByMigrationBatchSize() throws Exception {
+    // With more actions than the migration batch size, the in-loop batching 
branch must fire:
+    // 4 actions with a batch size of 2 -> [2, 2] -> 2 write() calls. This 
pins that the configured
+    // migration batch size (not just "all actions in one batch") actually 
governs batching.
+    LSMTimelineWriter writer = runMigration(2, 4);
+    verify(writer, times(2)).write(any(), any(), any());
+    verify(writer, never()).compactAndClean(any());
+  }
+
+  /**
+   * Runs {@link SevenToEightUpgradeHandler#upgradeToLSMTimeline} with the 
given migration batch size
+   * over the given number of archived actions, and returns the (mocked) LSM 
timeline writer for verification.
+   */
+  private LSMTimelineWriter runMigration(int migrationBatchSize, int 
totalActions) {
+    HoodieTable table = mock(HoodieTable.class);
+    HoodieTableMetaClient metaClient = mock(HoodieTableMetaClient.class);
+    HoodieTableConfig tableConfig = mock(HoodieTableConfig.class);
+
+    when(table.getMetaClient()).thenReturn(metaClient);
+    when(metaClient.getTableConfig()).thenReturn(tableConfig);
+    
when(tableConfig.getTimelineLayoutVersion()).thenReturn(Option.of(TimelineLayoutVersion.LAYOUT_VERSION_1));
+    when(metaClient.getMetaPath()).thenReturn(new StoragePath("/tmp/.hoodie"));
+    
when(config.getMigrationCommitArchivalBatchSize()).thenReturn(migrationBatchSize);
+    // The regular archival batch size must not be consulted during migration.
+    lenient().when(config.getCommitArchivalBatchSize()).thenReturn(1);
+
+    List<ActiveAction> actions = new ArrayList<>();
+    for (int i = 0; i < totalActions; i++) {
+      actions.add(mock(ActiveAction.class));
+    }
+
+    LSMTimelineWriter writer = mock(LSMTimelineWriter.class);
+    try (MockedStatic<LSMTimelineWriter> mockedWriterStatic = 
mockStatic(LSMTimelineWriter.class);
+         MockedConstruction<LegacyArchivedMetaEntryReader> mockedReader = 
Mockito.mockConstruction(
+             LegacyArchivedMetaEntryReader.class,
+             (readerMock, ctx) -> when(readerMock.getActiveActionsIterator())
+                 .thenReturn(ClosableIterator.wrap(actions.iterator())))) {
+      mockedWriterStatic.when(() -> LSMTimelineWriter.getInstance(
+          any(HoodieWriteConfig.class), any(HoodieTable.class), 
any(Option.class))).thenReturn(writer);
+
+      SevenToEightUpgradeHandler.upgradeToLSMTimeline(table, config);
+    }
+    return writer;
+  }
+
   private static Map<ConfigProperty, String> createMap(Object... keyValues) {
     Map<ConfigProperty, String> map = new HashMap<>();
     for (int i = 0; i < keyValues.length; i += 2) {
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestFlinkWriteClients.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestFlinkWriteClients.java
index 3910b91daebd..b357f0fae12f 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestFlinkWriteClients.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestFlinkWriteClients.java
@@ -38,6 +38,7 @@ import org.apache.hudi.common.table.HoodieTableConfig;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.HoodieTableVersion;
 import org.apache.hudi.common.table.marker.MarkerType;
+import org.apache.hudi.config.HoodieArchivalConfig;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.configuration.FlinkOptions;
 import org.apache.hudi.index.HoodieIndex;
@@ -121,6 +122,18 @@ public class TestFlinkWriteClients {
     assertEquals(12, writeConfig.getGlobalRecordLevelIndexMinFileGroupCount());
   }
 
+  @Test
+  void testUserConfiguredMigrationCommitArchivalBatchSizeIsPropagated() {
+    // A raw hoodie.* property set on the Flink configuration must be 
propagated to the write config
+    // (and therefore reach the upgrade handler that reads it during the v7 -> 
v8 LSM timeline migration).
+    
conf.setString(HoodieArchivalConfig.MIGRATION_COMMITS_ARCHIVAL_BATCH_SIZE.key(),
 "123");
+    HoodieWriteConfig writeConfig = 
FlinkWriteClients.getHoodieClientConfig(conf, false, false);
+    assertEquals(123, writeConfig.getMigrationCommitArchivalBatchSize());
+    // The regular archival batch size must stay independent at its own 
default.
+    
assertEquals(Integer.parseInt(HoodieArchivalConfig.COMMITS_ARCHIVAL_BATCH_SIZE.defaultValue()),
+        writeConfig.getCommitArchivalBatchSize());
+  }
+
   @Test
   void testHoodieClientConfigPrecedence() {
     conf.setString(HoodieWriteConfig.EMBEDDED_TIMELINE_SERVER_ENABLE.key(), 
"false");

Reply via email to