yuxiqian commented on code in PR #3563:
URL: https://github.com/apache/flink-cdc/pull/3563#discussion_r1726507995


##########
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/SchemaOperator.java:
##########
@@ -412,36 +404,62 @@ private void handleSchemaChangeEvent(TableId tableId, 
SchemaChangeEvent schemaCh
                             schemaChangeEvent));
         }
 
-        // The request will need to send a FlushEvent or block until flushing 
finished
+        // The request will block if another schema change event is being 
handled
         SchemaChangeResponse response = requestSchemaChange(tableId, 
schemaChangeEvent);
-        if (!response.getSchemaChangeEvents().isEmpty()) {
-            LOG.info(
-                    "Sending the FlushEvent for table {} in subtask {}.",
-                    tableId,
-                    getRuntimeContext().getIndexOfThisSubtask());
+        if (response.isAccepted()) {
+            LOG.info(">{} Sending the FlushEvent for table {}.", subTaskId, 
tableId);
             output.collect(new StreamRecord<>(new FlushEvent(tableId)));
             List<SchemaChangeEvent> expectedSchemaChangeEvents = 
response.getSchemaChangeEvents();
             
schemaOperatorMetrics.increaseSchemaChangeEvents(expectedSchemaChangeEvents.size());
 
             // The request will block until flushing finished in each sink 
writer
-            ReleaseUpstreamResponse schemaEvolveResponse = 
requestReleaseUpstream();
+            SchemaChangeResultResponse schemaEvolveResponse = 
requestSchemaChangeResult();
             List<SchemaChangeEvent> finishedSchemaChangeEvents =
                     schemaEvolveResponse.getFinishedSchemaChangeEvents();
 
             // Update evolved schema changes based on apply results
             finishedSchemaChangeEvents.forEach(e -> output.collect(new 
StreamRecord<>(e)));
+        } else if (response.isDuplicate()) {
+            LOG.info(
+                    ">{} Schema change event {} has been handled in another 
subTask already.",
+                    subTaskId,
+                    schemaChangeEvent);
+        } else if (response.isIgnored()) {
+            LOG.info(
+                    ">{} Schema change event {} has been ignored. No schema 
evolution needed.",
+                    subTaskId,
+                    schemaChangeEvent);
+        } else {
+            throw new IllegalStateException("Unexpected response status " + 
response);
         }
     }
 
     private SchemaChangeResponse requestSchemaChange(
-            TableId tableId, SchemaChangeEvent schemaChangeEvent) {
-        return sendRequestToCoordinator(new SchemaChangeRequest(tableId, 
schemaChangeEvent));
+            TableId tableId, SchemaChangeEvent schemaChangeEvent)
+            throws InterruptedException, TimeoutException {
+        long schemaEvolveTimeOutMillis = System.currentTimeMillis() + 
rpcTimeOutInMillis;

Review Comment:
   This has been exposed as a pipeline configuration as 
`schema-operator.rpc-timeout`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to