This is an automated email from the ASF dual-hosted git repository.
leonard pushed a commit to branch release-3.4
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git
The following commit(s) were added to refs/heads/release-3.4 by this push:
new 1bc596aee [FLINK-37462][cdc-runtime] Fix consecutive schema change
requests might be overwritten
1bc596aee is described below
commit 1bc596aee8ae349af3dde3477530af4fb917b033
Author: yuxiqian <[email protected]>
AuthorDate: Tue Apr 29 13:58:39 2025 +0800
[FLINK-37462][cdc-runtime] Fix consecutive schema change requests might be
overwritten
This closes #4010
---
.../runtime/operators/schema/regular/SchemaCoordinator.java | 10 ++++++----
.../cdc/runtime/operators/schema/regular/SchemaOperator.java | 3 ++-
2 files changed, 8 insertions(+), 5 deletions(-)
diff --git
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/regular/SchemaCoordinator.java
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/regular/SchemaCoordinator.java
index 92ae16b8c..53344dc87 100644
---
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/regular/SchemaCoordinator.java
+++
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/regular/SchemaCoordinator.java
@@ -419,15 +419,17 @@ public class SchemaCoordinator extends SchemaRegistry {
tableId,
schemaManager.getLatestEvolvedSchema(tableId).orElse(null));
}
- // And returns all successfully applied schema change events to
SchemaOperator.
- responseFuture.complete(
- wrap(new SchemaChangeResponse(appliedSchemaChangeEvents,
refreshedEvolvedSchemas)));
-
pendingRequests.remove(sourceSubTaskId);
+
LOG.info(
"Finished handling schema change request from {}. Pending
requests: {}",
sourceSubTaskId,
pendingRequests);
+
+ // We release the response future at last to avoid leaking internal
states to SchemaOperator
+ // client accidentally.
+ responseFuture.complete(
+ wrap(new SchemaChangeResponse(appliedSchemaChangeEvents,
refreshedEvolvedSchemas)));
}
private boolean applyAndUpdateEvolvedSchemaChange(SchemaChangeEvent
schemaChangeEvent) {
diff --git
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/regular/SchemaOperator.java
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/regular/SchemaOperator.java
index 28195f238..feb1a38d5 100644
---
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/regular/SchemaOperator.java
+++
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/regular/SchemaOperator.java
@@ -167,10 +167,11 @@ public class SchemaOperator extends
AbstractStreamOperator<Event>
output.collect(
new StreamRecord<>(new FlushEvent(subTaskId, sinkTables,
originalEvent.getType())));
+ LOG.info("{}> Going to request schema change...", subTaskId);
+
// Then, queue to request schema change to SchemaCoordinator.
SchemaChangeResponse response = requestSchemaChange(tableId,
originalEvent);
- LOG.info("{}> Successfully requested schema change.", subTaskId);
LOG.info(
"{}> Finished schema change events: {}",
subTaskId,