This is an automated email from the ASF dual-hosted git repository.
leonard pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git
The following commit(s) were added to refs/heads/master by this push:
new 2dabfc081 [FLINK-35730][cdc-cli] PipelineDefinitionParser supports
parsing pipeline def in text format
2dabfc081 is described below
commit 2dabfc08150c37741f2d8fca77b37b4be77ea1c0
Author: Wink <[email protected]>
AuthorDate: Thu Aug 8 20:38:26 2024 +0800
[FLINK-35730][cdc-cli] PipelineDefinitionParser supports parsing pipeline
def in text format
This closes #3444.
---
.../cdc/cli/parser/PipelineDefinitionParser.java | 6 +++
.../cli/parser/YamlPipelineDefinitionParser.java | 21 ++++++---
.../parser/YamlPipelineDefinitionParserTest.java | 51 ++++++++++++++++++++++
3 files changed, 72 insertions(+), 6 deletions(-)
diff --git
a/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/parser/PipelineDefinitionParser.java
b/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/parser/PipelineDefinitionParser.java
index c7eef56c1..bfcde27bd 100644
---
a/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/parser/PipelineDefinitionParser.java
+++
b/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/parser/PipelineDefinitionParser.java
@@ -30,4 +30,10 @@ public interface PipelineDefinitionParser {
* the {@link PipelineDef}.
*/
PipelineDef parse(Path pipelineDefPath, Configuration
globalPipelineConfig) throws Exception;
+
+ /**
+ * Parse the specified pipeline definition string, merge global
configurations, then generate
+ * the {@link PipelineDef}.
+ */
+ PipelineDef parse(String pipelineDefText, Configuration
globalPipelineConfig) throws Exception;
}
diff --git
a/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParser.java
b/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParser.java
index cfee1f810..907462f9f 100644
---
a/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParser.java
+++
b/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParser.java
@@ -82,13 +82,22 @@ public class YamlPipelineDefinitionParser implements
PipelineDefinitionParser {
@Override
public PipelineDef parse(Path pipelineDefPath, Configuration
globalPipelineConfig)
throws Exception {
- JsonNode root = mapper.readTree(pipelineDefPath.toFile());
+ return parse(mapper.readTree(pipelineDefPath.toFile()),
globalPipelineConfig);
+ }
+
+ @Override
+ public PipelineDef parse(String pipelineDefText, Configuration
globalPipelineConfig)
+ throws Exception {
+ return parse(mapper.readTree(pipelineDefText), globalPipelineConfig);
+ }
+ private PipelineDef parse(JsonNode pipelineDefJsonNode, Configuration
globalPipelineConfig)
+ throws Exception {
// Source is required
SourceDef sourceDef =
toSourceDef(
checkNotNull(
- root.get(SOURCE_KEY),
+ pipelineDefJsonNode.get(SOURCE_KEY),
"Missing required field \"%s\" in pipeline
definition",
SOURCE_KEY));
@@ -96,13 +105,13 @@ public class YamlPipelineDefinitionParser implements
PipelineDefinitionParser {
SinkDef sinkDef =
toSinkDef(
checkNotNull(
- root.get(SINK_KEY),
+ pipelineDefJsonNode.get(SINK_KEY),
"Missing required field \"%s\" in pipeline
definition",
SINK_KEY));
// Transforms are optional
List<TransformDef> transformDefs = new ArrayList<>();
- Optional.ofNullable(root.get(TRANSFORM_KEY))
+ Optional.ofNullable(pipelineDefJsonNode.get(TRANSFORM_KEY))
.ifPresent(
node ->
node.forEach(
@@ -110,11 +119,11 @@ public class YamlPipelineDefinitionParser implements
PipelineDefinitionParser {
// Routes are optional
List<RouteDef> routeDefs = new ArrayList<>();
- Optional.ofNullable(root.get(ROUTE_KEY))
+ Optional.ofNullable(pipelineDefJsonNode.get(ROUTE_KEY))
.ifPresent(node -> node.forEach(route ->
routeDefs.add(toRouteDef(route))));
// Pipeline configs are optional
- Configuration userPipelineConfig =
toPipelineConfig(root.get(PIPELINE_KEY));
+ Configuration userPipelineConfig =
toPipelineConfig(pipelineDefJsonNode.get(PIPELINE_KEY));
// Merge user config into global config
Configuration pipelineConfig = new Configuration();
diff --git
a/flink-cdc-cli/src/test/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParserTest.java
b/flink-cdc-cli/src/test/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParserTest.java
index 75dc5bd62..a26dfa8cd 100644
---
a/flink-cdc-cli/src/test/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParserTest.java
+++
b/flink-cdc-cli/src/test/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParserTest.java
@@ -238,6 +238,57 @@ class YamlPipelineDefinitionParserTest {
.put("schema-operator.rpc-timeout", "1 h")
.build()));
+ @Test
+ void testParsingFullDefinitionFromString() throws Exception {
+ String pipelineDefText =
+ "source:\n"
+ + " type: mysql\n"
+ + " name: source-database\n"
+ + " host: localhost\n"
+ + " port: 3306\n"
+ + " username: admin\n"
+ + " password: pass\n"
+ + " tables: adb.*, bdb.user_table_[0-9]+,
[app|web]_order_.*\n"
+ + " chunk-column:
app_order_.*:id,web_order:product_id\n"
+ + " capture-new-tables: true\n"
+ + "\n"
+ + "sink:\n"
+ + " type: kafka\n"
+ + " name: sink-queue\n"
+ + " bootstrap-servers: localhost:9092\n"
+ + " auto-create-table: true\n"
+ + "\n"
+ + "route:\n"
+ + " - source-table: mydb.default.app_order_.*\n"
+ + " sink-table: odsdb.default.app_order\n"
+ + " description: sync all sharding tables to one\n"
+ + " - source-table: mydb.default.web_order\n"
+ + " sink-table: odsdb.default.ods_web_order\n"
+ + " description: sync table to with given prefix
ods_\n"
+ + "\n"
+ + "transform:\n"
+ + " - source-table: mydb.app_order_.*\n"
+ + " projection: id, order_id,
TO_UPPER(product_name)\n"
+ + " filter: id > 10 AND order_id > 100\n"
+ + " primary-keys: id\n"
+ + " partition-keys: product_name\n"
+ + " table-options: comment=app order\n"
+ + " description: project fields from source table\n"
+ + " - source-table: mydb.web_order_.*\n"
+ + " projection: CONCAT(id, order_id) as uniq_id,
*\n"
+ + " filter: uniq_id > 10\n"
+ + " description: add new uniq_id for each row\n"
+ + "\n"
+ + "pipeline:\n"
+ + " name: source-database-sync-pipe\n"
+ + " parallelism: 4\n"
+ + " schema.change.behavior: evolve\n"
+ + " schema-operator.rpc-timeout: 1 h";
+ YamlPipelineDefinitionParser parser = new
YamlPipelineDefinitionParser();
+ PipelineDef pipelineDef = parser.parse(pipelineDefText, new
Configuration());
+ assertThat(pipelineDef).isEqualTo(fullDef);
+ }
+
private final PipelineDef fullDefWithGlobalConf =
new PipelineDef(
new SourceDef(