This is an automated email from the ASF dual-hosted git repository.

leonard pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git


The following commit(s) were added to refs/heads/master by this push:
     new ad1f554c0 [FLINK-35647][route] Support symbol replacement to enrich 
routing rules
ad1f554c0 is described below

commit ad1f554c0eb0a4be9fe6061c1942d1f284b68f5a
Author: yuxiqian <[email protected]>
AuthorDate: Fri Jul 12 17:35:22 2024 +0800

    [FLINK-35647][route] Support symbol replacement to enrich routing rules
    
    This closes #3428.
    
    Co-authored-by: 张田 <[email protected]>
    Co-authored-by: yangshuaitong <[email protected]>
---
 docs/content.zh/docs/core-concept/route.md         | 27 ++++--
 docs/content/docs/core-concept/route.md            | 27 ++++--
 .../cli/parser/YamlPipelineDefinitionParser.java   |  7 +-
 .../parser/YamlPipelineDefinitionParserTest.java   | 81 +++++++++++++++++-
 .../pipeline-definition-full-with-repsym.yaml      | 61 ++++++++++++++
 .../apache/flink/cdc/common/route/RouteRule.java   | 36 ++++++++
 .../flink/cdc/composer/definition/RouteDef.java    | 17 +++-
 .../flink/translator/SchemaOperatorTranslator.java | 10 ++-
 .../flink/FlinkPipelineComposerITCase.java         | 63 +++++++++++++-
 .../flink/cdc/pipeline/tests/RouteE2eITCase.java   | 96 ++++++++++++++++++++++
 .../runtime/operators/schema/SchemaOperator.java   | 59 +++++++++----
 .../operators/schema/SchemaOperatorFactory.java    |  9 +-
 .../schema/coordinator/SchemaDerivation.java       | 39 +++++++--
 .../schema/coordinator/SchemaRegistry.java         |  7 +-
 .../schema/coordinator/SchemaRegistryProvider.java | 24 +-----
 .../schema/coordinator/SchemaDerivationTest.java   | 11 +--
 16 files changed, 494 insertions(+), 80 deletions(-)

diff --git a/docs/content.zh/docs/core-concept/route.md 
b/docs/content.zh/docs/core-concept/route.md
index 5e6920579..ca5855d58 100644
--- a/docs/content.zh/docs/core-concept/route.md
+++ b/docs/content.zh/docs/core-concept/route.md
@@ -30,11 +30,12 @@ under the License.
 # Parameters
 To describe a route, the follows are required:  
 
-| parameter    | meaning                                            | 
optional/required |
-|--------------|----------------------------------------------------|-------------------|
-| source-table | Source table id, supports regular expressions      | required 
         |
-| sink-table   | Sink table id, supports regular expressions        | required 
         |
-| description  | Routing rule description(a default value provided) | optional 
         |
+| parameter      | meaning                                                     
                                | optional/required |
+|----------------|---------------------------------------------------------------------------------------------|-------------------|
+| source-table   | Source table id, supports regular expressions               
                                | required          |
+| sink-table     | Sink table id, supports symbol replacement                  
                                | required          |
+| replace-symbol | Special symbol in sink-table for pattern replacing, will be 
replaced by original table name | optional          |
+| description    | Routing rule description(a default value provided)          
                                | optional          |
 
 A route module can contain a list of source-table/sink-table rules.
 
@@ -71,4 +72,18 @@ route:
   - source-table: mydb.products
     sink-table: ods_db.ods_products
     description: sync products table to ods_products
-```
\ No newline at end of file
+```
+
+## Pattern Replacement in routing rules
+
+If you'd like to route source tables and rename them to sink tables with 
specific patterns, `replace-symbol` could be used to resemble source table 
names like this:
+
+```yaml
+route:
+  - source-table: source_db.\.*
+    sink-table: sink_db.<>
+    replace-symbol: <>
+    description: route all tables in source_db to sink_db
+```
+
+Then, all tables including `source_db.XXX` will be routed to `sink_db.XXX` 
without hassle.
\ No newline at end of file
diff --git a/docs/content/docs/core-concept/route.md 
b/docs/content/docs/core-concept/route.md
index 5e6920579..ca5855d58 100644
--- a/docs/content/docs/core-concept/route.md
+++ b/docs/content/docs/core-concept/route.md
@@ -30,11 +30,12 @@ under the License.
 # Parameters
 To describe a route, the follows are required:  
 
-| parameter    | meaning                                            | 
optional/required |
-|--------------|----------------------------------------------------|-------------------|
-| source-table | Source table id, supports regular expressions      | required 
         |
-| sink-table   | Sink table id, supports regular expressions        | required 
         |
-| description  | Routing rule description(a default value provided) | optional 
         |
+| parameter      | meaning                                                     
                                | optional/required |
+|----------------|---------------------------------------------------------------------------------------------|-------------------|
+| source-table   | Source table id, supports regular expressions               
                                | required          |
+| sink-table     | Sink table id, supports symbol replacement                  
                                | required          |
+| replace-symbol | Special symbol in sink-table for pattern replacing, will be 
replaced by original table name | optional          |
+| description    | Routing rule description(a default value provided)          
                                | optional          |
 
 A route module can contain a list of source-table/sink-table rules.
 
@@ -71,4 +72,18 @@ route:
   - source-table: mydb.products
     sink-table: ods_db.ods_products
     description: sync products table to ods_products
-```
\ No newline at end of file
+```
+
+## Pattern Replacement in routing rules
+
+If you'd like to route source tables and rename them to sink tables with 
specific patterns, `replace-symbol` could be used to resemble source table 
names like this:
+
+```yaml
+route:
+  - source-table: source_db.\.*
+    sink-table: sink_db.<>
+    replace-symbol: <>
+    description: route all tables in source_db to sink_db
+```
+
+Then, all tables including `source_db.XXX` will be routed to `sink_db.XXX` 
without hassle.
\ No newline at end of file
diff --git 
a/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParser.java
 
b/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParser.java
index 23b2c63ff..7ad07af1a 100644
--- 
a/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParser.java
+++ 
b/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParser.java
@@ -55,6 +55,7 @@ public class YamlPipelineDefinitionParser implements 
PipelineDefinitionParser {
     // Route keys
     private static final String ROUTE_SOURCE_TABLE_KEY = "source-table";
     private static final String ROUTE_SINK_TABLE_KEY = "sink-table";
+    private static final String ROUTE_REPLACE_SYMBOL = "replace-symbol";
     private static final String ROUTE_DESCRIPTION_KEY = "description";
 
     // Transform keys
@@ -164,11 +165,15 @@ public class YamlPipelineDefinitionParser implements 
PipelineDefinitionParser {
                                 "Missing required field \"%s\" in route 
configuration",
                                 ROUTE_SINK_TABLE_KEY)
                         .asText();
+        String replaceSymbol =
+                Optional.ofNullable(routeNode.get(ROUTE_REPLACE_SYMBOL))
+                        .map(JsonNode::asText)
+                        .orElse(null);
         String description =
                 Optional.ofNullable(routeNode.get(ROUTE_DESCRIPTION_KEY))
                         .map(JsonNode::asText)
                         .orElse(null);
-        return new RouteDef(sourceTable, sinkTable, description);
+        return new RouteDef(sourceTable, sinkTable, replaceSymbol, 
description);
     }
 
     private TransformDef toTransformDef(JsonNode transformNode) {
diff --git 
a/flink-cdc-cli/src/test/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParserTest.java
 
b/flink-cdc-cli/src/test/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParserTest.java
index 2d05bcbde..75dc5bd62 100644
--- 
a/flink-cdc-cli/src/test/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParserTest.java
+++ 
b/flink-cdc-cli/src/test/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParserTest.java
@@ -166,6 +166,15 @@ class YamlPipelineDefinitionParserTest {
                                 + "Or use 'UTC' without time zone and daylight 
saving time.");
     }
 
+    @Test
+    void testRouteWithReplacementSymbol() throws Exception {
+        URL resource =
+                
Resources.getResource("definitions/pipeline-definition-full-with-repsym.yaml");
+        YamlPipelineDefinitionParser parser = new 
YamlPipelineDefinitionParser();
+        PipelineDef pipelineDef = parser.parse(Paths.get(resource.toURI()), 
new Configuration());
+        assertThat(pipelineDef).isEqualTo(fullDefWithRouteRepSym);
+    }
+
     private final PipelineDef fullDef =
             new PipelineDef(
                     new SourceDef(
@@ -197,10 +206,12 @@ class YamlPipelineDefinitionParserTest {
                             new RouteDef(
                                     "mydb.default.app_order_.*",
                                     "odsdb.default.app_order",
+                                    null,
                                     "sync all sharding tables to one"),
                             new RouteDef(
                                     "mydb.default.web_order",
                                     "odsdb.default.ods_web_order",
+                                    null,
                                     "sync table to with given prefix ods_")),
                     Arrays.asList(
                             new TransformDef(
@@ -258,10 +269,12 @@ class YamlPipelineDefinitionParserTest {
                             new RouteDef(
                                     "mydb.default.app_order_.*",
                                     "odsdb.default.app_order",
+                                    null,
                                     "sync all sharding tables to one"),
                             new RouteDef(
                                     "mydb.default.web_order",
                                     "odsdb.default.ods_web_order",
+                                    null,
                                     "sync table to with given prefix ods_")),
                     Arrays.asList(
                             new TransformDef(
@@ -312,7 +325,10 @@ class YamlPipelineDefinitionParserTest {
                                             .build())),
                     Collections.singletonList(
                             new RouteDef(
-                                    "mydb.default.app_order_.*", 
"odsdb.default.app_order", null)),
+                                    "mydb.default.app_order_.*",
+                                    "odsdb.default.app_order",
+                                    null,
+                                    null)),
                     Collections.emptyList(),
                     Configuration.fromMap(
                             ImmutableMap.<String, String>builder()
@@ -326,4 +342,67 @@ class YamlPipelineDefinitionParserTest {
                     Collections.emptyList(),
                     Collections.emptyList(),
                     
Configuration.fromMap(Collections.singletonMap("parallelism", "1")));
+
+    private final PipelineDef fullDefWithRouteRepSym =
+            new PipelineDef(
+                    new SourceDef(
+                            "mysql",
+                            "source-database",
+                            Configuration.fromMap(
+                                    ImmutableMap.<String, String>builder()
+                                            .put("host", "localhost")
+                                            .put("port", "3306")
+                                            .put("username", "admin")
+                                            .put("password", "pass")
+                                            .put(
+                                                    "tables",
+                                                    "adb.*, 
bdb.user_table_[0-9]+, [app|web]_order_.*")
+                                            .put(
+                                                    "chunk-column",
+                                                    
"app_order_.*:id,web_order:product_id")
+                                            .put("capture-new-tables", "true")
+                                            .build())),
+                    new SinkDef(
+                            "kafka",
+                            "sink-queue",
+                            Configuration.fromMap(
+                                    ImmutableMap.<String, String>builder()
+                                            .put("bootstrap-servers", 
"localhost:9092")
+                                            .put("auto-create-table", "true")
+                                            .build())),
+                    Arrays.asList(
+                            new RouteDef(
+                                    "mydb.default.app_order_.*",
+                                    "odsdb.default.app_order_<>",
+                                    "<>",
+                                    "sync all sharding tables to one"),
+                            new RouteDef(
+                                    "mydb.default.web_order",
+                                    "odsdb.default.ods_web_order_>_<",
+                                    ">_<",
+                                    "sync table to with given prefix ods_")),
+                    Arrays.asList(
+                            new TransformDef(
+                                    "mydb.app_order_.*",
+                                    "id, order_id, TO_UPPER(product_name)",
+                                    "id > 10 AND order_id > 100",
+                                    "id",
+                                    "product_name",
+                                    "comment=app order",
+                                    "project fields from source table"),
+                            new TransformDef(
+                                    "mydb.web_order_.*",
+                                    "CONCAT(id, order_id) as uniq_id, *",
+                                    "uniq_id > 10",
+                                    null,
+                                    null,
+                                    null,
+                                    "add new uniq_id for each row")),
+                    Configuration.fromMap(
+                            ImmutableMap.<String, String>builder()
+                                    .put("name", "source-database-sync-pipe")
+                                    .put("parallelism", "4")
+                                    .put("schema.change.behavior", "evolve")
+                                    .put("schema-operator.rpc-timeout", "1 h")
+                                    .build()));
 }
diff --git 
a/flink-cdc-cli/src/test/resources/definitions/pipeline-definition-full-with-repsym.yaml
 
b/flink-cdc-cli/src/test/resources/definitions/pipeline-definition-full-with-repsym.yaml
new file mode 100644
index 000000000..265358fb4
--- /dev/null
+++ 
b/flink-cdc-cli/src/test/resources/definitions/pipeline-definition-full-with-repsym.yaml
@@ -0,0 +1,61 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+source:
+  type: mysql
+  name: source-database
+  host: localhost
+  port: 3306
+  username: admin
+  password: pass
+  tables: adb.*, bdb.user_table_[0-9]+, [app|web]_order_.*
+  chunk-column: app_order_.*:id,web_order:product_id
+  capture-new-tables: true
+
+sink:
+  type: kafka
+  name: sink-queue
+  bootstrap-servers: localhost:9092
+  auto-create-table: true
+
+route:
+  - source-table: mydb.default.app_order_.*
+    sink-table: odsdb.default.app_order_<>
+    replace-symbol: "<>"
+    description: sync all sharding tables to one
+  - source-table: mydb.default.web_order
+    sink-table: odsdb.default.ods_web_order_>_<
+    replace-symbol: ">_<"
+    description: sync table to with given prefix ods_
+
+transform:
+  - source-table: mydb.app_order_.*
+    projection: id, order_id, TO_UPPER(product_name)
+    filter: id > 10 AND order_id > 100
+    primary-keys: id
+    partition-keys: product_name
+    table-options: comment=app order
+    description: project fields from source table
+  - source-table: mydb.web_order_.*
+    projection: CONCAT(id, order_id) as uniq_id, *
+    filter: uniq_id > 10
+    description: add new uniq_id for each row
+
+pipeline:
+  name: source-database-sync-pipe
+  parallelism: 4
+  schema.change.behavior: evolve
+  schema-operator.rpc-timeout: 1 h
\ No newline at end of file
diff --git 
a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/route/RouteRule.java
 
b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/route/RouteRule.java
new file mode 100644
index 000000000..4fbfb61b6
--- /dev/null
+++ 
b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/route/RouteRule.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.common.route;
+
+import java.io.Serializable;
+
+/** Definition of a routing rule with replacement symbol. */
+public class RouteRule implements Serializable {
+
+    private static final long serialVersionUID = 1L;
+
+    public RouteRule(String sourceTable, String sinkTable, String 
replaceSymbol) {
+        this.sourceTable = sourceTable;
+        this.sinkTable = sinkTable;
+        this.replaceSymbol = replaceSymbol;
+    }
+
+    public String sourceTable;
+    public String sinkTable;
+    public String replaceSymbol;
+}
diff --git 
a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/definition/RouteDef.java
 
b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/definition/RouteDef.java
index ee55eb366..868c5c67f 100644
--- 
a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/definition/RouteDef.java
+++ 
b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/definition/RouteDef.java
@@ -36,11 +36,17 @@ import java.util.Optional;
 public class RouteDef {
     private final String sourceTable;
     private final String sinkTable;
+    private final String replaceSymbol;
     @Nullable private final String description;
 
-    public RouteDef(String sourceTable, String sinkTable, @Nullable String 
description) {
+    public RouteDef(
+            String sourceTable,
+            String sinkTable,
+            @Nullable String replaceSymbol,
+            @Nullable String description) {
         this.sourceTable = sourceTable;
         this.sinkTable = sinkTable;
+        this.replaceSymbol = replaceSymbol;
         this.description = description;
     }
 
@@ -52,6 +58,10 @@ public class RouteDef {
         return sinkTable;
     }
 
+    public Optional<String> getReplaceSymbol() {
+        return Optional.ofNullable(replaceSymbol);
+    }
+
     public Optional<String> getDescription() {
         return Optional.ofNullable(description);
     }
@@ -63,6 +73,8 @@ public class RouteDef {
                 + sourceTable
                 + ", sinkTable="
                 + sinkTable
+                + ", replaceSymbol="
+                + replaceSymbol
                 + ", description='"
                 + description
                 + '\''
@@ -80,11 +92,12 @@ public class RouteDef {
         RouteDef routeDef = (RouteDef) o;
         return Objects.equals(sourceTable, routeDef.sourceTable)
                 && Objects.equals(sinkTable, routeDef.sinkTable)
+                && Objects.equals(replaceSymbol, routeDef.replaceSymbol)
                 && Objects.equals(description, routeDef.description);
     }
 
     @Override
     public int hashCode() {
-        return Objects.hash(sourceTable, sinkTable, description);
+        return Objects.hash(sourceTable, sinkTable, replaceSymbol, 
description);
     }
 }
diff --git 
a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/SchemaOperatorTranslator.java
 
b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/SchemaOperatorTranslator.java
index a69741c7b..1f5dc44bd 100644
--- 
a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/SchemaOperatorTranslator.java
+++ 
b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/SchemaOperatorTranslator.java
@@ -17,12 +17,11 @@
 
 package org.apache.flink.cdc.composer.flink.translator;
 
-import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.cdc.common.annotation.Internal;
 import org.apache.flink.cdc.common.event.Event;
 import org.apache.flink.cdc.common.event.SchemaChangeEvent;
-import org.apache.flink.cdc.common.event.TableId;
 import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior;
+import org.apache.flink.cdc.common.route.RouteRule;
 import org.apache.flink.cdc.common.sink.MetadataApplier;
 import org.apache.flink.cdc.composer.definition.RouteDef;
 import org.apache.flink.cdc.runtime.operators.schema.SchemaOperator;
@@ -80,10 +79,13 @@ public class SchemaOperatorTranslator {
             int parallelism,
             MetadataApplier metadataApplier,
             List<RouteDef> routes) {
-        List<Tuple2<String, TableId>> routingRules = new ArrayList<>();
+        List<RouteRule> routingRules = new ArrayList<>();
         for (RouteDef route : routes) {
             routingRules.add(
-                    Tuple2.of(route.getSourceTable(), 
TableId.parse(route.getSinkTable())));
+                    new RouteRule(
+                            route.getSourceTable(),
+                            route.getSinkTable(),
+                            route.getReplaceSymbol().orElse(null)));
         }
         SingleOutputStreamOperator<Event> stream =
                 input.transform(
diff --git 
a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerITCase.java
 
b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerITCase.java
index b59f4ead6..26c9c9187 100644
--- 
a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerITCase.java
+++ 
b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerITCase.java
@@ -424,8 +424,8 @@ class FlinkPipelineComposerITCase {
         TableId routedTable2 = TableId.tableId("default_namespace", 
"default_schema", "routed2");
         List<RouteDef> routeDef =
                 Arrays.asList(
-                        new RouteDef(TABLE_1.toString(), 
routedTable1.toString(), null),
-                        new RouteDef(TABLE_2.toString(), 
routedTable2.toString(), null));
+                        new RouteDef(TABLE_1.toString(), 
routedTable1.toString(), null, null),
+                        new RouteDef(TABLE_2.toString(), 
routedTable2.toString(), null, null));
 
         // Setup pipeline
         Configuration pipelineConfig = new Configuration();
@@ -616,6 +616,7 @@ class FlinkPipelineComposerITCase {
                         new RouteDef(
                                 
"default_namespace.default_schema.mytable[0-9]",
                                 mergedTable.toString(),
+                                null,
                                 null));
 
         // Setup pipeline
@@ -657,4 +658,62 @@ class FlinkPipelineComposerITCase {
                         
"DataChangeEvent{tableId=default_namespace.default_schema.merged, before=[], 
after=[5, null, 24, null, Eliza, null], op=INSERT, meta=()}",
                         
"DataChangeEvent{tableId=default_namespace.default_schema.merged, before=[], 
after=[6, Frank, 30, student, null, male], op=INSERT, meta=()}");
     }
+
+    @ParameterizedTest
+    @EnumSource
+    void testRouteWithReplaceSymbol(ValuesDataSink.SinkApi sinkApi) throws 
Exception {
+        FlinkPipelineComposer composer = FlinkPipelineComposer.ofMiniCluster();
+
+        // Setup value source
+        Configuration sourceConfig = new Configuration();
+        sourceConfig.set(
+                ValuesDataSourceOptions.EVENT_SET_ID,
+                ValuesDataSourceHelper.EventSetId.SINGLE_SPLIT_MULTI_TABLES);
+        SourceDef sourceDef =
+                new SourceDef(ValuesDataFactory.IDENTIFIER, "Value Source", 
sourceConfig);
+
+        // Setup value sink
+        Configuration sinkConfig = new Configuration();
+        sinkConfig.set(ValuesDataSinkOptions.MATERIALIZED_IN_MEMORY, true);
+        sinkConfig.set(ValuesDataSinkOptions.SINK_API, sinkApi);
+        SinkDef sinkDef = new SinkDef(ValuesDataFactory.IDENTIFIER, "Value 
Sink", sinkConfig);
+
+        // Setup pipeline
+        Configuration pipelineConfig = new Configuration();
+        pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1);
+        PipelineDef pipelineDef =
+                new PipelineDef(
+                        sourceDef,
+                        sinkDef,
+                        Collections.singletonList(
+                                new RouteDef(
+                                        
"default_namespace.default_schema.table[0-9]",
+                                        
"replaced_namespace.replaced_schema.__$__",
+                                        "__$__",
+                                        null)),
+                        Collections.emptyList(),
+                        pipelineConfig);
+
+        // Execute the pipeline
+        PipelineExecution execution = composer.compose(pipelineDef);
+        execution.execute();
+
+        // Check the order and content of all received events
+        String[] outputEvents = outCaptor.toString().trim().split("\n");
+        assertThat(outputEvents)
+                .containsExactly(
+                        
"CreateTableEvent{tableId=replaced_namespace.replaced_schema.table1, 
schema=columns={`col1` STRING,`col2` STRING}, primaryKeys=col1, options=()}",
+                        
"CreateTableEvent{tableId=replaced_namespace.replaced_schema.table2, 
schema=columns={`col1` STRING,`col2` STRING}, primaryKeys=col1, options=()}",
+                        
"DataChangeEvent{tableId=replaced_namespace.replaced_schema.table1, before=[], 
after=[1, 1], op=INSERT, meta=()}",
+                        
"DataChangeEvent{tableId=replaced_namespace.replaced_schema.table1, before=[], 
after=[2, 2], op=INSERT, meta=()}",
+                        
"DataChangeEvent{tableId=replaced_namespace.replaced_schema.table1, before=[], 
after=[3, 3], op=INSERT, meta=()}",
+                        
"AddColumnEvent{tableId=replaced_namespace.replaced_schema.table1, 
addedColumns=[ColumnWithPosition{column=`col3` STRING, position=LAST, 
existedColumnName=null}]}",
+                        
"DataChangeEvent{tableId=replaced_namespace.replaced_schema.table2, before=[], 
after=[1, 1], op=INSERT, meta=()}",
+                        
"DataChangeEvent{tableId=replaced_namespace.replaced_schema.table2, before=[], 
after=[2, 2], op=INSERT, meta=()}",
+                        
"DataChangeEvent{tableId=replaced_namespace.replaced_schema.table2, before=[], 
after=[3, 3], op=INSERT, meta=()}",
+                        
"RenameColumnEvent{tableId=replaced_namespace.replaced_schema.table1, 
nameMapping={col2=newCol2, col3=newCol3}}",
+                        
"DropColumnEvent{tableId=replaced_namespace.replaced_schema.table1, 
droppedColumnNames=[newCol2]}",
+                        
"DataChangeEvent{tableId=replaced_namespace.replaced_schema.table1, before=[1, 
1], after=[], op=DELETE, meta=()}",
+                        
"DataChangeEvent{tableId=replaced_namespace.replaced_schema.table1, before=[2, 
2], after=[2, x], op=UPDATE, meta=()}");
+    }
 }
diff --git 
a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/RouteE2eITCase.java
 
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/RouteE2eITCase.java
index 4ff001e79..e674d57d0 100644
--- 
a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/RouteE2eITCase.java
+++ 
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/RouteE2eITCase.java
@@ -624,6 +624,102 @@ public class RouteE2eITCase extends 
PipelineTestEnvironment {
                 "DataChangeEvent{tableId=%s.TABLEDELTA, before=[], 
after=[10004], op=INSERT, meta=()}");
     }
 
+    @Test
+    public void testReplacementSymbol() throws Exception {
+        String pipelineJob =
+                String.format(
+                        "source:\n"
+                                + "  type: mysql\n"
+                                + "  hostname: %s\n"
+                                + "  port: 3306\n"
+                                + "  username: %s\n"
+                                + "  password: %s\n"
+                                + "  tables: %s.\\.*\n"
+                                + "  server-id: 5400-5404\n"
+                                + "  server-time-zone: UTC\n"
+                                + "\n"
+                                + "sink:\n"
+                                + "  type: values\n"
+                                + "route:\n"
+                                + "  - source-table: %s.\\.*\n"
+                                + "    sink-table: NEW_%s.NEW_<>\n"
+                                + "    replace-symbol: <>\n"
+                                + "\n"
+                                + "pipeline:\n"
+                                + "  parallelism: 1",
+                        INTER_CONTAINER_MYSQL_ALIAS,
+                        MYSQL_TEST_USER,
+                        MYSQL_TEST_PASSWORD,
+                        routeTestDatabase.getDatabaseName(),
+                        routeTestDatabase.getDatabaseName(),
+                        routeTestDatabase.getDatabaseName());
+        Path mysqlCdcJar = 
TestUtils.getResource("mysql-cdc-pipeline-connector.jar");
+        Path valuesCdcJar = 
TestUtils.getResource("values-cdc-pipeline-connector.jar");
+        Path mysqlDriverJar = TestUtils.getResource("mysql-driver.jar");
+        submitPipelineJob(pipelineJob, mysqlCdcJar, valuesCdcJar, 
mysqlDriverJar);
+        waitUntilJobRunning(Duration.ofSeconds(30));
+        LOG.info("Pipeline job is running");
+
+        waitUntilSpecificEvent(
+                String.format(
+                        "CreateTableEvent{tableId=NEW_%s.NEW_TABLEALPHA, 
schema=columns={`ID` INT NOT NULL,`VERSION` VARCHAR(17)}, primaryKeys=ID, 
options=()}",
+                        routeTestDatabase.getDatabaseName()));
+        waitUntilSpecificEvent(
+                String.format(
+                        "CreateTableEvent{tableId=NEW_%s.NEW_TABLEBETA, 
schema=columns={`ID` INT NOT NULL,`VERSION` VARCHAR(17)}, primaryKeys=ID, 
options=()}",
+                        routeTestDatabase.getDatabaseName()));
+        waitUntilSpecificEvent(
+                String.format(
+                        "CreateTableEvent{tableId=NEW_%s.NEW_TABLEGAMMA, 
schema=columns={`ID` INT NOT NULL,`VERSION` VARCHAR(17)}, primaryKeys=ID, 
options=()}",
+                        routeTestDatabase.getDatabaseName()));
+        waitUntilSpecificEvent(
+                String.format(
+                        "CreateTableEvent{tableId=NEW_%s.NEW_TABLEDELTA, 
schema=columns={`ID` INT NOT NULL,`VERSION` VARCHAR(17)}, primaryKeys=ID, 
options=()}",
+                        routeTestDatabase.getDatabaseName()));
+
+        validateResult(
+                "DataChangeEvent{tableId=NEW_%s.NEW_TABLEALPHA, before=[], 
after=[1008, 8], op=INSERT, meta=()}",
+                "DataChangeEvent{tableId=NEW_%s.NEW_TABLEALPHA, before=[], 
after=[1009, 8.1], op=INSERT, meta=()}",
+                "DataChangeEvent{tableId=NEW_%s.NEW_TABLEALPHA, before=[], 
after=[1010, 10], op=INSERT, meta=()}",
+                "DataChangeEvent{tableId=NEW_%s.NEW_TABLEALPHA, before=[], 
after=[1011, 11], op=INSERT, meta=()}",
+                "DataChangeEvent{tableId=NEW_%s.NEW_TABLEBETA, before=[], 
after=[2011, 11], op=INSERT, meta=()}",
+                "DataChangeEvent{tableId=NEW_%s.NEW_TABLEBETA, before=[], 
after=[2012, 12], op=INSERT, meta=()}",
+                "DataChangeEvent{tableId=NEW_%s.NEW_TABLEBETA, before=[], 
after=[2013, 13], op=INSERT, meta=()}",
+                "DataChangeEvent{tableId=NEW_%s.NEW_TABLEBETA, before=[], 
after=[2014, 14], op=INSERT, meta=()}",
+                "DataChangeEvent{tableId=NEW_%s.NEW_TABLEGAMMA, before=[], 
after=[3015, Amber], op=INSERT, meta=()}",
+                "DataChangeEvent{tableId=NEW_%s.NEW_TABLEGAMMA, before=[], 
after=[3016, Black], op=INSERT, meta=()}",
+                "DataChangeEvent{tableId=NEW_%s.NEW_TABLEGAMMA, before=[], 
after=[3017, Cyan], op=INSERT, meta=()}",
+                "DataChangeEvent{tableId=NEW_%s.NEW_TABLEGAMMA, before=[], 
after=[3018, Denim], op=INSERT, meta=()}",
+                "DataChangeEvent{tableId=NEW_%s.NEW_TABLEDELTA, before=[], 
after=[4019, Yosemite], op=INSERT, meta=()}",
+                "DataChangeEvent{tableId=NEW_%s.NEW_TABLEDELTA, before=[], 
after=[4020, El Capitan], op=INSERT, meta=()}",
+                "DataChangeEvent{tableId=NEW_%s.NEW_TABLEDELTA, before=[], 
after=[4021, Sierra], op=INSERT, meta=()}",
+                "DataChangeEvent{tableId=NEW_%s.NEW_TABLEDELTA, before=[], 
after=[4022, High Sierra], op=INSERT, meta=()}",
+                "DataChangeEvent{tableId=NEW_%s.NEW_TABLEDELTA, before=[], 
after=[4023, Mojave], op=INSERT, meta=()}",
+                "DataChangeEvent{tableId=NEW_%s.NEW_TABLEDELTA, before=[], 
after=[4024, Catalina], op=INSERT, meta=()}");
+
+        LOG.info("Begin incremental reading stage.");
+
+        generateIncrementalChanges();
+
+        validateResult(
+                "DataChangeEvent{tableId=NEW_%s.NEW_TABLEALPHA, before=[], 
after=[3007, 7], op=INSERT, meta=()}",
+                "DataChangeEvent{tableId=NEW_%s.NEW_TABLEBETA, before=[2014, 
14], after=[2014, 2014], op=UPDATE, meta=()}",
+                "DataChangeEvent{tableId=NEW_%s.NEW_TABLEGAMMA, before=[], 
after=[3019, Emerald], op=INSERT, meta=()}",
+                "DataChangeEvent{tableId=NEW_%s.NEW_TABLEDELTA, before=[4024, 
Catalina], after=[], op=DELETE, meta=()}");
+
+        generateSchemaChanges();
+        validateResult(
+                "AddColumnEvent{tableId=NEW_%s.NEW_TABLEALPHA, 
addedColumns=[ColumnWithPosition{column=`NAME` VARCHAR(17), position=LAST, 
existedColumnName=null}]}",
+                "DataChangeEvent{tableId=NEW_%s.NEW_TABLEALPHA, before=[], 
after=[10001, 12, Derrida], op=INSERT, meta=()}",
+                "RenameColumnEvent{tableId=NEW_%s.NEW_TABLEBETA, 
nameMapping={VERSION=VERSION_EX}}",
+                "DataChangeEvent{tableId=NEW_%s.NEW_TABLEBETA, before=[], 
after=[10002, 15], op=INSERT, meta=()}",
+                "AlterColumnTypeEvent{tableId=NEW_%s.NEW_TABLEGAMMA, 
nameMapping={VERSION=VARCHAR(19)}}",
+                "RenameColumnEvent{tableId=NEW_%s.NEW_TABLEGAMMA, 
nameMapping={VERSION=VERSION_EX}}",
+                "DataChangeEvent{tableId=NEW_%s.NEW_TABLEGAMMA, before=[], 
after=[10003, Fluorite], op=INSERT, meta=()}",
+                "DropColumnEvent{tableId=NEW_%s.NEW_TABLEDELTA, 
droppedColumnNames=[VERSION]}",
+                "DataChangeEvent{tableId=NEW_%s.NEW_TABLEDELTA, before=[], 
after=[10004], op=INSERT, meta=()}");
+    }
+
     private void validateResult(String... expectedEvents) throws Exception {
         for (String event : expectedEvents) {
             waitUntilSpecificEvent(String.format(event, 
routeTestDatabase.getDatabaseName()));
diff --git 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/SchemaOperator.java
 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/SchemaOperator.java
index 430e7f263..d1f468bfe 100644
--- 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/SchemaOperator.java
+++ 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/SchemaOperator.java
@@ -17,7 +17,7 @@
 
 package org.apache.flink.cdc.runtime.operators.schema;
 
-import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.cdc.common.annotation.Internal;
 import org.apache.flink.cdc.common.data.RecordData;
 import org.apache.flink.cdc.common.data.StringData;
@@ -26,6 +26,7 @@ import org.apache.flink.cdc.common.event.Event;
 import org.apache.flink.cdc.common.event.FlushEvent;
 import org.apache.flink.cdc.common.event.SchemaChangeEvent;
 import org.apache.flink.cdc.common.event.TableId;
+import org.apache.flink.cdc.common.route.RouteRule;
 import org.apache.flink.cdc.common.schema.Column;
 import org.apache.flink.cdc.common.schema.Schema;
 import org.apache.flink.cdc.common.schema.Selectors;
@@ -70,6 +71,7 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeoutException;
 import java.util.stream.Collectors;
 
@@ -88,22 +90,33 @@ public class SchemaOperator extends 
AbstractStreamOperator<Event>
     private static final Logger LOG = 
LoggerFactory.getLogger(SchemaOperator.class);
     private static final Duration CACHE_EXPIRE_DURATION = Duration.ofDays(1);
 
-    private final List<Tuple2<String, TableId>> routingRules;
+    private final List<RouteRule> routingRules;
+
+    /**
+     * Storing route source table selector, sink table name (before symbol 
replacement), and replace
+     * symbol in a tuple.
+     */
+    private transient List<Tuple3<Selectors, String, String>> routes;
 
-    private transient List<Tuple2<Selectors, TableId>> routes;
     private transient TaskOperatorEventGateway toCoordinator;
     private transient SchemaEvolutionClient schemaEvolutionClient;
     private transient LoadingCache<TableId, Schema> cachedSchemas;
 
+    /**
+     * Storing mapping relations between upstream tableId (source table) 
mapping to downstream
+     * tableIds (sink tables).
+     */
+    private transient LoadingCache<TableId, List<TableId>> tableIdMappingCache;
+
     private final long rpcTimeOutInMillis;
 
-    public SchemaOperator(List<Tuple2<String, TableId>> routingRules) {
+    public SchemaOperator(List<RouteRule> routingRules) {
         this.routingRules = routingRules;
         this.chainingStrategy = ChainingStrategy.ALWAYS;
         this.rpcTimeOutInMillis = 
DEFAULT_SCHEMA_OPERATOR_RPC_TIMEOUT.toMillis();
     }
 
-    public SchemaOperator(List<Tuple2<String, TableId>> routingRules, Duration 
rpcTimeOut) {
+    public SchemaOperator(List<RouteRule> routingRules, Duration rpcTimeOut) {
         this.routingRules = routingRules;
         this.chainingStrategy = ChainingStrategy.ALWAYS;
         this.rpcTimeOutInMillis = rpcTimeOut.toMillis();
@@ -119,14 +132,14 @@ public class SchemaOperator extends 
AbstractStreamOperator<Event>
         routes =
                 routingRules.stream()
                         .map(
-                                tuple2 -> {
-                                    String tableInclusions = tuple2.f0;
-                                    TableId replaceBy = tuple2.f1;
+                                rule -> {
+                                    String tableInclusions = rule.sourceTable;
                                     Selectors selectors =
                                             new Selectors.SelectorsBuilder()
                                                     
.includeTables(tableInclusions)
                                                     .build();
-                                    return new Tuple2<>(selectors, replaceBy);
+                                    return new Tuple3<>(
+                                            selectors, rule.sinkTable, 
rule.replaceSymbol);
                                 })
                         .collect(Collectors.toList());
         schemaEvolutionClient = new SchemaEvolutionClient(toCoordinator, 
getOperatorID());
@@ -140,6 +153,16 @@ public class SchemaOperator extends 
AbstractStreamOperator<Event>
                                         return getLatestSchema(tableId);
                                     }
                                 });
+        tableIdMappingCache =
+                CacheBuilder.newBuilder()
+                        .expireAfterAccess(CACHE_EXPIRE_DURATION)
+                        .build(
+                                new CacheLoader<TableId, List<TableId>>() {
+                                    @Override
+                                    public List<TableId> load(TableId tableId) 
{
+                                        return getRoutedTables(tableId);
+                                    }
+                                });
     }
 
     @Override
@@ -158,7 +181,7 @@ public class SchemaOperator extends 
AbstractStreamOperator<Event>
      */
     @Override
     public void processElement(StreamRecord<Event> streamRecord)
-            throws InterruptedException, TimeoutException {
+            throws InterruptedException, TimeoutException, ExecutionException {
         Event event = streamRecord.getValue();
         // Schema changes
         if (event instanceof SchemaChangeEvent) {
@@ -169,15 +192,15 @@ public class SchemaOperator extends 
AbstractStreamOperator<Event>
             handleSchemaChangeEvent(tableId, (SchemaChangeEvent) event);
             // Update caches
             cachedSchemas.put(tableId, getLatestSchema(tableId));
-            getRoutedTables(tableId)
+            tableIdMappingCache
+                    .get(tableId)
                     .forEach(routed -> cachedSchemas.put(routed, 
getLatestSchema(routed)));
             return;
         }
 
         // Data changes
         DataChangeEvent dataChangeEvent = (DataChangeEvent) event;
-        TableId tableId = dataChangeEvent.tableId();
-        List<TableId> optionalRoutedTable = getRoutedTables(tableId);
+        List<TableId> optionalRoutedTable = 
tableIdMappingCache.get(dataChangeEvent.tableId());
         if (optionalRoutedTable.isEmpty()) {
             output.collect(streamRecord);
         } else {
@@ -270,10 +293,18 @@ public class SchemaOperator extends 
AbstractStreamOperator<Event>
     private List<TableId> getRoutedTables(TableId originalTableId) {
         return routes.stream()
                 .filter(route -> route.f0.isMatch(originalTableId))
-                .map(route -> route.f1)
+                .map(route -> resolveReplacement(originalTableId, route))
                 .collect(Collectors.toList());
     }
 
+    private TableId resolveReplacement(
+            TableId originalTable, Tuple3<Selectors, String, String> route) {
+        if (route.f2 != null) {
+            return TableId.parse(route.f1.replace(route.f2, 
originalTable.getTableName()));
+        }
+        return TableId.parse(route.f1);
+    }
+
     private void handleSchemaChangeEvent(TableId tableId, SchemaChangeEvent 
schemaChangeEvent)
             throws InterruptedException, TimeoutException {
         // The request will need to send a FlushEvent or block until flushing 
finished
diff --git 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/SchemaOperatorFactory.java
 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/SchemaOperatorFactory.java
index ecf500171..eba7c7712 100644
--- 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/SchemaOperatorFactory.java
+++ 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/SchemaOperatorFactory.java
@@ -17,10 +17,9 @@
 
 package org.apache.flink.cdc.runtime.operators.schema;
 
-import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.cdc.common.annotation.Internal;
 import org.apache.flink.cdc.common.event.Event;
-import org.apache.flink.cdc.common.event.TableId;
+import org.apache.flink.cdc.common.route.RouteRule;
 import org.apache.flink.cdc.common.sink.MetadataApplier;
 import 
org.apache.flink.cdc.runtime.operators.schema.coordinator.SchemaRegistryProvider;
 import org.apache.flink.runtime.jobgraph.OperatorID;
@@ -40,12 +39,10 @@ public class SchemaOperatorFactory extends 
SimpleOperatorFactory<Event>
     private static final long serialVersionUID = 1L;
 
     private final MetadataApplier metadataApplier;
-    private final List<Tuple2<String, TableId>> routingRules;
+    private final List<RouteRule> routingRules;
 
     public SchemaOperatorFactory(
-            MetadataApplier metadataApplier,
-            List<Tuple2<String, TableId>> routingRules,
-            Duration rpcTimeOut) {
+            MetadataApplier metadataApplier, List<RouteRule> routingRules, 
Duration rpcTimeOut) {
         super(new SchemaOperator(routingRules, rpcTimeOut));
         this.metadataApplier = metadataApplier;
         this.routingRules = routingRules;
diff --git 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaDerivation.java
 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaDerivation.java
index 7b8538382..b936da608 100644
--- 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaDerivation.java
+++ 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaDerivation.java
@@ -17,7 +17,7 @@
 
 package org.apache.flink.cdc.runtime.operators.schema.coordinator;
 
-import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.cdc.common.event.AddColumnEvent;
 import org.apache.flink.cdc.common.event.AlterColumnTypeEvent;
 import org.apache.flink.cdc.common.event.CreateTableEvent;
@@ -25,6 +25,7 @@ import org.apache.flink.cdc.common.event.DropColumnEvent;
 import org.apache.flink.cdc.common.event.RenameColumnEvent;
 import org.apache.flink.cdc.common.event.SchemaChangeEvent;
 import org.apache.flink.cdc.common.event.TableId;
+import org.apache.flink.cdc.common.route.RouteRule;
 import org.apache.flink.cdc.common.schema.Column;
 import org.apache.flink.cdc.common.schema.PhysicalColumn;
 import org.apache.flink.cdc.common.schema.Schema;
@@ -48,19 +49,37 @@ import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
+import java.util.stream.Collectors;
 
 /** Derive schema changes based on the routing rules. */
 public class SchemaDerivation {
     private final SchemaManager schemaManager;
-    private final List<Tuple2<Selectors, TableId>> routes;
     private final Map<TableId, Set<TableId>> derivationMapping;
 
+    /**
+     * Storing route source table selector, sink table name (before symbol 
replacement), and replace
+     * symbol in a tuple.
+     */
+    private transient List<Tuple3<Selectors, String, String>> routes;
+
     public SchemaDerivation(
             SchemaManager schemaManager,
-            List<Tuple2<Selectors, TableId>> routes,
+            List<RouteRule> routeRules,
             Map<TableId, Set<TableId>> derivationMapping) {
         this.schemaManager = schemaManager;
-        this.routes = routes;
+        this.routes =
+                routeRules.stream()
+                        .map(
+                                rule -> {
+                                    String tableInclusions = rule.sourceTable;
+                                    Selectors selectors =
+                                            new Selectors.SelectorsBuilder()
+                                                    
.includeTables(tableInclusions)
+                                                    .build();
+                                    return new Tuple3<>(
+                                            selectors, rule.sinkTable, 
rule.replaceSymbol);
+                                })
+                        .collect(Collectors.toList());
         this.derivationMapping = derivationMapping;
     }
 
@@ -69,7 +88,7 @@ public class SchemaDerivation {
         TableId originalTable = schemaChangeEvent.tableId();
         boolean noRouteMatched = true;
 
-        for (Tuple2<Selectors, TableId> route : routes) {
+        for (Tuple3<Selectors, String, String> route : routes) {
             // Check routing table
             if (!route.f0.isMatch(originalTable)) {
                 continue;
@@ -78,7 +97,7 @@ public class SchemaDerivation {
             noRouteMatched = false;
 
             // Matched a routing rule
-            TableId derivedTable = route.f1;
+            TableId derivedTable = resolveReplacement(originalTable, route);
             Set<TableId> originalTables =
                     derivationMapping.computeIfAbsent(derivedTable, t -> new 
HashSet<>());
             originalTables.add(originalTable);
@@ -134,6 +153,14 @@ public class SchemaDerivation {
         }
     }
 
+    private TableId resolveReplacement(
+            TableId originalTable, Tuple3<Selectors, String, String> route) {
+        if (route.f2 != null) {
+            return TableId.parse(route.f1.replace(route.f2, 
originalTable.getTableName()));
+        }
+        return TableId.parse(route.f1);
+    }
+
     public Map<TableId, Set<TableId>> getDerivationMapping() {
         return derivationMapping;
     }
diff --git 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistry.java
 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistry.java
index 2c718dcec..02abb8903 100644
--- 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistry.java
+++ 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistry.java
@@ -17,9 +17,8 @@
 
 package org.apache.flink.cdc.runtime.operators.schema.coordinator;
 
-import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.cdc.common.event.TableId;
-import org.apache.flink.cdc.common.schema.Selectors;
+import org.apache.flink.cdc.common.route.RouteRule;
 import org.apache.flink.cdc.common.sink.MetadataApplier;
 import org.apache.flink.cdc.runtime.operators.schema.SchemaOperator;
 import org.apache.flink.cdc.runtime.operators.schema.event.FlushSuccessEvent;
@@ -90,7 +89,7 @@ public class SchemaRegistry implements OperatorCoordinator, 
CoordinationRequestH
     /** Metadata applier for applying schema changes to external system. */
     private final MetadataApplier metadataApplier;
 
-    private final List<Tuple2<Selectors, TableId>> routes;
+    private final List<RouteRule> routes;
 
     /** The request handler that handle all requests and events. */
     private SchemaRegistryRequestHandler requestHandler;
@@ -104,7 +103,7 @@ public class SchemaRegistry implements OperatorCoordinator, 
CoordinationRequestH
             String operatorName,
             OperatorCoordinator.Context context,
             MetadataApplier metadataApplier,
-            List<Tuple2<Selectors, TableId>> routes) {
+            List<RouteRule> routes) {
         this.context = context;
         this.operatorName = operatorName;
         this.failedReasons = new HashMap<>();
diff --git 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistryProvider.java
 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistryProvider.java
index 1f6e7aaf5..0db0cf3a7 100644
--- 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistryProvider.java
+++ 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistryProvider.java
@@ -17,16 +17,13 @@
 
 package org.apache.flink.cdc.runtime.operators.schema.coordinator;
 
-import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.cdc.common.annotation.Internal;
-import org.apache.flink.cdc.common.event.TableId;
-import org.apache.flink.cdc.common.schema.Selectors;
+import org.apache.flink.cdc.common.route.RouteRule;
 import org.apache.flink.cdc.common.sink.MetadataApplier;
 import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
 
 import java.util.List;
-import java.util.stream.Collectors;
 
 /** Provider of {@link SchemaRegistry}. */
 @Internal
@@ -36,13 +33,13 @@ public class SchemaRegistryProvider implements 
OperatorCoordinator.Provider {
     private final OperatorID operatorID;
     private final String operatorName;
     private final MetadataApplier metadataApplier;
-    private final List<Tuple2<String, TableId>> routingRules;
+    private final List<RouteRule> routingRules;
 
     public SchemaRegistryProvider(
             OperatorID operatorID,
             String operatorName,
             MetadataApplier metadataApplier,
-            List<Tuple2<String, TableId>> routingRules) {
+            List<RouteRule> routingRules) {
         this.operatorID = operatorID;
         this.operatorName = operatorName;
         this.metadataApplier = metadataApplier;
@@ -56,19 +53,6 @@ public class SchemaRegistryProvider implements 
OperatorCoordinator.Provider {
 
     @Override
     public OperatorCoordinator create(OperatorCoordinator.Context context) 
throws Exception {
-        List<Tuple2<Selectors, TableId>> routes =
-                routingRules.stream()
-                        .map(
-                                tuple2 -> {
-                                    String tableInclusions = tuple2.f0;
-                                    TableId replaceBy = tuple2.f1;
-                                    Selectors selectors =
-                                            new Selectors.SelectorsBuilder()
-                                                    
.includeTables(tableInclusions)
-                                                    .build();
-                                    return new Tuple2<>(selectors, replaceBy);
-                                })
-                        .collect(Collectors.toList());
-        return new SchemaRegistry(operatorName, context, metadataApplier, 
routes);
+        return new SchemaRegistry(operatorName, context, metadataApplier, 
routingRules);
     }
 }
diff --git 
a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaDerivationTest.java
 
b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaDerivationTest.java
index 19fd5a4a1..adaf3b140 100644
--- 
a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaDerivationTest.java
+++ 
b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaDerivationTest.java
@@ -17,7 +17,6 @@
 
 package org.apache.flink.cdc.runtime.operators.schema.coordinator;
 
-import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.cdc.common.event.AddColumnEvent;
 import org.apache.flink.cdc.common.event.AlterColumnTypeEvent;
 import org.apache.flink.cdc.common.event.CreateTableEvent;
@@ -25,10 +24,10 @@ import org.apache.flink.cdc.common.event.DropColumnEvent;
 import org.apache.flink.cdc.common.event.RenameColumnEvent;
 import org.apache.flink.cdc.common.event.SchemaChangeEvent;
 import org.apache.flink.cdc.common.event.TableId;
+import org.apache.flink.cdc.common.route.RouteRule;
 import org.apache.flink.cdc.common.schema.Column;
 import org.apache.flink.cdc.common.schema.PhysicalColumn;
 import org.apache.flink.cdc.common.schema.Schema;
-import org.apache.flink.cdc.common.schema.Selectors;
 import org.apache.flink.cdc.common.types.DataType;
 import org.apache.flink.cdc.common.types.DataTypes;
 
@@ -81,13 +80,9 @@ class SchemaDerivationTest {
                     .column(Column.physicalColumn("gender", 
DataTypes.STRING()))
                     .build();
 
-    private static final List<Tuple2<Selectors, TableId>> ROUTES =
+    private static final List<RouteRule> ROUTES =
             Collections.singletonList(
-                    Tuple2.of(
-                            new Selectors.SelectorsBuilder()
-                                    
.includeTables("mydb.myschema.mytable[0-9]")
-                                    .build(),
-                            MERGED_TABLE));
+                    new RouteRule("mydb.myschema.mytable[0-9]", 
MERGED_TABLE.toString(), null));
 
     @Test
     void testOneToOneMapping() {

Reply via email to