This is an automated email from the ASF dual-hosted git repository.

leonard pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git


The following commit(s) were added to refs/heads/master by this push:
     new 474339935 [FLINK-37578][cdc-runtime] Fix distributed schema registry 
exposes bad internal state accidentally
474339935 is described below

commit 47433993575d222808eb9cabe17b63c79270ae3d
Author: yuxiqian <[email protected]>
AuthorDate: Mon Apr 21 16:42:47 2025 +0800

    [FLINK-37578][cdc-runtime] Fix distributed schema registry exposes bad 
internal state accidentally
    
    This closes  #3972
    
    Co-authored-by: linjc13 <[email protected]>
---
 .../schema/distributed/SchemaCoordinator.java      | 75 ++++++++++++++++++----
 .../schema/distributed/SchemaOperator.java         | 12 +---
 .../distributed/event/SchemaChangeRequest.java     | 12 ++++
 .../distributed/event/SchemaChangeResponse.java    | 33 +++++++---
 4 files changed, 102 insertions(+), 30 deletions(-)

diff --git 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/distributed/SchemaCoordinator.java
 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/distributed/SchemaCoordinator.java
index 6d49653ba..8626eafbc 100755
--- 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/distributed/SchemaCoordinator.java
+++ 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/distributed/SchemaCoordinator.java
@@ -55,12 +55,14 @@ import java.io.DataOutputStream;
 import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.Collectors;
@@ -98,6 +100,9 @@ public class SchemaCoordinator extends SchemaRegistry {
     private transient Multimap<Tuple2<Integer, SchemaChangeEvent>, Integer>
             alreadyHandledSchemaChangeEvents;
 
+    /** Executor service to execute schema change. */
+    private final ExecutorService schemaChangeThreadPool;
+
     public SchemaCoordinator(
             String operatorName,
             OperatorCoordinator.Context context,
@@ -114,6 +119,7 @@ public class SchemaCoordinator extends SchemaRegistry {
                 routingRules,
                 schemaChangeBehavior,
                 rpcTimeout);
+        this.schemaChangeThreadPool = Executors.newSingleThreadExecutor();
     }
 
     // -----------------
@@ -131,6 +137,14 @@ public class SchemaCoordinator extends SchemaRegistry {
                 "Started SchemaRegistry for {}. Parallelism: {}", 
operatorName, currentParallelism);
     }
 
+    @Override
+    public void close() throws Exception {
+        super.close();
+        if (schemaChangeThreadPool != null && 
!schemaChangeThreadPool.isShutdown()) {
+            schemaChangeThreadPool.shutdownNow();
+        }
+    }
+
     // --------------------------
     // Checkpoint related methods
     // --------------------------
@@ -268,7 +282,20 @@ public class SchemaCoordinator extends SchemaRegistry {
             LOG.info(
                     "Received the last required schema change request {}. 
Switching from WAITING_FOR_FLUSH to EVOLVING.",
                     request);
-            startSchemaChange();
+
+            schemaChangeThreadPool.submit(
+                    () -> {
+                        try {
+                            startSchemaChange();
+                        } catch (Throwable t) {
+                            failJob(
+                                    "Schema change applying task",
+                                    new FlinkRuntimeException(
+                                            "Failed to apply schema change 
event.", t));
+                            throw new FlinkRuntimeException(
+                                    "Failed to apply schema change event.", t);
+                        }
+                    });
         }
     }
 
@@ -301,34 +328,56 @@ public class SchemaCoordinator extends SchemaRegistry {
         LOG.info("All flushed. Going to evolve schema for pending requests: 
{}", pendingRequests);
         flushedSinkWriters.clear();
 
-        // Deduce what schema change events should be applied to sink table
-        List<SchemaChangeEvent> deducedSchemaChangeEvents = 
deduceEvolvedSchemaChanges();
+        // Deduce what schema change events should be applied to sink table, 
and affected sink
+        // tables' schema
+        Tuple2<Set<TableId>, List<SchemaChangeEvent>> deduceSummary = 
deduceEvolvedSchemaChanges();
 
         // And tries to apply it to external system
         List<SchemaChangeEvent> successfullyAppliedSchemaChangeEvents = new 
ArrayList<>();
-        for (SchemaChangeEvent appliedSchemaChangeEvent : 
deducedSchemaChangeEvents) {
+        for (SchemaChangeEvent appliedSchemaChangeEvent : deduceSummary.f1) {
             if (applyAndUpdateEvolvedSchemaChange(appliedSchemaChangeEvent)) {
                 
successfullyAppliedSchemaChangeEvents.add(appliedSchemaChangeEvent);
             }
         }
 
-        // Then, we broadcast affected schema changes to mapper and release 
upstream
-        pendingRequests.forEach(
-                (subTaskId, tuple) -> {
-                    LOG.info("Coordinator finishes pending future from {}", 
subTaskId);
-                    tuple.f1.complete(
-                            wrap(new 
SchemaChangeResponse(successfullyAppliedSchemaChangeEvents)));
-                });
+        // Fetch refreshed view for affected tables. We can't rely on operator 
clients to do this
+        // because it might not have a complete schema view after restoring 
from previous states.
+        Set<TableId> affectedTableIds = deduceSummary.f0;
+        Map<TableId, Schema> evolvedSchemaView = new HashMap<>();
+        for (TableId tableId : affectedTableIds) {
+            schemaManager
+                    .getLatestEvolvedSchema(tableId)
+                    .ifPresent(schema -> evolvedSchemaView.put(tableId, 
schema));
+        }
+
+        List<Tuple2<SchemaChangeRequest, 
CompletableFuture<CoordinationResponse>>> futures =
+                new ArrayList<>(pendingRequests.values());
 
+        // Restore coordinator internal states first...
         pendingRequests.clear();
 
         LOG.info("Finished schema evolving. Switching from EVOLVING to IDLE.");
         Preconditions.checkState(
                 evolvingStatus.compareAndSet(RequestStatus.EVOLVING, 
RequestStatus.IDLE),
                 "RequestStatus should be EVOLVING when schema evolving 
finishes.");
+
+        // ... and broadcast affected schema changes to mapper and release 
upstream then.
+        // Make sure we've cleaned-up internal state before this, or we may 
receive new requests in
+        // a dirty state.
+        futures.forEach(
+                tuple -> {
+                    LOG.info(
+                            "Coordinator finishes pending future from {}",
+                            tuple.f0.getSinkSubTaskId());
+                    tuple.f1.complete(
+                            wrap(
+                                    new SchemaChangeResponse(
+                                            evolvedSchemaView,
+                                            
successfullyAppliedSchemaChangeEvents)));
+                });
     }
 
-    private List<SchemaChangeEvent> deduceEvolvedSchemaChanges() {
+    private Tuple2<Set<TableId>, List<SchemaChangeEvent>> 
deduceEvolvedSchemaChanges() {
         List<SchemaChangeRequest> validSchemaChangeRequests =
                 pendingRequests.values().stream()
                         .map(e -> e.f0)
@@ -408,7 +457,7 @@ public class SchemaCoordinator extends SchemaRegistry {
             evolvedSchemaChanges.addAll(normalizedEvents);
         }
 
-        return evolvedSchemaChanges;
+        return Tuple2.of(affectedSinkTableIds, evolvedSchemaChanges);
     }
 
     private boolean applyAndUpdateEvolvedSchemaChange(SchemaChangeEvent 
schemaChangeEvent) {
diff --git 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/distributed/SchemaOperator.java
 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/distributed/SchemaOperator.java
index 80adea3f0..f31cbabdc 100755
--- 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/distributed/SchemaOperator.java
+++ 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/distributed/SchemaOperator.java
@@ -205,17 +205,11 @@ public class SchemaOperator extends 
AbstractStreamOperator<Event>
         LOG.info("{}> Evolve request response: {}", subTaskId, response);
 
         // Update local evolved schema cache
-        response.getSchemaEvolveResult()
-                .forEach(
-                        schemaChangeEvent ->
-                                evolvedSchemaMap.compute(
-                                        schemaChangeEvent.tableId(),
-                                        (tableId, schema) ->
-                                                
SchemaUtils.applySchemaChangeEvent(
-                                                        schema, 
schemaChangeEvent)));
+        evolvedSchemaMap.putAll(response.getEvolvedSchemas());
 
         // And emit schema change events to downstream
-        response.getSchemaEvolveResult().forEach(evt -> output.collect(new 
StreamRecord<>(evt)));
+        response.getEvolvedSchemaChangeEvents()
+                .forEach(evt -> output.collect(new StreamRecord<>(evt)));
         LOG.info(
                 "{}> Successfully updated evolved schema cache. Current state: 
{}",
                 subTaskId,
diff --git 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/distributed/event/SchemaChangeRequest.java
 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/distributed/event/SchemaChangeRequest.java
index e287f7874..a40986974 100644
--- 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/distributed/event/SchemaChangeRequest.java
+++ 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/distributed/event/SchemaChangeRequest.java
@@ -65,4 +65,16 @@ public class SchemaChangeRequest implements 
CoordinationRequest {
                 !isNoOpRequest(), "Unable to fetch source subTaskId for an 
align event.");
         return schemaChangeEvent;
     }
+
+    @Override
+    public String toString() {
+        return "SchemaChangeRequest{"
+                + "sourceSubTaskId="
+                + sourceSubTaskId
+                + ", sinkSubTaskId="
+                + sinkSubTaskId
+                + ", schemaChangeEvent="
+                + schemaChangeEvent
+                + '}';
+    }
 }
diff --git 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/distributed/event/SchemaChangeResponse.java
 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/distributed/event/SchemaChangeResponse.java
index 1c0b7eaab..69264edd3 100644
--- 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/distributed/event/SchemaChangeResponse.java
+++ 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/distributed/event/SchemaChangeResponse.java
@@ -18,23 +18,34 @@
 package org.apache.flink.cdc.runtime.operators.schema.distributed.event;
 
 import org.apache.flink.cdc.common.event.SchemaChangeEvent;
+import org.apache.flink.cdc.common.event.TableId;
+import org.apache.flink.cdc.common.schema.Schema;
 import 
org.apache.flink.cdc.runtime.operators.schema.distributed.SchemaCoordinator;
 import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
 
 import java.util.List;
+import java.util.Map;
 import java.util.Objects;
 
 /** Response from a {@link SchemaCoordinator} to broadcast a coordination 
consensus. */
 public class SchemaChangeResponse implements CoordinationResponse {
 
-    private final List<SchemaChangeEvent> schemaEvolveResult;
+    private final Map<TableId, Schema> evolvedSchemas;
+    private final List<SchemaChangeEvent> evolvedSchemaChangeEvents;
 
-    public SchemaChangeResponse(List<SchemaChangeEvent> schemaEvolveResult) {
-        this.schemaEvolveResult = schemaEvolveResult;
+    public SchemaChangeResponse(
+            Map<TableId, Schema> evolvedSchemas,
+            List<SchemaChangeEvent> evolvedSchemaChangeEvents) {
+        this.evolvedSchemas = evolvedSchemas;
+        this.evolvedSchemaChangeEvents = evolvedSchemaChangeEvents;
     }
 
-    public List<SchemaChangeEvent> getSchemaEvolveResult() {
-        return schemaEvolveResult;
+    public Map<TableId, Schema> getEvolvedSchemas() {
+        return evolvedSchemas;
+    }
+
+    public List<SchemaChangeEvent> getEvolvedSchemaChangeEvents() {
+        return evolvedSchemaChangeEvents;
     }
 
     @Override
@@ -43,16 +54,22 @@ public class SchemaChangeResponse implements 
CoordinationResponse {
             return false;
         }
         SchemaChangeResponse that = (SchemaChangeResponse) o;
-        return Objects.equals(schemaEvolveResult, that.schemaEvolveResult);
+        return Objects.equals(evolvedSchemas, that.evolvedSchemas)
+                && Objects.equals(evolvedSchemaChangeEvents, 
that.evolvedSchemaChangeEvents);
     }
 
     @Override
     public int hashCode() {
-        return Objects.hash(schemaEvolveResult);
+        return Objects.hash(evolvedSchemas, evolvedSchemaChangeEvents);
     }
 
     @Override
     public String toString() {
-        return "SchemaChangeResponse{" + "schemaEvolveResult=" + 
schemaEvolveResult + '}';
+        return "SchemaChangeResponse{"
+                + "evolvedSchemas="
+                + evolvedSchemas
+                + ", evolvedSchemaChangeEvents="
+                + evolvedSchemaChangeEvents
+                + '}';
     }
 }

Reply via email to