Shawn-Hx commented on code in PR #3680:
URL: https://github.com/apache/flink-cdc/pull/3680#discussion_r1881526319


##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/testutils/MySqSourceTestUtils.java:
##########
@@ -64,4 +67,18 @@ public static String getServerId(int parallelism) {
     }
 
     private MySqSourceTestUtils() {}
+
+    public static void loopCheck(

Review Comment:
   nit: move this method before constructor



##########
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistryRequestHandler.java:
##########
@@ -131,53 +134,45 @@ public SchemaRegistryRequestHandler(
     public void handleSchemaChangeRequest(
             SchemaChangeRequest request, 
CompletableFuture<CoordinationResponse> response) {
 
-        // We use requester subTask ID as the pending ticket, because there 
will be at most 1 schema
-        // change requests simultaneously from each subTask
-        int requestSubTaskId = request.getSubTaskId();
+        // We use nonce to identify each schema change request
+        long nonce = request.getNonce();
 
         synchronized (schemaChangeRequestLock) {
             // Make sure we handle the first request in the pending list to 
avoid out-of-order
             // waiting and blocks checkpointing mechanism.
             if (schemaChangeStatus == RequestStatus.IDLE) {
-                if (pendingSubTaskIds.isEmpty()) {
-                    LOG.info(
-                            "Received schema change event request {} from 
table {} from subTask {}. Pending list is empty, handling this.",
-                            request.getSchemaChangeEvent(),
-                            request.getTableId().toString(),
-                            requestSubTaskId);
-                } else if (pendingSubTaskIds.get(0) == requestSubTaskId) {
-                    LOG.info(
-                            "Received schema change event request {} from 
table {} from subTask {}. It is on the first of the pending list, handling 
this.",
-                            request.getSchemaChangeEvent(),
-                            request.getTableId().toString(),
-                            requestSubTaskId);
-                    pendingSubTaskIds.remove(0);
-                } else {
-                    LOG.info(
-                            "Received schema change event request {} from 
table {} from subTask {}. It is not the first of the pending list ({}).",
-                            request.getSchemaChangeEvent(),
-                            request.getTableId().toString(),
-                            requestSubTaskId,
-                            pendingSubTaskIds);
-                    if (!pendingSubTaskIds.contains(requestSubTaskId)) {
-                        pendingSubTaskIds.add(requestSubTaskId);
-                    }
-                    response.complete(wrap(SchemaChangeResponse.busy()));
-                    return;
-                }
-
                 SchemaChangeEvent event = request.getSchemaChangeEvent();
 
                 // If this schema change event has been requested by another 
subTask, ignore it.
                 if (schemaManager.isOriginalSchemaChangeEventRedundant(event)) 
{
                     LOG.info("Event {} has been addressed before, ignoring 
it.", event);
-                    clearCurrentSchemaChangeRequest();
+                    clearCurrentSchemaChangeRequest(nonce);
                     LOG.info(
                             "SchemaChangeStatus switched from 
WAITING_FOR_FLUSH to IDLE for request {} due to duplicated request.",

Review Comment:
   I think the log message needs to be updated because there is no 
WAITING_FOR_FLUSH status anymore.



##########
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/SchemaOperator.java:
##########
@@ -473,37 +485,44 @@ private void handleSchemaChangeEvent(TableId tableId, 
SchemaChangeEvent schemaCh
     }
 
     private SchemaChangeResponse requestSchemaChange(
-            TableId tableId, SchemaChangeEvent schemaChangeEvent)
+            TableId tableId, SchemaChangeEvent schemaChangeEvent, long nonce)
             throws InterruptedException, TimeoutException {
         long schemaEvolveTimeOutMillis = System.currentTimeMillis() + 
rpcTimeOutInMillis;
         while (true) {
             SchemaChangeResponse response =
                     sendRequestToCoordinator(
-                            new SchemaChangeRequest(tableId, 
schemaChangeEvent, subTaskId));
-            if (response.isRegistryBusy()) {
-                if (System.currentTimeMillis() < schemaEvolveTimeOutMillis) {
+                            new SchemaChangeRequest(tableId, 
schemaChangeEvent, subTaskId, nonce));
+            if (System.currentTimeMillis() < schemaEvolveTimeOutMillis) {
+                if (response.isRegistryBusy()) {
                     LOG.info(
                             "{}> Schema Registry is busy now, waiting for next 
request...",
                             subTaskId);
                     Thread.sleep(1000);
+                } else if (response.isWaitingForFlush()) {
+                    LOG.info(
+                            "{}> Schema change event (with once {}) has not 
collected enough flush success events from writers, waiting...",

Review Comment:
   once -> nonce



##########
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistryRequestHandler.java:
##########
@@ -186,7 +181,7 @@ public void handleSchemaChangeRequest(
                 // route strategies, ignore it.
                 if (derivedSchemaChangeEvents.isEmpty()) {
                     LOG.info("Event {} is omitted from sending to downstream, 
ignoring it.", event);
-                    clearCurrentSchemaChangeRequest();
+                    clearCurrentSchemaChangeRequest(nonce);
                     LOG.info(
                             "SchemaChangeStatus switched from 
WAITING_FOR_FLUSH to IDLE for request {} due to ignored request.",

Review Comment:
   As mentioned above.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to