prashantwason commented on code in PR #18411:
URL: https://github.com/apache/hudi/pull/18411#discussion_r3011357839


##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/SevenToEightUpgradeHandler.java:
##########
@@ -249,6 +249,12 @@ static void upgradeKeyGeneratorType(HoodieTableConfig 
tableConfig, Map<ConfigPro
     }
   }
 
+  // Use a large batch size for migration to minimize the number of parquet 
files created
+  // on remote storage. Each write() call involves multiple remote storage 
operations (exists check,
+  // parquet write, manifest update). Using the default archival batch size 
(10) with hundreds of
+  // actions creates excessive I/O that significantly increases the total 
migration time.
+  private static final int MIGRATION_BATCH_SIZE = 500;

Review Comment:
   For a table with N archived actions, original code does `N/10` batches, each 
with 1 write + 1 compactAndClean = ~5 remote storage ops. With 
`MIGRATION_BATCH_SIZE = 500`, it does `N/500` batches with 1 write each + 1 
final compactAndClean.
   
   Concrete numbers:
   - **50 actions**: 25 ops → 5 ops (~80% reduction)
   - **200 actions**: 100 ops → 5 ops (~95% reduction)  
   - **500 actions**: 250 ops → 6 ops (~97% reduction)
   
   The biggest gain is from moving `compactAndClean()` to end-only. In the 
original code, each compaction re-processes all previously written L0 files, so 
the cost grows quadratically with the number of batches. With end-only 
compaction, it's a single O(N) pass.
   
   Pushed a revised commit that drops the async executor change (per your and 
@cshuo's feedback) and keeps only this I/O optimization.



##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java:
##########
@@ -244,6 +248,17 @@ public void start() throws Exception {
           .threadFactory(getThreadFactory("instant-request"))
           .exceptionHook((errMsg, t) -> this.context.failJob(new 
HoodieException(errMsg, t)))
           .build();
+
+      // Run upgrade and post-upgrade initialization on the executor thread.
+      this.executor.execute(() -> {
+        this.writeClient.tryUpgrade(instant, this.metaClient);
+        initMetadataTable(this.writeClient);
+        if (tableState.scheduleMdtCompaction) {
+          this.metadataWriteClient = 
StreamerUtil.createMetadataWriteClient(writeClient);

Review Comment:
   Agreed. Reverted the async change — the coordinator must block start() until 
upgrade and MDT init are complete. The revised commit keeps only the I/O 
optimization in SevenToEightUpgradeHandler (larger batch size + single 
compactAndClean at end).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to