yihua commented on code in PR #18411:
URL: https://github.com/apache/hudi/pull/18411#discussion_r3034562056


##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java:
##########
@@ -385,16 +407,23 @@ public void executionAttemptReady(int i, int 
attemptNumber, SubtaskGateway gatew
 
   @Override
   public CompletableFuture<CoordinationResponse> 
handleCoordinationRequest(CoordinationRequest request) {
-    if (request instanceof Correspondent.InstantTimeRequest) {
-      return handleInstantRequest((Correspondent.InstantTimeRequest) request);
-    }
-    if (request instanceof Correspondent.InflightInstantsRequest) {
-      return 
handleInFlightInstantsRequest((Correspondent.InflightInstantsRequest) request);
-    }
-    if (request instanceof Correspondent.AwaitPendingInstantsRequest) {
-      return 
handleAwaitPendingInstantsRequest((Correspondent.AwaitPendingInstantsRequest) 
request);
-    }
-    throw new HoodieException("Unexpected coordination request type: " + 
request.getClass().getSimpleName());
+    // Gate coordination requests on initialization completion. The upgrade,
+    // metadata table init, and event restoration run asynchronously on the
+    // executor thread (see start()). Coordination requests like startInstant()
+    // run on a separate instantRequestExecutor and must not race ahead of
+    // initialization — e.g., RecordIndexPartitioner depends on MDT init.
+    return initFuture.thenCompose(ignored -> {

Review Comment:
   🤖 Good question. A separate pre-upgrade step would keep the coordinator's 
`start()` simple, but it does change the UX from a transparent on-first-write 
upgrade to an explicit manual step. One middle ground: keep the in-place 
upgrade path but add a standalone CLI tool (or Spark action) that users with 
very large archived timelines can run ahead of time. That way the common case 
stays zero-touch, and only the pathological case (thousands of archived 
actions) needs the separate job.



##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java:
##########
@@ -244,15 +245,36 @@ public void start() throws Exception {
           .threadFactory(getThreadFactory("instant-request"))
           .exceptionHook((errMsg, t) -> this.context.failJob(new 
HoodieException(errMsg, t)))
           .build();
-      // start the executor if required
-      if (tableState.syncHive) {
-        initHiveSync();
-      }
-      // start client id heartbeats for optimistic concurrency control
-      if (OptionsResolver.isMultiWriter(conf)) {
-        initClientIds(conf);
-      }
-      restoreEvents();
+
+      // Run upgrade, metadata table init, and event restoration on the 
executor
+      // thread instead of the Pekko dispatcher thread. Running these 
synchronously
+      // on the dispatcher thread blocks heartbeat responses when the 
operations
+      // involve heavy I/O (e.g., LSM timeline migration with hundreds of 
archived
+      // actions), causing the ResourceManager to disconnect the JobManager.
+      //
+      // Safety guarantees:
+      // - Events via handleEventFromOperator() are submitted to the same
+      //   single-threaded executor, so FIFO ordering ensures initialization

Review Comment:
   🤖 The outer `catch (Throwable throwable)` block at the end of `start()` also 
doesn't complete `initFuture`. If an exception is thrown after the executors 
are created but before the lambda is submitted (or if `executor.execute()` 
itself fails to enqueue), `initFuture` will remain incomplete and coordination 
requests will hang. It might be worth adding 
`initFuture.completeExceptionally(throwable)` in that catch block as well.



##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java:
##########
@@ -244,15 +245,36 @@ public void start() throws Exception {
           .threadFactory(getThreadFactory("instant-request"))
           .exceptionHook((errMsg, t) -> this.context.failJob(new 
HoodieException(errMsg, t)))
           .build();
-      // start the executor if required
-      if (tableState.syncHive) {
-        initHiveSync();
-      }
-      // start client id heartbeats for optimistic concurrency control
-      if (OptionsResolver.isMultiWriter(conf)) {
-        initClientIds(conf);
-      }
-      restoreEvents();
+
+      // Run upgrade, metadata table init, and event restoration on the 
executor
+      // thread instead of the Pekko dispatcher thread. Running these 
synchronously
+      // on the dispatcher thread blocks heartbeat responses when the 
operations
+      // involve heavy I/O (e.g., LSM timeline migration with hundreds of 
archived
+      // actions), causing the ResourceManager to disconnect the JobManager.
+      //
+      // Safety guarantees:
+      // - Events via handleEventFromOperator() are submitted to the same
+      //   single-threaded executor, so FIFO ordering ensures initialization
+      //   completes before any event processing.
+      // - Coordination requests via handleCoordinationRequest() run on the
+      //   separate instantRequestExecutor and are gated on initFuture to
+      //   prevent startInstant() from racing ahead of the upgrade.
+      this.executor.execute(() -> {
+        this.writeClient.tryUpgrade(instant, this.metaClient);
+        initMetadataTable(this.writeClient);
+        if (tableState.scheduleMdtCompaction) {
+          this.metadataWriteClient = 
StreamerUtil.createMetadataWriteClient(writeClient);
+        }
+        if (tableState.syncHive) {
+          initHiveSync();
+        }

Review Comment:
   🤖 If any of the init steps (tryUpgrade, initMetadataTable, etc.) throw an 
exception, `NonThrownExecutor` swallows it and calls the exception hook, but 
`initFuture.complete(null)` is never reached. This leaves `initFuture` 
permanently incomplete, so any `handleCoordinationRequest` caller already 
waiting on `initFuture.thenCompose(...)` will hang indefinitely. Could you wrap 
the lambda body in try-finally (or try-catch) to ensure 
`initFuture.completeExceptionally(t)` on failure?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to