cshuo commented on code in PR #18411:
URL: https://github.com/apache/hudi/pull/18411#discussion_r3007251875


##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java:
##########
@@ -228,14 +228,18 @@ public void start() throws Exception {
       this.metaClient = initTableIfNotExists(this.conf);
       // the write client must create after the table creation
       this.writeClient = FlinkWriteClients.createWriteClient(conf);
-      this.writeClient.tryUpgrade(instant, this.metaClient);
-      initMetadataTable(this.writeClient);
 
-      if (tableState.scheduleMdtCompaction) {
-        this.metadataWriteClient = 
StreamerUtil.createMetadataWriteClient(writeClient);
-      }
-
-      // start the executor
+      // Create executors BEFORE the upgrade so that the heavy initialization
+      // (upgrade, metadata table init, event restoration) can run on the 
executor
+      // thread instead of the Pekko dispatcher thread. Running the upgrade
+      // synchronously on the dispatcher thread blocks heartbeat responses and
+      // causes the ResourceManager to disconnect the JobManager when upgrades
+      // take longer than the heartbeat timeout (e.g., LSM timeline migration
+      // with hundreds of archived actions).
+      //
+      // Since the executor is single-threaded and FIFO, submitting the

Review Comment:
   Initialization is submitted asynchronously to executor, while instant-time 
requests run on another executor `instantRequestExecutor` immediately. That 
allows `startInstant()` to race ahead of `tryUpgrade()` and metadata init, 
which leads to unexpected result.
   
   Besides, the initialization/running of the ingestion pipeline also depends 
on the MDT init when streaming RLI is enabled, e.g., `RecordIndexPartitioner` 
needs to know the file group count of RLI before shuffling the records.
   
   
https://github.com/apache/hudi/blob/d241b0901b271b2ff6b86a03904bbfd25ec300c8/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/RecordIndexPartitioner.java#L53-L57
   
   Overall, it't not safe to start the pipeline and begin processing inputs 
before the upgrading and MDT initialization is finished. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to