This is an automated email from the ASF dual-hosted git repository.
kunni pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git
The following commit(s) were added to refs/heads/master by this push:
new 8edc345b1 [FLINK-35633] Add more YAML structure validations & provide
better error messages (#4172)
8edc345b1 is described below
commit 8edc345b1a1cc34d542bc9ca69c30ae1082bbf18
Author: yuxiqian <[email protected]>
AuthorDate: Tue Dec 2 20:04:17 2025 +0800
[FLINK-35633] Add more YAML structure validations & provide better error
messages (#4172)
---
.../cli/parser/YamlPipelineDefinitionParser.java | 92 ++++++++++
.../definitions/pipeline-definition-full.yaml | 10 +-
.../cdc/pipeline/tests/TransformE2eITCase.java | 2 +-
.../tests/migration/YamlJobMigrationITCase.java | 2 +-
.../flink/cdc/pipeline/tests/specs/CheckStep.java | 31 ++++
.../flink/cdc/pipeline/tests/specs/ExecStep.java | 27 +++
.../tests/specs/FlinkPipelineSpecsITCase.java | 199 +++++++++++++++++++++
.../flink/cdc/pipeline/tests/specs/RuleSpec.java | 33 ++++
.../flink/cdc/pipeline/tests/specs/SpecStep.java | 21 +++
.../flink/cdc/pipeline/tests/specs/SubmitStep.java | 42 +++++
.../tests/utils/PipelineTestEnvironment.java | 17 +-
.../src/test/resources/rules/incomplete.yaml | 76 ++++++++
.../src/test/resources/rules/invalid.yaml | 57 ++++++
.../src/test/resources/rules/malformed.yaml | 74 ++++++++
.../src/test/resources/rules/unexpected.yaml | 83 +++++++++
.../src/test/resources/rules/vanilla.yaml | 70 ++++++++
16 files changed, 827 insertions(+), 9 deletions(-)
diff --git
a/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParser.java
b/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParser.java
index 5d00d5394..efb4e79d2 100644
---
a/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParser.java
+++
b/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParser.java
@@ -42,6 +42,9 @@ import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.yaml.YA
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@@ -55,6 +58,8 @@ import static
org.apache.flink.cdc.common.utils.Preconditions.checkNotNull;
/** Parser for converting YAML formatted pipeline definition to {@link
PipelineDef}. */
public class YamlPipelineDefinitionParser implements PipelineDefinitionParser {
+ private static final String TOP_LEVEL_NAME = "top-level";
+
// Parent node keys
private static final String SOURCE_KEY = "source";
private static final String SINK_KEY = "sink";
@@ -118,6 +123,11 @@ public class YamlPipelineDefinitionParser implements
PipelineDefinitionParser {
private PipelineDef parse(JsonNode pipelineDefJsonNode, Configuration
globalPipelineConfig)
throws Exception {
+ validateJsonNodeKeys(
+ TOP_LEVEL_NAME,
+ pipelineDefJsonNode,
+ Arrays.asList(SOURCE_KEY, SINK_KEY),
+ Arrays.asList(ROUTE_KEY, TRANSFORM_KEY, PIPELINE_KEY));
// UDFs are optional. We parse UDF first and remove it from the
pipelineDefJsonNode since
// it's not of plain data types and must be removed before calling
toPipelineConfig.
@@ -126,10 +136,12 @@ public class YamlPipelineDefinitionParser implements
PipelineDefinitionParser {
if (pipelineDefJsonNode.get(PIPELINE_KEY) != null) {
Optional.ofNullable(
((ObjectNode)
pipelineDefJsonNode.get(PIPELINE_KEY)).remove(UDF_KEY))
+ .map(node -> validateArray("UDF", node))
.ifPresent(node -> node.forEach(udf ->
udfDefs.add(toUdfDef(udf))));
Optional.ofNullable(
((ObjectNode)
pipelineDefJsonNode.get(PIPELINE_KEY)).remove(MODEL_KEY))
+ .map(node -> validateArray("model", node))
.ifPresent(node -> modelDefs.addAll(parseModels(node)));
}
@@ -159,6 +171,7 @@ public class YamlPipelineDefinitionParser implements
PipelineDefinitionParser {
// Transforms are optional
List<TransformDef> transformDefs = new ArrayList<>();
Optional.ofNullable(pipelineDefJsonNode.get(TRANSFORM_KEY))
+ .map(node -> validateArray("transform", node))
.ifPresent(
node ->
node.forEach(
@@ -167,6 +180,7 @@ public class YamlPipelineDefinitionParser implements
PipelineDefinitionParser {
// Routes are optional
List<RouteDef> routeDefs = new ArrayList<>();
Optional.ofNullable(pipelineDefJsonNode.get(ROUTE_KEY))
+ .map(node -> validateArray("route", node))
.ifPresent(node -> node.forEach(route ->
routeDefs.add(toRouteDef(route))));
// Merge user config into global config
@@ -247,6 +261,12 @@ public class YamlPipelineDefinitionParser implements
PipelineDefinitionParser {
}
private RouteDef toRouteDef(JsonNode routeNode) {
+ validateJsonNodeKeys(
+ "route",
+ routeNode,
+ Arrays.asList(ROUTE_SOURCE_TABLE_KEY, ROUTE_SINK_TABLE_KEY),
+ Arrays.asList(ROUTE_REPLACE_SYMBOL, ROUTE_DESCRIPTION_KEY));
+
String sourceTable =
checkNotNull(
routeNode.get(ROUTE_SOURCE_TABLE_KEY),
@@ -271,6 +291,12 @@ public class YamlPipelineDefinitionParser implements
PipelineDefinitionParser {
}
private UdfDef toUdfDef(JsonNode udfNode) {
+ validateJsonNodeKeys(
+ "UDF",
+ udfNode,
+ Arrays.asList(UDF_FUNCTION_NAME_KEY, UDF_CLASSPATH_KEY),
+ Collections.emptyList());
+
String functionName =
checkNotNull(
udfNode.get(UDF_FUNCTION_NAME_KEY),
@@ -288,6 +314,19 @@ public class YamlPipelineDefinitionParser implements
PipelineDefinitionParser {
}
private TransformDef toTransformDef(JsonNode transformNode) {
+ validateJsonNodeKeys(
+ "transform",
+ transformNode,
+ Collections.singletonList(TRANSFORM_SOURCE_TABLE_KEY),
+ Arrays.asList(
+ TRANSFORM_PROJECTION_KEY,
+ TRANSFORM_FILTER_KEY,
+ TRANSFORM_PRIMARY_KEY_KEY,
+ TRANSFORM_PARTITION_KEY_KEY,
+ TRANSFORM_TABLE_OPTION_KEY,
+ TRANSFORM_DESCRIPTION_KEY,
+ TRANSFORM_CONVERTER_AFTER_TRANSFORM_KEY));
+
String sourceTable =
checkNotNull(
transformNode.get(TRANSFORM_SOURCE_TABLE_KEY),
@@ -377,4 +416,57 @@ public class YamlPipelineDefinitionParser implements
PipelineDefinitionParser {
Map<String, String> properties = mapper.convertValue(modelNode,
Map.class);
return new ModelDef(name, model, properties);
}
+
+ private void validateJsonNodeKeys(
+ String contextName,
+ JsonNode jsonNode,
+ List<String> requiredKeys,
+ List<String> optionalKeys)
+ throws IllegalArgumentException {
+ List<String> validKeys = new ArrayList<>(requiredKeys);
+ Set<String> presentedKeys = new HashSet<>();
+ validKeys.addAll(optionalKeys);
+
+ for (Iterator<String> it = jsonNode.fieldNames(); it.hasNext(); ) {
+ String key = it.next();
+ presentedKeys.add(key);
+ if (!validKeys.contains(key)) {
+ if (TOP_LEVEL_NAME.equals(contextName)) {
+ throw new IllegalArgumentException(
+ String.format(
+ "Unexpected key `%s` in YAML top-level
block.\n"
+ + "Allowed keys in this context
are: %s\n"
+ + "Note: Flink configurations
should be defined in \"Runtime Configurations\" instead of YAML scripts.",
+ key, validKeys));
+ } else {
+ throw new IllegalArgumentException(
+ String.format(
+ "Unexpected key `%s` in YAML %s block.\n"
+ + "Allowed keys in this context
are: %s\n"
+ + "Note: option %s: %s is
unexpected. It was silently ignored in previous versions, and probably should
be removed.",
+ key, contextName, validKeys, key,
jsonNode.get(key)));
+ }
+ }
+ }
+
+ for (String key : requiredKeys) {
+ if (!presentedKeys.contains(key)) {
+ throw new IllegalArgumentException(
+ String.format(
+ "Missing required field \"%s\" in %s
configuration",
+ key, contextName));
+ }
+ }
+ }
+
+ private JsonNode validateArray(String contextName, JsonNode jsonNode) {
+ if (jsonNode.isArray()) {
+ return jsonNode;
+ } else {
+ throw new IllegalArgumentException(
+ String.format(
+ "YAML %s block is expecting an array children, but
got an %s (%s). Perhaps you missed a dash prefix `-`?",
+ contextName, jsonNode.getNodeType(), jsonNode));
+ }
+ }
}
diff --git
a/flink-cdc-cli/src/test/resources/definitions/pipeline-definition-full.yaml
b/flink-cdc-cli/src/test/resources/definitions/pipeline-definition-full.yaml
index 5ff18c951..dec2c25dc 100644
--- a/flink-cdc-cli/src/test/resources/definitions/pipeline-definition-full.yaml
+++ b/flink-cdc-cli/src/test/resources/definitions/pipeline-definition-full.yaml
@@ -60,8 +60,8 @@ pipeline:
schema-operator.rpc-timeout: 1 h
execution.runtime-mode: STREAMING
model:
- model-name: GET_EMBEDDING
- class-name: OpenAIEmbeddingModel
- openai.model: text-embedding-3-small
- openai.host: https://xxxx
- openai.apikey: abcd1234
+ - model-name: GET_EMBEDDING
+ class-name: OpenAIEmbeddingModel
+ openai.model: text-embedding-3-small
+ openai.host: https://xxxx
+ openai.apikey: abcd1234
diff --git
a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/TransformE2eITCase.java
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/TransformE2eITCase.java
index 2f7bc5d81..351bd76fd 100644
---
a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/TransformE2eITCase.java
+++
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/TransformE2eITCase.java
@@ -183,7 +183,7 @@ class TransformE2eITCase extends PipelineTestEnvironment {
+ "\n"
+ "sink:\n"
+ " type: values\n"
- + "route:\n"
+ + "\n"
+ "transform:\n"
+ " - source-table: %s.\\.*\n"
+ " projection: ID, VERSION, 'Type-A' AS
CATEGORY\n"
diff --git
a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/migration/YamlJobMigrationITCase.java
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/migration/YamlJobMigrationITCase.java
index 82a0a9ff3..bdf970ea6 100644
---
a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/migration/YamlJobMigrationITCase.java
+++
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/migration/YamlJobMigrationITCase.java
@@ -217,7 +217,7 @@ class YamlJobMigrationITCase extends
PipelineTestEnvironment {
+ "\n"
+ "pipeline:\n"
+ " parallelism: %d\n"
- + "use.legacy.json.format: true\n",
+ + " use.legacy.json.format: true\n",
INTER_CONTAINER_MYSQL_ALIAS,
MySqlContainer.MYSQL_PORT,
MYSQL_TEST_USER,
diff --git
a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/specs/CheckStep.java
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/specs/CheckStep.java
new file mode 100644
index 000000000..f7e6acb96
--- /dev/null
+++
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/specs/CheckStep.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.pipeline.tests.specs;
+
+import java.util.List;
+
+/** A {@link SpecStep} to check expected output results. */
+class CheckStep implements SpecStep {
+ public List<String> jmLogs;
+ public List<String> tmLogs;
+
+ public CheckStep(List<String> jmLogs, List<String> tmLogs) {
+ this.jmLogs = jmLogs;
+ this.tmLogs = tmLogs;
+ }
+}
diff --git
a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/specs/ExecStep.java
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/specs/ExecStep.java
new file mode 100644
index 000000000..45bca3988
--- /dev/null
+++
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/specs/ExecStep.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.pipeline.tests.specs;
+
+/** A {@link SpecStep} to execute SQL commands. */
+class ExecStep implements SpecStep {
+ public String sql;
+
+ public ExecStep(String sql) {
+ this.sql = sql;
+ }
+}
diff --git
a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/specs/FlinkPipelineSpecsITCase.java
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/specs/FlinkPipelineSpecsITCase.java
new file mode 100644
index 000000000..b7ac0507c
--- /dev/null
+++
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/specs/FlinkPipelineSpecsITCase.java
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.pipeline.tests.specs;
+
+import org.apache.flink.cdc.cli.utils.YamlParserUtils;
+import org.apache.flink.cdc.pipeline.tests.utils.PipelineTestEnvironment;
+
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.UUID;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/** YAML-spec based pipeline test cases. */
+public class FlinkPipelineSpecsITCase extends PipelineTestEnvironment {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(FlinkPipelineSpecsITCase.class);
+ private static final String MYSQL_CONTAINER_ALIAS = "mysql";
+
+ static Stream<Arguments> loadTestSpecs() throws IOException {
+ Path path =
Paths.get("").toAbsolutePath().resolve("src/test/resources/rules");
+
+ try (Stream<Path> dependencyResources = Files.walk(path)) {
+ List<Path> p =
dependencyResources.sorted().collect(Collectors.toList());
+ return p.stream()
+ .filter(f -> f.getFileName().toString().endsWith(".yaml"))
+ .map(FlinkPipelineSpecsITCase::parseSpec)
+ .map(spec -> Arguments.of(spec.groupName, spec.specName,
spec));
+ }
+ }
+
+ private final String databaseName = "spec_db_" +
UUID.randomUUID().toString().substring(0, 8);
+ private final Function<String, String> dbNameFormatter =
+ s -> s.replace("$database$", databaseName);
+
+ private Connection getConnection(String databaseName) throws Exception {
+ return DriverManager.getConnection(
+ String.format(
+ "jdbc:mysql://%s:%s/%s?allowMultiQueries=true",
+ MYSQL.getHost(), MYSQL.getDatabasePort(),
databaseName),
+ MYSQL_TEST_USER,
+ MYSQL_TEST_PASSWORD);
+ }
+
+ @BeforeEach
+ void initializeDatabase() throws Exception {
+ try (Connection conn = getConnection("");
+ Statement stmt = conn.createStatement()) {
+ stmt.execute("CREATE DATABASE IF NOT EXISTS " + databaseName);
+ }
+ }
+
+ @AfterEach
+ void destroyDatabase() throws Exception {
+ try (Connection conn = getConnection("");
+ Statement stmt = conn.createStatement()) {
+ stmt.execute("DROP DATABASE IF EXISTS " + databaseName);
+ }
+ }
+
+ @ParameterizedTest(name = "{0} :: {1}")
+ @MethodSource("loadTestSpecs")
+ void runTestSpec(String groupName, String specName, RuleSpec spec) throws
Exception {
+ LOG.info("Running test: {} :: {}", groupName, specName);
+ for (SpecStep step : spec.steps) {
+ if (step instanceof ExecStep) {
+ ExecStep execStep = (ExecStep) step;
+ LOG.info("ExecStep: Executing SQL:\n{}", execStep.sql);
+ try (Connection conn = getConnection(databaseName);
+ Statement stmt = conn.createStatement()) {
+ stmt.execute(execStep.sql);
+ }
+ } else if (step instanceof SubmitStep) {
+ SubmitStep submitStep =
+ ((SubmitStep) step)
+ .substitute("$hostname$",
MYSQL_CONTAINER_ALIAS)
+ .substitute("$database$", databaseName)
+ .substitute("$username$", MYSQL_TEST_USER)
+ .substitute("$password$", MYSQL_TEST_PASSWORD);
+
+ if (submitStep.expectError.isEmpty()) {
+ LOG.info("SubmitStep: Submitting YAML Job:\n{}",
submitStep.yaml);
+ submitPipelineJob(submitStep.yaml);
+ } else {
+ LOG.info(
+ "SubmitStep: Submitting YAML Job:\n{}\nwith
expected error: {}",
+ submitStep.yaml,
+ submitStep.expectError);
+
+ Throwable caughtError = null;
+ try {
+ submitPipelineJob(submitStep.yaml);
+ } catch (Throwable t) {
+ caughtError = t;
+ }
+
+ Assertions.assertThat(caughtError).isNotNull();
+ for (String error : submitStep.expectError) {
+
Assertions.assertThat(caughtError).hasStackTraceContaining(error);
+ }
+ }
+ } else if (step instanceof CheckStep) {
+ CheckStep checkStep = (CheckStep) step;
+ LOG.info("CheckStep: Going to perform checking:");
+ LOG.info(" - Expected JM: {}", checkStep.jmLogs);
+ LOG.info(" - Expected TM: {}", checkStep.tmLogs);
+ if (!checkStep.jmLogs.isEmpty()) {
+ validateResult(
+ jobManagerConsumer,
+ dbNameFormatter,
+ checkStep.jmLogs.toArray(new String[0]));
+ }
+ if (!checkStep.tmLogs.isEmpty()) {
+ validateResult(dbNameFormatter,
checkStep.tmLogs.toArray(new String[0]));
+ }
+ } else {
+ Assertions.fail("Unexpected RuleSpec step: " + step);
+ }
+ }
+ }
+
+ private static RuleSpec parseSpec(Path yamlFile) {
+ Map<String, Object> spec;
+ try {
+ spec = YamlParserUtils.loadYamlFile(yamlFile.toFile());
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ Assertions.assertThat(spec.get("steps")).isInstanceOf(List.class);
+ String groupName = (String) spec.get("group");
+ String specName = (String) spec.get("spec");
+ List<SpecStep> parsedSteps = new ArrayList<>();
+
+ List<Map<String, ?>> steps = (List<Map<String, ?>>) spec.get("steps");
+ for (Map<String, ?> step : steps) {
+ switch ((String) step.get("type")) {
+ case "exec":
+ parsedSteps.add(new ExecStep((String) step.get("sql")));
+ break;
+ case "submit":
+ parsedSteps.add(
+ new SubmitStep((String) step.get("yaml"),
getLines(step.get("error"))));
+ break;
+ case "check":
+ List<String> jmLogs = getLines(step.get("jm"));
+ List<String> tmLogs = getLines(step.get("tm"));
+ parsedSteps.add(new CheckStep(jmLogs, tmLogs));
+ break;
+ }
+ }
+ return new RuleSpec(groupName, specName, parsedSteps);
+ }
+
+ private static List<String> getLines(@Nullable Object input) {
+ return Optional.ofNullable(input)
+ .map(String.class::cast)
+ .map(s -> s.split("\n"))
+ .map(Arrays::asList)
+ .orElseGet(Collections::emptyList);
+ }
+}
diff --git
a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/specs/RuleSpec.java
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/specs/RuleSpec.java
new file mode 100644
index 000000000..087749545
--- /dev/null
+++
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/specs/RuleSpec.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.pipeline.tests.specs;
+
+import java.util.List;
+
+/** Describing how to test a YAML pipeline job. */
+public class RuleSpec {
+ public String groupName;
+ public String specName;
+ public List<SpecStep> steps;
+
+ public RuleSpec(String groupName, String specName, List<SpecStep> steps) {
+ this.groupName = groupName;
+ this.specName = specName;
+ this.steps = steps;
+ }
+}
diff --git
a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/specs/SpecStep.java
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/specs/SpecStep.java
new file mode 100644
index 000000000..8e53e8849
--- /dev/null
+++
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/specs/SpecStep.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.pipeline.tests.specs;
+
+/** A single step of {@link RuleSpec}. */
+public interface SpecStep {}
diff --git
a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/specs/SubmitStep.java
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/specs/SubmitStep.java
new file mode 100644
index 000000000..435a98f16
--- /dev/null
+++
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/specs/SubmitStep.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.pipeline.tests.specs;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+/** A {@link SpecStep} to submit YAML jobs. */
+class SubmitStep implements SpecStep {
+ public String yaml;
+ public List<String> expectError;
+
+ public SubmitStep(String yaml, List<String> expectError) {
+ this.yaml = yaml;
+ this.expectError = expectError;
+ }
+
+ public SubmitStep substitute(String key, String value) {
+ this.yaml = this.yaml.replace(key, value);
+ this.expectError =
+ this.expectError.stream()
+ .map(l -> l.replaceAll(key, value))
+ .collect(Collectors.toList());
+
+ return this;
+ }
+}
diff --git
a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/utils/PipelineTestEnvironment.java
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/utils/PipelineTestEnvironment.java
index 914278bcb..e2b4f6fb3 100644
---
a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/utils/PipelineTestEnvironment.java
+++
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/utils/PipelineTestEnvironment.java
@@ -419,8 +419,15 @@ public abstract class PipelineTestEnvironment extends
TestLogger {
} else {
LOG.error(execResult.getStderr());
throw new AssertionError(
- "Failed when submitting the pipeline job. Exit code: "
- + execResult.getExitCode());
+ "Failed when submitting the pipeline job.\n"
+ + "Exit code: "
+ + execResult.getExitCode()
+ + "\n"
+ + "StdOut: "
+ + execResult.getStdout()
+ + "\n"
+ + "StdErr: "
+ + execResult.getStderr());
}
} catch (Exception e) {
throw new RuntimeException(
@@ -469,6 +476,12 @@ public abstract class PipelineTestEnvironment extends
TestLogger {
}
}
+ protected void validateResult(
+ ToStringConsumer consumer, Function<String, String> mapper,
String... expectedEvents)
+ throws Exception {
+ validateResult(consumer,
Stream.of(expectedEvents).map(mapper).toArray(String[]::new));
+ }
+
protected void waitUntilSpecificEvent(String event) throws Exception {
waitUntilSpecificEvent(taskManagerConsumer, event);
}
diff --git
a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/resources/rules/incomplete.yaml
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/resources/rules/incomplete.yaml
new file mode 100644
index 000000000..054ccd9fc
--- /dev/null
+++
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/resources/rules/incomplete.yaml
@@ -0,0 +1,76 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+group: Common
+spec: Incomplete Structures
+steps:
+ # Missing source
+ - type: submit
+ yaml: |
+ source:
+ type: mysql
+ hostname: $hostname$
+ tables: $database$.\.*
+ username: $username$
+ password: $password$
+ error: Missing required field "sink" in top-level configuration
+ # Missing source
+ - type: submit
+ yaml: |
+ sink:
+ type: values
+ error: Missing required field "source" in top-level configuration
+ # Missing required options
+ - type: submit
+ yaml: |
+ source:
+ type: mysql
+ sink:
+ type: values
+ error: |
+ Missing required options are:
+ hostname
+ password
+ tables
+ username
+ # Missing transform components
+ - type: submit
+ yaml: |
+ source:
+ type: values
+ sink:
+ type: values
+ transform:
+ - projection: \*
+ error: Missing required field "source-table" in transform configuration
+ # Missing routing components
+ - type: submit
+ yaml: |
+ source:
+ type: values
+ sink:
+ type: values
+ route:
+ - sink-table: foo.bar.baz
+ error: Missing required field "source-table" in route configuration
+ - type: submit
+ yaml: |
+ source:
+ type: values
+ sink:
+ type: values
+ route:
+ - source-table: foo.bar.baz
+ error: Missing required field "sink-table" in route configuration
diff --git
a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/resources/rules/invalid.yaml
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/resources/rules/invalid.yaml
new file mode 100644
index 000000000..e52d1e0f4
--- /dev/null
+++
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/resources/rules/invalid.yaml
@@ -0,0 +1,57 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+group: Common
+spec: Invalid Cases
+steps:
+ - type: submit
+ yaml: |
+ source:
+ type: values
+ sink:
+ type: values
+ execution:
+ execution.coordinator-close-timeout: 300000
+ checkpointing:
+ interval: 600000
+ timeout: 30000
+ max-concurrent-checkpoints: 1
+ min-pause-between-checkpoints: 300000
+ error: |
+ Unexpected key `execution` in YAML top-level block.
+ Allowed keys in this context are: [source, sink, route, transform,
pipeline]
+ Note: Flink configurations should be defined in "Runtime Configurations"
instead of YAML scripts.
+ - type: submit
+ yaml: |
+ source:
+ type: values
+ sink:
+ type: values
+ route:
+ - source-table: user_course.\.*
+ sink-table: ods.ods_user_course_<>
+ replace-symbol: <>
+ description: sync table to one destination table with given prefix
ods_
+ transformers:
+ - type: cast
+ fields:
+ - name: "*"
+ type: BIT
+ target-type: TINYINT
+ error: |
+ Unexpected key `transformers` in YAML route block.
+ Allowed keys in this context are: [source-table, sink-table,
replace-symbol, description]
+ Note: option transformers:
[{"type":"cast","fields":[{"name":"*","type":"BIT","target-type":"TINYINT"}]}]
is unexpected.
+ It was silently ignored in previous versions, and probably should be
removed.
\ No newline at end of file
diff --git
a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/resources/rules/malformed.yaml
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/resources/rules/malformed.yaml
new file mode 100644
index 000000000..ffed7ad9b
--- /dev/null
+++
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/resources/rules/malformed.yaml
@@ -0,0 +1,74 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+group: Common
+spec: Malformed Structures
+steps:
+ # Transform rules not an array
+ - type: submit
+ yaml: |
+ source:
+ type: values
+ sink:
+ type: values
+ transform:
+ # ERROR: should be `- source-table: foo.bar`
+ source-table: foo.bar
+ error: |
+ YAML transform block is expecting an array children, but got an OBJECT
({"source-table":"foo.bar"}).
+ Perhaps you missed a dash prefix `-`?
+ # Route rules not an array
+ - type: submit
+ yaml: |
+ source:
+ type: values
+ sink:
+ type: values
+ route:
+ source-table: foo.bar
+ error: |
+ YAML route block is expecting an array children, but got an OBJECT
({"source-table":"foo.bar"}).
+ Perhaps you missed a dash prefix `-`?
+ # UDF functions not an array
+ - type: submit
+ yaml: |
+ source:
+ type: values
+ sink:
+ type: values
+ pipeline:
+ user-defined-function:
+ name: addone
+ classpath: org.apache.flink.cdc.udf.examples.%s.AddOneFunctionClass
+ error: |
+ YAML UDF block is expecting an array children, but got an OBJECT
({"name":"addone","classpath":"org.apache.flink.cdc.udf.examples.%s.AddOneFunctionClass"}).
+ Perhaps you missed a dash prefix `-`?
+ # Models not an array
+ - type: submit
+ yaml: |
+ source:
+ type: values
+ sink:
+ type: values
+ pipeline:
+ model:
+ model-name: GET_EMBEDDING
+ class-name: OpenAIEmbeddingModel
+ openai.model: text-embedding-3-small
+ openai.host: https://xxxx
+ openai.apikey: abcd1234
+ error: |
+ YAML model block is expecting an array children, but got an OBJECT
({"model-name":"GET_EMBEDDING","class-name":"OpenAIEmbeddingModel","openai.model":"text-embedding-3-small","openai.host":"https://xxxx","openai.apikey":"abcd1234"}).
+ Perhaps you missed a dash prefix `-`?
diff --git
a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/resources/rules/unexpected.yaml
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/resources/rules/unexpected.yaml
new file mode 100644
index 000000000..293f76cd4
--- /dev/null
+++
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/resources/rules/unexpected.yaml
@@ -0,0 +1,83 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+group: Common
+spec: Unexpected Structures
+steps:
+ # Unexpected top-level block
+ - type: submit
+ yaml: |
+ source:
+ type: values
+ sink:
+ type: values
+ mapping:
+ - source: userUuid
+ sink: uuid
+ - source: userId
+ sink: user_id
+ - source: deviceUuid
+ sink: device_uuid
+ error: |
+ Unexpected key `mapping` in YAML top-level block.
+ Allowed keys in this context are: [source, sink, route, transform,
pipeline]
+ Note: Flink configurations should be defined in "Runtime Configurations"
instead of YAML scripts.
+ # Unexpected transform block keys
+ - type: submit
+ yaml: |
+ source:
+ type: values
+ sink:
+ type: values
+ transform:
+ - source-table: foo.bar
+ mapping:
+ - source: userUuid
+ sink: uuid
+ error: |
+ Unexpected key `mapping` in YAML transform block.
+ Allowed keys in this context are: [source-table, projection, filter,
primary-keys, partition-keys, table-options, description,
converter-after-transform]
+ Note: option mapping: [{"source":"userUuid","sink":"uuid"}] is
unexpected. It was silently ignored in previous versions, and probably should
be removed.
+ # Unexpected route block keys
+ - type: submit
+ yaml: |
+ source:
+ type: values
+ sink:
+ type: values
+ route:
+ - source-table: foo.bar
+ sink-table: fun.baz
+ regex-style: java
+ error: |
+ Unexpected key `regex-style` in YAML route block.
+ Allowed keys in this context are: [source-table, sink-table,
replace-symbol, description]
+ Note: option regex-style: "java" is unexpected. It was silently ignored
in previous versions, and probably should be removed.
+ # Unexpected UDF block keys
+ - type: submit
+ yaml: |
+ source:
+ type: values
+ sink:
+ type: values
+ pipeline:
+ user-defined-function:
+ - name: addone
+ classpath: org.apache.flink.cdc.udf.examples.%s.AddOneFunctionClass
+ language: clojure
+ error: |
+ Unexpected key `language` in YAML UDF block.
+ Allowed keys in this context are: [name, classpath]
+ Note: option language: "clojure" is unexpected. It was silently ignored
in previous versions, and probably should be removed.
diff --git
a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/resources/rules/vanilla.yaml
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/resources/rules/vanilla.yaml
new file mode 100644
index 000000000..3f10fd03e
--- /dev/null
+++
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/resources/rules/vanilla.yaml
@@ -0,0 +1,70 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+group: Common
+spec: Vanilla Test Specs
+steps:
+ - type: exec
+ sql: |
+ CREATE TABLE foo (idx BIGINT NOT NULL PRIMARY KEY, name VARCHAR(17));
+ INSERT INTO foo VALUES
+ (1, 'Alice'),
+ (2, 'Brett'),
+ (3, 'Cicada'),
+ (4, 'Derrida'),
+ (5, 'Eve');
+ - type: submit
+ yaml: |
+ source:
+ type: mysql
+ hostname: $hostname$
+ tables: $database$.\.*
+ username: $username$
+ password: $password$
+ sink:
+ type: values
+ pipeline:
+ parallelism: 1
+ - type: check
+ jm: |
+ Successfully applied schema change event
CreateTableEvent{tableId=$database$.foo, schema=columns={`idx` BIGINT NOT
NULL,`name` VARCHAR(17)}, primaryKeys=idx, options=()} to external system.
+ tm: |
+ DataChangeEvent{tableId=$database$.foo, before=[], after=[1, Alice],
op=INSERT, meta=()}
+ DataChangeEvent{tableId=$database$.foo, before=[], after=[2, Brett],
op=INSERT, meta=()}
+ DataChangeEvent{tableId=$database$.foo, before=[], after=[3, Cicada],
op=INSERT, meta=()}
+ DataChangeEvent{tableId=$database$.foo, before=[], after=[4, Derrida],
op=INSERT, meta=()}
+ DataChangeEvent{tableId=$database$.foo, before=[], after=[5, Eve],
op=INSERT, meta=()}
+ - type: exec
+ sql: |
+ ALTER TABLE foo ADD COLUMN (digits DOUBLE);
+ INSERT INTO foo VALUES
+ (6, 'Faye', 3.14),
+ (7, 'Gem', 2.71828),
+ (8, 'Helen', 1.414);
+ - type: check
+ jm: |
+ Successfully applied schema change event
AddColumnEvent{tableId=$database$.foo,
addedColumns=[ColumnWithPosition{column=`digits` DOUBLE, position=LAST,
existedColumnName=null}]} to external system.
+ tm: |
+ DataChangeEvent{tableId=$database$.foo, before=[], after=[6, Faye,
3.14], op=INSERT, meta=()}
+ DataChangeEvent{tableId=$database$.foo, before=[], after=[7, Gem,
2.71828], op=INSERT, meta=()}
+ DataChangeEvent{tableId=$database$.foo, before=[], after=[8, Helen,
1.414], op=INSERT, meta=()}
+ - type: exec
+ sql: |
+ UPDATE foo SET digits = 42 WHERE idx < 4;
+ - type: check
+ tm: |
+ DataChangeEvent{tableId=$database$.foo, before=[1, Alice, null],
after=[1, Alice, 42.0], op=UPDATE, meta=()}
+ DataChangeEvent{tableId=$database$.foo, before=[2, Brett, null],
after=[2, Brett, 42.0], op=UPDATE, meta=()}
+ DataChangeEvent{tableId=$database$.foo, before=[3, Cicada, null],
after=[3, Cicada, 42.0], op=UPDATE, meta=()}