yuxiqian commented on code in PR #3858:
URL: https://github.com/apache/flink-cdc/pull/3858#discussion_r1914652765
##########
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/common/SchemaRegistry.java:
##########
@@ -253,6 +260,7 @@ public final CompletableFuture<CoordinationResponse>
handleCoordinationRequest(
public final void handleEventFromOperator(
int subTaskId, int attemptNumber, OperatorEvent event) {
runInEventLoop(
+ runInEventFromOperatorExecutor,
Review Comment:
This seems incorrect. `handleEventFromOperator` should be submitted to the
same single threaded executor like other methods (so they won't be scheduled
simultaneously with other critical methods). Only the
`SchemaCoordinator#startSchemaChange` needs to be wrapped in another executor.
##########
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/common/SchemaRegistry.java:
##########
@@ -96,6 +97,9 @@ public abstract class SchemaRegistry implements
OperatorCoordinator, Coordinatio
protected transient SchemaManager schemaManager;
protected transient TableIdRouter router;
+ /** Executor service to execute handle event from operator. */
+ private final ExecutorService runInEventFromOperatorExecutor;
Review Comment:
Keep its name consistent with regular schema coordinator
##########
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/common/SchemaRegistry.java:
##########
Review Comment:
IIUC this problem will only occur if:
* Distributed schema evolution topology is created
* Flush success event takes a while to finish
* The last schema operator initiates request before any flush succeeds
Thus, The handler of FlushSuccessEvent will wait for schema evolution to
finish, but the busy-loop is still waiting for collecting all flush success
events.
Could you please add a test case to verify this change?
##########
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/common/SchemaRegistry.java:
##########
@@ -325,6 +337,7 @@ public final void resetToCheckpoint(long checkpointId,
@Nullable byte[] checkpoi
* directly, make sure you're running heavy logics inside, or the entire
job might hang!
*/
protected void runInEventLoop(
+ final ExecutorService coordinatorExecutor,
Review Comment:
It's nice to allow specifying `ExecutorService` when
calling `runInEventLoop`. Maybe `regular/SchemaCoordinator` could also
invoke this instead of this:
```java
schemaChangeThreadPool.submit(
() -> {
try {
applySchemaChange(originalEvent,
deducedSchemaChangeEvents);
} catch (Throwable t) {
failJob(
"Schema change applying task",
new FlinkRuntimeException(
"Failed to apply schema change
event.", t));
throw t;
}
});
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]