This is an automated email from the ASF dual-hosted git repository.

leonard pushed a commit to branch release-3.4
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git


The following commit(s) were added to refs/heads/release-3.4 by this push:
     new 1bc596aee [FLINK-37462][cdc-runtime] Fix consecutive schema change 
requests might be overwritten
1bc596aee is described below

commit 1bc596aee8ae349af3dde3477530af4fb917b033
Author: yuxiqian <[email protected]>
AuthorDate: Tue Apr 29 13:58:39 2025 +0800

    [FLINK-37462][cdc-runtime] Fix consecutive schema change requests might be 
overwritten
    
    This closes #4010
---
 .../runtime/operators/schema/regular/SchemaCoordinator.java    | 10 ++++++----
 .../cdc/runtime/operators/schema/regular/SchemaOperator.java   |  3 ++-
 2 files changed, 8 insertions(+), 5 deletions(-)

diff --git 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/regular/SchemaCoordinator.java
 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/regular/SchemaCoordinator.java
index 92ae16b8c..53344dc87 100644
--- 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/regular/SchemaCoordinator.java
+++ 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/regular/SchemaCoordinator.java
@@ -419,15 +419,17 @@ public class SchemaCoordinator extends SchemaRegistry {
                     tableId, 
schemaManager.getLatestEvolvedSchema(tableId).orElse(null));
         }
 
-        // And returns all successfully applied schema change events to 
SchemaOperator.
-        responseFuture.complete(
-                wrap(new SchemaChangeResponse(appliedSchemaChangeEvents, 
refreshedEvolvedSchemas)));
-
         pendingRequests.remove(sourceSubTaskId);
+
         LOG.info(
                 "Finished handling schema change request from {}. Pending 
requests: {}",
                 sourceSubTaskId,
                 pendingRequests);
+
+        // We release the response future at last to avoid leaking internal 
states to SchemaOperator
+        // client accidentally.
+        responseFuture.complete(
+                wrap(new SchemaChangeResponse(appliedSchemaChangeEvents, 
refreshedEvolvedSchemas)));
     }
 
     private boolean applyAndUpdateEvolvedSchemaChange(SchemaChangeEvent 
schemaChangeEvent) {
diff --git 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/regular/SchemaOperator.java
 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/regular/SchemaOperator.java
index 28195f238..feb1a38d5 100644
--- 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/regular/SchemaOperator.java
+++ 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/regular/SchemaOperator.java
@@ -167,10 +167,11 @@ public class SchemaOperator extends 
AbstractStreamOperator<Event>
         output.collect(
                 new StreamRecord<>(new FlushEvent(subTaskId, sinkTables, 
originalEvent.getType())));
 
+        LOG.info("{}> Going to request schema change...", subTaskId);
+
         // Then, queue to request schema change to SchemaCoordinator.
         SchemaChangeResponse response = requestSchemaChange(tableId, 
originalEvent);
 
-        LOG.info("{}> Successfully requested schema change.", subTaskId);
         LOG.info(
                 "{}> Finished schema change events: {}",
                 subTaskId,

Reply via email to