loserwang1024 commented on code in PR #3563:
URL: https://github.com/apache/flink-cdc/pull/3563#discussion_r1726446769


##########
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/SchemaOperator.java:
##########
@@ -412,36 +404,62 @@ private void handleSchemaChangeEvent(TableId tableId, 
SchemaChangeEvent schemaCh
                             schemaChangeEvent));
         }
 
-        // The request will need to send a FlushEvent or block until flushing 
finished
+        // The request will block if another schema change event is being 
handled
         SchemaChangeResponse response = requestSchemaChange(tableId, 
schemaChangeEvent);
-        if (!response.getSchemaChangeEvents().isEmpty()) {
-            LOG.info(
-                    "Sending the FlushEvent for table {} in subtask {}.",
-                    tableId,
-                    getRuntimeContext().getIndexOfThisSubtask());
+        if (response.isAccepted()) {
+            LOG.info(">{} Sending the FlushEvent for table {}.", subTaskId, 
tableId);
             output.collect(new StreamRecord<>(new FlushEvent(tableId)));
             List<SchemaChangeEvent> expectedSchemaChangeEvents = 
response.getSchemaChangeEvents();
             
schemaOperatorMetrics.increaseSchemaChangeEvents(expectedSchemaChangeEvents.size());
 
             // The request will block until flushing finished in each sink 
writer
-            ReleaseUpstreamResponse schemaEvolveResponse = 
requestReleaseUpstream();
+            SchemaChangeResultResponse schemaEvolveResponse = 
requestSchemaChangeResult();
             List<SchemaChangeEvent> finishedSchemaChangeEvents =
                     schemaEvolveResponse.getFinishedSchemaChangeEvents();
 
             // Update evolved schema changes based on apply results
             finishedSchemaChangeEvents.forEach(e -> output.collect(new 
StreamRecord<>(e)));
+        } else if (response.isDuplicate()) {
+            LOG.info(
+                    ">{} Schema change event {} has been handled in another 
subTask already.",
+                    subTaskId,
+                    schemaChangeEvent);
+        } else if (response.isIgnored()) {
+            LOG.info(
+                    ">{} Schema change event {} has been ignored. No schema 
evolution needed.",
+                    subTaskId,
+                    schemaChangeEvent);
+        } else {
+            throw new IllegalStateException("Unexpected response status " + 
response);
         }
     }
 
     private SchemaChangeResponse requestSchemaChange(
-            TableId tableId, SchemaChangeEvent schemaChangeEvent) {
-        return sendRequestToCoordinator(new SchemaChangeRequest(tableId, 
schemaChangeEvent));
+            TableId tableId, SchemaChangeEvent schemaChangeEvent)
+            throws InterruptedException, TimeoutException {
+        long schemaEvolveTimeOutMillis = System.currentTimeMillis() + 
rpcTimeOutInMillis;
+        while (true) {
+            SchemaChangeResponse response =
+                    sendRequestToCoordinator(new SchemaChangeRequest(tableId, 
schemaChangeEvent));
+            if (response.isRegistryBusy()) {
+                if (System.currentTimeMillis() < schemaEvolveTimeOutMillis) {
+                    LOG.info(
+                            ">{} Schema Registry is busy now, waiting for next 
request...",

Review Comment:
   what is "> substask" meaning?



##########
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaManager.java:
##########
@@ -93,6 +98,97 @@ public SchemaChangeBehavior getBehavior() {
         return behavior;
     }
 
+    /**
+     * This function checks if the given schema change event has been applied 
already. If so, it
+     * will be ignored to avoid sending duplicate evolved schema change events 
to sink metadata
+     * applier.
+     */
+    public final boolean 
isOriginalSchemaChangeEventRedundant(SchemaChangeEvent event) {
+        TableId tableId = event.tableId();
+        Optional<Schema> latestSchema = getLatestOriginalSchema(tableId);
+        return Boolean.TRUE.equals(
+                SchemaChangeEventVisitor.visit(

Review Comment:
   I wonder we should do it in registry or sink?  @leonardBang , WDYT?



##########
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/SchemaOperator.java:
##########
@@ -412,36 +404,62 @@ private void handleSchemaChangeEvent(TableId tableId, 
SchemaChangeEvent schemaCh
                             schemaChangeEvent));
         }
 
-        // The request will need to send a FlushEvent or block until flushing 
finished
+        // The request will block if another schema change event is being 
handled
         SchemaChangeResponse response = requestSchemaChange(tableId, 
schemaChangeEvent);
-        if (!response.getSchemaChangeEvents().isEmpty()) {
-            LOG.info(
-                    "Sending the FlushEvent for table {} in subtask {}.",
-                    tableId,
-                    getRuntimeContext().getIndexOfThisSubtask());
+        if (response.isAccepted()) {
+            LOG.info(">{} Sending the FlushEvent for table {}.", subTaskId, 
tableId);
             output.collect(new StreamRecord<>(new FlushEvent(tableId)));
             List<SchemaChangeEvent> expectedSchemaChangeEvents = 
response.getSchemaChangeEvents();
             
schemaOperatorMetrics.increaseSchemaChangeEvents(expectedSchemaChangeEvents.size());
 
             // The request will block until flushing finished in each sink 
writer
-            ReleaseUpstreamResponse schemaEvolveResponse = 
requestReleaseUpstream();
+            SchemaChangeResultResponse schemaEvolveResponse = 
requestSchemaChangeResult();
             List<SchemaChangeEvent> finishedSchemaChangeEvents =
                     schemaEvolveResponse.getFinishedSchemaChangeEvents();
 
             // Update evolved schema changes based on apply results
             finishedSchemaChangeEvents.forEach(e -> output.collect(new 
StreamRecord<>(e)));
+        } else if (response.isDuplicate()) {
+            LOG.info(
+                    ">{} Schema change event {} has been handled in another 
subTask already.",
+                    subTaskId,
+                    schemaChangeEvent);
+        } else if (response.isIgnored()) {
+            LOG.info(
+                    ">{} Schema change event {} has been ignored. No schema 
evolution needed.",
+                    subTaskId,
+                    schemaChangeEvent);
+        } else {
+            throw new IllegalStateException("Unexpected response status " + 
response);
         }
     }
 
     private SchemaChangeResponse requestSchemaChange(
-            TableId tableId, SchemaChangeEvent schemaChangeEvent) {
-        return sendRequestToCoordinator(new SchemaChangeRequest(tableId, 
schemaChangeEvent));
+            TableId tableId, SchemaChangeEvent schemaChangeEvent)
+            throws InterruptedException, TimeoutException {
+        long schemaEvolveTimeOutMillis = System.currentTimeMillis() + 
rpcTimeOutInMillis;

Review Comment:
   Please exposed DEFAULT_SCHEMA_OPERATOR_RPC_TIMEOUT. If user restarted job 
with binlog mode, lot of create table events will be sent, some operator may 
wait for a long time.



##########
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/SchemaOperator.java:
##########
@@ -412,36 +404,62 @@ private void handleSchemaChangeEvent(TableId tableId, 
SchemaChangeEvent schemaCh
                             schemaChangeEvent));
         }
 
-        // The request will need to send a FlushEvent or block until flushing 
finished
+        // The request will block if another schema change event is being 
handled
         SchemaChangeResponse response = requestSchemaChange(tableId, 
schemaChangeEvent);
-        if (!response.getSchemaChangeEvents().isEmpty()) {
-            LOG.info(
-                    "Sending the FlushEvent for table {} in subtask {}.",
-                    tableId,
-                    getRuntimeContext().getIndexOfThisSubtask());
+        if (response.isAccepted()) {
+            LOG.info(">{} Sending the FlushEvent for table {}.", subTaskId, 
tableId);

Review Comment:
   larger than taskId?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to