This is an automated email from the ASF dual-hosted git repository.
yuxiqian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git
The following commit(s) were added to refs/heads/master by this push:
new 3e1211e27 [FLINK-39696][runtime] Use source partition for distributed
flush events (#4400)
3e1211e27 is described below
commit 3e1211e271ddb0335eb45a0dc16b1d83616b5e90
Author: Ran Tao <[email protected]>
AuthorDate: Mon May 18 15:00:37 2026 +0800
[FLINK-39696][runtime] Use source partition for distributed flush events
(#4400)
---
.../schema/distributed/SchemaOperator.java | 9 +++++--
.../schema/distributed/SchemaEvolveTest.java | 31 ++++++++++++++++++++++
.../DistributedEventOperatorTestHarness.java | 23 ++++++++++++----
3 files changed, 56 insertions(+), 7 deletions(-)
diff --git
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/distributed/SchemaOperator.java
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/distributed/SchemaOperator.java
index 7964efd3f..114629e2d 100755
---
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/distributed/SchemaOperator.java
+++
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/distributed/SchemaOperator.java
@@ -195,11 +195,16 @@ public class SchemaOperator extends
AbstractStreamOperatorAdapter<Event>
private void requestSchemaChange(
TableId sourceTableId, SchemaChangeRequest schemaChangeRequest) {
- LOG.info("{}> Sent FlushEvent to downstream...", subTaskId);
+ final int sourcePartition = schemaChangeRequest.getSourceSubTaskId();
+
+ LOG.info(
+ "{}> Sent FlushEvent to downstream for source partition {}...",
+ subTaskId,
+ sourcePartition);
output.collect(
new StreamRecord<>(
new FlushEvent(
- subTaskId,
+ sourcePartition,
tableIdRouter.route(sourceTableId),
schemaChangeRequest.getSchemaChangeEvent().getType())));
diff --git
a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/distributed/SchemaEvolveTest.java
b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/distributed/SchemaEvolveTest.java
index 4245887f2..b7424cb5d 100644
---
a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/distributed/SchemaEvolveTest.java
+++
b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/distributed/SchemaEvolveTest.java
@@ -382,6 +382,37 @@ public class SchemaEvolveTest extends SchemaTestBase {
"Unexpected schema change events occurred in EXCEPTION
mode. Job will fail now.");
}
+ @Test
+ void testFlushEventUsesSourcePartitionInsteadOfSchemaOperatorSubtask()
throws Exception {
+ CreateTableEvent createTableEvent = new CreateTableEvent(TABLE_ID,
INITIAL_SCHEMA);
+
+ Assertions.assertThat(
+ runInHarness(
+ () ->
+ new SchemaOperator(
+ ROUTING_RULES,
+ RouteMode.ALL_MATCH,
+ Duration.ofMinutes(3),
+ SchemaChangeBehavior.LENIENT,
+ "UTC"),
+ (op) ->
+ new
DistributedEventOperatorTestHarness<>(
+ op,
+ 20,
+ 1,
+ Duration.ofSeconds(3),
+ Duration.ofMinutes(3)),
+ (operator, harness) ->
+
operator.processElement(wrap(createTableEvent, 0, 1))))
+ .map(StreamRecord::getValue)
+ .first()
+ .isEqualTo(
+ new FlushEvent(
+ 0,
+ Collections.singletonList(TABLE_ID),
+ createTableEvent.getType()));
+ }
+
protected static <
OP extends AbstractStreamOperatorAdapter<E>,
E extends Event,
diff --git
a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/testutils/operators/DistributedEventOperatorTestHarness.java
b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/testutils/operators/DistributedEventOperatorTestHarness.java
index 3e81ec82b..8277dc9f1 100644
---
a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/testutils/operators/DistributedEventOperatorTestHarness.java
+++
b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/testutils/operators/DistributedEventOperatorTestHarness.java
@@ -81,15 +81,26 @@ public class DistributedEventOperatorTestHarness<
private final TestingSchemaRegistryGateway schemaRegistryGateway;
private final LinkedList<StreamRecord<E>> outputRecords = new
LinkedList<>();
private final MockedOperatorCoordinatorContext mockedContext;
+ private final int subtaskIndex;
public DistributedEventOperatorTestHarness(OP operator, int numOutputs) {
- this(operator, numOutputs, Duration.ofSeconds(3),
Duration.ofMinutes(3));
+ this(operator, numOutputs, 0, Duration.ofSeconds(3),
Duration.ofMinutes(3));
}
public DistributedEventOperatorTestHarness(
OP operator, int numOutputs, Duration applyDuration, Duration
rpcTimeout) {
+ this(operator, numOutputs, 0, applyDuration, rpcTimeout);
+ }
+
+ public DistributedEventOperatorTestHarness(
+ OP operator,
+ int numOutputs,
+ int subtaskIndex,
+ Duration applyDuration,
+ Duration rpcTimeout) {
this.operator = operator;
this.numOutputs = numOutputs;
+ this.subtaskIndex = subtaskIndex;
this.mockedContext =
new MockedOperatorCoordinatorContext(
SCHEMA_OPERATOR_ID,
Thread.currentThread().getContextClassLoader());
@@ -160,7 +171,7 @@ public class DistributedEventOperatorTestHarness<
private void initializeOperator() throws Exception {
operator.setup(
- new MockStreamTask(schemaRegistryGateway),
+ new MockStreamTask(schemaRegistryGateway, subtaskIndex),
new MockStreamConfig(new Configuration(), numOutputs),
new EventCollectingOutput<>(outputRecords,
schemaRegistryGateway));
schemaRegistryGateway.sendOperatorEventToCoordinator(
@@ -227,9 +238,10 @@ public class DistributedEventOperatorTestHarness<
}
private static class MockStreamTask extends StreamTask<Event,
AbstractStreamOperator<Event>> {
- protected MockStreamTask(TestingSchemaRegistryGateway
schemaRegistryGateway)
+ protected MockStreamTask(
+ TestingSchemaRegistryGateway schemaRegistryGateway, int
subtaskIndex)
throws Exception {
- super(new
SchemaRegistryCoordinatingEnvironment(schemaRegistryGateway));
+ super(new
SchemaRegistryCoordinatingEnvironment(schemaRegistryGateway, subtaskIndex));
}
@Override
@@ -240,7 +252,8 @@ public class DistributedEventOperatorTestHarness<
private final TestingSchemaRegistryGateway schemaRegistryGateway;
public SchemaRegistryCoordinatingEnvironment(
- TestingSchemaRegistryGateway schemaRegistryGateway) {
+ TestingSchemaRegistryGateway schemaRegistryGateway, int
subtaskIndex) {
+ super("test-task", 2, subtaskIndex, 2);
this.schemaRegistryGateway = schemaRegistryGateway;
}