cshuo commented on code in PR #18411:
URL: https://github.com/apache/hudi/pull/18411#discussion_r3007251875
##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java:
##########
@@ -228,14 +228,18 @@ public void start() throws Exception {
this.metaClient = initTableIfNotExists(this.conf);
// the write client must create after the table creation
this.writeClient = FlinkWriteClients.createWriteClient(conf);
- this.writeClient.tryUpgrade(instant, this.metaClient);
- initMetadataTable(this.writeClient);
- if (tableState.scheduleMdtCompaction) {
- this.metadataWriteClient =
StreamerUtil.createMetadataWriteClient(writeClient);
- }
-
- // start the executor
+ // Create executors BEFORE the upgrade so that the heavy initialization
+ // (upgrade, metadata table init, event restoration) can run on the
executor
+ // thread instead of the Pekko dispatcher thread. Running the upgrade
+ // synchronously on the dispatcher thread blocks heartbeat responses and
+ // causes the ResourceManager to disconnect the JobManager when upgrades
+ // take longer than the heartbeat timeout (e.g., LSM timeline migration
+ // with hundreds of archived actions).
+ //
+ // Since the executor is single-threaded and FIFO, submitting the
Review Comment:
Initialization is submitted asynchronously to executor, while instant-time
requests run on another executor `instantRequestExecutor` immediately. That
allows `startInstant()` to race ahead of `tryUpgrade()` and metadata init,
which leads to unexpected result.
Besides, the initialization/running of the ingestion pipeline also depends
on the MDT init when streaming RLI is enabled, e.g., `RecordIndexPartitioner`
needs to know the file group count of RLI before shuffling the records.
https://github.com/apache/hudi/blob/d241b0901b271b2ff6b86a03904bbfd25ec300c8/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/RecordIndexPartitioner.java#L53-L57
Overall, it't not safe to start the pipeline and begin processing inputs
before the upgrading and MDT initialization is finished.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]