This is an automated email from the ASF dual-hosted git repository.

kunni pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git


The following commit(s) were added to refs/heads/master by this push:
     new 8edc345b1 [FLINK-35633] Add more YAML structure validations & provide 
better error messages (#4172)
8edc345b1 is described below

commit 8edc345b1a1cc34d542bc9ca69c30ae1082bbf18
Author: yuxiqian <[email protected]>
AuthorDate: Tue Dec 2 20:04:17 2025 +0800

    [FLINK-35633] Add more YAML structure validations & provide better error 
messages (#4172)
---
 .../cli/parser/YamlPipelineDefinitionParser.java   |  92 ++++++++++
 .../definitions/pipeline-definition-full.yaml      |  10 +-
 .../cdc/pipeline/tests/TransformE2eITCase.java     |   2 +-
 .../tests/migration/YamlJobMigrationITCase.java    |   2 +-
 .../flink/cdc/pipeline/tests/specs/CheckStep.java  |  31 ++++
 .../flink/cdc/pipeline/tests/specs/ExecStep.java   |  27 +++
 .../tests/specs/FlinkPipelineSpecsITCase.java      | 199 +++++++++++++++++++++
 .../flink/cdc/pipeline/tests/specs/RuleSpec.java   |  33 ++++
 .../flink/cdc/pipeline/tests/specs/SpecStep.java   |  21 +++
 .../flink/cdc/pipeline/tests/specs/SubmitStep.java |  42 +++++
 .../tests/utils/PipelineTestEnvironment.java       |  17 +-
 .../src/test/resources/rules/incomplete.yaml       |  76 ++++++++
 .../src/test/resources/rules/invalid.yaml          |  57 ++++++
 .../src/test/resources/rules/malformed.yaml        |  74 ++++++++
 .../src/test/resources/rules/unexpected.yaml       |  83 +++++++++
 .../src/test/resources/rules/vanilla.yaml          |  70 ++++++++
 16 files changed, 827 insertions(+), 9 deletions(-)

diff --git 
a/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParser.java
 
b/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParser.java
index 5d00d5394..efb4e79d2 100644
--- 
a/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParser.java
+++ 
b/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParser.java
@@ -42,6 +42,9 @@ import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.yaml.YA
 
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
@@ -55,6 +58,8 @@ import static 
org.apache.flink.cdc.common.utils.Preconditions.checkNotNull;
 /** Parser for converting YAML formatted pipeline definition to {@link 
PipelineDef}. */
 public class YamlPipelineDefinitionParser implements PipelineDefinitionParser {
 
+    private static final String TOP_LEVEL_NAME = "top-level";
+
     // Parent node keys
     private static final String SOURCE_KEY = "source";
     private static final String SINK_KEY = "sink";
@@ -118,6 +123,11 @@ public class YamlPipelineDefinitionParser implements 
PipelineDefinitionParser {
 
     private PipelineDef parse(JsonNode pipelineDefJsonNode, Configuration 
globalPipelineConfig)
             throws Exception {
+        validateJsonNodeKeys(
+                TOP_LEVEL_NAME,
+                pipelineDefJsonNode,
+                Arrays.asList(SOURCE_KEY, SINK_KEY),
+                Arrays.asList(ROUTE_KEY, TRANSFORM_KEY, PIPELINE_KEY));
 
         // UDFs are optional. We parse UDF first and remove it from the 
pipelineDefJsonNode since
         // it's not of plain data types and must be removed before calling 
toPipelineConfig.
@@ -126,10 +136,12 @@ public class YamlPipelineDefinitionParser implements 
PipelineDefinitionParser {
         if (pipelineDefJsonNode.get(PIPELINE_KEY) != null) {
             Optional.ofNullable(
                             ((ObjectNode) 
pipelineDefJsonNode.get(PIPELINE_KEY)).remove(UDF_KEY))
+                    .map(node -> validateArray("UDF", node))
                     .ifPresent(node -> node.forEach(udf -> 
udfDefs.add(toUdfDef(udf))));
 
             Optional.ofNullable(
                             ((ObjectNode) 
pipelineDefJsonNode.get(PIPELINE_KEY)).remove(MODEL_KEY))
+                    .map(node -> validateArray("model", node))
                     .ifPresent(node -> modelDefs.addAll(parseModels(node)));
         }
 
@@ -159,6 +171,7 @@ public class YamlPipelineDefinitionParser implements 
PipelineDefinitionParser {
         // Transforms are optional
         List<TransformDef> transformDefs = new ArrayList<>();
         Optional.ofNullable(pipelineDefJsonNode.get(TRANSFORM_KEY))
+                .map(node -> validateArray("transform", node))
                 .ifPresent(
                         node ->
                                 node.forEach(
@@ -167,6 +180,7 @@ public class YamlPipelineDefinitionParser implements 
PipelineDefinitionParser {
         // Routes are optional
         List<RouteDef> routeDefs = new ArrayList<>();
         Optional.ofNullable(pipelineDefJsonNode.get(ROUTE_KEY))
+                .map(node -> validateArray("route", node))
                 .ifPresent(node -> node.forEach(route -> 
routeDefs.add(toRouteDef(route))));
 
         // Merge user config into global config
@@ -247,6 +261,12 @@ public class YamlPipelineDefinitionParser implements 
PipelineDefinitionParser {
     }
 
     private RouteDef toRouteDef(JsonNode routeNode) {
+        validateJsonNodeKeys(
+                "route",
+                routeNode,
+                Arrays.asList(ROUTE_SOURCE_TABLE_KEY, ROUTE_SINK_TABLE_KEY),
+                Arrays.asList(ROUTE_REPLACE_SYMBOL, ROUTE_DESCRIPTION_KEY));
+
         String sourceTable =
                 checkNotNull(
                                 routeNode.get(ROUTE_SOURCE_TABLE_KEY),
@@ -271,6 +291,12 @@ public class YamlPipelineDefinitionParser implements 
PipelineDefinitionParser {
     }
 
     private UdfDef toUdfDef(JsonNode udfNode) {
+        validateJsonNodeKeys(
+                "UDF",
+                udfNode,
+                Arrays.asList(UDF_FUNCTION_NAME_KEY, UDF_CLASSPATH_KEY),
+                Collections.emptyList());
+
         String functionName =
                 checkNotNull(
                                 udfNode.get(UDF_FUNCTION_NAME_KEY),
@@ -288,6 +314,19 @@ public class YamlPipelineDefinitionParser implements 
PipelineDefinitionParser {
     }
 
     private TransformDef toTransformDef(JsonNode transformNode) {
+        validateJsonNodeKeys(
+                "transform",
+                transformNode,
+                Collections.singletonList(TRANSFORM_SOURCE_TABLE_KEY),
+                Arrays.asList(
+                        TRANSFORM_PROJECTION_KEY,
+                        TRANSFORM_FILTER_KEY,
+                        TRANSFORM_PRIMARY_KEY_KEY,
+                        TRANSFORM_PARTITION_KEY_KEY,
+                        TRANSFORM_TABLE_OPTION_KEY,
+                        TRANSFORM_DESCRIPTION_KEY,
+                        TRANSFORM_CONVERTER_AFTER_TRANSFORM_KEY));
+
         String sourceTable =
                 checkNotNull(
                                 transformNode.get(TRANSFORM_SOURCE_TABLE_KEY),
@@ -377,4 +416,57 @@ public class YamlPipelineDefinitionParser implements 
PipelineDefinitionParser {
         Map<String, String> properties = mapper.convertValue(modelNode, 
Map.class);
         return new ModelDef(name, model, properties);
     }
+
+    private void validateJsonNodeKeys(
+            String contextName,
+            JsonNode jsonNode,
+            List<String> requiredKeys,
+            List<String> optionalKeys)
+            throws IllegalArgumentException {
+        List<String> validKeys = new ArrayList<>(requiredKeys);
+        Set<String> presentedKeys = new HashSet<>();
+        validKeys.addAll(optionalKeys);
+
+        for (Iterator<String> it = jsonNode.fieldNames(); it.hasNext(); ) {
+            String key = it.next();
+            presentedKeys.add(key);
+            if (!validKeys.contains(key)) {
+                if (TOP_LEVEL_NAME.equals(contextName)) {
+                    throw new IllegalArgumentException(
+                            String.format(
+                                    "Unexpected key `%s` in YAML top-level 
block.\n"
+                                            + "Allowed keys in this context 
are: %s\n"
+                                            + "Note: Flink configurations 
should be defined in \"Runtime Configurations\" instead of YAML scripts.",
+                                    key, validKeys));
+                } else {
+                    throw new IllegalArgumentException(
+                            String.format(
+                                    "Unexpected key `%s` in YAML %s block.\n"
+                                            + "Allowed keys in this context 
are: %s\n"
+                                            + "Note: option %s: %s is 
unexpected. It was silently ignored in previous versions, and probably should 
be removed.",
+                                    key, contextName, validKeys, key, 
jsonNode.get(key)));
+                }
+            }
+        }
+
+        for (String key : requiredKeys) {
+            if (!presentedKeys.contains(key)) {
+                throw new IllegalArgumentException(
+                        String.format(
+                                "Missing required field \"%s\" in %s 
configuration",
+                                key, contextName));
+            }
+        }
+    }
+
+    private JsonNode validateArray(String contextName, JsonNode jsonNode) {
+        if (jsonNode.isArray()) {
+            return jsonNode;
+        } else {
+            throw new IllegalArgumentException(
+                    String.format(
+                            "YAML %s block is expecting an array children, but 
got an %s (%s). Perhaps you missed a dash prefix `-`?",
+                            contextName, jsonNode.getNodeType(), jsonNode));
+        }
+    }
 }
diff --git 
a/flink-cdc-cli/src/test/resources/definitions/pipeline-definition-full.yaml 
b/flink-cdc-cli/src/test/resources/definitions/pipeline-definition-full.yaml
index 5ff18c951..dec2c25dc 100644
--- a/flink-cdc-cli/src/test/resources/definitions/pipeline-definition-full.yaml
+++ b/flink-cdc-cli/src/test/resources/definitions/pipeline-definition-full.yaml
@@ -60,8 +60,8 @@ pipeline:
   schema-operator.rpc-timeout: 1 h
   execution.runtime-mode: STREAMING
   model:
-    model-name: GET_EMBEDDING
-    class-name: OpenAIEmbeddingModel
-    openai.model: text-embedding-3-small
-    openai.host: https://xxxx
-    openai.apikey: abcd1234
+    - model-name: GET_EMBEDDING
+      class-name: OpenAIEmbeddingModel
+      openai.model: text-embedding-3-small
+      openai.host: https://xxxx
+      openai.apikey: abcd1234
diff --git 
a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/TransformE2eITCase.java
 
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/TransformE2eITCase.java
index 2f7bc5d81..351bd76fd 100644
--- 
a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/TransformE2eITCase.java
+++ 
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/TransformE2eITCase.java
@@ -183,7 +183,7 @@ class TransformE2eITCase extends PipelineTestEnvironment {
                                 + "\n"
                                 + "sink:\n"
                                 + "  type: values\n"
-                                + "route:\n"
+                                + "\n"
                                 + "transform:\n"
                                 + "  - source-table: %s.\\.*\n"
                                 + "    projection: ID, VERSION, 'Type-A' AS 
CATEGORY\n"
diff --git 
a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/migration/YamlJobMigrationITCase.java
 
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/migration/YamlJobMigrationITCase.java
index 82a0a9ff3..bdf970ea6 100644
--- 
a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/migration/YamlJobMigrationITCase.java
+++ 
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/migration/YamlJobMigrationITCase.java
@@ -217,7 +217,7 @@ class YamlJobMigrationITCase extends 
PipelineTestEnvironment {
                                 + "\n"
                                 + "pipeline:\n"
                                 + "  parallelism: %d\n"
-                                + "use.legacy.json.format: true\n",
+                                + "  use.legacy.json.format: true\n",
                         INTER_CONTAINER_MYSQL_ALIAS,
                         MySqlContainer.MYSQL_PORT,
                         MYSQL_TEST_USER,
diff --git 
a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/specs/CheckStep.java
 
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/specs/CheckStep.java
new file mode 100644
index 000000000..f7e6acb96
--- /dev/null
+++ 
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/specs/CheckStep.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.pipeline.tests.specs;
+
+import java.util.List;
+
+/** A {@link SpecStep} to check expected output results. */
+class CheckStep implements SpecStep {
+    public List<String> jmLogs;
+    public List<String> tmLogs;
+
+    public CheckStep(List<String> jmLogs, List<String> tmLogs) {
+        this.jmLogs = jmLogs;
+        this.tmLogs = tmLogs;
+    }
+}
diff --git 
a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/specs/ExecStep.java
 
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/specs/ExecStep.java
new file mode 100644
index 000000000..45bca3988
--- /dev/null
+++ 
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/specs/ExecStep.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.pipeline.tests.specs;
+
+/** A {@link SpecStep} to execute SQL commands. */
+class ExecStep implements SpecStep {
+    public String sql;
+
+    public ExecStep(String sql) {
+        this.sql = sql;
+    }
+}
diff --git 
a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/specs/FlinkPipelineSpecsITCase.java
 
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/specs/FlinkPipelineSpecsITCase.java
new file mode 100644
index 000000000..b7ac0507c
--- /dev/null
+++ 
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/specs/FlinkPipelineSpecsITCase.java
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.pipeline.tests.specs;
+
+import org.apache.flink.cdc.cli.utils.YamlParserUtils;
+import org.apache.flink.cdc.pipeline.tests.utils.PipelineTestEnvironment;
+
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.UUID;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/** YAML-spec based pipeline test cases. */
+public class FlinkPipelineSpecsITCase extends PipelineTestEnvironment {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(FlinkPipelineSpecsITCase.class);
+    private static final String MYSQL_CONTAINER_ALIAS = "mysql";
+
+    static Stream<Arguments> loadTestSpecs() throws IOException {
+        Path path = 
Paths.get("").toAbsolutePath().resolve("src/test/resources/rules");
+
+        try (Stream<Path> dependencyResources = Files.walk(path)) {
+            List<Path> p = 
dependencyResources.sorted().collect(Collectors.toList());
+            return p.stream()
+                    .filter(f -> f.getFileName().toString().endsWith(".yaml"))
+                    .map(FlinkPipelineSpecsITCase::parseSpec)
+                    .map(spec -> Arguments.of(spec.groupName, spec.specName, 
spec));
+        }
+    }
+
+    private final String databaseName = "spec_db_" + 
UUID.randomUUID().toString().substring(0, 8);
+    private final Function<String, String> dbNameFormatter =
+            s -> s.replace("$database$", databaseName);
+
+    private Connection getConnection(String databaseName) throws Exception {
+        return DriverManager.getConnection(
+                String.format(
+                        "jdbc:mysql://%s:%s/%s?allowMultiQueries=true",
+                        MYSQL.getHost(), MYSQL.getDatabasePort(), 
databaseName),
+                MYSQL_TEST_USER,
+                MYSQL_TEST_PASSWORD);
+    }
+
+    @BeforeEach
+    void initializeDatabase() throws Exception {
+        try (Connection conn = getConnection("");
+                Statement stmt = conn.createStatement()) {
+            stmt.execute("CREATE DATABASE IF NOT EXISTS " + databaseName);
+        }
+    }
+
+    @AfterEach
+    void destroyDatabase() throws Exception {
+        try (Connection conn = getConnection("");
+                Statement stmt = conn.createStatement()) {
+            stmt.execute("DROP DATABASE IF EXISTS " + databaseName);
+        }
+    }
+
+    @ParameterizedTest(name = "{0} :: {1}")
+    @MethodSource("loadTestSpecs")
+    void runTestSpec(String groupName, String specName, RuleSpec spec) throws 
Exception {
+        LOG.info("Running test: {} :: {}", groupName, specName);
+        for (SpecStep step : spec.steps) {
+            if (step instanceof ExecStep) {
+                ExecStep execStep = (ExecStep) step;
+                LOG.info("ExecStep: Executing SQL:\n{}", execStep.sql);
+                try (Connection conn = getConnection(databaseName);
+                        Statement stmt = conn.createStatement()) {
+                    stmt.execute(execStep.sql);
+                }
+            } else if (step instanceof SubmitStep) {
+                SubmitStep submitStep =
+                        ((SubmitStep) step)
+                                .substitute("$hostname$", 
MYSQL_CONTAINER_ALIAS)
+                                .substitute("$database$", databaseName)
+                                .substitute("$username$", MYSQL_TEST_USER)
+                                .substitute("$password$", MYSQL_TEST_PASSWORD);
+
+                if (submitStep.expectError.isEmpty()) {
+                    LOG.info("SubmitStep: Submitting YAML Job:\n{}", 
submitStep.yaml);
+                    submitPipelineJob(submitStep.yaml);
+                } else {
+                    LOG.info(
+                            "SubmitStep: Submitting YAML Job:\n{}\nwith 
expected error: {}",
+                            submitStep.yaml,
+                            submitStep.expectError);
+
+                    Throwable caughtError = null;
+                    try {
+                        submitPipelineJob(submitStep.yaml);
+                    } catch (Throwable t) {
+                        caughtError = t;
+                    }
+
+                    Assertions.assertThat(caughtError).isNotNull();
+                    for (String error : submitStep.expectError) {
+                        
Assertions.assertThat(caughtError).hasStackTraceContaining(error);
+                    }
+                }
+            } else if (step instanceof CheckStep) {
+                CheckStep checkStep = (CheckStep) step;
+                LOG.info("CheckStep: Going to perform checking:");
+                LOG.info(" - Expected JM: {}", checkStep.jmLogs);
+                LOG.info(" - Expected TM: {}", checkStep.tmLogs);
+                if (!checkStep.jmLogs.isEmpty()) {
+                    validateResult(
+                            jobManagerConsumer,
+                            dbNameFormatter,
+                            checkStep.jmLogs.toArray(new String[0]));
+                }
+                if (!checkStep.tmLogs.isEmpty()) {
+                    validateResult(dbNameFormatter, 
checkStep.tmLogs.toArray(new String[0]));
+                }
+            } else {
+                Assertions.fail("Unexpected RuleSpec step: " + step);
+            }
+        }
+    }
+
+    private static RuleSpec parseSpec(Path yamlFile) {
+        Map<String, Object> spec;
+        try {
+            spec = YamlParserUtils.loadYamlFile(yamlFile.toFile());
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+        Assertions.assertThat(spec.get("steps")).isInstanceOf(List.class);
+        String groupName = (String) spec.get("group");
+        String specName = (String) spec.get("spec");
+        List<SpecStep> parsedSteps = new ArrayList<>();
+
+        List<Map<String, ?>> steps = (List<Map<String, ?>>) spec.get("steps");
+        for (Map<String, ?> step : steps) {
+            switch ((String) step.get("type")) {
+                case "exec":
+                    parsedSteps.add(new ExecStep((String) step.get("sql")));
+                    break;
+                case "submit":
+                    parsedSteps.add(
+                            new SubmitStep((String) step.get("yaml"), 
getLines(step.get("error"))));
+                    break;
+                case "check":
+                    List<String> jmLogs = getLines(step.get("jm"));
+                    List<String> tmLogs = getLines(step.get("tm"));
+                    parsedSteps.add(new CheckStep(jmLogs, tmLogs));
+                    break;
+            }
+        }
+        return new RuleSpec(groupName, specName, parsedSteps);
+    }
+
+    private static List<String> getLines(@Nullable Object input) {
+        return Optional.ofNullable(input)
+                .map(String.class::cast)
+                .map(s -> s.split("\n"))
+                .map(Arrays::asList)
+                .orElseGet(Collections::emptyList);
+    }
+}
diff --git 
a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/specs/RuleSpec.java
 
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/specs/RuleSpec.java
new file mode 100644
index 000000000..087749545
--- /dev/null
+++ 
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/specs/RuleSpec.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.pipeline.tests.specs;
+
+import java.util.List;
+
+/** Describing how to test a YAML pipeline job. */
+public class RuleSpec {
+    public String groupName;
+    public String specName;
+    public List<SpecStep> steps;
+
+    public RuleSpec(String groupName, String specName, List<SpecStep> steps) {
+        this.groupName = groupName;
+        this.specName = specName;
+        this.steps = steps;
+    }
+}
diff --git 
a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/specs/SpecStep.java
 
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/specs/SpecStep.java
new file mode 100644
index 000000000..8e53e8849
--- /dev/null
+++ 
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/specs/SpecStep.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.pipeline.tests.specs;
+
+/** A single step of {@link RuleSpec}. */
+public interface SpecStep {}
diff --git 
a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/specs/SubmitStep.java
 
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/specs/SubmitStep.java
new file mode 100644
index 000000000..435a98f16
--- /dev/null
+++ 
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/specs/SubmitStep.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.pipeline.tests.specs;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+/** A {@link SpecStep} to submit YAML jobs. */
+class SubmitStep implements SpecStep {
+    public String yaml;
+    public List<String> expectError;
+
+    public SubmitStep(String yaml, List<String> expectError) {
+        this.yaml = yaml;
+        this.expectError = expectError;
+    }
+
+    public SubmitStep substitute(String key, String value) {
+        this.yaml = this.yaml.replace(key, value);
+        this.expectError =
+                this.expectError.stream()
+                        .map(l -> l.replaceAll(key, value))
+                        .collect(Collectors.toList());
+
+        return this;
+    }
+}
diff --git 
a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/utils/PipelineTestEnvironment.java
 
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/utils/PipelineTestEnvironment.java
index 914278bcb..e2b4f6fb3 100644
--- 
a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/utils/PipelineTestEnvironment.java
+++ 
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/utils/PipelineTestEnvironment.java
@@ -419,8 +419,15 @@ public abstract class PipelineTestEnvironment extends 
TestLogger {
             } else {
                 LOG.error(execResult.getStderr());
                 throw new AssertionError(
-                        "Failed when submitting the pipeline job. Exit code: "
-                                + execResult.getExitCode());
+                        "Failed when submitting the pipeline job.\n"
+                                + "Exit code: "
+                                + execResult.getExitCode()
+                                + "\n"
+                                + "StdOut: "
+                                + execResult.getStdout()
+                                + "\n"
+                                + "StdErr: "
+                                + execResult.getStderr());
             }
         } catch (Exception e) {
             throw new RuntimeException(
@@ -469,6 +476,12 @@ public abstract class PipelineTestEnvironment extends 
TestLogger {
         }
     }
 
+    protected void validateResult(
+            ToStringConsumer consumer, Function<String, String> mapper, 
String... expectedEvents)
+            throws Exception {
+        validateResult(consumer, 
Stream.of(expectedEvents).map(mapper).toArray(String[]::new));
+    }
+
     protected void waitUntilSpecificEvent(String event) throws Exception {
         waitUntilSpecificEvent(taskManagerConsumer, event);
     }
diff --git 
a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/resources/rules/incomplete.yaml
 
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/resources/rules/incomplete.yaml
new file mode 100644
index 000000000..054ccd9fc
--- /dev/null
+++ 
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/resources/rules/incomplete.yaml
@@ -0,0 +1,76 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+group: Common
+spec: Incomplete Structures
+steps:
+  # Missing source
+  - type: submit
+    yaml: |
+      source:
+        type: mysql
+        hostname: $hostname$
+        tables: $database$.\.*
+        username: $username$
+        password: $password$
+    error: Missing required field "sink" in top-level configuration
+  # Missing source
+  - type: submit
+    yaml: |
+      sink:
+        type: values
+    error: Missing required field "source" in top-level configuration
+  # Missing required options
+  - type: submit
+    yaml: |
+      source:
+        type: mysql
+      sink:
+        type: values
+    error: |
+      Missing required options are:
+      hostname
+      password
+      tables
+      username
+  # Missing transform components
+  - type: submit
+    yaml: |
+      source:
+        type: values
+      sink:
+        type: values
+      transform:
+        - projection: \*
+    error: Missing required field "source-table" in transform configuration
+  # Missing routing components
+  - type: submit
+    yaml: |
+      source:
+        type: values
+      sink:
+        type: values
+      route:
+        - sink-table: foo.bar.baz
+    error: Missing required field "source-table" in route configuration
+  - type: submit
+    yaml: |
+      source:
+        type: values
+      sink:
+        type: values
+      route:
+        - source-table: foo.bar.baz
+    error: Missing required field "sink-table" in route configuration
diff --git 
a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/resources/rules/invalid.yaml
 
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/resources/rules/invalid.yaml
new file mode 100644
index 000000000..e52d1e0f4
--- /dev/null
+++ 
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/resources/rules/invalid.yaml
@@ -0,0 +1,57 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+group: Common
+spec: Invalid Cases
+steps:
+  - type: submit
+    yaml: |
+      source:
+        type: values
+      sink:
+        type: values
+      execution:
+        execution.coordinator-close-timeout: 300000
+        checkpointing:
+          interval: 600000
+          timeout: 30000
+          max-concurrent-checkpoints: 1
+          min-pause-between-checkpoints: 300000
+    error: |
+      Unexpected key `execution` in YAML top-level block.
+      Allowed keys in this context are: [source, sink, route, transform, 
pipeline]
+      Note: Flink configurations should be defined in "Runtime Configurations" 
instead of YAML scripts.
+  - type: submit
+    yaml: |
+      source:
+        type: values
+      sink:
+        type: values
+      route:
+        - source-table: user_course.\.*
+          sink-table: ods.ods_user_course_<>
+          replace-symbol: <>
+          description: sync table to one destination table with given prefix 
ods_
+          transformers:
+            - type: cast
+              fields:
+                - name: "*"
+                  type: BIT
+                  target-type: TINYINT
+    error: |
+      Unexpected key `transformers` in YAML route block.
+      Allowed keys in this context are: [source-table, sink-table, 
replace-symbol, description]
+      Note: option transformers: 
[{"type":"cast","fields":[{"name":"*","type":"BIT","target-type":"TINYINT"}]}] 
is unexpected.
+      It was silently ignored in previous versions, and probably should be 
removed.
\ No newline at end of file
diff --git 
a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/resources/rules/malformed.yaml
 
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/resources/rules/malformed.yaml
new file mode 100644
index 000000000..ffed7ad9b
--- /dev/null
+++ 
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/resources/rules/malformed.yaml
@@ -0,0 +1,74 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+group: Common
+spec: Malformed Structures
+steps:
+  # Transform rules not an array
+  - type: submit
+    yaml: |
+      source:
+        type: values
+      sink:
+        type: values
+      transform:
+        # ERROR: should be `- source-table: foo.bar`
+        source-table: foo.bar
+    error: |
+      YAML transform block is expecting an array children, but got an OBJECT 
({"source-table":"foo.bar"}).
+      Perhaps you missed a dash prefix `-`?
+  # Route rules not an array
+  - type: submit
+    yaml: |
+      source:
+        type: values
+      sink:
+        type: values
+      route:
+        source-table: foo.bar
+    error: |
+      YAML route block is expecting an array children, but got an OBJECT 
({"source-table":"foo.bar"}).
+      Perhaps you missed a dash prefix `-`?
+  # UDF functions not an array
+  - type: submit
+    yaml: |
+      source:
+        type: values
+      sink:
+        type: values
+      pipeline:
+        user-defined-function:
+          name: addone
+          classpath: org.apache.flink.cdc.udf.examples.%s.AddOneFunctionClass
+    error: |
+      YAML UDF block is expecting an array children, but got an OBJECT 
({"name":"addone","classpath":"org.apache.flink.cdc.udf.examples.%s.AddOneFunctionClass"}).
+      Perhaps you missed a dash prefix `-`?
+  # Models not an array
+  - type: submit
+    yaml: |
+      source:
+        type: values
+      sink:
+        type: values
+      pipeline:
+        model:
+          model-name: GET_EMBEDDING
+          class-name: OpenAIEmbeddingModel
+          openai.model: text-embedding-3-small
+          openai.host: https://xxxx
+          openai.apikey: abcd1234
+    error: |
+      YAML model block is expecting an array children, but got an OBJECT 
({"model-name":"GET_EMBEDDING","class-name":"OpenAIEmbeddingModel","openai.model":"text-embedding-3-small","openai.host":"https://xxxx","openai.apikey":"abcd1234"}).
+      Perhaps you missed a dash prefix `-`?
diff --git 
a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/resources/rules/unexpected.yaml
 
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/resources/rules/unexpected.yaml
new file mode 100644
index 000000000..293f76cd4
--- /dev/null
+++ 
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/resources/rules/unexpected.yaml
@@ -0,0 +1,83 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+group: Common
+spec: Unexpected Structures
+steps:
+  # Unexpected top-level block
+  - type: submit
+    yaml: |
+      source:
+        type: values
+      sink:
+        type: values
+      mapping:
+        - source: userUuid
+          sink: uuid
+        - source: userId
+          sink: user_id
+        - source: deviceUuid
+          sink: device_uuid
+    error: |
+      Unexpected key `mapping` in YAML top-level block.
+      Allowed keys in this context are: [source, sink, route, transform, 
pipeline]
+      Note: Flink configurations should be defined in "Runtime Configurations" 
instead of YAML scripts.
+  # Unexpected transform block keys
+  - type: submit
+    yaml: |
+      source:
+        type: values
+      sink:
+        type: values
+      transform:
+        - source-table: foo.bar
+          mapping:
+            - source: userUuid
+              sink: uuid
+    error: |
+      Unexpected key `mapping` in YAML transform block.
+      Allowed keys in this context are: [source-table, projection, filter, 
primary-keys, partition-keys, table-options, description, 
converter-after-transform]
+      Note: option mapping: [{"source":"userUuid","sink":"uuid"}] is 
unexpected. It was silently ignored in previous versions, and probably should 
be removed.
+  # Unexpected route block keys
+  - type: submit
+    yaml: |
+      source:
+        type: values
+      sink:
+        type: values
+      route:
+        - source-table: foo.bar
+          sink-table: fun.baz
+          regex-style: java
+    error: |
+      Unexpected key `regex-style` in YAML route block.
+      Allowed keys in this context are: [source-table, sink-table, 
replace-symbol, description]
+      Note: option regex-style: "java" is unexpected. It was silently ignored 
in previous versions, and probably should be removed.
+  # Unexpected UDF block keys
+  - type: submit
+    yaml: |
+      source:
+        type: values
+      sink:
+        type: values
+      pipeline:
+        user-defined-function:
+          - name: addone
+            classpath: org.apache.flink.cdc.udf.examples.%s.AddOneFunctionClass
+            language: clojure
+    error: |
+      Unexpected key `language` in YAML UDF block.
+      Allowed keys in this context are: [name, classpath]
+      Note: option language: "clojure" is unexpected. It was silently ignored 
in previous versions, and probably should be removed.
diff --git 
a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/resources/rules/vanilla.yaml
 
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/resources/rules/vanilla.yaml
new file mode 100644
index 000000000..3f10fd03e
--- /dev/null
+++ 
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/resources/rules/vanilla.yaml
@@ -0,0 +1,70 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+group: Common
+spec: Vanilla Test Specs
+steps:
+  - type: exec
+    sql: |
+      CREATE TABLE foo (idx BIGINT NOT NULL PRIMARY KEY, name VARCHAR(17));
+      INSERT INTO foo VALUES
+        (1, 'Alice'),
+        (2, 'Brett'),
+        (3, 'Cicada'),
+        (4, 'Derrida'),
+        (5, 'Eve');
+  - type: submit
+    yaml: |
+      source:
+        type: mysql
+        hostname: $hostname$
+        tables: $database$.\.*
+        username: $username$
+        password: $password$
+      sink:
+        type: values
+      pipeline:
+        parallelism: 1
+  - type: check
+    jm: |
+      Successfully applied schema change event 
CreateTableEvent{tableId=$database$.foo, schema=columns={`idx` BIGINT NOT 
NULL,`name` VARCHAR(17)}, primaryKeys=idx, options=()} to external system.
+    tm: |
+      DataChangeEvent{tableId=$database$.foo, before=[], after=[1, Alice], 
op=INSERT, meta=()}
+      DataChangeEvent{tableId=$database$.foo, before=[], after=[2, Brett], 
op=INSERT, meta=()}
+      DataChangeEvent{tableId=$database$.foo, before=[], after=[3, Cicada], 
op=INSERT, meta=()}
+      DataChangeEvent{tableId=$database$.foo, before=[], after=[4, Derrida], 
op=INSERT, meta=()}
+      DataChangeEvent{tableId=$database$.foo, before=[], after=[5, Eve], 
op=INSERT, meta=()}
+  - type: exec
+    sql: |
+      ALTER TABLE foo ADD COLUMN (digits DOUBLE);
+      INSERT INTO foo VALUES
+        (6, 'Faye', 3.14),
+        (7, 'Gem', 2.71828),
+        (8, 'Helen', 1.414);
+  - type: check
+    jm: |
+      Successfully applied schema change event 
AddColumnEvent{tableId=$database$.foo, 
addedColumns=[ColumnWithPosition{column=`digits` DOUBLE, position=LAST, 
existedColumnName=null}]} to external system.
+    tm: |
+      DataChangeEvent{tableId=$database$.foo, before=[], after=[6, Faye, 
3.14], op=INSERT, meta=()}
+      DataChangeEvent{tableId=$database$.foo, before=[], after=[7, Gem, 
2.71828], op=INSERT, meta=()}
+      DataChangeEvent{tableId=$database$.foo, before=[], after=[8, Helen, 
1.414], op=INSERT, meta=()}
+  - type: exec
+    sql: |
+      UPDATE foo SET digits = 42 WHERE idx < 4;
+  - type: check
+    tm: |
+      DataChangeEvent{tableId=$database$.foo, before=[1, Alice, null], 
after=[1, Alice, 42.0], op=UPDATE, meta=()}
+      DataChangeEvent{tableId=$database$.foo, before=[2, Brett, null], 
after=[2, Brett, 42.0], op=UPDATE, meta=()}
+      DataChangeEvent{tableId=$database$.foo, before=[3, Cicada, null], 
after=[3, Cicada, 42.0], op=UPDATE, meta=()}


Reply via email to