This is an automated email from the ASF dual-hosted git repository.
leonard pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git
The following commit(s) were added to refs/heads/master by this push:
new 474339935 [FLINK-37578][cdc-runtime] Fix distributed schema registry
exposes bad internal state accidentally
474339935 is described below
commit 47433993575d222808eb9cabe17b63c79270ae3d
Author: yuxiqian <[email protected]>
AuthorDate: Mon Apr 21 16:42:47 2025 +0800
[FLINK-37578][cdc-runtime] Fix distributed schema registry exposes bad
internal state accidentally
This closes #3972
Co-authored-by: linjc13 <[email protected]>
---
.../schema/distributed/SchemaCoordinator.java | 75 ++++++++++++++++++----
.../schema/distributed/SchemaOperator.java | 12 +---
.../distributed/event/SchemaChangeRequest.java | 12 ++++
.../distributed/event/SchemaChangeResponse.java | 33 +++++++---
4 files changed, 102 insertions(+), 30 deletions(-)
diff --git
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/distributed/SchemaCoordinator.java
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/distributed/SchemaCoordinator.java
index 6d49653ba..8626eafbc 100755
---
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/distributed/SchemaCoordinator.java
+++
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/distributed/SchemaCoordinator.java
@@ -55,12 +55,14 @@ import java.io.DataOutputStream;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
@@ -98,6 +100,9 @@ public class SchemaCoordinator extends SchemaRegistry {
private transient Multimap<Tuple2<Integer, SchemaChangeEvent>, Integer>
alreadyHandledSchemaChangeEvents;
+ /** Executor service to execute schema change. */
+ private final ExecutorService schemaChangeThreadPool;
+
public SchemaCoordinator(
String operatorName,
OperatorCoordinator.Context context,
@@ -114,6 +119,7 @@ public class SchemaCoordinator extends SchemaRegistry {
routingRules,
schemaChangeBehavior,
rpcTimeout);
+ this.schemaChangeThreadPool = Executors.newSingleThreadExecutor();
}
// -----------------
@@ -131,6 +137,14 @@ public class SchemaCoordinator extends SchemaRegistry {
"Started SchemaRegistry for {}. Parallelism: {}",
operatorName, currentParallelism);
}
+ @Override
+ public void close() throws Exception {
+ super.close();
+ if (schemaChangeThreadPool != null &&
!schemaChangeThreadPool.isShutdown()) {
+ schemaChangeThreadPool.shutdownNow();
+ }
+ }
+
// --------------------------
// Checkpoint related methods
// --------------------------
@@ -268,7 +282,20 @@ public class SchemaCoordinator extends SchemaRegistry {
LOG.info(
"Received the last required schema change request {}.
Switching from WAITING_FOR_FLUSH to EVOLVING.",
request);
- startSchemaChange();
+
+ schemaChangeThreadPool.submit(
+ () -> {
+ try {
+ startSchemaChange();
+ } catch (Throwable t) {
+ failJob(
+ "Schema change applying task",
+ new FlinkRuntimeException(
+ "Failed to apply schema change
event.", t));
+ throw new FlinkRuntimeException(
+ "Failed to apply schema change event.", t);
+ }
+ });
}
}
@@ -301,34 +328,56 @@ public class SchemaCoordinator extends SchemaRegistry {
LOG.info("All flushed. Going to evolve schema for pending requests:
{}", pendingRequests);
flushedSinkWriters.clear();
- // Deduce what schema change events should be applied to sink table
- List<SchemaChangeEvent> deducedSchemaChangeEvents =
deduceEvolvedSchemaChanges();
+ // Deduce what schema change events should be applied to sink table,
and affected sink
+ // tables' schema
+ Tuple2<Set<TableId>, List<SchemaChangeEvent>> deduceSummary =
deduceEvolvedSchemaChanges();
// And tries to apply it to external system
List<SchemaChangeEvent> successfullyAppliedSchemaChangeEvents = new
ArrayList<>();
- for (SchemaChangeEvent appliedSchemaChangeEvent :
deducedSchemaChangeEvents) {
+ for (SchemaChangeEvent appliedSchemaChangeEvent : deduceSummary.f1) {
if (applyAndUpdateEvolvedSchemaChange(appliedSchemaChangeEvent)) {
successfullyAppliedSchemaChangeEvents.add(appliedSchemaChangeEvent);
}
}
- // Then, we broadcast affected schema changes to mapper and release
upstream
- pendingRequests.forEach(
- (subTaskId, tuple) -> {
- LOG.info("Coordinator finishes pending future from {}",
subTaskId);
- tuple.f1.complete(
- wrap(new
SchemaChangeResponse(successfullyAppliedSchemaChangeEvents)));
- });
+ // Fetch refreshed view for affected tables. We can't rely on operator
clients to do this
+ // because it might not have a complete schema view after restoring
from previous states.
+ Set<TableId> affectedTableIds = deduceSummary.f0;
+ Map<TableId, Schema> evolvedSchemaView = new HashMap<>();
+ for (TableId tableId : affectedTableIds) {
+ schemaManager
+ .getLatestEvolvedSchema(tableId)
+ .ifPresent(schema -> evolvedSchemaView.put(tableId,
schema));
+ }
+
+ List<Tuple2<SchemaChangeRequest,
CompletableFuture<CoordinationResponse>>> futures =
+ new ArrayList<>(pendingRequests.values());
+ // Restore coordinator internal states first...
pendingRequests.clear();
LOG.info("Finished schema evolving. Switching from EVOLVING to IDLE.");
Preconditions.checkState(
evolvingStatus.compareAndSet(RequestStatus.EVOLVING,
RequestStatus.IDLE),
"RequestStatus should be EVOLVING when schema evolving
finishes.");
+
+ // ... and broadcast affected schema changes to mapper and release
upstream then.
+ // Make sure we've cleaned-up internal state before this, or we may
receive new requests in
+ // a dirty state.
+ futures.forEach(
+ tuple -> {
+ LOG.info(
+ "Coordinator finishes pending future from {}",
+ tuple.f0.getSinkSubTaskId());
+ tuple.f1.complete(
+ wrap(
+ new SchemaChangeResponse(
+ evolvedSchemaView,
+
successfullyAppliedSchemaChangeEvents)));
+ });
}
- private List<SchemaChangeEvent> deduceEvolvedSchemaChanges() {
+ private Tuple2<Set<TableId>, List<SchemaChangeEvent>>
deduceEvolvedSchemaChanges() {
List<SchemaChangeRequest> validSchemaChangeRequests =
pendingRequests.values().stream()
.map(e -> e.f0)
@@ -408,7 +457,7 @@ public class SchemaCoordinator extends SchemaRegistry {
evolvedSchemaChanges.addAll(normalizedEvents);
}
- return evolvedSchemaChanges;
+ return Tuple2.of(affectedSinkTableIds, evolvedSchemaChanges);
}
private boolean applyAndUpdateEvolvedSchemaChange(SchemaChangeEvent
schemaChangeEvent) {
diff --git
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/distributed/SchemaOperator.java
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/distributed/SchemaOperator.java
index 80adea3f0..f31cbabdc 100755
---
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/distributed/SchemaOperator.java
+++
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/distributed/SchemaOperator.java
@@ -205,17 +205,11 @@ public class SchemaOperator extends
AbstractStreamOperator<Event>
LOG.info("{}> Evolve request response: {}", subTaskId, response);
// Update local evolved schema cache
- response.getSchemaEvolveResult()
- .forEach(
- schemaChangeEvent ->
- evolvedSchemaMap.compute(
- schemaChangeEvent.tableId(),
- (tableId, schema) ->
-
SchemaUtils.applySchemaChangeEvent(
- schema,
schemaChangeEvent)));
+ evolvedSchemaMap.putAll(response.getEvolvedSchemas());
// And emit schema change events to downstream
- response.getSchemaEvolveResult().forEach(evt -> output.collect(new
StreamRecord<>(evt)));
+ response.getEvolvedSchemaChangeEvents()
+ .forEach(evt -> output.collect(new StreamRecord<>(evt)));
LOG.info(
"{}> Successfully updated evolved schema cache. Current state:
{}",
subTaskId,
diff --git
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/distributed/event/SchemaChangeRequest.java
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/distributed/event/SchemaChangeRequest.java
index e287f7874..a40986974 100644
---
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/distributed/event/SchemaChangeRequest.java
+++
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/distributed/event/SchemaChangeRequest.java
@@ -65,4 +65,16 @@ public class SchemaChangeRequest implements
CoordinationRequest {
!isNoOpRequest(), "Unable to fetch source subTaskId for an
align event.");
return schemaChangeEvent;
}
+
+ @Override
+ public String toString() {
+ return "SchemaChangeRequest{"
+ + "sourceSubTaskId="
+ + sourceSubTaskId
+ + ", sinkSubTaskId="
+ + sinkSubTaskId
+ + ", schemaChangeEvent="
+ + schemaChangeEvent
+ + '}';
+ }
}
diff --git
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/distributed/event/SchemaChangeResponse.java
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/distributed/event/SchemaChangeResponse.java
index 1c0b7eaab..69264edd3 100644
---
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/distributed/event/SchemaChangeResponse.java
+++
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/distributed/event/SchemaChangeResponse.java
@@ -18,23 +18,34 @@
package org.apache.flink.cdc.runtime.operators.schema.distributed.event;
import org.apache.flink.cdc.common.event.SchemaChangeEvent;
+import org.apache.flink.cdc.common.event.TableId;
+import org.apache.flink.cdc.common.schema.Schema;
import
org.apache.flink.cdc.runtime.operators.schema.distributed.SchemaCoordinator;
import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
import java.util.List;
+import java.util.Map;
import java.util.Objects;
/** Response from a {@link SchemaCoordinator} to broadcast a coordination
consensus. */
public class SchemaChangeResponse implements CoordinationResponse {
- private final List<SchemaChangeEvent> schemaEvolveResult;
+ private final Map<TableId, Schema> evolvedSchemas;
+ private final List<SchemaChangeEvent> evolvedSchemaChangeEvents;
- public SchemaChangeResponse(List<SchemaChangeEvent> schemaEvolveResult) {
- this.schemaEvolveResult = schemaEvolveResult;
+ public SchemaChangeResponse(
+ Map<TableId, Schema> evolvedSchemas,
+ List<SchemaChangeEvent> evolvedSchemaChangeEvents) {
+ this.evolvedSchemas = evolvedSchemas;
+ this.evolvedSchemaChangeEvents = evolvedSchemaChangeEvents;
}
- public List<SchemaChangeEvent> getSchemaEvolveResult() {
- return schemaEvolveResult;
+ public Map<TableId, Schema> getEvolvedSchemas() {
+ return evolvedSchemas;
+ }
+
+ public List<SchemaChangeEvent> getEvolvedSchemaChangeEvents() {
+ return evolvedSchemaChangeEvents;
}
@Override
@@ -43,16 +54,22 @@ public class SchemaChangeResponse implements
CoordinationResponse {
return false;
}
SchemaChangeResponse that = (SchemaChangeResponse) o;
- return Objects.equals(schemaEvolveResult, that.schemaEvolveResult);
+ return Objects.equals(evolvedSchemas, that.evolvedSchemas)
+ && Objects.equals(evolvedSchemaChangeEvents,
that.evolvedSchemaChangeEvents);
}
@Override
public int hashCode() {
- return Objects.hash(schemaEvolveResult);
+ return Objects.hash(evolvedSchemas, evolvedSchemaChangeEvents);
}
@Override
public String toString() {
- return "SchemaChangeResponse{" + "schemaEvolveResult=" +
schemaEvolveResult + '}';
+ return "SchemaChangeResponse{"
+ + "evolvedSchemas="
+ + evolvedSchemas
+ + ", evolvedSchemaChangeEvents="
+ + evolvedSchemaChangeEvents
+ + '}';
}
}