prashantwason opened a new issue, #18410:
URL: https://github.com/apache/hudi/issues/18410

   ## Description
   
   When a Flink streaming job performs a Hudi table version upgrade (e.g., v7 → 
v8), the `StreamWriteOperatorCoordinator.start()` method runs `tryUpgrade()` 
synchronously on the Pekko dispatcher thread. For tables with many archived 
timeline actions, the LSM timeline migration in 
`SevenToEightUpgradeHandler.upgradeToLSMTimeline()` performs extensive remote 
storage I/O that can block the dispatcher thread for 90+ seconds.
   
   This blocks the Pekko dispatcher from processing RPC messages including 
heartbeat responses, causing the ResourceManager to disconnect the JobManager 
due to heartbeat timeout. The resulting application shutdown terminates 
filesystem thread pools, which causes `RejectedExecutionException` in the 
still-running upgrade code.
   
   ## Root Cause
   
   Two issues compound:
   
   1. **`StreamWriteOperatorCoordinator.start()` runs upgrade synchronously on 
dispatcher thread**: The `tryUpgrade()` → `UpgradeDowngrade.run()` → 
`SevenToEightUpgradeHandler.upgrade()` chain runs entirely on the Pekko 
dispatcher thread, blocking heartbeat processing.
   
   2. **`upgradeToLSMTimeline()` has excessive I/O**: Uses small batch size 
(default 10) and calls `compactAndClean()` after every batch. With hundreds of 
archived actions, this creates ~50+ batches × ~5 remote storage operations each 
= 250+ storage operations, each taking ~2 seconds on cloud storage.
   
   ## Error Chain
   
   ```
   StreamWriteOperatorCoordinator.start() [dispatcher thread]
     → tryUpgrade() [blocks dispatcher for 90+ seconds]
       → upgradeToLSMTimeline() [heavy I/O]
     → Pekko dispatcher blocked, can't process heartbeat RPCs
       → ResourceManager heartbeat timeout → disconnect JobManager
         → Application shutdown → CFS ThreadPoolExecutor terminated
           → RejectedExecutionException in upgrade code
   ```
   
   ## Fix
   
   1. **Run upgrade on executor thread**: Move `tryUpgrade()`, 
`initMetadataTable()`, and `restoreEvents()` to the coordinator's 
single-threaded FIFO executor, freeing the dispatcher thread for heartbeats. 
Since all event handling also goes through this executor, the FIFO ordering 
guarantees the upgrade completes before any events are processed.
   
   2. **Optimize LSM timeline migration**: Use a larger batch size (500 instead 
of 10) and call `compactAndClean()` once at the end instead of after every 
batch, reducing remote storage operations from 250+ to ~6.
   
   ## Impact
   
   This affects any Flink streaming job that upgrades a Hudi table with a 
non-trivial archived timeline. The failure is non-deterministic and depends on 
timeline size and storage latency.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to