This is an automated email from the ASF dual-hosted git repository.
leonard pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git
The following commit(s) were added to refs/heads/master by this push:
new ad1f554c0 [FLINK-35647][route] Support symbol replacement to enrich
routing rules
ad1f554c0 is described below
commit ad1f554c0eb0a4be9fe6061c1942d1f284b68f5a
Author: yuxiqian <[email protected]>
AuthorDate: Fri Jul 12 17:35:22 2024 +0800
[FLINK-35647][route] Support symbol replacement to enrich routing rules
This closes #3428.
Co-authored-by: 张田 <[email protected]>
Co-authored-by: yangshuaitong <[email protected]>
---
docs/content.zh/docs/core-concept/route.md | 27 ++++--
docs/content/docs/core-concept/route.md | 27 ++++--
.../cli/parser/YamlPipelineDefinitionParser.java | 7 +-
.../parser/YamlPipelineDefinitionParserTest.java | 81 +++++++++++++++++-
.../pipeline-definition-full-with-repsym.yaml | 61 ++++++++++++++
.../apache/flink/cdc/common/route/RouteRule.java | 36 ++++++++
.../flink/cdc/composer/definition/RouteDef.java | 17 +++-
.../flink/translator/SchemaOperatorTranslator.java | 10 ++-
.../flink/FlinkPipelineComposerITCase.java | 63 +++++++++++++-
.../flink/cdc/pipeline/tests/RouteE2eITCase.java | 96 ++++++++++++++++++++++
.../runtime/operators/schema/SchemaOperator.java | 59 +++++++++----
.../operators/schema/SchemaOperatorFactory.java | 9 +-
.../schema/coordinator/SchemaDerivation.java | 39 +++++++--
.../schema/coordinator/SchemaRegistry.java | 7 +-
.../schema/coordinator/SchemaRegistryProvider.java | 24 +-----
.../schema/coordinator/SchemaDerivationTest.java | 11 +--
16 files changed, 494 insertions(+), 80 deletions(-)
diff --git a/docs/content.zh/docs/core-concept/route.md
b/docs/content.zh/docs/core-concept/route.md
index 5e6920579..ca5855d58 100644
--- a/docs/content.zh/docs/core-concept/route.md
+++ b/docs/content.zh/docs/core-concept/route.md
@@ -30,11 +30,12 @@ under the License.
# Parameters
To describe a route, the follows are required:
-| parameter | meaning |
optional/required |
-|--------------|----------------------------------------------------|-------------------|
-| source-table | Source table id, supports regular expressions | required
|
-| sink-table | Sink table id, supports regular expressions | required
|
-| description | Routing rule description(a default value provided) | optional
|
+| parameter | meaning
| optional/required |
+|----------------|---------------------------------------------------------------------------------------------|-------------------|
+| source-table | Source table id, supports regular expressions
| required |
+| sink-table | Sink table id, supports symbol replacement
| required |
+| replace-symbol | Special symbol in sink-table for pattern replacing, will be
replaced by original table name | optional |
+| description | Routing rule description(a default value provided)
| optional |
A route module can contain a list of source-table/sink-table rules.
@@ -71,4 +72,18 @@ route:
- source-table: mydb.products
sink-table: ods_db.ods_products
description: sync products table to ods_products
-```
\ No newline at end of file
+```
+
+## Pattern Replacement in routing rules
+
+If you'd like to route source tables and rename them to sink tables with
specific patterns, `replace-symbol` could be used to resemble source table
names like this:
+
+```yaml
+route:
+ - source-table: source_db.\.*
+ sink-table: sink_db.<>
+ replace-symbol: <>
+ description: route all tables in source_db to sink_db
+```
+
+Then, all tables including `source_db.XXX` will be routed to `sink_db.XXX`
without hassle.
\ No newline at end of file
diff --git a/docs/content/docs/core-concept/route.md
b/docs/content/docs/core-concept/route.md
index 5e6920579..ca5855d58 100644
--- a/docs/content/docs/core-concept/route.md
+++ b/docs/content/docs/core-concept/route.md
@@ -30,11 +30,12 @@ under the License.
# Parameters
To describe a route, the follows are required:
-| parameter | meaning |
optional/required |
-|--------------|----------------------------------------------------|-------------------|
-| source-table | Source table id, supports regular expressions | required
|
-| sink-table | Sink table id, supports regular expressions | required
|
-| description | Routing rule description(a default value provided) | optional
|
+| parameter | meaning
| optional/required |
+|----------------|---------------------------------------------------------------------------------------------|-------------------|
+| source-table | Source table id, supports regular expressions
| required |
+| sink-table | Sink table id, supports symbol replacement
| required |
+| replace-symbol | Special symbol in sink-table for pattern replacing, will be
replaced by original table name | optional |
+| description | Routing rule description(a default value provided)
| optional |
A route module can contain a list of source-table/sink-table rules.
@@ -71,4 +72,18 @@ route:
- source-table: mydb.products
sink-table: ods_db.ods_products
description: sync products table to ods_products
-```
\ No newline at end of file
+```
+
+## Pattern Replacement in routing rules
+
+If you'd like to route source tables and rename them to sink tables with
specific patterns, `replace-symbol` could be used to resemble source table
names like this:
+
+```yaml
+route:
+ - source-table: source_db.\.*
+ sink-table: sink_db.<>
+ replace-symbol: <>
+ description: route all tables in source_db to sink_db
+```
+
+Then, all tables including `source_db.XXX` will be routed to `sink_db.XXX`
without hassle.
\ No newline at end of file
diff --git
a/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParser.java
b/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParser.java
index 23b2c63ff..7ad07af1a 100644
---
a/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParser.java
+++
b/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParser.java
@@ -55,6 +55,7 @@ public class YamlPipelineDefinitionParser implements
PipelineDefinitionParser {
// Route keys
private static final String ROUTE_SOURCE_TABLE_KEY = "source-table";
private static final String ROUTE_SINK_TABLE_KEY = "sink-table";
+ private static final String ROUTE_REPLACE_SYMBOL = "replace-symbol";
private static final String ROUTE_DESCRIPTION_KEY = "description";
// Transform keys
@@ -164,11 +165,15 @@ public class YamlPipelineDefinitionParser implements
PipelineDefinitionParser {
"Missing required field \"%s\" in route
configuration",
ROUTE_SINK_TABLE_KEY)
.asText();
+ String replaceSymbol =
+ Optional.ofNullable(routeNode.get(ROUTE_REPLACE_SYMBOL))
+ .map(JsonNode::asText)
+ .orElse(null);
String description =
Optional.ofNullable(routeNode.get(ROUTE_DESCRIPTION_KEY))
.map(JsonNode::asText)
.orElse(null);
- return new RouteDef(sourceTable, sinkTable, description);
+ return new RouteDef(sourceTable, sinkTable, replaceSymbol,
description);
}
private TransformDef toTransformDef(JsonNode transformNode) {
diff --git
a/flink-cdc-cli/src/test/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParserTest.java
b/flink-cdc-cli/src/test/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParserTest.java
index 2d05bcbde..75dc5bd62 100644
---
a/flink-cdc-cli/src/test/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParserTest.java
+++
b/flink-cdc-cli/src/test/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParserTest.java
@@ -166,6 +166,15 @@ class YamlPipelineDefinitionParserTest {
+ "Or use 'UTC' without time zone and daylight
saving time.");
}
+ @Test
+ void testRouteWithReplacementSymbol() throws Exception {
+ URL resource =
+
Resources.getResource("definitions/pipeline-definition-full-with-repsym.yaml");
+ YamlPipelineDefinitionParser parser = new
YamlPipelineDefinitionParser();
+ PipelineDef pipelineDef = parser.parse(Paths.get(resource.toURI()),
new Configuration());
+ assertThat(pipelineDef).isEqualTo(fullDefWithRouteRepSym);
+ }
+
private final PipelineDef fullDef =
new PipelineDef(
new SourceDef(
@@ -197,10 +206,12 @@ class YamlPipelineDefinitionParserTest {
new RouteDef(
"mydb.default.app_order_.*",
"odsdb.default.app_order",
+ null,
"sync all sharding tables to one"),
new RouteDef(
"mydb.default.web_order",
"odsdb.default.ods_web_order",
+ null,
"sync table to with given prefix ods_")),
Arrays.asList(
new TransformDef(
@@ -258,10 +269,12 @@ class YamlPipelineDefinitionParserTest {
new RouteDef(
"mydb.default.app_order_.*",
"odsdb.default.app_order",
+ null,
"sync all sharding tables to one"),
new RouteDef(
"mydb.default.web_order",
"odsdb.default.ods_web_order",
+ null,
"sync table to with given prefix ods_")),
Arrays.asList(
new TransformDef(
@@ -312,7 +325,10 @@ class YamlPipelineDefinitionParserTest {
.build())),
Collections.singletonList(
new RouteDef(
- "mydb.default.app_order_.*",
"odsdb.default.app_order", null)),
+ "mydb.default.app_order_.*",
+ "odsdb.default.app_order",
+ null,
+ null)),
Collections.emptyList(),
Configuration.fromMap(
ImmutableMap.<String, String>builder()
@@ -326,4 +342,67 @@ class YamlPipelineDefinitionParserTest {
Collections.emptyList(),
Collections.emptyList(),
Configuration.fromMap(Collections.singletonMap("parallelism", "1")));
+
+ private final PipelineDef fullDefWithRouteRepSym =
+ new PipelineDef(
+ new SourceDef(
+ "mysql",
+ "source-database",
+ Configuration.fromMap(
+ ImmutableMap.<String, String>builder()
+ .put("host", "localhost")
+ .put("port", "3306")
+ .put("username", "admin")
+ .put("password", "pass")
+ .put(
+ "tables",
+ "adb.*,
bdb.user_table_[0-9]+, [app|web]_order_.*")
+ .put(
+ "chunk-column",
+
"app_order_.*:id,web_order:product_id")
+ .put("capture-new-tables", "true")
+ .build())),
+ new SinkDef(
+ "kafka",
+ "sink-queue",
+ Configuration.fromMap(
+ ImmutableMap.<String, String>builder()
+ .put("bootstrap-servers",
"localhost:9092")
+ .put("auto-create-table", "true")
+ .build())),
+ Arrays.asList(
+ new RouteDef(
+ "mydb.default.app_order_.*",
+ "odsdb.default.app_order_<>",
+ "<>",
+ "sync all sharding tables to one"),
+ new RouteDef(
+ "mydb.default.web_order",
+ "odsdb.default.ods_web_order_>_<",
+ ">_<",
+ "sync table to with given prefix ods_")),
+ Arrays.asList(
+ new TransformDef(
+ "mydb.app_order_.*",
+ "id, order_id, TO_UPPER(product_name)",
+ "id > 10 AND order_id > 100",
+ "id",
+ "product_name",
+ "comment=app order",
+ "project fields from source table"),
+ new TransformDef(
+ "mydb.web_order_.*",
+ "CONCAT(id, order_id) as uniq_id, *",
+ "uniq_id > 10",
+ null,
+ null,
+ null,
+ "add new uniq_id for each row")),
+ Configuration.fromMap(
+ ImmutableMap.<String, String>builder()
+ .put("name", "source-database-sync-pipe")
+ .put("parallelism", "4")
+ .put("schema.change.behavior", "evolve")
+ .put("schema-operator.rpc-timeout", "1 h")
+ .build()));
}
diff --git
a/flink-cdc-cli/src/test/resources/definitions/pipeline-definition-full-with-repsym.yaml
b/flink-cdc-cli/src/test/resources/definitions/pipeline-definition-full-with-repsym.yaml
new file mode 100644
index 000000000..265358fb4
--- /dev/null
+++
b/flink-cdc-cli/src/test/resources/definitions/pipeline-definition-full-with-repsym.yaml
@@ -0,0 +1,61 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+source:
+ type: mysql
+ name: source-database
+ host: localhost
+ port: 3306
+ username: admin
+ password: pass
+ tables: adb.*, bdb.user_table_[0-9]+, [app|web]_order_.*
+ chunk-column: app_order_.*:id,web_order:product_id
+ capture-new-tables: true
+
+sink:
+ type: kafka
+ name: sink-queue
+ bootstrap-servers: localhost:9092
+ auto-create-table: true
+
+route:
+ - source-table: mydb.default.app_order_.*
+ sink-table: odsdb.default.app_order_<>
+ replace-symbol: "<>"
+ description: sync all sharding tables to one
+ - source-table: mydb.default.web_order
+ sink-table: odsdb.default.ods_web_order_>_<
+ replace-symbol: ">_<"
+ description: sync table to with given prefix ods_
+
+transform:
+ - source-table: mydb.app_order_.*
+ projection: id, order_id, TO_UPPER(product_name)
+ filter: id > 10 AND order_id > 100
+ primary-keys: id
+ partition-keys: product_name
+ table-options: comment=app order
+ description: project fields from source table
+ - source-table: mydb.web_order_.*
+ projection: CONCAT(id, order_id) as uniq_id, *
+ filter: uniq_id > 10
+ description: add new uniq_id for each row
+
+pipeline:
+ name: source-database-sync-pipe
+ parallelism: 4
+ schema.change.behavior: evolve
+ schema-operator.rpc-timeout: 1 h
\ No newline at end of file
diff --git
a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/route/RouteRule.java
b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/route/RouteRule.java
new file mode 100644
index 000000000..4fbfb61b6
--- /dev/null
+++
b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/route/RouteRule.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.common.route;
+
+import java.io.Serializable;
+
+/** Definition of a routing rule with replacement symbol. */
+public class RouteRule implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ public RouteRule(String sourceTable, String sinkTable, String
replaceSymbol) {
+ this.sourceTable = sourceTable;
+ this.sinkTable = sinkTable;
+ this.replaceSymbol = replaceSymbol;
+ }
+
+ public String sourceTable;
+ public String sinkTable;
+ public String replaceSymbol;
+}
diff --git
a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/definition/RouteDef.java
b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/definition/RouteDef.java
index ee55eb366..868c5c67f 100644
---
a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/definition/RouteDef.java
+++
b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/definition/RouteDef.java
@@ -36,11 +36,17 @@ import java.util.Optional;
public class RouteDef {
private final String sourceTable;
private final String sinkTable;
+ private final String replaceSymbol;
@Nullable private final String description;
- public RouteDef(String sourceTable, String sinkTable, @Nullable String
description) {
+ public RouteDef(
+ String sourceTable,
+ String sinkTable,
+ @Nullable String replaceSymbol,
+ @Nullable String description) {
this.sourceTable = sourceTable;
this.sinkTable = sinkTable;
+ this.replaceSymbol = replaceSymbol;
this.description = description;
}
@@ -52,6 +58,10 @@ public class RouteDef {
return sinkTable;
}
+ public Optional<String> getReplaceSymbol() {
+ return Optional.ofNullable(replaceSymbol);
+ }
+
public Optional<String> getDescription() {
return Optional.ofNullable(description);
}
@@ -63,6 +73,8 @@ public class RouteDef {
+ sourceTable
+ ", sinkTable="
+ sinkTable
+ + ", replaceSymbol="
+ + replaceSymbol
+ ", description='"
+ description
+ '\''
@@ -80,11 +92,12 @@ public class RouteDef {
RouteDef routeDef = (RouteDef) o;
return Objects.equals(sourceTable, routeDef.sourceTable)
&& Objects.equals(sinkTable, routeDef.sinkTable)
+ && Objects.equals(replaceSymbol, routeDef.replaceSymbol)
&& Objects.equals(description, routeDef.description);
}
@Override
public int hashCode() {
- return Objects.hash(sourceTable, sinkTable, description);
+ return Objects.hash(sourceTable, sinkTable, replaceSymbol,
description);
}
}
diff --git
a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/SchemaOperatorTranslator.java
b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/SchemaOperatorTranslator.java
index a69741c7b..1f5dc44bd 100644
---
a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/SchemaOperatorTranslator.java
+++
b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/SchemaOperatorTranslator.java
@@ -17,12 +17,11 @@
package org.apache.flink.cdc.composer.flink.translator;
-import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.cdc.common.annotation.Internal;
import org.apache.flink.cdc.common.event.Event;
import org.apache.flink.cdc.common.event.SchemaChangeEvent;
-import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior;
+import org.apache.flink.cdc.common.route.RouteRule;
import org.apache.flink.cdc.common.sink.MetadataApplier;
import org.apache.flink.cdc.composer.definition.RouteDef;
import org.apache.flink.cdc.runtime.operators.schema.SchemaOperator;
@@ -80,10 +79,13 @@ public class SchemaOperatorTranslator {
int parallelism,
MetadataApplier metadataApplier,
List<RouteDef> routes) {
- List<Tuple2<String, TableId>> routingRules = new ArrayList<>();
+ List<RouteRule> routingRules = new ArrayList<>();
for (RouteDef route : routes) {
routingRules.add(
- Tuple2.of(route.getSourceTable(),
TableId.parse(route.getSinkTable())));
+ new RouteRule(
+ route.getSourceTable(),
+ route.getSinkTable(),
+ route.getReplaceSymbol().orElse(null)));
}
SingleOutputStreamOperator<Event> stream =
input.transform(
diff --git
a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerITCase.java
b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerITCase.java
index b59f4ead6..26c9c9187 100644
---
a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerITCase.java
+++
b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerITCase.java
@@ -424,8 +424,8 @@ class FlinkPipelineComposerITCase {
TableId routedTable2 = TableId.tableId("default_namespace",
"default_schema", "routed2");
List<RouteDef> routeDef =
Arrays.asList(
- new RouteDef(TABLE_1.toString(),
routedTable1.toString(), null),
- new RouteDef(TABLE_2.toString(),
routedTable2.toString(), null));
+ new RouteDef(TABLE_1.toString(),
routedTable1.toString(), null, null),
+ new RouteDef(TABLE_2.toString(),
routedTable2.toString(), null, null));
// Setup pipeline
Configuration pipelineConfig = new Configuration();
@@ -616,6 +616,7 @@ class FlinkPipelineComposerITCase {
new RouteDef(
"default_namespace.default_schema.mytable[0-9]",
mergedTable.toString(),
+ null,
null));
// Setup pipeline
@@ -657,4 +658,62 @@ class FlinkPipelineComposerITCase {
"DataChangeEvent{tableId=default_namespace.default_schema.merged, before=[],
after=[5, null, 24, null, Eliza, null], op=INSERT, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.merged, before=[],
after=[6, Frank, 30, student, null, male], op=INSERT, meta=()}");
}
+
+ @ParameterizedTest
+ @EnumSource
+ void testRouteWithReplaceSymbol(ValuesDataSink.SinkApi sinkApi) throws
Exception {
+ FlinkPipelineComposer composer = FlinkPipelineComposer.ofMiniCluster();
+
+ // Setup value source
+ Configuration sourceConfig = new Configuration();
+ sourceConfig.set(
+ ValuesDataSourceOptions.EVENT_SET_ID,
+ ValuesDataSourceHelper.EventSetId.SINGLE_SPLIT_MULTI_TABLES);
+ SourceDef sourceDef =
+ new SourceDef(ValuesDataFactory.IDENTIFIER, "Value Source",
sourceConfig);
+
+ // Setup value sink
+ Configuration sinkConfig = new Configuration();
+ sinkConfig.set(ValuesDataSinkOptions.MATERIALIZED_IN_MEMORY, true);
+ sinkConfig.set(ValuesDataSinkOptions.SINK_API, sinkApi);
+ SinkDef sinkDef = new SinkDef(ValuesDataFactory.IDENTIFIER, "Value
Sink", sinkConfig);
+
+ // Setup pipeline
+ Configuration pipelineConfig = new Configuration();
+ pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1);
+ PipelineDef pipelineDef =
+ new PipelineDef(
+ sourceDef,
+ sinkDef,
+ Collections.singletonList(
+ new RouteDef(
+
"default_namespace.default_schema.table[0-9]",
+
"replaced_namespace.replaced_schema.__$__",
+ "__$__",
+ null)),
+ Collections.emptyList(),
+ pipelineConfig);
+
+ // Execute the pipeline
+ PipelineExecution execution = composer.compose(pipelineDef);
+ execution.execute();
+
+ // Check the order and content of all received events
+ String[] outputEvents = outCaptor.toString().trim().split("\n");
+ assertThat(outputEvents)
+ .containsExactly(
+
"CreateTableEvent{tableId=replaced_namespace.replaced_schema.table1,
schema=columns={`col1` STRING,`col2` STRING}, primaryKeys=col1, options=()}",
+
"CreateTableEvent{tableId=replaced_namespace.replaced_schema.table2,
schema=columns={`col1` STRING,`col2` STRING}, primaryKeys=col1, options=()}",
+
"DataChangeEvent{tableId=replaced_namespace.replaced_schema.table1, before=[],
after=[1, 1], op=INSERT, meta=()}",
+
"DataChangeEvent{tableId=replaced_namespace.replaced_schema.table1, before=[],
after=[2, 2], op=INSERT, meta=()}",
+
"DataChangeEvent{tableId=replaced_namespace.replaced_schema.table1, before=[],
after=[3, 3], op=INSERT, meta=()}",
+
"AddColumnEvent{tableId=replaced_namespace.replaced_schema.table1,
addedColumns=[ColumnWithPosition{column=`col3` STRING, position=LAST,
existedColumnName=null}]}",
+
"DataChangeEvent{tableId=replaced_namespace.replaced_schema.table2, before=[],
after=[1, 1], op=INSERT, meta=()}",
+
"DataChangeEvent{tableId=replaced_namespace.replaced_schema.table2, before=[],
after=[2, 2], op=INSERT, meta=()}",
+
"DataChangeEvent{tableId=replaced_namespace.replaced_schema.table2, before=[],
after=[3, 3], op=INSERT, meta=()}",
+
"RenameColumnEvent{tableId=replaced_namespace.replaced_schema.table1,
nameMapping={col2=newCol2, col3=newCol3}}",
+
"DropColumnEvent{tableId=replaced_namespace.replaced_schema.table1,
droppedColumnNames=[newCol2]}",
+
"DataChangeEvent{tableId=replaced_namespace.replaced_schema.table1, before=[1,
1], after=[], op=DELETE, meta=()}",
+
"DataChangeEvent{tableId=replaced_namespace.replaced_schema.table1, before=[2,
2], after=[2, x], op=UPDATE, meta=()}");
+ }
}
diff --git
a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/RouteE2eITCase.java
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/RouteE2eITCase.java
index 4ff001e79..e674d57d0 100644
---
a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/RouteE2eITCase.java
+++
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/RouteE2eITCase.java
@@ -624,6 +624,102 @@ public class RouteE2eITCase extends
PipelineTestEnvironment {
"DataChangeEvent{tableId=%s.TABLEDELTA, before=[],
after=[10004], op=INSERT, meta=()}");
}
+ @Test
+ public void testReplacementSymbol() throws Exception {
+ String pipelineJob =
+ String.format(
+ "source:\n"
+ + " type: mysql\n"
+ + " hostname: %s\n"
+ + " port: 3306\n"
+ + " username: %s\n"
+ + " password: %s\n"
+ + " tables: %s.\\.*\n"
+ + " server-id: 5400-5404\n"
+ + " server-time-zone: UTC\n"
+ + "\n"
+ + "sink:\n"
+ + " type: values\n"
+ + "route:\n"
+ + " - source-table: %s.\\.*\n"
+ + " sink-table: NEW_%s.NEW_<>\n"
+ + " replace-symbol: <>\n"
+ + "\n"
+ + "pipeline:\n"
+ + " parallelism: 1",
+ INTER_CONTAINER_MYSQL_ALIAS,
+ MYSQL_TEST_USER,
+ MYSQL_TEST_PASSWORD,
+ routeTestDatabase.getDatabaseName(),
+ routeTestDatabase.getDatabaseName(),
+ routeTestDatabase.getDatabaseName());
+ Path mysqlCdcJar =
TestUtils.getResource("mysql-cdc-pipeline-connector.jar");
+ Path valuesCdcJar =
TestUtils.getResource("values-cdc-pipeline-connector.jar");
+ Path mysqlDriverJar = TestUtils.getResource("mysql-driver.jar");
+ submitPipelineJob(pipelineJob, mysqlCdcJar, valuesCdcJar,
mysqlDriverJar);
+ waitUntilJobRunning(Duration.ofSeconds(30));
+ LOG.info("Pipeline job is running");
+
+ waitUntilSpecificEvent(
+ String.format(
+ "CreateTableEvent{tableId=NEW_%s.NEW_TABLEALPHA,
schema=columns={`ID` INT NOT NULL,`VERSION` VARCHAR(17)}, primaryKeys=ID,
options=()}",
+ routeTestDatabase.getDatabaseName()));
+ waitUntilSpecificEvent(
+ String.format(
+ "CreateTableEvent{tableId=NEW_%s.NEW_TABLEBETA,
schema=columns={`ID` INT NOT NULL,`VERSION` VARCHAR(17)}, primaryKeys=ID,
options=()}",
+ routeTestDatabase.getDatabaseName()));
+ waitUntilSpecificEvent(
+ String.format(
+ "CreateTableEvent{tableId=NEW_%s.NEW_TABLEGAMMA,
schema=columns={`ID` INT NOT NULL,`VERSION` VARCHAR(17)}, primaryKeys=ID,
options=()}",
+ routeTestDatabase.getDatabaseName()));
+ waitUntilSpecificEvent(
+ String.format(
+ "CreateTableEvent{tableId=NEW_%s.NEW_TABLEDELTA,
schema=columns={`ID` INT NOT NULL,`VERSION` VARCHAR(17)}, primaryKeys=ID,
options=()}",
+ routeTestDatabase.getDatabaseName()));
+
+ validateResult(
+ "DataChangeEvent{tableId=NEW_%s.NEW_TABLEALPHA, before=[],
after=[1008, 8], op=INSERT, meta=()}",
+ "DataChangeEvent{tableId=NEW_%s.NEW_TABLEALPHA, before=[],
after=[1009, 8.1], op=INSERT, meta=()}",
+ "DataChangeEvent{tableId=NEW_%s.NEW_TABLEALPHA, before=[],
after=[1010, 10], op=INSERT, meta=()}",
+ "DataChangeEvent{tableId=NEW_%s.NEW_TABLEALPHA, before=[],
after=[1011, 11], op=INSERT, meta=()}",
+ "DataChangeEvent{tableId=NEW_%s.NEW_TABLEBETA, before=[],
after=[2011, 11], op=INSERT, meta=()}",
+ "DataChangeEvent{tableId=NEW_%s.NEW_TABLEBETA, before=[],
after=[2012, 12], op=INSERT, meta=()}",
+ "DataChangeEvent{tableId=NEW_%s.NEW_TABLEBETA, before=[],
after=[2013, 13], op=INSERT, meta=()}",
+ "DataChangeEvent{tableId=NEW_%s.NEW_TABLEBETA, before=[],
after=[2014, 14], op=INSERT, meta=()}",
+ "DataChangeEvent{tableId=NEW_%s.NEW_TABLEGAMMA, before=[],
after=[3015, Amber], op=INSERT, meta=()}",
+ "DataChangeEvent{tableId=NEW_%s.NEW_TABLEGAMMA, before=[],
after=[3016, Black], op=INSERT, meta=()}",
+ "DataChangeEvent{tableId=NEW_%s.NEW_TABLEGAMMA, before=[],
after=[3017, Cyan], op=INSERT, meta=()}",
+ "DataChangeEvent{tableId=NEW_%s.NEW_TABLEGAMMA, before=[],
after=[3018, Denim], op=INSERT, meta=()}",
+ "DataChangeEvent{tableId=NEW_%s.NEW_TABLEDELTA, before=[],
after=[4019, Yosemite], op=INSERT, meta=()}",
+ "DataChangeEvent{tableId=NEW_%s.NEW_TABLEDELTA, before=[],
after=[4020, El Capitan], op=INSERT, meta=()}",
+ "DataChangeEvent{tableId=NEW_%s.NEW_TABLEDELTA, before=[],
after=[4021, Sierra], op=INSERT, meta=()}",
+ "DataChangeEvent{tableId=NEW_%s.NEW_TABLEDELTA, before=[],
after=[4022, High Sierra], op=INSERT, meta=()}",
+ "DataChangeEvent{tableId=NEW_%s.NEW_TABLEDELTA, before=[],
after=[4023, Mojave], op=INSERT, meta=()}",
+ "DataChangeEvent{tableId=NEW_%s.NEW_TABLEDELTA, before=[],
after=[4024, Catalina], op=INSERT, meta=()}");
+
+ LOG.info("Begin incremental reading stage.");
+
+ generateIncrementalChanges();
+
+ validateResult(
+ "DataChangeEvent{tableId=NEW_%s.NEW_TABLEALPHA, before=[],
after=[3007, 7], op=INSERT, meta=()}",
+ "DataChangeEvent{tableId=NEW_%s.NEW_TABLEBETA, before=[2014,
14], after=[2014, 2014], op=UPDATE, meta=()}",
+ "DataChangeEvent{tableId=NEW_%s.NEW_TABLEGAMMA, before=[],
after=[3019, Emerald], op=INSERT, meta=()}",
+ "DataChangeEvent{tableId=NEW_%s.NEW_TABLEDELTA, before=[4024,
Catalina], after=[], op=DELETE, meta=()}");
+
+ generateSchemaChanges();
+ validateResult(
+ "AddColumnEvent{tableId=NEW_%s.NEW_TABLEALPHA,
addedColumns=[ColumnWithPosition{column=`NAME` VARCHAR(17), position=LAST,
existedColumnName=null}]}",
+ "DataChangeEvent{tableId=NEW_%s.NEW_TABLEALPHA, before=[],
after=[10001, 12, Derrida], op=INSERT, meta=()}",
+ "RenameColumnEvent{tableId=NEW_%s.NEW_TABLEBETA,
nameMapping={VERSION=VERSION_EX}}",
+ "DataChangeEvent{tableId=NEW_%s.NEW_TABLEBETA, before=[],
after=[10002, 15], op=INSERT, meta=()}",
+ "AlterColumnTypeEvent{tableId=NEW_%s.NEW_TABLEGAMMA,
nameMapping={VERSION=VARCHAR(19)}}",
+ "RenameColumnEvent{tableId=NEW_%s.NEW_TABLEGAMMA,
nameMapping={VERSION=VERSION_EX}}",
+ "DataChangeEvent{tableId=NEW_%s.NEW_TABLEGAMMA, before=[],
after=[10003, Fluorite], op=INSERT, meta=()}",
+ "DropColumnEvent{tableId=NEW_%s.NEW_TABLEDELTA,
droppedColumnNames=[VERSION]}",
+ "DataChangeEvent{tableId=NEW_%s.NEW_TABLEDELTA, before=[],
after=[10004], op=INSERT, meta=()}");
+ }
+
private void validateResult(String... expectedEvents) throws Exception {
for (String event : expectedEvents) {
waitUntilSpecificEvent(String.format(event,
routeTestDatabase.getDatabaseName()));
diff --git
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/SchemaOperator.java
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/SchemaOperator.java
index 430e7f263..d1f468bfe 100644
---
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/SchemaOperator.java
+++
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/SchemaOperator.java
@@ -17,7 +17,7 @@
package org.apache.flink.cdc.runtime.operators.schema;
-import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.cdc.common.annotation.Internal;
import org.apache.flink.cdc.common.data.RecordData;
import org.apache.flink.cdc.common.data.StringData;
@@ -26,6 +26,7 @@ import org.apache.flink.cdc.common.event.Event;
import org.apache.flink.cdc.common.event.FlushEvent;
import org.apache.flink.cdc.common.event.SchemaChangeEvent;
import org.apache.flink.cdc.common.event.TableId;
+import org.apache.flink.cdc.common.route.RouteRule;
import org.apache.flink.cdc.common.schema.Column;
import org.apache.flink.cdc.common.schema.Schema;
import org.apache.flink.cdc.common.schema.Selectors;
@@ -70,6 +71,7 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
@@ -88,22 +90,33 @@ public class SchemaOperator extends
AbstractStreamOperator<Event>
private static final Logger LOG =
LoggerFactory.getLogger(SchemaOperator.class);
private static final Duration CACHE_EXPIRE_DURATION = Duration.ofDays(1);
- private final List<Tuple2<String, TableId>> routingRules;
+ private final List<RouteRule> routingRules;
+
+ /**
+ * Storing route source table selector, sink table name (before symbol
replacement), and replace
+ * symbol in a tuple.
+ */
+ private transient List<Tuple3<Selectors, String, String>> routes;
- private transient List<Tuple2<Selectors, TableId>> routes;
private transient TaskOperatorEventGateway toCoordinator;
private transient SchemaEvolutionClient schemaEvolutionClient;
private transient LoadingCache<TableId, Schema> cachedSchemas;
+ /**
+ * Storing mapping relations between upstream tableId (source table)
mapping to downstream
+ * tableIds (sink tables).
+ */
+ private transient LoadingCache<TableId, List<TableId>> tableIdMappingCache;
+
private final long rpcTimeOutInMillis;
- public SchemaOperator(List<Tuple2<String, TableId>> routingRules) {
+ public SchemaOperator(List<RouteRule> routingRules) {
this.routingRules = routingRules;
this.chainingStrategy = ChainingStrategy.ALWAYS;
this.rpcTimeOutInMillis =
DEFAULT_SCHEMA_OPERATOR_RPC_TIMEOUT.toMillis();
}
- public SchemaOperator(List<Tuple2<String, TableId>> routingRules, Duration
rpcTimeOut) {
+ public SchemaOperator(List<RouteRule> routingRules, Duration rpcTimeOut) {
this.routingRules = routingRules;
this.chainingStrategy = ChainingStrategy.ALWAYS;
this.rpcTimeOutInMillis = rpcTimeOut.toMillis();
@@ -119,14 +132,14 @@ public class SchemaOperator extends
AbstractStreamOperator<Event>
routes =
routingRules.stream()
.map(
- tuple2 -> {
- String tableInclusions = tuple2.f0;
- TableId replaceBy = tuple2.f1;
+ rule -> {
+ String tableInclusions = rule.sourceTable;
Selectors selectors =
new Selectors.SelectorsBuilder()
.includeTables(tableInclusions)
.build();
- return new Tuple2<>(selectors, replaceBy);
+ return new Tuple3<>(
+ selectors, rule.sinkTable,
rule.replaceSymbol);
})
.collect(Collectors.toList());
schemaEvolutionClient = new SchemaEvolutionClient(toCoordinator,
getOperatorID());
@@ -140,6 +153,16 @@ public class SchemaOperator extends
AbstractStreamOperator<Event>
return getLatestSchema(tableId);
}
});
+ tableIdMappingCache =
+ CacheBuilder.newBuilder()
+ .expireAfterAccess(CACHE_EXPIRE_DURATION)
+ .build(
+ new CacheLoader<TableId, List<TableId>>() {
+ @Override
+ public List<TableId> load(TableId tableId)
{
+ return getRoutedTables(tableId);
+ }
+ });
}
@Override
@@ -158,7 +181,7 @@ public class SchemaOperator extends
AbstractStreamOperator<Event>
*/
@Override
public void processElement(StreamRecord<Event> streamRecord)
- throws InterruptedException, TimeoutException {
+ throws InterruptedException, TimeoutException, ExecutionException {
Event event = streamRecord.getValue();
// Schema changes
if (event instanceof SchemaChangeEvent) {
@@ -169,15 +192,15 @@ public class SchemaOperator extends
AbstractStreamOperator<Event>
handleSchemaChangeEvent(tableId, (SchemaChangeEvent) event);
// Update caches
cachedSchemas.put(tableId, getLatestSchema(tableId));
- getRoutedTables(tableId)
+ tableIdMappingCache
+ .get(tableId)
.forEach(routed -> cachedSchemas.put(routed,
getLatestSchema(routed)));
return;
}
// Data changes
DataChangeEvent dataChangeEvent = (DataChangeEvent) event;
- TableId tableId = dataChangeEvent.tableId();
- List<TableId> optionalRoutedTable = getRoutedTables(tableId);
+ List<TableId> optionalRoutedTable =
tableIdMappingCache.get(dataChangeEvent.tableId());
if (optionalRoutedTable.isEmpty()) {
output.collect(streamRecord);
} else {
@@ -270,10 +293,18 @@ public class SchemaOperator extends
AbstractStreamOperator<Event>
private List<TableId> getRoutedTables(TableId originalTableId) {
return routes.stream()
.filter(route -> route.f0.isMatch(originalTableId))
- .map(route -> route.f1)
+ .map(route -> resolveReplacement(originalTableId, route))
.collect(Collectors.toList());
}
+ private TableId resolveReplacement(
+ TableId originalTable, Tuple3<Selectors, String, String> route) {
+ if (route.f2 != null) {
+ return TableId.parse(route.f1.replace(route.f2,
originalTable.getTableName()));
+ }
+ return TableId.parse(route.f1);
+ }
+
private void handleSchemaChangeEvent(TableId tableId, SchemaChangeEvent
schemaChangeEvent)
throws InterruptedException, TimeoutException {
// The request will need to send a FlushEvent or block until flushing
finished
diff --git
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/SchemaOperatorFactory.java
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/SchemaOperatorFactory.java
index ecf500171..eba7c7712 100644
---
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/SchemaOperatorFactory.java
+++
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/SchemaOperatorFactory.java
@@ -17,10 +17,9 @@
package org.apache.flink.cdc.runtime.operators.schema;
-import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.cdc.common.annotation.Internal;
import org.apache.flink.cdc.common.event.Event;
-import org.apache.flink.cdc.common.event.TableId;
+import org.apache.flink.cdc.common.route.RouteRule;
import org.apache.flink.cdc.common.sink.MetadataApplier;
import
org.apache.flink.cdc.runtime.operators.schema.coordinator.SchemaRegistryProvider;
import org.apache.flink.runtime.jobgraph.OperatorID;
@@ -40,12 +39,10 @@ public class SchemaOperatorFactory extends
SimpleOperatorFactory<Event>
private static final long serialVersionUID = 1L;
private final MetadataApplier metadataApplier;
- private final List<Tuple2<String, TableId>> routingRules;
+ private final List<RouteRule> routingRules;
public SchemaOperatorFactory(
- MetadataApplier metadataApplier,
- List<Tuple2<String, TableId>> routingRules,
- Duration rpcTimeOut) {
+ MetadataApplier metadataApplier, List<RouteRule> routingRules,
Duration rpcTimeOut) {
super(new SchemaOperator(routingRules, rpcTimeOut));
this.metadataApplier = metadataApplier;
this.routingRules = routingRules;
diff --git
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaDerivation.java
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaDerivation.java
index 7b8538382..b936da608 100644
---
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaDerivation.java
+++
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaDerivation.java
@@ -17,7 +17,7 @@
package org.apache.flink.cdc.runtime.operators.schema.coordinator;
-import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.cdc.common.event.AddColumnEvent;
import org.apache.flink.cdc.common.event.AlterColumnTypeEvent;
import org.apache.flink.cdc.common.event.CreateTableEvent;
@@ -25,6 +25,7 @@ import org.apache.flink.cdc.common.event.DropColumnEvent;
import org.apache.flink.cdc.common.event.RenameColumnEvent;
import org.apache.flink.cdc.common.event.SchemaChangeEvent;
import org.apache.flink.cdc.common.event.TableId;
+import org.apache.flink.cdc.common.route.RouteRule;
import org.apache.flink.cdc.common.schema.Column;
import org.apache.flink.cdc.common.schema.PhysicalColumn;
import org.apache.flink.cdc.common.schema.Schema;
@@ -48,19 +49,37 @@ import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
+import java.util.stream.Collectors;
/** Derive schema changes based on the routing rules. */
public class SchemaDerivation {
private final SchemaManager schemaManager;
- private final List<Tuple2<Selectors, TableId>> routes;
private final Map<TableId, Set<TableId>> derivationMapping;
+ /**
+ * Storing route source table selector, sink table name (before symbol
replacement), and replace
+ * symbol in a tuple.
+ */
+ private transient List<Tuple3<Selectors, String, String>> routes;
+
public SchemaDerivation(
SchemaManager schemaManager,
- List<Tuple2<Selectors, TableId>> routes,
+ List<RouteRule> routeRules,
Map<TableId, Set<TableId>> derivationMapping) {
this.schemaManager = schemaManager;
- this.routes = routes;
+ this.routes =
+ routeRules.stream()
+ .map(
+ rule -> {
+ String tableInclusions = rule.sourceTable;
+ Selectors selectors =
+ new Selectors.SelectorsBuilder()
+
.includeTables(tableInclusions)
+ .build();
+ return new Tuple3<>(
+ selectors, rule.sinkTable,
rule.replaceSymbol);
+ })
+ .collect(Collectors.toList());
this.derivationMapping = derivationMapping;
}
@@ -69,7 +88,7 @@ public class SchemaDerivation {
TableId originalTable = schemaChangeEvent.tableId();
boolean noRouteMatched = true;
- for (Tuple2<Selectors, TableId> route : routes) {
+ for (Tuple3<Selectors, String, String> route : routes) {
// Check routing table
if (!route.f0.isMatch(originalTable)) {
continue;
@@ -78,7 +97,7 @@ public class SchemaDerivation {
noRouteMatched = false;
// Matched a routing rule
- TableId derivedTable = route.f1;
+ TableId derivedTable = resolveReplacement(originalTable, route);
Set<TableId> originalTables =
derivationMapping.computeIfAbsent(derivedTable, t -> new
HashSet<>());
originalTables.add(originalTable);
@@ -134,6 +153,14 @@ public class SchemaDerivation {
}
}
+ private TableId resolveReplacement(
+ TableId originalTable, Tuple3<Selectors, String, String> route) {
+ if (route.f2 != null) {
+ return TableId.parse(route.f1.replace(route.f2,
originalTable.getTableName()));
+ }
+ return TableId.parse(route.f1);
+ }
+
public Map<TableId, Set<TableId>> getDerivationMapping() {
return derivationMapping;
}
diff --git
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistry.java
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistry.java
index 2c718dcec..02abb8903 100644
---
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistry.java
+++
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistry.java
@@ -17,9 +17,8 @@
package org.apache.flink.cdc.runtime.operators.schema.coordinator;
-import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.cdc.common.event.TableId;
-import org.apache.flink.cdc.common.schema.Selectors;
+import org.apache.flink.cdc.common.route.RouteRule;
import org.apache.flink.cdc.common.sink.MetadataApplier;
import org.apache.flink.cdc.runtime.operators.schema.SchemaOperator;
import org.apache.flink.cdc.runtime.operators.schema.event.FlushSuccessEvent;
@@ -90,7 +89,7 @@ public class SchemaRegistry implements OperatorCoordinator,
CoordinationRequestH
/** Metadata applier for applying schema changes to external system. */
private final MetadataApplier metadataApplier;
- private final List<Tuple2<Selectors, TableId>> routes;
+ private final List<RouteRule> routes;
/** The request handler that handle all requests and events. */
private SchemaRegistryRequestHandler requestHandler;
@@ -104,7 +103,7 @@ public class SchemaRegistry implements OperatorCoordinator,
CoordinationRequestH
String operatorName,
OperatorCoordinator.Context context,
MetadataApplier metadataApplier,
- List<Tuple2<Selectors, TableId>> routes) {
+ List<RouteRule> routes) {
this.context = context;
this.operatorName = operatorName;
this.failedReasons = new HashMap<>();
diff --git
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistryProvider.java
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistryProvider.java
index 1f6e7aaf5..0db0cf3a7 100644
---
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistryProvider.java
+++
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistryProvider.java
@@ -17,16 +17,13 @@
package org.apache.flink.cdc.runtime.operators.schema.coordinator;
-import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.cdc.common.annotation.Internal;
-import org.apache.flink.cdc.common.event.TableId;
-import org.apache.flink.cdc.common.schema.Selectors;
+import org.apache.flink.cdc.common.route.RouteRule;
import org.apache.flink.cdc.common.sink.MetadataApplier;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
import java.util.List;
-import java.util.stream.Collectors;
/** Provider of {@link SchemaRegistry}. */
@Internal
@@ -36,13 +33,13 @@ public class SchemaRegistryProvider implements
OperatorCoordinator.Provider {
private final OperatorID operatorID;
private final String operatorName;
private final MetadataApplier metadataApplier;
- private final List<Tuple2<String, TableId>> routingRules;
+ private final List<RouteRule> routingRules;
public SchemaRegistryProvider(
OperatorID operatorID,
String operatorName,
MetadataApplier metadataApplier,
- List<Tuple2<String, TableId>> routingRules) {
+ List<RouteRule> routingRules) {
this.operatorID = operatorID;
this.operatorName = operatorName;
this.metadataApplier = metadataApplier;
@@ -56,19 +53,6 @@ public class SchemaRegistryProvider implements
OperatorCoordinator.Provider {
@Override
public OperatorCoordinator create(OperatorCoordinator.Context context)
throws Exception {
- List<Tuple2<Selectors, TableId>> routes =
- routingRules.stream()
- .map(
- tuple2 -> {
- String tableInclusions = tuple2.f0;
- TableId replaceBy = tuple2.f1;
- Selectors selectors =
- new Selectors.SelectorsBuilder()
-
.includeTables(tableInclusions)
- .build();
- return new Tuple2<>(selectors, replaceBy);
- })
- .collect(Collectors.toList());
- return new SchemaRegistry(operatorName, context, metadataApplier,
routes);
+ return new SchemaRegistry(operatorName, context, metadataApplier,
routingRules);
}
}
diff --git
a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaDerivationTest.java
b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaDerivationTest.java
index 19fd5a4a1..adaf3b140 100644
---
a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaDerivationTest.java
+++
b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaDerivationTest.java
@@ -17,7 +17,6 @@
package org.apache.flink.cdc.runtime.operators.schema.coordinator;
-import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.cdc.common.event.AddColumnEvent;
import org.apache.flink.cdc.common.event.AlterColumnTypeEvent;
import org.apache.flink.cdc.common.event.CreateTableEvent;
@@ -25,10 +24,10 @@ import org.apache.flink.cdc.common.event.DropColumnEvent;
import org.apache.flink.cdc.common.event.RenameColumnEvent;
import org.apache.flink.cdc.common.event.SchemaChangeEvent;
import org.apache.flink.cdc.common.event.TableId;
+import org.apache.flink.cdc.common.route.RouteRule;
import org.apache.flink.cdc.common.schema.Column;
import org.apache.flink.cdc.common.schema.PhysicalColumn;
import org.apache.flink.cdc.common.schema.Schema;
-import org.apache.flink.cdc.common.schema.Selectors;
import org.apache.flink.cdc.common.types.DataType;
import org.apache.flink.cdc.common.types.DataTypes;
@@ -81,13 +80,9 @@ class SchemaDerivationTest {
.column(Column.physicalColumn("gender",
DataTypes.STRING()))
.build();
- private static final List<Tuple2<Selectors, TableId>> ROUTES =
+ private static final List<RouteRule> ROUTES =
Collections.singletonList(
- Tuple2.of(
- new Selectors.SelectorsBuilder()
-
.includeTables("mydb.myschema.mytable[0-9]")
- .build(),
- MERGED_TABLE));
+ new RouteRule("mydb.myschema.mytable[0-9]",
MERGED_TABLE.toString(), null));
@Test
void testOneToOneMapping() {