yuxiqian commented on code in PR #3858:
URL: https://github.com/apache/flink-cdc/pull/3858#discussion_r1914652765


##########
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/common/SchemaRegistry.java:
##########
@@ -253,6 +260,7 @@ public final CompletableFuture<CoordinationResponse> 
handleCoordinationRequest(
     public final void handleEventFromOperator(
             int subTaskId, int attemptNumber, OperatorEvent event) {
         runInEventLoop(
+                runInEventFromOperatorExecutor,

Review Comment:
   This seems incorrect. `handleEventFromOperator` should be submitted to the 
same single threaded executor like other methods (so they won't be scheduled 
simultaneously with other critical methods). Only the 
`SchemaCoordinator#startSchemaChange` needs to be wrapped in another executor.



##########
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/common/SchemaRegistry.java:
##########
@@ -96,6 +97,9 @@ public abstract class SchemaRegistry implements 
OperatorCoordinator, Coordinatio
     protected transient SchemaManager schemaManager;
     protected transient TableIdRouter router;
 
+    /** Executor service to execute handle event from operator. */
+    private final ExecutorService runInEventFromOperatorExecutor;

Review Comment:
   Keep its name consistent with regular schema coordinator



##########
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/common/SchemaRegistry.java:
##########


Review Comment:
   IIUC this problem will only occur if:
   
   * Distributed schema evolution topology is created
   * Flush success event takes a while to finish
   * The last schema operator initiates request before any flush succeeds
   
   Thus, The handler of FlushSuccessEvent will wait for schema evolution to 
finish, but the busy-loop is still waiting for collecting all flush success 
events.
   
   Could you please add a test case to verify this change?



##########
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/common/SchemaRegistry.java:
##########
@@ -325,6 +337,7 @@ public final void resetToCheckpoint(long checkpointId, 
@Nullable byte[] checkpoi
      * directly, make sure you're running heavy logics inside, or the entire 
job might hang!
      */
     protected void runInEventLoop(
+            final ExecutorService coordinatorExecutor,

Review Comment:
   It's nice to allow specifying `ExecutorService` when 
    calling `runInEventLoop`. Maybe `regular/SchemaCoordinator` could also 
invoke this instead of this:
   
   ```java
   schemaChangeThreadPool.submit(
                   () -> {
                       try {
                           applySchemaChange(originalEvent, 
deducedSchemaChangeEvents);
                       } catch (Throwable t) {
                           failJob(
                                   "Schema change applying task",
                                   new FlinkRuntimeException(
                                           "Failed to apply schema change 
event.", t));
                           throw t;
                       }
                   });
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to