This is an automated email from the ASF dual-hosted git repository.

leonard pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git


The following commit(s) were added to refs/heads/master by this push:
     new 2dabfc081 [FLINK-35730][cdc-cli] PipelineDefinitionParser supports  
parsing pipeline def in text format
2dabfc081 is described below

commit 2dabfc08150c37741f2d8fca77b37b4be77ea1c0
Author: Wink <[email protected]>
AuthorDate: Thu Aug 8 20:38:26 2024 +0800

    [FLINK-35730][cdc-cli] PipelineDefinitionParser supports  parsing pipeline 
def in text format
    
    This closes #3444.
---
 .../cdc/cli/parser/PipelineDefinitionParser.java   |  6 +++
 .../cli/parser/YamlPipelineDefinitionParser.java   | 21 ++++++---
 .../parser/YamlPipelineDefinitionParserTest.java   | 51 ++++++++++++++++++++++
 3 files changed, 72 insertions(+), 6 deletions(-)

diff --git 
a/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/parser/PipelineDefinitionParser.java
 
b/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/parser/PipelineDefinitionParser.java
index c7eef56c1..bfcde27bd 100644
--- 
a/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/parser/PipelineDefinitionParser.java
+++ 
b/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/parser/PipelineDefinitionParser.java
@@ -30,4 +30,10 @@ public interface PipelineDefinitionParser {
      * the {@link PipelineDef}.
      */
     PipelineDef parse(Path pipelineDefPath, Configuration 
globalPipelineConfig) throws Exception;
+
+    /**
+     * Parse the specified pipeline definition string, merge global 
configurations, then generate
+     * the {@link PipelineDef}.
+     */
+    PipelineDef parse(String pipelineDefText, Configuration 
globalPipelineConfig) throws Exception;
 }
diff --git 
a/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParser.java
 
b/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParser.java
index cfee1f810..907462f9f 100644
--- 
a/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParser.java
+++ 
b/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParser.java
@@ -82,13 +82,22 @@ public class YamlPipelineDefinitionParser implements 
PipelineDefinitionParser {
     @Override
     public PipelineDef parse(Path pipelineDefPath, Configuration 
globalPipelineConfig)
             throws Exception {
-        JsonNode root = mapper.readTree(pipelineDefPath.toFile());
+        return parse(mapper.readTree(pipelineDefPath.toFile()), 
globalPipelineConfig);
+    }
+
+    @Override
+    public PipelineDef parse(String pipelineDefText, Configuration 
globalPipelineConfig)
+            throws Exception {
+        return parse(mapper.readTree(pipelineDefText), globalPipelineConfig);
+    }
 
+    private PipelineDef parse(JsonNode pipelineDefJsonNode, Configuration 
globalPipelineConfig)
+            throws Exception {
         // Source is required
         SourceDef sourceDef =
                 toSourceDef(
                         checkNotNull(
-                                root.get(SOURCE_KEY),
+                                pipelineDefJsonNode.get(SOURCE_KEY),
                                 "Missing required field \"%s\" in pipeline 
definition",
                                 SOURCE_KEY));
 
@@ -96,13 +105,13 @@ public class YamlPipelineDefinitionParser implements 
PipelineDefinitionParser {
         SinkDef sinkDef =
                 toSinkDef(
                         checkNotNull(
-                                root.get(SINK_KEY),
+                                pipelineDefJsonNode.get(SINK_KEY),
                                 "Missing required field \"%s\" in pipeline 
definition",
                                 SINK_KEY));
 
         // Transforms are optional
         List<TransformDef> transformDefs = new ArrayList<>();
-        Optional.ofNullable(root.get(TRANSFORM_KEY))
+        Optional.ofNullable(pipelineDefJsonNode.get(TRANSFORM_KEY))
                 .ifPresent(
                         node ->
                                 node.forEach(
@@ -110,11 +119,11 @@ public class YamlPipelineDefinitionParser implements 
PipelineDefinitionParser {
 
         // Routes are optional
         List<RouteDef> routeDefs = new ArrayList<>();
-        Optional.ofNullable(root.get(ROUTE_KEY))
+        Optional.ofNullable(pipelineDefJsonNode.get(ROUTE_KEY))
                 .ifPresent(node -> node.forEach(route -> 
routeDefs.add(toRouteDef(route))));
 
         // Pipeline configs are optional
-        Configuration userPipelineConfig = 
toPipelineConfig(root.get(PIPELINE_KEY));
+        Configuration userPipelineConfig = 
toPipelineConfig(pipelineDefJsonNode.get(PIPELINE_KEY));
 
         // Merge user config into global config
         Configuration pipelineConfig = new Configuration();
diff --git 
a/flink-cdc-cli/src/test/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParserTest.java
 
b/flink-cdc-cli/src/test/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParserTest.java
index 75dc5bd62..a26dfa8cd 100644
--- 
a/flink-cdc-cli/src/test/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParserTest.java
+++ 
b/flink-cdc-cli/src/test/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParserTest.java
@@ -238,6 +238,57 @@ class YamlPipelineDefinitionParserTest {
                                     .put("schema-operator.rpc-timeout", "1 h")
                                     .build()));
 
+    @Test
+    void testParsingFullDefinitionFromString() throws Exception {
+        String pipelineDefText =
+                "source:\n"
+                        + "  type: mysql\n"
+                        + "  name: source-database\n"
+                        + "  host: localhost\n"
+                        + "  port: 3306\n"
+                        + "  username: admin\n"
+                        + "  password: pass\n"
+                        + "  tables: adb.*, bdb.user_table_[0-9]+, 
[app|web]_order_.*\n"
+                        + "  chunk-column: 
app_order_.*:id,web_order:product_id\n"
+                        + "  capture-new-tables: true\n"
+                        + "\n"
+                        + "sink:\n"
+                        + "  type: kafka\n"
+                        + "  name: sink-queue\n"
+                        + "  bootstrap-servers: localhost:9092\n"
+                        + "  auto-create-table: true\n"
+                        + "\n"
+                        + "route:\n"
+                        + "  - source-table: mydb.default.app_order_.*\n"
+                        + "    sink-table: odsdb.default.app_order\n"
+                        + "    description: sync all sharding tables to one\n"
+                        + "  - source-table: mydb.default.web_order\n"
+                        + "    sink-table: odsdb.default.ods_web_order\n"
+                        + "    description: sync table to with given prefix 
ods_\n"
+                        + "\n"
+                        + "transform:\n"
+                        + "  - source-table: mydb.app_order_.*\n"
+                        + "    projection: id, order_id, 
TO_UPPER(product_name)\n"
+                        + "    filter: id > 10 AND order_id > 100\n"
+                        + "    primary-keys: id\n"
+                        + "    partition-keys: product_name\n"
+                        + "    table-options: comment=app order\n"
+                        + "    description: project fields from source table\n"
+                        + "  - source-table: mydb.web_order_.*\n"
+                        + "    projection: CONCAT(id, order_id) as uniq_id, 
*\n"
+                        + "    filter: uniq_id > 10\n"
+                        + "    description: add new uniq_id for each row\n"
+                        + "\n"
+                        + "pipeline:\n"
+                        + "  name: source-database-sync-pipe\n"
+                        + "  parallelism: 4\n"
+                        + "  schema.change.behavior: evolve\n"
+                        + "  schema-operator.rpc-timeout: 1 h";
+        YamlPipelineDefinitionParser parser = new 
YamlPipelineDefinitionParser();
+        PipelineDef pipelineDef = parser.parse(pipelineDefText, new 
Configuration());
+        assertThat(pipelineDef).isEqualTo(fullDef);
+    }
+
     private final PipelineDef fullDefWithGlobalConf =
             new PipelineDef(
                     new SourceDef(

Reply via email to