This is an automated email from the ASF dual-hosted git repository.

renqs pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git


The following commit(s) were added to refs/heads/master by this push:
     new de4be372f [hotfix][e2e] Fix pipeline e2e test cases failures (#3246)
de4be372f is described below

commit de4be372f8d553d3bb1610305a282a9be53634e3
Author: yux <34335406+yuxiq...@users.noreply.github.com>
AuthorDate: Wed Apr 24 14:03:22 2024 +0800

    [hotfix][e2e] Fix pipeline e2e test cases failures (#3246)
---
 docs/content.zh/docs/connectors/mysql.md           |  2 +-
 docs/content/docs/connectors/mysql.md              |  2 +-
 .../flink/cdc/cli/utils/ConfigurationUtils.java    |  8 ++++-
 .../flink/cdc/cli/utils/FlinkEnvironmentUtils.java |  2 +-
 .../flink/cdc/cli/utils/YamlParserUtils.java       | 38 ++++++++++++++++++++--
 .../cdc/cli/utils/ConfigurationUtilsTest.java      |  3 +-
 .../test/resources/flink-home/conf/flink-conf.yaml |  3 ++
 .../flink/cdc/pipeline/tests/MysqlE2eITCase.java   |  3 +-
 .../tests/utils/PipelineTestEnvironment.java       |  7 +++-
 9 files changed, 58 insertions(+), 10 deletions(-)

diff --git a/docs/content.zh/docs/connectors/mysql.md 
b/docs/content.zh/docs/connectors/mysql.md
index 0fccd82f1..99b573df6 100644
--- a/docs/content.zh/docs/connectors/mysql.md
+++ b/docs/content.zh/docs/connectors/mysql.md
@@ -31,7 +31,7 @@ MySQL CDC Pipeline 连接器允许从 MySQL 数据库读取快照数据和增量
 ## 依赖配置
 
 由于 MySQL Connector 采用的 GPLv2 协议与 Flink CDC 项目不兼容,我们无法在 jar 包中提供 MySQL 连接器。
-您可能需要手动配置以下依赖:
+您可能需要手动配置以下依赖,并在提交 YAML 作业时使用 Flink CDC CLI 的 `--jar` 参数将其传入:
 
 <div class="wy-table-responsive">
 <table class="colwidths-auto docutils">
diff --git a/docs/content/docs/connectors/mysql.md 
b/docs/content/docs/connectors/mysql.md
index d2878faba..a314daf45 100644
--- a/docs/content/docs/connectors/mysql.md
+++ b/docs/content/docs/connectors/mysql.md
@@ -32,7 +32,7 @@ This document describes how to setup the MySQL connector.
 ## Dependencies
 
 Since MySQL Connector's GPLv2 license is incompatible with Flink CDC project, 
we can't provide MySQL connector in prebuilt connector jar packages.
-You may need to configure the following dependencies manually.
+You may need to configure the following dependencies manually, and pass it 
with `--jar` argument of Flink CDC CLI when submitting YAML pipeline jobs.
 
 <div class="wy-table-responsive">
 <table class="colwidths-auto docutils">
diff --git 
a/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/utils/ConfigurationUtils.java
 
b/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/utils/ConfigurationUtils.java
index 38fa608f5..8bc4ba628 100644
--- 
a/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/utils/ConfigurationUtils.java
+++ 
b/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/utils/ConfigurationUtils.java
@@ -30,7 +30,13 @@ public class ConfigurationUtils {
     private static final String KEY_SEPARATOR = ".";
 
     public static Configuration loadConfigFile(Path configPath) throws 
Exception {
-        Map<String, Object> configMap = 
YamlParserUtils.loadYamlFile(configPath.toFile());
+        return loadConfigFile(configPath, false);
+    }
+
+    public static Configuration loadConfigFile(Path configPath, boolean 
allowDuplicateKeys)
+            throws Exception {
+        Map<String, Object> configMap =
+                YamlParserUtils.loadYamlFile(configPath.toFile(), 
allowDuplicateKeys);
         return Configuration.fromMap(flattenConfigMap(configMap, ""));
     }
 
diff --git 
a/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/utils/FlinkEnvironmentUtils.java
 
b/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/utils/FlinkEnvironmentUtils.java
index d250ea835..7c4a7acea 100644
--- 
a/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/utils/FlinkEnvironmentUtils.java
+++ 
b/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/utils/FlinkEnvironmentUtils.java
@@ -46,7 +46,7 @@ public class FlinkEnvironmentUtils {
                     FLINK_CONF_FILENAME,
                     LEGACY_FLINK_CONF_FILENAME);
             return ConfigurationUtils.loadConfigFile(
-                    
flinkHome.resolve(FLINK_CONF_DIR).resolve(LEGACY_FLINK_CONF_FILENAME));
+                    
flinkHome.resolve(FLINK_CONF_DIR).resolve(LEGACY_FLINK_CONF_FILENAME), true);
         }
     }
 
diff --git 
a/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/utils/YamlParserUtils.java
 
b/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/utils/YamlParserUtils.java
index d47332b01..1e5889c4b 100644
--- 
a/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/utils/YamlParserUtils.java
+++ 
b/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/utils/YamlParserUtils.java
@@ -82,7 +82,22 @@ public class YamlParserUtils {
             new Dump(flowDumperSettings, new 
FlinkConfigRepresenter(flowDumperSettings));
 
     private static final Load loader =
-            new Load(LoadSettings.builder().setSchema(new 
CoreSchema()).build());
+            new Load(
+                    LoadSettings.builder()
+                            .setSchema(new CoreSchema())
+                            .setAllowDuplicateKeys(false)
+                            .build());
+
+    private static final Load legacyLoader =
+            new Load(
+                    LoadSettings.builder()
+                            .setSchema(new CoreSchema())
+                            .setAllowDuplicateKeys(true)
+                            .build());
+
+    private static Load getYamlLoader(boolean allowDuplicateKeys) {
+        return allowDuplicateKeys ? legacyLoader : loader;
+    }
 
     /**
      * Loads the contents of the given YAML file into a map.
@@ -94,12 +109,29 @@ public class YamlParserUtils {
      * @throws YamlEngineException if the file cannot be parsed.
      * @throws IOException if an I/O error occurs while reading from the file 
stream.
      */
-    @SuppressWarnings("unchecked")
     public static synchronized @Nonnull Map<String, Object> loadYamlFile(File 
file)
             throws Exception {
+        return loadYamlFile(file, false);
+    }
+
+    /**
+     * Loads the contents of the given YAML file into a map.
+     *
+     * @param file the YAML file to load.
+     * @param allowDuplicateKeys whether to allow duplicated keys.
+     * @return a non-null map representing the YAML content. If the file is 
empty or only contains
+     *     comments, an empty map is returned.
+     * @throws FileNotFoundException if the YAML file is not found.
+     * @throws YamlEngineException if the file cannot be parsed.
+     * @throws IOException if an I/O error occurs while reading from the file 
stream.
+     */
+    @SuppressWarnings("unchecked")
+    public static synchronized @Nonnull Map<String, Object> loadYamlFile(
+            File file, boolean allowDuplicateKeys) throws Exception {
         try (FileInputStream inputStream = new FileInputStream((file))) {
             Map<String, Object> yamlResult =
-                    (Map<String, Object>) 
loader.loadFromInputStream(inputStream);
+                    (Map<String, Object>)
+                            
getYamlLoader(allowDuplicateKeys).loadFromInputStream(inputStream);
             return yamlResult == null ? new HashMap<>() : yamlResult;
         } catch (FileNotFoundException e) {
             LOG.error("Failed to find YAML file", e);
diff --git 
a/flink-cdc-cli/src/test/java/org/apache/flink/cdc/cli/utils/ConfigurationUtilsTest.java
 
b/flink-cdc-cli/src/test/java/org/apache/flink/cdc/cli/utils/ConfigurationUtilsTest.java
index 7e012fce9..b12681190 100644
--- 
a/flink-cdc-cli/src/test/java/org/apache/flink/cdc/cli/utils/ConfigurationUtilsTest.java
+++ 
b/flink-cdc-cli/src/test/java/org/apache/flink/cdc/cli/utils/ConfigurationUtilsTest.java
@@ -75,7 +75,8 @@ class ConfigurationUtilsTest {
     void loadConfigFile(String resourcePath) throws Exception {
         URL resource = Resources.getResource(resourcePath);
         Path path = Paths.get(resource.toURI());
-        Configuration configuration = ConfigurationUtils.loadConfigFile(path);
+        Configuration configuration =
+                ConfigurationUtils.loadConfigFile(path, 
resourcePath.endsWith("flink-conf.yaml"));
         Map<String, String> configMap = configuration.toMap();
         for (Map.Entry<ConfigOption<?>, Object> entry : 
CONFIG_OPTIONS.entrySet()) {
             String key = entry.getKey().key();
diff --git a/flink-cdc-cli/src/test/resources/flink-home/conf/flink-conf.yaml 
b/flink-cdc-cli/src/test/resources/flink-home/conf/flink-conf.yaml
index 27d1f712e..3d25a04bd 100644
--- a/flink-cdc-cli/src/test/resources/flink-home/conf/flink-conf.yaml
+++ b/flink-cdc-cli/src/test/resources/flink-home/conf/flink-conf.yaml
@@ -33,6 +33,9 @@ env.java.opts.all: 
--add-exports=java.base/sun.net.util=ALL-UNNAMED --add-export
 # automatically configure the host name based on the hostname of the node 
where the
 # JobManager runs.
 
+# Legacy flink-conf.yaml allows duplicate keys in yaml.
+# This key is meant to check if the yaml parser is able to handle duplicate 
keys.
+jobmanager.rpc.address: shaded
 jobmanager.rpc.address: localhost
 
 # The RPC port where the JobManager is reachable.
diff --git 
a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MysqlE2eITCase.java
 
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MysqlE2eITCase.java
index 4f0d9e002..330717586 100644
--- 
a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MysqlE2eITCase.java
+++ 
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MysqlE2eITCase.java
@@ -110,7 +110,8 @@ public class MysqlE2eITCase extends PipelineTestEnvironment 
{
                         mysqlInventoryDatabase.getDatabaseName());
         Path mysqlCdcJar = 
TestUtils.getResource("mysql-cdc-pipeline-connector.jar");
         Path valuesCdcJar = 
TestUtils.getResource("values-cdc-pipeline-connector.jar");
-        submitPipelineJob(pipelineJob, mysqlCdcJar, valuesCdcJar);
+        Path mysqlDriverJar = TestUtils.getResource("mysql-driver.jar");
+        submitPipelineJob(pipelineJob, mysqlCdcJar, valuesCdcJar, 
mysqlDriverJar);
         waitUntilJobRunning(Duration.ofSeconds(30));
         LOG.info("Pipeline job is running");
         waitUtilSpecificEvent(
diff --git 
a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/utils/PipelineTestEnvironment.java
 
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/utils/PipelineTestEnvironment.java
index 72dff1818..a448bf554 100644
--- 
a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/utils/PipelineTestEnvironment.java
+++ 
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/utils/PipelineTestEnvironment.java
@@ -174,8 +174,13 @@ public abstract class PipelineTestEnvironment extends 
TestLogger {
         Files.write(script, pipelineJob.getBytes());
         jobManager.copyFileToContainer(
                 MountableFile.forHostPath(script), 
"/tmp/flinkCDC/conf/pipeline.yaml");
+        StringBuilder sb = new StringBuilder();
+        for (Path jar : jars) {
+            sb.append(" --jar /tmp/flinkCDC/lib/").append(jar.getFileName());
+        }
         String commands =
-                "/tmp/flinkCDC/bin/flink-cdc.sh 
/tmp/flinkCDC/conf/pipeline.yaml --flink-home /opt/flink";
+                "/tmp/flinkCDC/bin/flink-cdc.sh 
/tmp/flinkCDC/conf/pipeline.yaml --flink-home /opt/flink"
+                        + sb;
         ExecResult execResult = jobManager.execInContainer("bash", "-c", 
commands);
         LOG.info(execResult.getStdout());
         LOG.error(execResult.getStderr());

Reply via email to