yuxiqian commented on code in PR #3563:
URL: https://github.com/apache/flink-cdc/pull/3563#discussion_r1726511886
##########
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/SchemaOperator.java:
##########
@@ -412,36 +404,62 @@ private void handleSchemaChangeEvent(TableId tableId,
SchemaChangeEvent schemaCh
schemaChangeEvent));
}
- // The request will need to send a FlushEvent or block until flushing
finished
+ // The request will block if another schema change event is being
handled
SchemaChangeResponse response = requestSchemaChange(tableId,
schemaChangeEvent);
- if (!response.getSchemaChangeEvents().isEmpty()) {
- LOG.info(
- "Sending the FlushEvent for table {} in subtask {}.",
- tableId,
- getRuntimeContext().getIndexOfThisSubtask());
+ if (response.isAccepted()) {
+ LOG.info(">{} Sending the FlushEvent for table {}.", subTaskId,
tableId);
Review Comment:
This is intended to imitate how Values data sink prints subTaskId to
distinguish them:
```java
if (print) {
String prefix = numSubtasks > 1 ? subtaskIndex + "> " : "";
// print the detail message to console for verification.
System.out.println(
prefix
+ ValuesDataSinkHelper.convertEventToStr(
event,
fieldGetterMaps.get(((ChangeEvent)
event).tableId())));
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]