This is an automated email from the ASF dual-hosted git repository.

leonard pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git


The following commit(s) were added to refs/heads/master by this push:
     new 5e04bc54e [FLINK-35360][cli] Support Yarn application mode for yaml job
5e04bc54e is described below

commit 5e04bc54ef14db9729d9c1012b680827d786f968
Author: MOBIN <18814118...@163.com>
AuthorDate: Thu Mar 27 18:18:09 2025 +0800

    [FLINK-35360][cli] Support Yarn application mode for yaml job
    
    This closes #3643.
    
    Co-authored-by: ouyangwulink <mr.art...@gmail.com>
---
 docs/content.zh/docs/deployment/yarn.md            |  28 +-
 docs/content/docs/deployment/yarn.md               |  27 +-
 flink-cdc-cli/pom.xml                              |   7 +
 .../java/org/apache/flink/cdc/cli/CliExecutor.java | 111 +++----
 .../java/org/apache/flink/cdc/cli/CliFrontend.java |  44 ++-
 .../cdc/cli/parser/PipelineDefinitionParser.java   |   3 +-
 .../cli/parser/YamlPipelineDefinitionParser.java   |   8 +-
 .../flink/cdc/cli/utils/ConfigurationUtils.java    |  19 +-
 .../flink/cdc/cli/utils/FlinkEnvironmentUtils.java |  35 +--
 .../org/apache/flink/cdc/cli/CliFrontendTest.java  |  20 +-
 .../parser/YamlPipelineDefinitionParserTest.java   |  26 +-
 .../cdc/cli/utils/ConfigurationUtilsTest.java      |   5 +-
 flink-cdc-composer/pom.xml                         |   6 +
 .../cdc/composer/PipelineDeploymentExecutor.java   |   7 +-
 .../cdc/composer/flink/FlinkPipelineComposer.java  |  18 +-
 .../flink/deployment/ComposeDeployment.java        |  62 ++++
 .../flink/deployment/ComposeDeploymentFactory.java |  35 ---
 .../K8SApplicationDeploymentExecutor.java          |  17 +-
 .../YarnApplicationDeploymentExecutor.java         | 155 ++++++++++
 .../flink/deployment/ComposeDeploymentTest.java    |  40 +++
 .../flink/cdc/common/test/utils/TestUtils.java     |  32 +-
 .../flink-cdc-pipeline-e2e-tests/pom.xml           | 175 +++++++++++
 .../tests/MysqlE2eWithYarnApplicationITCase.java   | 191 ++++++++++++
 .../tests/utils/PipelineTestOnYarnEnvironment.java | 322 +++++++++++++++++++++
 24 files changed, 1185 insertions(+), 208 deletions(-)

diff --git a/docs/content.zh/docs/deployment/yarn.md 
b/docs/content.zh/docs/deployment/yarn.md
index 4a1889791..89d587d5b 100644
--- a/docs/content.zh/docs/deployment/yarn.md
+++ b/docs/content.zh/docs/deployment/yarn.md
@@ -42,23 +42,20 @@ Flink 可以根据在 JobManager 上运行的作业处理所需的 slot 数量
 ```bash
 export HADOOP_CLASSPATH=`hadoop classpath`
 ```
-
-## Session 模式
-
 Flink 在所有类 UNIX 的环境中运行,即在 Linux、Mac OS X 以及(针对 Windows 的)Cygwin 上运行。
 你可以参考[概览]({{< ref "docs/connectors/pipeline-connectors/overview" 
>}})来检查支持的版本并下载[Flink二进制版本](https://flink.apache.org/downloads/),
 然后解压文件:
-
 ```bash
 tar -xzf flink-*.tgz
 ```
-
 你需要设置 `FLINK_HOME` 环境变量,比如:
 
 ```bash
 export FLINK_HOME=/path/flink-*
 ```
 
+## Session 模式
+
 ### 在 YARN 启动一个Flink Session
 
 确保已设置 `HADOOP_CLASSPATH` 环境变量,即可在 YARN 会话启动一个 Flink 任务:
@@ -150,4 +147,23 @@ Job Description: Sync MySQL Database to Doris
 
 你可以通过 Flink Web UI 找到一个名为 `Sync MySQL Database to Doris` 的作业。
 
-请注意,目前还不支持提交至 application 模式集群和 per-job 模式集群。
+# Yarn Application 模式
+Yarn Application 模式是 Yarn 集群上运行 Flink 作业的推荐模式。对资源的管理和分配更加灵活,可以更好地利用集群资源。
+
+通过Cli将作业提交至 Flink Yarn Application 集群。
+```bash
+cd /path/flink-cdc-*
+./bin/flink-cdc.sh -t yarn-application -Dexecution.checkpointing.interval=2min 
mysql-to-doris.yaml
+```
+或者从savepoint恢复Flink-CDC作业:
+```bash
+cd /path/flink-cdc-*
+./bin/flink-cdc.sh -t yarn-application -s hdfs:///flink/savepoint-1537 
-Dexecution.checkpointing.interval=2min mysql-to-doris.yaml
+```
+提交成功将返回如下信息:
+```bash
+Pipeline has been submitted to cluster.
+Job ID: application_1728995081590_1254
+Job Description: submit job successful
+```
+你可以通过 Yarn Web UI 找到一个application_id为 `application_1728995081590_1254` 的作业。
diff --git a/docs/content/docs/deployment/yarn.md 
b/docs/content/docs/deployment/yarn.md
index 068f9f218..4969ccf22 100644
--- a/docs/content/docs/deployment/yarn.md
+++ b/docs/content/docs/deployment/yarn.md
@@ -42,9 +42,6 @@ This *Getting Started* section assumes a functional YARN 
environment, starting f
 ```bash
 export HADOOP_CLASSPATH=`hadoop classpath`
 ```
-
-## Session Mode
-
 Flink runs on all UNIX-like environments, i.e. Linux, Mac OS X, and Cygwin 
(for Windows).  
 You can refer [overview]({{< ref 
"docs/connectors/pipeline-connectors/overview" >}}) to check supported versions 
and download [the binary release](https://flink.apache.org/downloads/) of Flink,
 then extract the archive:
@@ -59,6 +56,8 @@ You should set `FLINK_HOME` environment variables like:
 export FLINK_HOME=/path/flink-*
 ```
 
+## Session Mode
+
 ### Starting a Flink Session on YARN
 
 Once you've made sure that the `HADOOP_CLASSPATH` environment variable is set, 
you can launch a Flink on YARN session:
@@ -150,4 +149,24 @@ Job Description: Sync MySQL Database to Doris
 
 You can find a job named `Sync MySQL Database to Doris` running through Flink 
Web UI.
 
-Please note that submitting to application mode cluster and per-job mode 
cluster are not supported for now.
+# Yarn Application Mode
+The Yarn Application mode is the recommended approach for running Flink jobs 
on a Yarn cluster. It offers more flexible resource management and allocation, 
enabling better utilization of cluster resources.
+
+To submit a job to a Flink Yarn Application cluster using the CLI:
+```bash
+cd /path/flink-cdc-*
+./bin/flink-cdc.sh -t yarn-application -Dexecution.checkpointing.interval=2min 
mysql-to-doris.yaml
+````
+Or resuming Flink-CDC job from Savepoint:
+```bash
+cd /path/flink-cdc-*
+./bin/flink-cdc.sh -t yarn-application -s hdfs:///flink/savepoint-1537 
-Dexecution.checkpointing.interval=2min mysql-to-doris.yaml
+```
+After successful submission, the return information is as follows:
+```bash
+Pipeline has been submitted to cluster.
+Job ID: application_1728995081590_1254
+Job Description: submit job successful
+```
+You can find a application_id `application_1728995081590_1254` running through 
Yarn Web UI.
+
diff --git a/flink-cdc-cli/pom.xml b/flink-cdc-cli/pom.xml
index c4ec22bbc..922c0f384 100644
--- a/flink-cdc-cli/pom.xml
+++ b/flink-cdc-cli/pom.xml
@@ -63,6 +63,13 @@ limitations under the License.
             <scope>provided</scope>
         </dependency>
 
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-yarn</artifactId>
+            <version>${flink.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
     </dependencies>
 
 </project>
\ No newline at end of file
diff --git 
a/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/CliExecutor.java 
b/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/CliExecutor.java
index 50107da11..ce7d973e2 100644
--- a/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/CliExecutor.java
+++ b/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/CliExecutor.java
@@ -19,85 +19,103 @@ package org.apache.flink.cdc.cli;
 
 import org.apache.flink.cdc.cli.parser.PipelineDefinitionParser;
 import org.apache.flink.cdc.cli.parser.YamlPipelineDefinitionParser;
-import org.apache.flink.cdc.cli.utils.ConfigurationUtils;
-import org.apache.flink.cdc.cli.utils.FlinkEnvironmentUtils;
 import org.apache.flink.cdc.common.annotation.VisibleForTesting;
 import org.apache.flink.cdc.common.configuration.Configuration;
 import org.apache.flink.cdc.composer.PipelineComposer;
 import org.apache.flink.cdc.composer.PipelineDeploymentExecutor;
 import org.apache.flink.cdc.composer.PipelineExecution;
 import org.apache.flink.cdc.composer.definition.PipelineDef;
-import org.apache.flink.cdc.composer.flink.deployment.ComposeDeploymentFactory;
-import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
+import org.apache.flink.cdc.composer.flink.FlinkPipelineComposer;
+import org.apache.flink.cdc.composer.flink.deployment.ComposeDeployment;
+import 
org.apache.flink.cdc.composer.flink.deployment.K8SApplicationDeploymentExecutor;
+import 
org.apache.flink.cdc.composer.flink.deployment.YarnApplicationDeploymentExecutor;
+import org.apache.flink.configuration.DeploymentOptions;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 
 import org.apache.commons.cli.CommandLine;
 
-import java.nio.file.Path;
 import java.util.List;
 
+import static 
org.apache.flink.cdc.composer.flink.deployment.ComposeDeployment.REMOTE;
+
 /** Executor for doing the composing and submitting logic for {@link 
CliFrontend}. */
 public class CliExecutor {
 
     private final Path pipelineDefPath;
-    private final Configuration flinkConfig;
+    private final org.apache.flink.configuration.Configuration flinkConfig;
     private final Configuration globalPipelineConfig;
-    private final boolean useMiniCluster;
     private final List<Path> additionalJars;
-
+    private final Path flinkHome;
     private final CommandLine commandLine;
-
     private PipelineComposer composer = null;
 
-    private final SavepointRestoreSettings savepointSettings;
-
     public CliExecutor(
             CommandLine commandLine,
             Path pipelineDefPath,
-            Configuration flinkConfig,
+            org.apache.flink.configuration.Configuration flinkConfig,
             Configuration globalPipelineConfig,
-            boolean useMiniCluster,
             List<Path> additionalJars,
-            SavepointRestoreSettings savepointSettings) {
+            Path flinkHome) {
         this.commandLine = commandLine;
         this.pipelineDefPath = pipelineDefPath;
         this.flinkConfig = flinkConfig;
         this.globalPipelineConfig = globalPipelineConfig;
-        this.useMiniCluster = useMiniCluster;
         this.additionalJars = additionalJars;
-        this.savepointSettings = savepointSettings;
+        this.flinkHome = flinkHome;
     }
 
     public PipelineExecution.ExecutionInfo run() throws Exception {
         // Create Submit Executor to deployment flink cdc job Or Run Flink CDC 
Job
-        boolean isDeploymentMode = 
ConfigurationUtils.isDeploymentMode(commandLine);
-        if (isDeploymentMode) {
-            ComposeDeploymentFactory composeDeploymentFactory = new 
ComposeDeploymentFactory();
-            PipelineDeploymentExecutor composeExecutor =
-                    
composeDeploymentFactory.getFlinkComposeExecutor(commandLine);
-            org.apache.flink.configuration.Configuration configuration =
-                    
org.apache.flink.configuration.Configuration.fromMap(flinkConfig.toMap());
-            SavepointRestoreSettings.toConfiguration(savepointSettings, 
configuration);
-            return composeExecutor.deploy(commandLine, configuration, 
additionalJars);
-        } else {
-            // Run CDC Job And Parse pipeline definition file
-            PipelineDefinitionParser pipelineDefinitionParser = new 
YamlPipelineDefinitionParser();
-            PipelineDef pipelineDef =
-                    pipelineDefinitionParser.parse(pipelineDefPath, 
globalPipelineConfig);
-            // Create composer
-            PipelineComposer composer = getComposer();
-            // Compose pipeline
-            PipelineExecution execution = composer.compose(pipelineDef);
-            // Execute or submit the pipeline
-            return execution.execute();
+        String deploymentTargetStr = getDeploymentTarget();
+        ComposeDeployment deploymentTarget =
+                ComposeDeployment.getDeploymentFromName(deploymentTargetStr);
+        switch (deploymentTarget) {
+            case KUBERNETES_APPLICATION:
+                return deployWithApplicationComposer(new 
K8SApplicationDeploymentExecutor());
+            case YARN_APPLICATION:
+                return deployWithApplicationComposer(new 
YarnApplicationDeploymentExecutor());
+            case LOCAL:
+                return 
deployWithComposer(FlinkPipelineComposer.ofMiniCluster());
+            case REMOTE:
+            case YARN_SESSION:
+                return deployWithComposer(
+                        FlinkPipelineComposer.ofRemoteCluster(flinkConfig, 
additionalJars));
+            default:
+                throw new IllegalArgumentException(
+                        String.format(
+                                "Deployment target %s is not supported", 
deploymentTargetStr));
         }
     }
 
-    private PipelineComposer getComposer() throws Exception {
-        if (composer == null) {
-            return FlinkEnvironmentUtils.createComposer(
-                    useMiniCluster, flinkConfig, additionalJars, 
savepointSettings);
-        }
-        return composer;
+    private PipelineExecution.ExecutionInfo deployWithApplicationComposer(
+            PipelineDeploymentExecutor composeExecutor) throws Exception {
+        return composeExecutor.deploy(commandLine, flinkConfig, 
additionalJars, flinkHome);
+    }
+
+    private PipelineExecution.ExecutionInfo 
deployWithComposer(PipelineComposer composer)
+            throws Exception {
+        PipelineDefinitionParser pipelineDefinitionParser = new 
YamlPipelineDefinitionParser();
+        PipelineDef pipelineDef =
+                pipelineDefinitionParser.parse(pipelineDefPath, 
globalPipelineConfig);
+        PipelineExecution execution = composer.compose(pipelineDef);
+        return execution.execute();
+    }
+
+    @VisibleForTesting
+    public PipelineExecution.ExecutionInfo deployWithNoOpComposer() throws 
Exception {
+        return deployWithComposer(this.composer);
+    }
+
+    // The main class for running application mode
+    public static void main(String[] args) throws Exception {
+        PipelineDefinitionParser pipelineDefinitionParser = new 
YamlPipelineDefinitionParser();
+        PipelineDef pipelineDef = pipelineDefinitionParser.parse(args[0], new 
Configuration());
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        FlinkPipelineComposer flinkPipelineComposer =
+                FlinkPipelineComposer.ofApplicationCluster(env);
+        PipelineExecution execution = 
flinkPipelineComposer.compose(pipelineDef);
+        execution.execute();
     }
 
     @VisibleForTesting
@@ -106,7 +124,7 @@ public class CliExecutor {
     }
 
     @VisibleForTesting
-    public Configuration getFlinkConfig() {
+    public org.apache.flink.configuration.Configuration getFlinkConfig() {
         return flinkConfig;
     }
 
@@ -120,12 +138,7 @@ public class CliExecutor {
         return additionalJars;
     }
 
-    @VisibleForTesting
     public String getDeploymentTarget() {
-        return commandLine.getOptionValue("target");
-    }
-
-    public SavepointRestoreSettings getSavepointSettings() {
-        return savepointSettings;
+        return flinkConfig.get(DeploymentOptions.TARGET);
     }
 }
diff --git 
a/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/CliFrontend.java 
b/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/CliFrontend.java
index 40b4061f6..5f327eca2 100644
--- a/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/CliFrontend.java
+++ b/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/CliFrontend.java
@@ -25,9 +25,13 @@ import 
org.apache.flink.cdc.common.configuration.ConfigOptions;
 import org.apache.flink.cdc.common.configuration.Configuration;
 import org.apache.flink.cdc.common.utils.StringUtils;
 import org.apache.flink.cdc.composer.PipelineExecution;
+import org.apache.flink.configuration.DeploymentOptions;
+import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.jobgraph.SavepointConfigOptions;
 import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
 
+import org.apache.flink.shaded.guava31.com.google.common.base.Joiner;
+
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.CommandLineParser;
 import org.apache.commons.cli.DefaultParser;
@@ -36,9 +40,8 @@ import org.apache.commons.cli.Options;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.File;
 import java.lang.reflect.InvocationTargetException;
-import java.nio.file.Path;
-import java.nio.file.Paths;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Optional;
@@ -49,6 +52,10 @@ import static 
org.apache.flink.cdc.cli.CliFrontendOptions.FLINK_CONFIG;
 import static 
org.apache.flink.cdc.cli.CliFrontendOptions.SAVEPOINT_ALLOW_NON_RESTORED_OPTION;
 import static org.apache.flink.cdc.cli.CliFrontendOptions.SAVEPOINT_CLAIM_MODE;
 import static 
org.apache.flink.cdc.cli.CliFrontendOptions.SAVEPOINT_PATH_OPTION;
+import static org.apache.flink.cdc.cli.CliFrontendOptions.TARGET;
+import static org.apache.flink.cdc.cli.CliFrontendOptions.USE_MINI_CLUSTER;
+import static 
org.apache.flink.cdc.composer.flink.deployment.ComposeDeployment.LOCAL;
+import static 
org.apache.flink.cdc.composer.flink.deployment.ComposeDeployment.REMOTE;
 
 /** The frontend entrypoint for the command-line interface of Flink CDC. */
 public class CliFrontend {
@@ -86,7 +93,7 @@ public class CliFrontend {
                     "Missing pipeline definition file path in arguments. ");
         }
 
-        Path pipelineDefPath = Paths.get(unparsedArgs.get(0));
+        Path pipelineDefPath = new Path(unparsedArgs.get(0));
         // Take the first unparsed argument as the pipeline definition file
         LOG.info("Real Path pipelineDefPath {}", pipelineDefPath);
         // Global pipeline configuration
@@ -94,13 +101,17 @@ public class CliFrontend {
 
         // Load Flink environment
         Path flinkHome = getFlinkHome(commandLine);
-        Configuration flinkConfig = 
FlinkEnvironmentUtils.loadFlinkConfiguration(flinkHome);
+        Configuration configuration = 
FlinkEnvironmentUtils.loadFlinkConfiguration(flinkHome);
 
         // To override the Flink configuration
-        overrideFlinkConfiguration(flinkConfig, commandLine);
+        overrideFlinkConfiguration(configuration, commandLine);
+
+        org.apache.flink.configuration.Configuration flinkConfig =
+                
org.apache.flink.configuration.Configuration.fromMap(configuration.toMap());
 
         // Savepoint
         SavepointRestoreSettings savepointSettings = 
createSavepointRestoreSettings(commandLine);
+        SavepointRestoreSettings.toConfiguration(savepointSettings, 
flinkConfig);
 
         // Additional JARs
         List<Path> additionalJars =
@@ -108,7 +119,7 @@ public class CliFrontend {
                                 Optional.ofNullable(
                                                 
commandLine.getOptionValues(CliFrontendOptions.JAR))
                                         .orElse(new String[0]))
-                        .map(Paths::get)
+                        .map(Path::new)
                         .collect(Collectors.toList());
 
         // Build executor
@@ -117,13 +128,21 @@ public class CliFrontend {
                 pipelineDefPath,
                 flinkConfig,
                 globalPipelineConfig,
-                commandLine.hasOption(CliFrontendOptions.USE_MINI_CLUSTER),
                 additionalJars,
-                savepointSettings);
+                flinkHome);
     }
 
     private static void overrideFlinkConfiguration(
             Configuration flinkConfig, CommandLine commandLine) {
+
+        String target =
+                commandLine.hasOption(USE_MINI_CLUSTER)
+                        ? LOCAL.getName()
+                        : commandLine.getOptionValue(TARGET, REMOTE.getName());
+        flinkConfig.set(
+                
ConfigOptions.key(DeploymentOptions.TARGET.key()).stringType().defaultValue(target),
+                target);
+
         Properties properties = 
commandLine.getOptionProperties(FLINK_CONFIG.getOpt());
         LOG.info("Dynamic flink config items found: {}", properties);
         for (String key : properties.stringPropertyNames()) {
@@ -194,14 +213,14 @@ public class CliFrontend {
         String flinkHomeFromArgs = 
commandLine.getOptionValue(CliFrontendOptions.FLINK_HOME);
         if (flinkHomeFromArgs != null) {
             LOG.debug("Flink home is loaded by command-line argument: {}", 
flinkHomeFromArgs);
-            return Paths.get(flinkHomeFromArgs);
+            return new Path(flinkHomeFromArgs);
         }
 
         // Fallback to environment variable
         String flinkHomeFromEnvVar = System.getenv(FLINK_HOME_ENV_VAR);
         if (flinkHomeFromEnvVar != null) {
             LOG.debug("Flink home is loaded by environment variable: {}", 
flinkHomeFromEnvVar);
-            return Paths.get(flinkHomeFromEnvVar);
+            return new Path(flinkHomeFromEnvVar);
         }
 
         throw new IllegalArgumentException(
@@ -214,7 +233,7 @@ public class CliFrontend {
         // Try to get global config path from command line
         String globalConfig = 
commandLine.getOptionValue(CliFrontendOptions.GLOBAL_CONFIG);
         if (globalConfig != null) {
-            Path globalConfigPath = Paths.get(globalConfig);
+            Path globalConfigPath = new Path(globalConfig);
             LOG.info("Using global config in command line: {}", 
globalConfigPath);
             return ConfigurationUtils.loadConfigFile(globalConfigPath);
         }
@@ -223,7 +242,8 @@ public class CliFrontend {
         String flinkCdcHome = System.getenv(FLINK_CDC_HOME_ENV_VAR);
         if (flinkCdcHome != null) {
             Path globalConfigPath =
-                    
Paths.get(flinkCdcHome).resolve("conf").resolve("flink-cdc.yaml");
+                    new Path(
+                            flinkCdcHome, 
Joiner.on(File.separator).join("conf", "flink-cdc.yaml"));
             LOG.info("Using global config in FLINK_CDC_HOME: {}", 
globalConfigPath);
             return ConfigurationUtils.loadConfigFile(globalConfigPath);
         }
diff --git 
a/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/parser/PipelineDefinitionParser.java
 
b/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/parser/PipelineDefinitionParser.java
index bfcde27bd..c86002214 100644
--- 
a/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/parser/PipelineDefinitionParser.java
+++ 
b/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/parser/PipelineDefinitionParser.java
@@ -19,8 +19,7 @@ package org.apache.flink.cdc.cli.parser;
 
 import org.apache.flink.cdc.common.configuration.Configuration;
 import org.apache.flink.cdc.composer.definition.PipelineDef;
-
-import java.nio.file.Path;
+import org.apache.flink.core.fs.Path;
 
 /** Parsing pipeline definition files and generate {@link PipelineDef}. */
 public interface PipelineDefinitionParser {
diff --git 
a/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParser.java
 
b/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParser.java
index b594aa35b..5d00d5394 100644
--- 
a/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParser.java
+++ 
b/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParser.java
@@ -30,6 +30,9 @@ import org.apache.flink.cdc.composer.definition.SinkDef;
 import org.apache.flink.cdc.composer.definition.SourceDef;
 import org.apache.flink.cdc.composer.definition.TransformDef;
 import org.apache.flink.cdc.composer.definition.UdfDef;
+import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
 
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.type.TypeReference;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
@@ -37,7 +40,6 @@ import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMap
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
 
-import java.nio.file.Path;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
@@ -103,7 +105,9 @@ public class YamlPipelineDefinitionParser implements 
PipelineDefinitionParser {
     @Override
     public PipelineDef parse(Path pipelineDefPath, Configuration 
globalPipelineConfig)
             throws Exception {
-        return parse(mapper.readTree(pipelineDefPath.toFile()), 
globalPipelineConfig);
+        FileSystem fileSystem = FileSystem.get(pipelineDefPath.toUri());
+        FSDataInputStream pipelineInStream = fileSystem.open(pipelineDefPath);
+        return parse(mapper.readTree(pipelineInStream), globalPipelineConfig);
     }
 
     @Override
diff --git 
a/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/utils/ConfigurationUtils.java
 
b/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/utils/ConfigurationUtils.java
index af1bc7318..7280c0679 100644
--- 
a/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/utils/ConfigurationUtils.java
+++ 
b/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/utils/ConfigurationUtils.java
@@ -18,18 +18,13 @@
 package org.apache.flink.cdc.cli.utils;
 
 import org.apache.flink.cdc.common.configuration.Configuration;
-import org.apache.flink.client.deployment.executors.LocalExecutor;
-import org.apache.flink.client.deployment.executors.RemoteExecutor;
+import org.apache.flink.core.fs.Path;
 
-import org.apache.commons.cli.CommandLine;
-
-import java.nio.file.Path;
+import java.io.File;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
-import static org.apache.flink.cdc.cli.CliFrontendOptions.TARGET;
-
 /** Utilities for handling {@link Configuration}. */
 public class ConfigurationUtils {
 
@@ -42,7 +37,8 @@ public class ConfigurationUtils {
     public static Configuration loadConfigFile(Path configPath, boolean 
allowDuplicateKeys)
             throws Exception {
         Map<String, Object> configMap =
-                YamlParserUtils.loadYamlFile(configPath.toFile(), 
allowDuplicateKeys);
+                YamlParserUtils.loadYamlFile(
+                        new File(configPath.toUri().getPath()), 
allowDuplicateKeys);
         return Configuration.fromMap(flattenConfigMap(configMap, ""));
     }
 
@@ -69,13 +65,6 @@ public class ConfigurationUtils {
         return flattenedMap;
     }
 
-    public static boolean isDeploymentMode(CommandLine commandLine) {
-        String target = commandLine.getOptionValue(TARGET);
-        return target != null
-                && !target.equalsIgnoreCase(LocalExecutor.NAME)
-                && !target.equalsIgnoreCase(RemoteExecutor.NAME);
-    }
-
     public static Class<?> getClaimModeClass() {
         try {
             return 
Class.forName("org.apache.flink.core.execution.RestoreMode");
diff --git 
a/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/utils/FlinkEnvironmentUtils.java
 
b/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/utils/FlinkEnvironmentUtils.java
index 198a80c19..f10b2fa06 100644
--- 
a/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/utils/FlinkEnvironmentUtils.java
+++ 
b/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/utils/FlinkEnvironmentUtils.java
@@ -18,14 +18,14 @@
 package org.apache.flink.cdc.cli.utils;
 
 import org.apache.flink.cdc.common.configuration.Configuration;
-import org.apache.flink.cdc.composer.flink.FlinkPipelineComposer;
-import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
+import org.apache.flink.core.fs.Path;
+
+import org.apache.flink.shaded.guava31.com.google.common.base.Joiner;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.nio.file.Path;
-import java.util.List;
+import java.io.File;
 
 /** Utilities for handling Flink configuration and environment. */
 public class FlinkEnvironmentUtils {
@@ -36,26 +36,19 @@ public class FlinkEnvironmentUtils {
     private static final String FLINK_CONF_FILENAME = "config.yaml";
 
     public static Configuration loadFlinkConfiguration(Path flinkHome) throws 
Exception {
-        Path flinkConfPath = 
flinkHome.resolve(FLINK_CONF_DIR).resolve(FLINK_CONF_FILENAME);
-        if (flinkConfPath.toFile().exists()) {
+        Path flinkConfPath =
+                new Path(
+                        flinkHome,
+                        Joiner.on(File.separator).join(FLINK_CONF_DIR, 
FLINK_CONF_FILENAME));
+        if (flinkConfPath.getFileSystem().exists(flinkConfPath)) {
             return ConfigurationUtils.loadConfigFile(flinkConfPath);
         } else {
             return ConfigurationUtils.loadConfigFile(
-                    
flinkHome.resolve(FLINK_CONF_DIR).resolve(LEGACY_FLINK_CONF_FILENAME), true);
-        }
-    }
-
-    public static FlinkPipelineComposer createComposer(
-            boolean useMiniCluster,
-            Configuration flinkConfig,
-            List<Path> additionalJars,
-            SavepointRestoreSettings savepointSettings) {
-        if (useMiniCluster) {
-            return FlinkPipelineComposer.ofMiniCluster();
+                    new Path(
+                            flinkHome,
+                            Joiner.on(File.separator)
+                                    .join(FLINK_CONF_DIR, 
LEGACY_FLINK_CONF_FILENAME)),
+                    true);
         }
-        org.apache.flink.configuration.Configuration configuration =
-                
org.apache.flink.configuration.Configuration.fromMap(flinkConfig.toMap());
-        SavepointRestoreSettings.toConfiguration(savepointSettings, 
configuration);
-        return FlinkPipelineComposer.ofRemoteCluster(configuration, 
additionalJars);
     }
 }
diff --git 
a/flink-cdc-cli/src/test/java/org/apache/flink/cdc/cli/CliFrontendTest.java 
b/flink-cdc-cli/src/test/java/org/apache/flink/cdc/cli/CliFrontendTest.java
index 5b8a181e0..b9f42cefc 100644
--- a/flink-cdc-cli/src/test/java/org/apache/flink/cdc/cli/CliFrontendTest.java
+++ b/flink-cdc-cli/src/test/java/org/apache/flink/cdc/cli/CliFrontendTest.java
@@ -21,6 +21,7 @@ import org.apache.flink.cdc.composer.PipelineComposer;
 import org.apache.flink.cdc.composer.PipelineExecution;
 import org.apache.flink.cdc.composer.definition.PipelineDef;
 import org.apache.flink.core.execution.RestoreMode;
+import org.apache.flink.core.fs.Path;
 
 import org.apache.flink.shaded.guava31.com.google.common.io.Resources;
 
@@ -37,6 +38,9 @@ import java.net.URL;
 import java.nio.file.Paths;
 import java.util.Map;
 
+import static org.apache.flink.configuration.StateRecoveryOptions.RESTORE_MODE;
+import static 
org.apache.flink.configuration.StateRecoveryOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE;
+import static 
org.apache.flink.configuration.StateRecoveryOptions.SAVEPOINT_PATH;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
@@ -101,11 +105,10 @@ class CliFrontendTest {
                         "-cm",
                         "no_claim",
                         "-n");
-        assertThat(executor.getSavepointSettings().getRestorePath())
+        assertThat(executor.getFlinkConfig().get(SAVEPOINT_PATH))
                 .isEqualTo(flinkHome() + "/savepoints/savepoint-1");
-        assertThat(executor.getSavepointSettings().getRestoreMode())
-                .isEqualTo(RestoreMode.NO_CLAIM);
-        
assertThat(executor.getSavepointSettings().allowNonRestoredState()).isTrue();
+        
assertThat(executor.getFlinkConfig().get(RESTORE_MODE)).isEqualTo(RestoreMode.NO_CLAIM);
+        
assertThat(executor.getFlinkConfig().get(SAVEPOINT_IGNORE_UNCLAIMED_STATE)).isTrue();
     }
 
     @Test
@@ -119,6 +122,11 @@ class CliFrontendTest {
                         "kubernetes-application",
                         "-n");
         
assertThat(executor.getDeploymentTarget()).isEqualTo("kubernetes-application");
+
+        executor =
+                createExecutor(
+                        pipelineDef(), "--flink-home", flinkHome(), "-t", 
"yarn-application", "-n");
+        
assertThat(executor.getDeploymentTarget()).isEqualTo("yarn-application");
     }
 
     @Test
@@ -128,7 +136,7 @@ class CliFrontendTest {
         CliExecutor executor =
                 createExecutor(
                         pipelineDef(), "--flink-home", flinkHome(), "--jar", 
aJar, "--jar", bJar);
-        assertThat(executor.getAdditionalJars()).contains(Paths.get(aJar), 
Paths.get(bJar));
+        assertThat(executor.getAdditionalJars()).contains(new Path(aJar), new 
Path(bJar));
     }
 
     @Test
@@ -142,7 +150,7 @@ class CliFrontendTest {
                         globalPipelineConfig());
         NoOpComposer composer = new NoOpComposer();
         executor.setComposer(composer);
-        PipelineExecution.ExecutionInfo executionInfo = executor.run();
+        PipelineExecution.ExecutionInfo executionInfo = 
executor.deployWithNoOpComposer();
         assertThat(executionInfo.getId()).isEqualTo("fake-id");
         
assertThat(executionInfo.getDescription()).isEqualTo("fake-description");
     }
diff --git 
a/flink-cdc-cli/src/test/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParserTest.java
 
b/flink-cdc-cli/src/test/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParserTest.java
index 27cbbb1de..c8f8acd13 100644
--- 
a/flink-cdc-cli/src/test/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParserTest.java
+++ 
b/flink-cdc-cli/src/test/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParserTest.java
@@ -27,6 +27,7 @@ import org.apache.flink.cdc.composer.definition.SinkDef;
 import org.apache.flink.cdc.composer.definition.SourceDef;
 import org.apache.flink.cdc.composer.definition.TransformDef;
 import org.apache.flink.cdc.composer.definition.UdfDef;
+import org.apache.flink.core.fs.Path;
 
 import org.apache.flink.shaded.guava31.com.google.common.collect.ImmutableMap;
 import org.apache.flink.shaded.guava31.com.google.common.collect.ImmutableSet;
@@ -35,7 +36,6 @@ import 
org.apache.flink.shaded.guava31.com.google.common.io.Resources;
 import org.junit.jupiter.api.Test;
 
 import java.net.URL;
-import java.nio.file.Paths;
 import java.time.Duration;
 import java.time.ZoneId;
 import java.util.Arrays;
@@ -61,7 +61,7 @@ class YamlPipelineDefinitionParserTest {
     void testParsingFullDefinition() throws Exception {
         URL resource = 
Resources.getResource("definitions/pipeline-definition-full.yaml");
         YamlPipelineDefinitionParser parser = new 
YamlPipelineDefinitionParser();
-        PipelineDef pipelineDef = parser.parse(Paths.get(resource.toURI()), 
new Configuration());
+        PipelineDef pipelineDef = parser.parse(new Path(resource.toURI()), new 
Configuration());
         assertThat(pipelineDef).isEqualTo(fullDef);
     }
 
@@ -69,7 +69,7 @@ class YamlPipelineDefinitionParserTest {
     void testParsingNecessaryOnlyDefinition() throws Exception {
         URL resource = 
Resources.getResource("definitions/pipeline-definition-with-optional.yaml");
         YamlPipelineDefinitionParser parser = new 
YamlPipelineDefinitionParser();
-        PipelineDef pipelineDef = parser.parse(Paths.get(resource.toURI()), 
new Configuration());
+        PipelineDef pipelineDef = parser.parse(new Path(resource.toURI()), new 
Configuration());
         assertThat(pipelineDef).isEqualTo(defWithOptional);
     }
 
@@ -77,7 +77,7 @@ class YamlPipelineDefinitionParserTest {
     void testMinimizedDefinition() throws Exception {
         URL resource = 
Resources.getResource("definitions/pipeline-definition-minimized.yaml");
         YamlPipelineDefinitionParser parser = new 
YamlPipelineDefinitionParser();
-        PipelineDef pipelineDef = parser.parse(Paths.get(resource.toURI()), 
new Configuration());
+        PipelineDef pipelineDef = parser.parse(new Path(resource.toURI()), new 
Configuration());
         assertThat(pipelineDef).isEqualTo(minimizedDef);
     }
 
@@ -87,7 +87,7 @@ class YamlPipelineDefinitionParserTest {
         YamlPipelineDefinitionParser parser = new 
YamlPipelineDefinitionParser();
         PipelineDef pipelineDef =
                 parser.parse(
-                        Paths.get(resource.toURI()),
+                        new Path(resource.toURI()),
                         Configuration.fromMap(
                                 ImmutableMap.<String, String>builder()
                                         .put("parallelism", "1")
@@ -99,7 +99,7 @@ class YamlPipelineDefinitionParserTest {
     void testEvaluateDefaultLocalTimeZone() throws Exception {
         URL resource = 
Resources.getResource("definitions/pipeline-definition-minimized.yaml");
         YamlPipelineDefinitionParser parser = new 
YamlPipelineDefinitionParser();
-        PipelineDef pipelineDef = parser.parse(Paths.get(resource.toURI()), 
new Configuration());
+        PipelineDef pipelineDef = parser.parse(new Path(resource.toURI()), new 
Configuration());
         assertThat(pipelineDef.getConfig().get(PIPELINE_LOCAL_TIME_ZONE))
                 .isNotEqualTo(PIPELINE_LOCAL_TIME_ZONE.defaultValue());
     }
@@ -110,7 +110,7 @@ class YamlPipelineDefinitionParserTest {
         YamlPipelineDefinitionParser parser = new 
YamlPipelineDefinitionParser();
         PipelineDef pipelineDef =
                 parser.parse(
-                        Paths.get(resource.toURI()),
+                        new Path(resource.toURI()),
                         Configuration.fromMap(
                                 ImmutableMap.<String, String>builder()
                                         .put(
@@ -131,7 +131,7 @@ class YamlPipelineDefinitionParserTest {
         YamlPipelineDefinitionParser parser = new 
YamlPipelineDefinitionParser();
         PipelineDef pipelineDef =
                 parser.parse(
-                        Paths.get(resource.toURI()),
+                        new Path(resource.toURI()),
                         Configuration.fromMap(
                                 ImmutableMap.<String, String>builder()
                                         .put(PIPELINE_LOCAL_TIME_ZONE.key(), 
"Asia/Shanghai")
@@ -141,7 +141,7 @@ class YamlPipelineDefinitionParserTest {
 
         pipelineDef =
                 parser.parse(
-                        Paths.get(resource.toURI()),
+                        new Path(resource.toURI()),
                         Configuration.fromMap(
                                 ImmutableMap.<String, String>builder()
                                         .put(PIPELINE_LOCAL_TIME_ZONE.key(), 
"GMT+08:00")
@@ -150,7 +150,7 @@ class YamlPipelineDefinitionParserTest {
 
         pipelineDef =
                 parser.parse(
-                        Paths.get(resource.toURI()),
+                        new Path(resource.toURI()),
                         Configuration.fromMap(
                                 ImmutableMap.<String, String>builder()
                                         .put(PIPELINE_LOCAL_TIME_ZONE.key(), 
"UTC")
@@ -165,7 +165,7 @@ class YamlPipelineDefinitionParserTest {
         assertThatThrownBy(
                         () ->
                                 parser.parse(
-                                        Paths.get(resource.toURI()),
+                                        new Path(resource.toURI()),
                                         Configuration.fromMap(
                                                 ImmutableMap.<String, 
String>builder()
                                                         .put(
@@ -185,7 +185,7 @@ class YamlPipelineDefinitionParserTest {
         URL resource =
                 
Resources.getResource("definitions/pipeline-definition-full-with-repsym.yaml");
         YamlPipelineDefinitionParser parser = new 
YamlPipelineDefinitionParser();
-        PipelineDef pipelineDef = parser.parse(Paths.get(resource.toURI()), 
new Configuration());
+        PipelineDef pipelineDef = parser.parse(new Path(resource.toURI()), new 
Configuration());
         assertThat(pipelineDef).isEqualTo(fullDefWithRouteRepSym);
     }
 
@@ -193,7 +193,7 @@ class YamlPipelineDefinitionParserTest {
     void testUdfDefinition() throws Exception {
         URL resource = 
Resources.getResource("definitions/pipeline-definition-with-udf.yaml");
         YamlPipelineDefinitionParser parser = new 
YamlPipelineDefinitionParser();
-        PipelineDef pipelineDef = parser.parse(Paths.get(resource.toURI()), 
new Configuration());
+        PipelineDef pipelineDef = parser.parse(new Path(resource.toURI()), new 
Configuration());
         assertThat(pipelineDef).isEqualTo(pipelineDefWithUdf);
     }
 
diff --git 
a/flink-cdc-cli/src/test/java/org/apache/flink/cdc/cli/utils/ConfigurationUtilsTest.java
 
b/flink-cdc-cli/src/test/java/org/apache/flink/cdc/cli/utils/ConfigurationUtilsTest.java
index 5da49ec5d..078830f3a 100644
--- 
a/flink-cdc-cli/src/test/java/org/apache/flink/cdc/cli/utils/ConfigurationUtilsTest.java
+++ 
b/flink-cdc-cli/src/test/java/org/apache/flink/cdc/cli/utils/ConfigurationUtilsTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.cdc.cli.utils;
 import org.apache.flink.cdc.common.configuration.ConfigOption;
 import org.apache.flink.cdc.common.configuration.ConfigOptions;
 import org.apache.flink.cdc.common.configuration.Configuration;
+import org.apache.flink.core.fs.Path;
 
 import org.apache.flink.shaded.curator5.com.google.common.io.Resources;
 
@@ -28,8 +29,6 @@ import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.ValueSource;
 
 import java.net.URL;
-import java.nio.file.Path;
-import java.nio.file.Paths;
 import java.util.HashMap;
 import java.util.Map;
 
@@ -72,7 +71,7 @@ class ConfigurationUtilsTest {
     @ValueSource(strings = {"flink-home/conf/config.yaml", 
"flink-home/conf/flink-conf.yaml"})
     void loadConfigFile(String resourcePath) throws Exception {
         URL resource = Resources.getResource(resourcePath);
-        Path path = Paths.get(resource.toURI());
+        Path path = new Path(resource.toURI());
         Configuration configuration =
                 ConfigurationUtils.loadConfigFile(path, 
resourcePath.endsWith("flink-conf.yaml"));
         Map<String, String> configMap = configuration.toMap();
diff --git a/flink-cdc-composer/pom.xml b/flink-cdc-composer/pom.xml
index 5971f3b45..0924951d0 100644
--- a/flink-cdc-composer/pom.xml
+++ b/flink-cdc-composer/pom.xml
@@ -61,6 +61,12 @@ limitations under the License.
             <artifactId>flink-kubernetes</artifactId>
             <version>${flink.version}</version>
         </dependency>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-yarn</artifactId>
+            <version>${flink.version}</version>
+            <scope>provided</scope>
+        </dependency>
         <dependency>
             <groupId>org.apache.flink</groupId>
             <artifactId>flink-cdc-pipeline-udf-examples</artifactId>
diff --git 
a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/PipelineDeploymentExecutor.java
 
b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/PipelineDeploymentExecutor.java
index 37d573e6b..b3b02ff7d 100644
--- 
a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/PipelineDeploymentExecutor.java
+++ 
b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/PipelineDeploymentExecutor.java
@@ -18,16 +18,19 @@
 package org.apache.flink.cdc.composer;
 
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.Path;
 
 import org.apache.commons.cli.CommandLine;
 
-import java.nio.file.Path;
 import java.util.List;
 
 /** PipelineDeploymentExecutor to execute flink cdc job from different target. 
*/
 public interface PipelineDeploymentExecutor {
 
     PipelineExecution.ExecutionInfo deploy(
-            CommandLine commandLine, Configuration flinkConfig, List<Path> 
additionalJars)
+            CommandLine commandLine,
+            Configuration flinkConfig,
+            List<Path> additionalJars,
+            Path flinkHome)
             throws Exception;
 }
diff --git 
a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposer.java
 
b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposer.java
index 533868c56..9a1e0bfdd 100644
--- 
a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposer.java
+++ 
b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposer.java
@@ -35,14 +35,13 @@ import 
org.apache.flink.cdc.composer.flink.translator.SchemaOperatorTranslator;
 import org.apache.flink.cdc.composer.flink.translator.TransformTranslator;
 import org.apache.flink.cdc.runtime.partitioning.PartitioningEvent;
 import org.apache.flink.cdc.runtime.serializer.event.EventSerializer;
-import org.apache.flink.configuration.DeploymentOptions;
+import org.apache.flink.core.fs.Path;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 
 import java.net.URI;
 import java.net.URL;
 import java.nio.file.Files;
-import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.util.HashSet;
 import java.util.List;
@@ -58,16 +57,13 @@ public class FlinkPipelineComposer implements 
PipelineComposer {
 
     public static FlinkPipelineComposer ofRemoteCluster(
             org.apache.flink.configuration.Configuration flinkConfig, 
List<Path> additionalJars) {
-        org.apache.flink.configuration.Configuration effectiveConfiguration =
-                new org.apache.flink.configuration.Configuration();
-        // Use "remote" as the default target
-        effectiveConfiguration.set(DeploymentOptions.TARGET, "remote");
-        effectiveConfiguration.addAll(flinkConfig);
-        StreamExecutionEnvironment env = new 
StreamExecutionEnvironment(effectiveConfiguration);
+        StreamExecutionEnvironment env = new 
StreamExecutionEnvironment(flinkConfig);
         additionalJars.forEach(
                 jarPath -> {
                     try {
-                        FlinkEnvironmentUtils.addJar(env, 
jarPath.toUri().toURL());
+                        FlinkEnvironmentUtils.addJar(
+                                env,
+                                
jarPath.makeQualified(jarPath.getFileSystem()).toUri().toURL());
                     } catch (Exception e) {
                         throw new RuntimeException(
                                 String.format(
@@ -79,6 +75,10 @@ public class FlinkPipelineComposer implements 
PipelineComposer {
         return new FlinkPipelineComposer(env, false);
     }
 
+    public static FlinkPipelineComposer 
ofApplicationCluster(StreamExecutionEnvironment env) {
+        return new FlinkPipelineComposer(env, false);
+    }
+
     public static FlinkPipelineComposer ofMiniCluster() {
         return new FlinkPipelineComposer(
                 StreamExecutionEnvironment.getExecutionEnvironment(), true);
diff --git 
a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/deployment/ComposeDeployment.java
 
b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/deployment/ComposeDeployment.java
new file mode 100644
index 000000000..5541941f6
--- /dev/null
+++ 
b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/deployment/ComposeDeployment.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.composer.flink.deployment;
+
+import java.util.Arrays;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.cdc.common.utils.Preconditions.checkNotNull;
+
+/** Create deployment methods corresponding to different goals. */
+public enum ComposeDeployment {
+    YARN_SESSION("yarn-session"),
+    YARN_APPLICATION("yarn-application"),
+    LOCAL("local"),
+    REMOTE("remote"),
+    KUBERNETES_APPLICATION("kubernetes-application");
+
+    private final String name;
+
+    ComposeDeployment(final String name) {
+        this.name = checkNotNull(name);
+    }
+
+    public String getName() {
+        return name;
+    }
+
+    public static ComposeDeployment getDeploymentFromName(final String 
deploymentTargetStr) {
+        return Arrays.stream(ComposeDeployment.values())
+                .filter(d -> d.name.equalsIgnoreCase(deploymentTargetStr))
+                .findFirst()
+                .orElseThrow(
+                        () ->
+                                new IllegalArgumentException(
+                                        "Unknown deployment target \""
+                                                + deploymentTargetStr
+                                                + "\"."
+                                                + " The available options are: 
"
+                                                + options()));
+    }
+
+    private static String options() {
+        return Arrays.stream(ComposeDeployment.values())
+                .map(ComposeDeployment::getName)
+                .collect(Collectors.joining(","));
+    }
+}
diff --git 
a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/deployment/ComposeDeploymentFactory.java
 
b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/deployment/ComposeDeploymentFactory.java
deleted file mode 100644
index 27a005721..000000000
--- 
a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/deployment/ComposeDeploymentFactory.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.cdc.composer.flink.deployment;
-
-import org.apache.flink.cdc.composer.PipelineDeploymentExecutor;
-
-import org.apache.commons.cli.CommandLine;
-
-/** Create deployment methods corresponding to different goals. */
-public class ComposeDeploymentFactory {
-
-    public PipelineDeploymentExecutor getFlinkComposeExecutor(CommandLine 
commandLine) {
-        String target = commandLine.getOptionValue("target");
-        if (target.equalsIgnoreCase("kubernetes-application")) {
-            return new K8SApplicationDeploymentExecutor();
-        }
-        throw new IllegalArgumentException(
-                String.format("Deployment target %s is not supported", 
target));
-    }
-}
diff --git 
a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/deployment/K8SApplicationDeploymentExecutor.java
 
b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/deployment/K8SApplicationDeploymentExecutor.java
index 19fcdb262..4a23d05cc 100644
--- 
a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/deployment/K8SApplicationDeploymentExecutor.java
+++ 
b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/deployment/K8SApplicationDeploymentExecutor.java
@@ -24,32 +24,33 @@ import 
org.apache.flink.client.deployment.application.ApplicationConfiguration;
 import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.client.program.ClusterClientProvider;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.DeploymentOptions;
 import org.apache.flink.configuration.PipelineOptions;
+import org.apache.flink.core.fs.Path;
 import org.apache.flink.kubernetes.KubernetesClusterClientFactory;
 import org.apache.flink.kubernetes.KubernetesClusterDescriptor;
 import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
-import org.apache.flink.kubernetes.configuration.KubernetesDeploymentTarget;
 
 import org.apache.commons.cli.CommandLine;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.nio.file.Path;
 import java.util.ArrayList;
 import java.util.List;
 
 /** deploy flink cdc job by native k8s application mode. */
 public class K8SApplicationDeploymentExecutor implements 
PipelineDeploymentExecutor {
-
     private static final Logger LOG =
             LoggerFactory.getLogger(K8SApplicationDeploymentExecutor.class);
 
+    private static final String APPLICATION_MAIN_CLASS = 
"org.apache.flink.cdc.cli.CliExecutor";
+
     @Override
     public PipelineExecution.ExecutionInfo deploy(
-            CommandLine commandLine, Configuration flinkConfig, List<Path> 
additionalJars) {
+            CommandLine commandLine,
+            Configuration flinkConfig,
+            List<Path> additionalJars,
+            Path flinkHome) {
         LOG.info("Submitting application in 'Flink K8S Application Mode'.");
-        flinkConfig.set(DeploymentOptions.TARGET, 
KubernetesDeploymentTarget.APPLICATION.getName());
         List<String> jars = new ArrayList<>();
         if (flinkConfig.get(PipelineOptions.JARS) == null) {
             // must be added cdc dist jar by default docker container path
@@ -59,9 +60,7 @@ public class K8SApplicationDeploymentExecutor implements 
PipelineDeploymentExecu
         // set the default cdc latest docker image
         flinkConfig.set(KubernetesConfigOptions.CONTAINER_IMAGE, 
"flink/flink-cdc:latest");
         flinkConfig.set(ApplicationConfiguration.APPLICATION_ARGS, 
commandLine.getArgList());
-        flinkConfig.set(
-                ApplicationConfiguration.APPLICATION_MAIN_CLASS,
-                "org.apache.flink.cdc.cli.CliFrontend");
+        flinkConfig.set(ApplicationConfiguration.APPLICATION_MAIN_CLASS, 
APPLICATION_MAIN_CLASS);
         KubernetesClusterClientFactory kubernetesClusterClientFactory =
                 new KubernetesClusterClientFactory();
         KubernetesClusterDescriptor descriptor =
diff --git 
a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/deployment/YarnApplicationDeploymentExecutor.java
 
b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/deployment/YarnApplicationDeploymentExecutor.java
new file mode 100644
index 000000000..16b9b1501
--- /dev/null
+++ 
b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/deployment/YarnApplicationDeploymentExecutor.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.composer.flink.deployment;
+
+import org.apache.flink.cdc.common.utils.Preconditions;
+import org.apache.flink.cdc.composer.PipelineDeploymentExecutor;
+import org.apache.flink.cdc.composer.PipelineExecution;
+import org.apache.flink.client.deployment.ClusterSpecification;
+import org.apache.flink.client.deployment.application.ApplicationConfiguration;
+import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.client.program.ClusterClientProvider;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.PipelineOptions;
+import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.yarn.YarnClusterClientFactory;
+import org.apache.flink.yarn.YarnClusterDescriptor;
+import org.apache.flink.yarn.configuration.YarnConfigOptions;
+import org.apache.flink.yarn.configuration.YarnLogConfigUtil;
+
+import org.apache.flink.shaded.guava31.com.google.common.base.Joiner;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+/** Deploy flink cdc job by yarn application mode. */
+public class YarnApplicationDeploymentExecutor implements 
PipelineDeploymentExecutor {
+    private static final Logger LOG =
+            LoggerFactory.getLogger(YarnApplicationDeploymentExecutor.class);
+
+    private static final String FLINK_CDC_HOME_ENV_VAR = "FLINK_CDC_HOME";
+    private static final String FLINK_CDC_DIST_JAR_PATTERN =
+            "^flink-cdc-dist-(\\d+(\\.\\d+)*)(-SNAPSHOT)?\\.jar$";
+    private static final String APPLICATION_MAIN_CLASS = 
"org.apache.flink.cdc.cli.CliExecutor";
+
+    @Override
+    public PipelineExecution.ExecutionInfo deploy(
+            CommandLine commandLine,
+            Configuration flinkConfig,
+            List<Path> additionalJars,
+            Path flinkHome)
+            throws Exception {
+        LOG.info("Submitting application in 'Flink Yarn Application Mode'.");
+        if (flinkConfig.get(PipelineOptions.JARS) == null) {
+            flinkConfig.set(
+                    PipelineOptions.JARS, 
Collections.singletonList(getFlinkCDCDistJarFromEnv()));
+        }
+        flinkConfig.set(
+                YarnConfigOptions.SHIP_FILES,
+                
additionalJars.stream().map(Path::toString).collect(Collectors.toList()));
+
+        ObjectMapper mapper = new ObjectMapper(new YAMLFactory());
+
+        Path pipelinePath = new Path(commandLine.getArgList().get(0));
+        FileSystem fileSystem = FileSystem.get(pipelinePath.toUri());
+        FSDataInputStream pipelineInStream = fileSystem.open(pipelinePath);
+
+        flinkConfig.set(
+                ApplicationConfiguration.APPLICATION_ARGS,
+                
Collections.singletonList(mapper.readTree(pipelineInStream).toString()));
+        YarnLogConfigUtil.setLogConfigFileInConfig(
+                flinkConfig, Joiner.on(File.separator).join(flinkHome, 
"conf"));
+
+        flinkConfig.set(ApplicationConfiguration.APPLICATION_MAIN_CLASS, 
APPLICATION_MAIN_CLASS);
+        final YarnClusterClientFactory yarnClusterClientFactory = new 
YarnClusterClientFactory();
+        final YarnClusterDescriptor descriptor =
+                yarnClusterClientFactory.createClusterDescriptor(flinkConfig);
+        ClusterSpecification specification =
+                yarnClusterClientFactory.getClusterSpecification(flinkConfig);
+        ApplicationConfiguration applicationConfiguration =
+                ApplicationConfiguration.fromConfiguration(flinkConfig);
+
+        ClusterClient<ApplicationId> client = null;
+        try {
+            ClusterClientProvider<ApplicationId> clusterClientProvider =
+                    descriptor.deployApplicationCluster(specification, 
applicationConfiguration);
+            client = clusterClientProvider.getClusterClient();
+            ApplicationId clusterId = client.getClusterId();
+            LOG.info("Deployment Flink CDC From Cluster ID {}", clusterId);
+            return new PipelineExecution.ExecutionInfo(
+                    clusterId.toString(), "submit job successful");
+        } catch (Exception e) {
+            if (client != null) {
+                client.shutDownCluster();
+            }
+            throw new RuntimeException("Failed to deploy Flink CDC job", e);
+        } finally {
+            descriptor.close();
+            if (client != null) {
+                client.close();
+            }
+        }
+    }
+
+    private String getFlinkCDCDistJarFromEnv() throws IOException {
+        String flinkCDCHomeFromEnvVar = System.getenv(FLINK_CDC_HOME_ENV_VAR);
+        Preconditions.checkNotNull(
+                flinkCDCHomeFromEnvVar,
+                "FLINK_CDC_HOME is not correctly set in environment variable, 
current FLINK_CDC_HOME is: "
+                        + FLINK_CDC_HOME_ENV_VAR);
+        Path flinkCDCLibPath = new Path(flinkCDCHomeFromEnvVar, "lib");
+        if (!flinkCDCLibPath.getFileSystem().exists(flinkCDCLibPath)
+                || 
!flinkCDCLibPath.getFileSystem().getFileStatus(flinkCDCLibPath).isDir()) {
+            throw new RuntimeException(
+                    "Flink cdc home lib is not file or not directory: "
+                            + 
flinkCDCLibPath.makeQualified(flinkCDCLibPath.getFileSystem()));
+        }
+
+        FileStatus[] fileStatuses = 
flinkCDCLibPath.getFileSystem().listStatus(flinkCDCLibPath);
+        Optional<Path> distJars =
+                Arrays.stream(fileStatuses)
+                        .filter(status -> !status.isDir())
+                        .map(FileStatus::getPath)
+                        .filter(path -> 
path.getName().matches(FLINK_CDC_DIST_JAR_PATTERN))
+                        .findFirst();
+
+        if (distJars.isPresent()) {
+            Path path = 
distJars.get().makeQualified(distJars.get().getFileSystem());
+            return path.toString();
+        } else {
+            throw new FileNotFoundException(
+                    "Failed to fetch Flink CDC dist jar from path: " + 
flinkCDCLibPath);
+        }
+    }
+}
diff --git 
a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/deployment/ComposeDeploymentTest.java
 
b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/deployment/ComposeDeploymentTest.java
new file mode 100644
index 000000000..be207736c
--- /dev/null
+++ 
b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/deployment/ComposeDeploymentTest.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.composer.flink.deployment;
+
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+/** Test for {@link ComposeDeployment}. */
+public class ComposeDeploymentTest {
+    @Test
+    public void testComposeDeployment() throws Exception {
+        
Assertions.assertThat(ComposeDeployment.getDeploymentFromName("yarn-application"))
+                .as("test yarn-application")
+                .isEqualTo(ComposeDeployment.YARN_APPLICATION);
+
+        
Assertions.assertThat(ComposeDeployment.getDeploymentFromName("yarn-Application"))
+                .as("test ignore case")
+                .isEqualTo(ComposeDeployment.YARN_APPLICATION);
+
+        Assertions.assertThatThrownBy(() -> 
ComposeDeployment.getDeploymentFromName("unKnown"))
+                .as("test Unknown deployment target")
+                .hasMessage(
+                        "Unknown deployment target \"unKnown\". The available 
options are: 
yarn-session,yarn-application,local,remote,kubernetes-application");
+    }
+}
diff --git 
a/flink-cdc-e2e-tests/flink-cdc-e2e-utils/src/test/java/org/apache/flink/cdc/common/test/utils/TestUtils.java
 
b/flink-cdc-e2e-tests/flink-cdc-e2e-utils/src/test/java/org/apache/flink/cdc/common/test/utils/TestUtils.java
index 7923bb9b9..1ccfd441a 100644
--- 
a/flink-cdc-e2e-tests/flink-cdc-e2e-utils/src/test/java/org/apache/flink/cdc/common/test/utils/TestUtils.java
+++ 
b/flink-cdc-e2e-tests/flink-cdc-e2e-utils/src/test/java/org/apache/flink/cdc/common/test/utils/TestUtils.java
@@ -22,9 +22,8 @@ import java.io.IOException;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.Paths;
-import java.util.List;
+import java.util.Optional;
 import java.util.regex.Pattern;
-import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
 /** General test utilities. */
@@ -58,30 +57,23 @@ public class TestUtils {
         }
 
         try (Stream<Path> dependencyResources = Files.walk(path)) {
-            final List<Path> matchingResources =
+            Optional<Path> jarPath =
                     dependencyResources
                             .filter(
                                     jar ->
                                             Pattern.compile(resourceNameRegex)
                                                     
.matcher(jar.toAbsolutePath().toString())
                                                     .find())
-                            .collect(Collectors.toList());
-            switch (matchingResources.size()) {
-                case 0:
-                    throw new RuntimeException(
-                            new FileNotFoundException(
-                                    String.format(
-                                            "No resource file could be found 
that matches the pattern %s. "
-                                                    + "This could mean that 
the test module must be rebuilt via maven.",
-                                            resourceNameRegex)));
-                case 1:
-                    return matchingResources.get(0);
-                default:
-                    throw new RuntimeException(
-                            new IOException(
-                                    String.format(
-                                            "Multiple resource files were 
found matching the pattern %s. Matches=%s",
-                                            resourceNameRegex, 
matchingResources)));
+                            .findFirst();
+            if (jarPath.isPresent()) {
+                return jarPath.get();
+            } else {
+                throw new RuntimeException(
+                        new FileNotFoundException(
+                                String.format(
+                                        "No resource file could be found that 
matches the pattern %s. "
+                                                + "This could mean that the 
test module must be rebuilt via maven.",
+                                        resourceNameRegex)));
             }
         } catch (final IOException ioe) {
             throw new RuntimeException("Could not search for resource resource 
files.", ioe);
diff --git a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/pom.xml 
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/pom.xml
index 47f4217cd..47bea3d27 100644
--- a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/pom.xml
+++ b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/pom.xml
@@ -36,6 +36,11 @@ limitations under the License.
         <!-- TODO: Update this, when StarRocks releases a 1.20 compatible 
connector. -->
         
<starrocks.connector.version>1.2.10_flink-${flink-major-1.19}</starrocks.connector.version>
         <paimon.version>1.0.1</paimon.version>
+        <flink.hadoop.version>3.3.4</flink.hadoop.version>
+        <flink.release.download.skip>false</flink.release.download.skip>
+        
<flink.release.name>flink-${flink.version}-bin-scala_${scala.binary.version}.tgz</flink.release.name>
+        
<flink.release.mirror>https://dlcdn.apache.org/flink/flink-${flink.version}</flink.release.mirror>
+        <maven.plugin.download.version>1.6.8</maven.plugin.download.version>
     </properties>
 
     <dependencies>
@@ -108,6 +113,12 @@ limitations under the License.
             <version>${project.version}</version>
             <type>test-jar</type>
             <scope>test</scope>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.apache.hive</groupId>
+                    <artifactId>hive-serde</artifactId>
+                </exclusion>
+            </exclusions>
         </dependency>
         <dependency>
             <groupId>org.apache.flink</groupId>
@@ -128,12 +139,24 @@ limitations under the License.
             <version>${project.version}</version>
             <type>test-jar</type>
             <scope>test</scope>
+            <exclusions>
+                <exclusion>
+                    <groupId>com.google.protobuf</groupId>
+                    <artifactId>protobuf-java</artifactId>
+                </exclusion>
+            </exclusions>
         </dependency>
         <dependency>
             <groupId>org.apache.flink</groupId>
             <artifactId>flink-cdc-pipeline-connector-maxcompute</artifactId>
             <version>${project.version}</version>
             <scope>test</scope>
+            <exclusions>
+                <exclusion>
+                    <groupId>com.google.protobuf</groupId>
+                    <artifactId>protobuf-java</artifactId>
+                </exclusion>
+            </exclusions>
         </dependency>
         <dependency>
             <groupId>org.apache.flink</groupId>
@@ -182,6 +205,122 @@ limitations under the License.
             <version>${testcontainers.version}</version>
             <scope>test</scope>
         </dependency>
+
+        <!-- mini yarn -->
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-yarn-api</artifactId>
+            <version>${flink.hadoop.version}</version>
+            <scope>test</scope>
+            <exclusions>
+                <exclusion>
+                    <!-- This dependency is no longer shipped with the JDK 
since Java 9.-->
+                    <groupId>jdk.tools</groupId>
+                    <artifactId>jdk.tools</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>ch.qos.reload4j</groupId>
+                    <artifactId>reload4j</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.slf4j</groupId>
+                    <artifactId>slf4j-reload4j</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-common</artifactId>
+            <version>${flink.hadoop.version}</version>
+            <scope>test</scope>
+            <exclusions>
+                <exclusion>
+                    <groupId>jdk.tools</groupId>
+                    <artifactId>jdk.tools</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>log4j</groupId>
+                    <artifactId>log4j</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.slf4j</groupId>
+                    <artifactId>slf4j-log4j12</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-yarn-client</artifactId>
+            <version>${flink.hadoop.version}</version>
+            <scope>test</scope>
+            <exclusions>
+                <exclusion>
+                    <artifactId>jdk.tools</artifactId>
+                    <groupId>jdk.tools</groupId>
+                </exclusion>
+                <exclusion>
+                    <artifactId>log4j</artifactId>
+                    <groupId>log4j</groupId>
+                </exclusion>
+                <exclusion>
+                    <artifactId>slf4j-log4j12</artifactId>
+                    <groupId>org.slf4j</groupId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-yarn-server-tests</artifactId>
+            <version>${flink.hadoop.version}</version>
+            <scope>test</scope>
+            <type>test-jar</type>
+            <exclusions>
+                <exclusion>
+                    <artifactId>jdk.tools</artifactId>
+                    <groupId>jdk.tools</groupId>
+                </exclusion>
+                <exclusion>
+                    <artifactId>log4j</artifactId>
+                    <groupId>log4j</groupId>
+                </exclusion>
+                <exclusion>
+                    <artifactId>slf4j-log4j12</artifactId>
+                    <groupId>org.slf4j</groupId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-common</artifactId>
+            <version>${flink.hadoop.version}</version>
+            <scope>test</scope>
+            <type>test-jar</type>
+            <exclusions>
+                <exclusion>
+                    <artifactId>jdk.tools</artifactId>
+                    <groupId>jdk.tools</groupId>
+                </exclusion>
+                <exclusion>
+                    <artifactId>log4j</artifactId>
+                    <groupId>log4j</groupId>
+                </exclusion>
+                <exclusion>
+                    <artifactId>slf4j-log4j12</artifactId>
+                    <groupId>org.slf4j</groupId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-yarn</artifactId>
+            <version>${flink.version}</version>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 
     <build>
@@ -225,6 +364,31 @@ limitations under the License.
                     </execution>
                 </executions>
             </plugin>
+            <plugin>
+                <groupId>com.googlecode.maven-download-plugin</groupId>
+                <artifactId>download-maven-plugin</artifactId>
+                <version>1.6.8</version>
+                <configuration>
+                    
<cacheDirectory>${maven.plugin.download.cache.path}</cacheDirectory>
+                    
<outputDirectory>${project.build.directory}</outputDirectory>
+                    <readTimeOut>60000</readTimeOut>
+                    <retries>3</retries>
+                    <unpack>true</unpack>
+                </configuration>
+                <executions>
+                    <execution>
+                        <id>download-flink-release</id>
+                        <goals>
+                            <goal>wget</goal>
+                        </goals>
+                        <phase>compile</phase>
+                        <configuration>
+                            <skip>${flink.release.download.skip}</skip>
+                            
<url>${flink.release.mirror}/${flink.release.name}</url>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
 
             <plugin>
                 <groupId>org.apache.maven.plugins</groupId>
@@ -237,6 +401,17 @@ limitations under the License.
                             <goal>copy</goal>
                         </goals>
                     </execution>
+                    <execution>
+                        <id>store-classpath-in-target-for-tests</id>
+                        <phase>process-test-resources</phase>
+                        <goals>
+                            <goal>build-classpath</goal>
+                        </goals>
+                        <configuration>
+                            
<outputFile>${project.build.directory}/yarn.classpath</outputFile>
+                            <excludeGroupIds>org.apache.flink</excludeGroupIds>
+                        </configuration>
+                    </execution>
                 </executions>
                 <configuration>
                     <artifactItems>
diff --git 
a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MysqlE2eWithYarnApplicationITCase.java
 
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MysqlE2eWithYarnApplicationITCase.java
new file mode 100644
index 000000000..cab98c6af
--- /dev/null
+++ 
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MysqlE2eWithYarnApplicationITCase.java
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.pipeline.tests;
+
+import org.apache.flink.cdc.common.test.utils.TestUtils;
+import org.apache.flink.cdc.connectors.mysql.testutils.MySqlContainer;
+import org.apache.flink.cdc.connectors.mysql.testutils.MySqlVersion;
+import org.apache.flink.cdc.connectors.mysql.testutils.UniqueDatabase;
+import org.apache.flink.cdc.pipeline.tests.utils.PipelineTestOnYarnEnvironment;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.Network;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.junit.jupiter.Container;
+import org.testcontainers.lifecycle.Startables;
+import org.testcontainers.shaded.org.apache.commons.io.FileUtils;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Optional;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** End-to-end tests for mysql cdc pipeline job. */
+public class MysqlE2eWithYarnApplicationITCase extends 
PipelineTestOnYarnEnvironment {
+    private static final Logger LOG =
+            LoggerFactory.getLogger(MysqlE2eWithYarnApplicationITCase.class);
+
+    // 
------------------------------------------------------------------------------------------
+    // MySQL Variables (we always use MySQL as the data source for easier 
verifying)
+    // 
------------------------------------------------------------------------------------------
+    protected static final String MYSQL_TEST_USER = "mysqluser";
+    protected static final String MYSQL_TEST_PASSWORD = "mysqlpw";
+    protected static final String INTER_CONTAINER_MYSQL_ALIAS = "mysql";
+
+    @Container
+    public static final MySqlContainer MYSQL =
+            (MySqlContainer)
+                    new MySqlContainer(
+                                    MySqlVersion.V8_0) // v8 support both ARM 
and AMD architectures
+                            .withConfigurationOverride("docker/mysql/my.cnf")
+                            .withSetupSQL("docker/mysql/setup.sql")
+                            .withDatabaseName("flink-test")
+                            .withUsername("flinkuser")
+                            .withPassword("flinkpw")
+                            .withNetwork(Network.newNetwork())
+                            .withNetworkAliases(INTER_CONTAINER_MYSQL_ALIAS)
+                            .withLogConsumer(new Slf4jLogConsumer(LOG));
+
+    protected final UniqueDatabase mysqlInventoryDatabase =
+            new UniqueDatabase(MYSQL, "mysql_inventory", MYSQL_TEST_USER, 
MYSQL_TEST_PASSWORD);
+
+    @BeforeAll
+    public static void setup() {
+        LOG.info("Starting containers...");
+        Startables.deepStart(Stream.of(MYSQL)).join();
+        LOG.info("Containers are started.");
+        LOG.info("Starting up MiniYARNCluster...");
+        startMiniYARNCluster();
+        LOG.info("MiniYARNCluster are started.");
+    }
+
+    @BeforeEach
+    public void before() throws Exception {
+        mysqlInventoryDatabase.createAndInitialize();
+    }
+
+    @AfterEach
+    public void after() {
+        mysqlInventoryDatabase.dropDatabase();
+    }
+
+    @Test
+    public void testSyncWholeDatabase() throws Exception {
+        String pipelineJob =
+                String.format(
+                        "source:\n"
+                                + "  type: mysql\n"
+                                + "  hostname: %s\n"
+                                + "  port: %S\n"
+                                + "  username: %s\n"
+                                + "  password: %s\n"
+                                + "  tables: %s.\\.*\n"
+                                + "  server-id: 5400-5404\n"
+                                + "  server-time-zone: UTC\n"
+                                + "  scan.startup.mode: snapshot\n"
+                                + "\n"
+                                + "sink:\n"
+                                + "  type: values\n"
+                                + "\n"
+                                + "pipeline:\n"
+                                + "  parallelism: %d",
+                        MYSQL.getHost(),
+                        MYSQL.getDatabasePort(),
+                        MYSQL_TEST_USER,
+                        MYSQL_TEST_PASSWORD,
+                        mysqlInventoryDatabase.getDatabaseName(),
+                        1);
+        Path mysqlCdcJar = 
TestUtils.getResource("mysql-cdc-pipeline-connector.jar");
+        Path valuesCdcJar = 
TestUtils.getResource("values-cdc-pipeline-connector.jar");
+        Path mysqlDriverJar = TestUtils.getResource("mysql-driver.jar");
+        String applicationId =
+                submitPipelineJob(pipelineJob, mysqlCdcJar, valuesCdcJar, 
mysqlDriverJar);
+        LOG.info("Pipeline job is running");
+        validateResult(
+                applicationId,
+                "CreateTableEvent{tableId=%s.customers, schema=columns={`id` 
INT NOT NULL,`name` VARCHAR(255) NOT NULL 'flink',`address` 
VARCHAR(1024),`phone_number` VARCHAR(512)}, primaryKeys=id, options=()}",
+                "DataChangeEvent{tableId=%s.customers, before=[], after=[104, 
user_4, Shanghai, 123567891234], op=INSERT, meta=()}",
+                "DataChangeEvent{tableId=%s.customers, before=[], after=[103, 
user_3, Shanghai, 123567891234], op=INSERT, meta=()}",
+                "DataChangeEvent{tableId=%s.customers, before=[], after=[102, 
user_2, Shanghai, 123567891234], op=INSERT, meta=()}",
+                "DataChangeEvent{tableId=%s.customers, before=[], after=[101, 
user_1, Shanghai, 123567891234], op=INSERT, meta=()}",
+                "CreateTableEvent{tableId=%s.products, schema=columns={`id` 
INT NOT NULL,`name` VARCHAR(255) NOT NULL 'flink',`description` 
VARCHAR(512),`weight` FLOAT,`enum_c` STRING 'red',`json_c` STRING,`point_c` 
STRING}, primaryKeys=id, options=()}",
+                "DataChangeEvent{tableId=%s.products, before=[], after=[109, 
spare tire, 24 inch spare tire, 22.2, null, null, null], op=INSERT, meta=()}",
+                "DataChangeEvent{tableId=%s.products, before=[], after=[107, 
rocks, box of assorted rocks, 5.3, null, null, null], op=INSERT, meta=()}",
+                "DataChangeEvent{tableId=%s.products, before=[], after=[108, 
jacket, water resistent black wind breaker, 0.1, null, null, null], op=INSERT, 
meta=()}",
+                "DataChangeEvent{tableId=%s.products, before=[], after=[105, 
hammer, 14oz carpenter's hammer, 0.875, red, {\"k1\": \"v1\", \"k2\": \"v2\"}, 
{\"coordinates\":[5,5],\"type\":\"Point\",\"srid\":0}], op=INSERT, meta=()}",
+                "DataChangeEvent{tableId=%s.products, before=[], after=[106, 
hammer, 16oz carpenter's hammer, 1.0, null, null, null], op=INSERT, meta=()}",
+                "DataChangeEvent{tableId=%s.products, before=[], after=[103, 
12-pack drill bits, 12-pack of drill bits with sizes ranging from #40 to #3, 
0.8, red, {\"key3\": \"value3\"}, 
{\"coordinates\":[3,3],\"type\":\"Point\",\"srid\":0}], op=INSERT, meta=()}",
+                "DataChangeEvent{tableId=%s.products, before=[], after=[104, 
hammer, 12oz carpenter's hammer, 0.75, white, {\"key4\": \"value4\"}, 
{\"coordinates\":[4,4],\"type\":\"Point\",\"srid\":0}], op=INSERT, meta=()}",
+                "DataChangeEvent{tableId=%s.products, before=[], after=[101, 
scooter, Small 2-wheel scooter, 3.14, red, {\"key1\": \"value1\"}, 
{\"coordinates\":[1,1],\"type\":\"Point\",\"srid\":0}], op=INSERT, meta=()}",
+                "DataChangeEvent{tableId=%s.products, before=[], after=[102, 
car battery, 12V car battery, 8.1, white, {\"key2\": \"value2\"}, 
{\"coordinates\":[2,2],\"type\":\"Point\",\"srid\":0}], op=INSERT, meta=()}");
+    }
+
+    private void validateResult(String applicationId, String... 
expectedEvents) {
+        String dbName = mysqlInventoryDatabase.getDatabaseName();
+        List<String> expectedEventsList =
+                Arrays.stream(expectedEvents)
+                        .map(event -> String.format(event, dbName, dbName))
+                        .collect(Collectors.toList());
+        List<String> taskManagerOutContent = 
getTaskManagerOutContent(applicationId);
+        
assertThat(taskManagerOutContent).containsExactlyInAnyOrderElementsOf(expectedEventsList);
+    }
+
+    public static List<String> getTaskManagerOutContent(String applicationId) {
+        Path resource =
+                TestUtils.getResource(
+                        
YARN_CONFIGURATION.get(PipelineTestOnYarnEnvironment.TEST_CLUSTER_NAME_KEY),
+                        
"flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/target");
+        try (Stream<Path> taskManagerOutFilePath = Files.walk(resource)) {
+            Optional<File> taskManagerOutFile =
+                    taskManagerOutFilePath
+                            .filter(
+                                    path ->
+                                            
path.getFileName().toString().equals("taskmanager.out")
+                                                    && 
path.toString().contains(applicationId))
+                            .map(Path::toFile)
+                            .findFirst();
+
+            if (taskManagerOutFile.isPresent()) {
+                return FileUtils.readLines(taskManagerOutFile.get(), 
Charset.defaultCharset());
+            } else {
+                throw new FileNotFoundException(
+                        String.format("taskmanager.out is not existed for %s", 
applicationId));
+            }
+        } catch (IOException e) {
+            throw new RuntimeException(
+                    String.format(
+                            "Could not search for %s directory.",
+                            YARN_CONFIGURATION.get(
+                                    
PipelineTestOnYarnEnvironment.TEST_CLUSTER_NAME_KEY)));
+        }
+    }
+}
diff --git 
a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/utils/PipelineTestOnYarnEnvironment.java
 
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/utils/PipelineTestOnYarnEnvironment.java
new file mode 100644
index 000000000..e9998ad40
--- /dev/null
+++ 
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/utils/PipelineTestOnYarnEnvironment.java
@@ -0,0 +1,322 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.pipeline.tests.utils;
+
+import org.apache.flink.api.common.time.Deadline;
+import org.apache.flink.cdc.common.test.utils.TestUtils;
+import org.apache.flink.cdc.common.utils.Preconditions;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.core.testutils.CommonTestUtils;
+import org.apache.flink.util.TestLogger;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.service.Service;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.MiniYARNCluster;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.io.TempDir;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.StandardOpenOption;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+import static java.lang.Thread.sleep;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Fail.fail;
+
+/** Test environment running pipeline job on YARN mini-cluster. */
+public class PipelineTestOnYarnEnvironment extends TestLogger {
+    private static final Logger LOG = 
LoggerFactory.getLogger(PipelineTestOnYarnEnvironment.class);
+
+    protected static final YarnConfiguration YARN_CONFIGURATION;
+    private YarnClient yarnClient = null;
+    protected static MiniYARNCluster yarnCluster = null;
+
+    protected static final String TEST_CLUSTER_NAME_KEY = 
"flink-yarn-minicluster-name";
+    protected static final int NUM_NODEMANAGERS = 2;
+
+    protected static File yarnSiteXML = null;
+
+    @TempDir Path temporaryFolder;
+
+    private static final Duration yarnAppTerminateTimeout = 
Duration.ofSeconds(120);
+    private static final int sleepIntervalInMS = 100;
+
+    // copy from org.apache.flink.yarn.YarnTestBase
+    static {
+        YARN_CONFIGURATION = new YarnConfiguration();
+        
YARN_CONFIGURATION.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 
32);
+        YARN_CONFIGURATION.setInt(
+                YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB,
+                4096); // 4096 is the available memory anyways
+        YARN_CONFIGURATION.setBoolean(
+                YarnConfiguration.RM_SCHEDULER_INCLUDE_PORT_IN_NODE_NAME, 
true);
+        YARN_CONFIGURATION.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2);
+        
YARN_CONFIGURATION.setInt(YarnConfiguration.RM_MAX_COMPLETED_APPLICATIONS, 2);
+        
YARN_CONFIGURATION.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES,
 4);
+        YARN_CONFIGURATION.setInt(YarnConfiguration.DEBUG_NM_DELETE_DELAY_SEC, 
3600);
+        
YARN_CONFIGURATION.setBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, false);
+        YARN_CONFIGURATION.setInt(
+                YarnConfiguration.NM_VCORES, 666); // memory is overwritten in 
the MiniYARNCluster.
+        // so we have to change the number of cores for testing.
+        YARN_CONFIGURATION.setFloat(
+                YarnConfiguration.NM_MAX_PER_DISK_UTILIZATION_PERCENTAGE, 
99.0F);
+        YARN_CONFIGURATION.set(YarnConfiguration.YARN_APPLICATION_CLASSPATH, 
getYarnClasspath());
+        YARN_CONFIGURATION.setInt(
+                YarnConfiguration.RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_MS, 
1000);
+        
YARN_CONFIGURATION.setInt(YarnConfiguration.RESOURCEMANAGER_CONNECT_MAX_WAIT_MS,
 5000);
+        YARN_CONFIGURATION.set(TEST_CLUSTER_NAME_KEY, 
"flink-yarn-tests-application");
+    }
+
+    @BeforeEach
+    public void setupYarnClient() throws Exception {
+        if (yarnClient == null) {
+            yarnClient = YarnClient.createYarnClient();
+            yarnClient.init(getYarnConfiguration());
+            yarnClient.start();
+        }
+    }
+
+    @AfterEach
+    public void shutdownYarnClient() {
+        yarnClient.stop();
+    }
+
+    @AfterAll
+    public static void teardown() {
+
+        if (yarnCluster != null) {
+            LOG.info("Stopping MiniYarn Cluster");
+            yarnCluster.stop();
+            yarnCluster = null;
+        }
+
+        // Unset FLINK_CONF_DIR, as it might change the behavior of other tests
+        Map<String, String> map = new HashMap<>(System.getenv());
+        map.remove(ConfigConstants.ENV_FLINK_CONF_DIR);
+        map.remove("YARN_CONF_DIR");
+        map.remove("IN_TESTS");
+        CommonTestUtils.setEnv(map);
+
+        if (yarnSiteXML != null) {
+            yarnSiteXML.delete();
+        }
+    }
+
+    protected static YarnConfiguration getYarnConfiguration() {
+        return YARN_CONFIGURATION;
+    }
+
+    public static void startMiniYARNCluster() {
+        try {
+            if (yarnCluster == null) {
+                final String testName =
+                        
YARN_CONFIGURATION.get(PipelineTestOnYarnEnvironment.TEST_CLUSTER_NAME_KEY);
+                yarnCluster =
+                        new MiniYARNCluster(
+                                testName == null ? "YarnTest_" + 
UUID.randomUUID() : testName,
+                                NUM_NODEMANAGERS,
+                                1,
+                                1);
+
+                yarnCluster.init(YARN_CONFIGURATION);
+                yarnCluster.start();
+            }
+
+            File targetTestClassesFolder = new File("target/test-classes");
+            writeYarnSiteConfigXML(YARN_CONFIGURATION, 
targetTestClassesFolder);
+
+            Map<String, String> map = new HashMap<String, 
String>(System.getenv());
+            map.put(
+                    "IN_TESTS",
+                    "yes we are in tests"); // see YarnClusterDescriptor() for 
more infos
+            map.put("YARN_CONF_DIR", 
targetTestClassesFolder.getAbsolutePath());
+            map.put("MAX_LOG_FILE_NUMBER", "10");
+            CommonTestUtils.setEnv(map);
+
+            
assertThat(yarnCluster.getServiceState()).isEqualTo(Service.STATE.STARTED);
+            // wait for the NodeManagers to connect
+            while (!yarnCluster.waitForNodeManagersToConnect(500)) {
+                LOG.info("Waiting for NodeManagers to connect");
+            }
+        } catch (Exception e) {
+            fail("Starting MiniYARNCluster failed: ", e);
+        }
+    }
+
+    // write yarn-site.xml to target/test-classes so that flink pick can pick 
up this when
+    // initializing YarnClient properly from classpath
+    public static void writeYarnSiteConfigXML(Configuration yarnConf, File 
targetFolder)
+            throws IOException {
+        yarnSiteXML = new File(targetFolder, "/yarn-site.xml");
+        try (FileWriter writer = new FileWriter(yarnSiteXML)) {
+            yarnConf.writeXml(writer);
+            writer.flush();
+        }
+    }
+
+    public String submitPipelineJob(String pipelineJob, Path... jars) throws 
Exception {
+        ProcessBuilder processBuilder = new ProcessBuilder();
+        Map<String, String> env = getEnv();
+        processBuilder.environment().putAll(env);
+        Path yamlScript = temporaryFolder.resolve("mysql-to-values.yml");
+        Files.write(yamlScript, pipelineJob.getBytes());
+
+        List<String> commandList = new ArrayList<>();
+        commandList.add(env.get("FLINK_CDC_HOME") + "/bin/flink-cdc.sh");
+        commandList.add("-t");
+        commandList.add("yarn-application");
+        commandList.add(yamlScript.toAbsolutePath().toString());
+        for (Path jar : jars) {
+            commandList.add("--jar");
+            commandList.add(jar.toString());
+        }
+
+        processBuilder.command(commandList);
+        LOG.info("starting flink-cdc task with flink on yarn-application");
+        Process process = processBuilder.start();
+        process.waitFor();
+        String applicationIdStr = getApplicationId(process);
+        Preconditions.checkNotNull(
+                applicationIdStr, "applicationId should not be null, please 
check logs");
+        ApplicationId applicationId = 
ApplicationId.fromString(applicationIdStr);
+        waitApplicationFinished(applicationId, yarnAppTerminateTimeout, 
sleepIntervalInMS);
+        LOG.info("started flink-cdc task with flink on yarn-application");
+        return applicationIdStr;
+    }
+
+    public Map<String, String> getEnv() {
+        Path flinkHome =
+                TestUtils.getResource(
+                        "flink-\\d+(\\.\\d+)*$",
+                        
"flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/target");
+        Map<String, String> env = new HashMap<>();
+        env.put("FLINK_HOME", flinkHome.toString());
+        env.put("FLINK_CONF_DIR", flinkHome.resolve("conf").toString());
+        addFlinkConf(flinkHome.resolve("conf").resolve("config.yaml"));
+        Path flinkcdcHome =
+                TestUtils.getResource("flink-cdc-\\d+(\\.\\d+)*(-SNAPSHOT)?$", 
"flink-cdc-dist");
+        env.put("FLINK_CDC_HOME", flinkcdcHome.toString());
+        env.put("HADOOP_CLASSPATH", getYarnClasspath());
+        return env;
+    }
+
+    public void addFlinkConf(Path flinkConf) {
+        Map<String, String> configToAppend = new HashMap<>();
+        configToAppend.put("akka.ask.timeout", "100s");
+        configToAppend.put("web.timeout", "1000000");
+        configToAppend.put("taskmanager.slot.timeout", "1000s");
+        configToAppend.put("slot.request.timeout", "120000");
+        try {
+            if (!Files.exists(flinkConf)) {
+                throw new FileNotFoundException("config.yaml not found at " + 
flinkConf);
+            }
+            List<String> lines = new 
ArrayList<>(Files.readAllLines(flinkConf));
+            for (Map.Entry<String, String> entry : configToAppend.entrySet()) {
+                lines.add(entry.getKey() + ": " + entry.getValue());
+            }
+            Files.write(
+                    flinkConf,
+                    lines,
+                    StandardOpenOption.WRITE,
+                    StandardOpenOption.TRUNCATE_EXISTING);
+        } catch (IOException e) {
+            throw new RuntimeException("Failed to append configuration to 
config.yaml", e);
+        }
+    }
+
+    public String getApplicationId(Process process) throws IOException {
+        BufferedReader reader = new BufferedReader(new 
InputStreamReader(process.getInputStream()));
+        String line;
+        while ((line = reader.readLine()) != null) {
+            LOG.info(line);
+            if (line.startsWith("Job ID")) {
+                return line.split(":")[1].trim();
+            }
+        }
+        return null;
+    }
+
+    protected void waitApplicationFinished(
+            ApplicationId applicationId, Duration timeout, int 
sleepIntervalInMS) throws Exception {
+        Deadline deadline = Deadline.now().plus(timeout);
+        YarnApplicationState state =
+                
getYarnClient().getApplicationReport(applicationId).getYarnApplicationState();
+
+        while (state != YarnApplicationState.FINISHED) {
+            if (state == YarnApplicationState.FAILED || state == 
YarnApplicationState.KILLED) {
+                fail("Application became FAILED or KILLED while expecting 
FINISHED");
+            }
+
+            if (deadline.isOverdue()) {
+                getYarnClient().killApplication(applicationId);
+                fail("Application didn't finish before timeout");
+            }
+
+            sleep(sleepIntervalInMS);
+            state = 
getYarnClient().getApplicationReport(applicationId).getYarnApplicationState();
+        }
+    }
+
+    @Nullable
+    protected YarnClient getYarnClient() {
+        return yarnClient;
+    }
+
+    /**
+     * Searches for the yarn.classpath file generated by the 
"dependency:build-classpath" maven
+     * plugin in "flink-yarn-tests".
+     *
+     * @return a classpath suitable for running all YARN-launched JVMs
+     */
+    private static String getYarnClasspath() {
+        Path yarnClasspathFile = TestUtils.getResource("yarn.classpath");
+        try {
+            return FileUtils.readFileToString(yarnClasspathFile.toFile(), 
StandardCharsets.UTF_8);
+        } catch (Throwable t) {
+            LOG.error(
+                    "Error while getting YARN classpath in {}",
+                    yarnClasspathFile.toFile().getAbsoluteFile(),
+                    t);
+            throw new RuntimeException("Error while getting YARN classpath", 
t);
+        }
+    }
+}

Reply via email to