This is an automated email from the ASF dual-hosted git repository. leonard pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink-cdc.git
The following commit(s) were added to refs/heads/master by this push: new 5e04bc54e [FLINK-35360][cli] Support Yarn application mode for yaml job 5e04bc54e is described below commit 5e04bc54ef14db9729d9c1012b680827d786f968 Author: MOBIN <18814118...@163.com> AuthorDate: Thu Mar 27 18:18:09 2025 +0800 [FLINK-35360][cli] Support Yarn application mode for yaml job This closes #3643. Co-authored-by: ouyangwulink <mr.art...@gmail.com> --- docs/content.zh/docs/deployment/yarn.md | 28 +- docs/content/docs/deployment/yarn.md | 27 +- flink-cdc-cli/pom.xml | 7 + .../java/org/apache/flink/cdc/cli/CliExecutor.java | 111 +++---- .../java/org/apache/flink/cdc/cli/CliFrontend.java | 44 ++- .../cdc/cli/parser/PipelineDefinitionParser.java | 3 +- .../cli/parser/YamlPipelineDefinitionParser.java | 8 +- .../flink/cdc/cli/utils/ConfigurationUtils.java | 19 +- .../flink/cdc/cli/utils/FlinkEnvironmentUtils.java | 35 +-- .../org/apache/flink/cdc/cli/CliFrontendTest.java | 20 +- .../parser/YamlPipelineDefinitionParserTest.java | 26 +- .../cdc/cli/utils/ConfigurationUtilsTest.java | 5 +- flink-cdc-composer/pom.xml | 6 + .../cdc/composer/PipelineDeploymentExecutor.java | 7 +- .../cdc/composer/flink/FlinkPipelineComposer.java | 18 +- .../flink/deployment/ComposeDeployment.java | 62 ++++ .../flink/deployment/ComposeDeploymentFactory.java | 35 --- .../K8SApplicationDeploymentExecutor.java | 17 +- .../YarnApplicationDeploymentExecutor.java | 155 ++++++++++ .../flink/deployment/ComposeDeploymentTest.java | 40 +++ .../flink/cdc/common/test/utils/TestUtils.java | 32 +- .../flink-cdc-pipeline-e2e-tests/pom.xml | 175 +++++++++++ .../tests/MysqlE2eWithYarnApplicationITCase.java | 191 ++++++++++++ .../tests/utils/PipelineTestOnYarnEnvironment.java | 322 +++++++++++++++++++++ 24 files changed, 1185 insertions(+), 208 deletions(-) diff --git a/docs/content.zh/docs/deployment/yarn.md b/docs/content.zh/docs/deployment/yarn.md index 4a1889791..89d587d5b 100644 --- a/docs/content.zh/docs/deployment/yarn.md +++ b/docs/content.zh/docs/deployment/yarn.md @@ -42,23 +42,20 @@ Flink 可以根据在 JobManager 上运行的作业处理所需的 slot 数量 ```bash export HADOOP_CLASSPATH=`hadoop classpath` ``` - -## Session 模式 - Flink 在所有类 UNIX 的环境中运行,即在 Linux、Mac OS X 以及(针对 Windows 的)Cygwin 上运行。 你可以参考[概览]({{< ref "docs/connectors/pipeline-connectors/overview" >}})来检查支持的版本并下载[Flink二进制版本](https://flink.apache.org/downloads/), 然后解压文件: - ```bash tar -xzf flink-*.tgz ``` - 你需要设置 `FLINK_HOME` 环境变量,比如: ```bash export FLINK_HOME=/path/flink-* ``` +## Session 模式 + ### 在 YARN 启动一个Flink Session 确保已设置 `HADOOP_CLASSPATH` 环境变量,即可在 YARN 会话启动一个 Flink 任务: @@ -150,4 +147,23 @@ Job Description: Sync MySQL Database to Doris 你可以通过 Flink Web UI 找到一个名为 `Sync MySQL Database to Doris` 的作业。 -请注意,目前还不支持提交至 application 模式集群和 per-job 模式集群。 +# Yarn Application 模式 +Yarn Application 模式是 Yarn 集群上运行 Flink 作业的推荐模式。对资源的管理和分配更加灵活,可以更好地利用集群资源。 + +通过Cli将作业提交至 Flink Yarn Application 集群。 +```bash +cd /path/flink-cdc-* +./bin/flink-cdc.sh -t yarn-application -Dexecution.checkpointing.interval=2min mysql-to-doris.yaml +``` +或者从savepoint恢复Flink-CDC作业: +```bash +cd /path/flink-cdc-* +./bin/flink-cdc.sh -t yarn-application -s hdfs:///flink/savepoint-1537 -Dexecution.checkpointing.interval=2min mysql-to-doris.yaml +``` +提交成功将返回如下信息: +```bash +Pipeline has been submitted to cluster. +Job ID: application_1728995081590_1254 +Job Description: submit job successful +``` +你可以通过 Yarn Web UI 找到一个application_id为 `application_1728995081590_1254` 的作业。 diff --git a/docs/content/docs/deployment/yarn.md b/docs/content/docs/deployment/yarn.md index 068f9f218..4969ccf22 100644 --- a/docs/content/docs/deployment/yarn.md +++ b/docs/content/docs/deployment/yarn.md @@ -42,9 +42,6 @@ This *Getting Started* section assumes a functional YARN environment, starting f ```bash export HADOOP_CLASSPATH=`hadoop classpath` ``` - -## Session Mode - Flink runs on all UNIX-like environments, i.e. Linux, Mac OS X, and Cygwin (for Windows). You can refer [overview]({{< ref "docs/connectors/pipeline-connectors/overview" >}}) to check supported versions and download [the binary release](https://flink.apache.org/downloads/) of Flink, then extract the archive: @@ -59,6 +56,8 @@ You should set `FLINK_HOME` environment variables like: export FLINK_HOME=/path/flink-* ``` +## Session Mode + ### Starting a Flink Session on YARN Once you've made sure that the `HADOOP_CLASSPATH` environment variable is set, you can launch a Flink on YARN session: @@ -150,4 +149,24 @@ Job Description: Sync MySQL Database to Doris You can find a job named `Sync MySQL Database to Doris` running through Flink Web UI. -Please note that submitting to application mode cluster and per-job mode cluster are not supported for now. +# Yarn Application Mode +The Yarn Application mode is the recommended approach for running Flink jobs on a Yarn cluster. It offers more flexible resource management and allocation, enabling better utilization of cluster resources. + +To submit a job to a Flink Yarn Application cluster using the CLI: +```bash +cd /path/flink-cdc-* +./bin/flink-cdc.sh -t yarn-application -Dexecution.checkpointing.interval=2min mysql-to-doris.yaml +```` +Or resuming Flink-CDC job from Savepoint: +```bash +cd /path/flink-cdc-* +./bin/flink-cdc.sh -t yarn-application -s hdfs:///flink/savepoint-1537 -Dexecution.checkpointing.interval=2min mysql-to-doris.yaml +``` +After successful submission, the return information is as follows: +```bash +Pipeline has been submitted to cluster. +Job ID: application_1728995081590_1254 +Job Description: submit job successful +``` +You can find a application_id `application_1728995081590_1254` running through Yarn Web UI. + diff --git a/flink-cdc-cli/pom.xml b/flink-cdc-cli/pom.xml index c4ec22bbc..922c0f384 100644 --- a/flink-cdc-cli/pom.xml +++ b/flink-cdc-cli/pom.xml @@ -63,6 +63,13 @@ limitations under the License. <scope>provided</scope> </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-yarn</artifactId> + <version>${flink.version}</version> + <scope>provided</scope> + </dependency> + </dependencies> </project> \ No newline at end of file diff --git a/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/CliExecutor.java b/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/CliExecutor.java index 50107da11..ce7d973e2 100644 --- a/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/CliExecutor.java +++ b/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/CliExecutor.java @@ -19,85 +19,103 @@ package org.apache.flink.cdc.cli; import org.apache.flink.cdc.cli.parser.PipelineDefinitionParser; import org.apache.flink.cdc.cli.parser.YamlPipelineDefinitionParser; -import org.apache.flink.cdc.cli.utils.ConfigurationUtils; -import org.apache.flink.cdc.cli.utils.FlinkEnvironmentUtils; import org.apache.flink.cdc.common.annotation.VisibleForTesting; import org.apache.flink.cdc.common.configuration.Configuration; import org.apache.flink.cdc.composer.PipelineComposer; import org.apache.flink.cdc.composer.PipelineDeploymentExecutor; import org.apache.flink.cdc.composer.PipelineExecution; import org.apache.flink.cdc.composer.definition.PipelineDef; -import org.apache.flink.cdc.composer.flink.deployment.ComposeDeploymentFactory; -import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings; +import org.apache.flink.cdc.composer.flink.FlinkPipelineComposer; +import org.apache.flink.cdc.composer.flink.deployment.ComposeDeployment; +import org.apache.flink.cdc.composer.flink.deployment.K8SApplicationDeploymentExecutor; +import org.apache.flink.cdc.composer.flink.deployment.YarnApplicationDeploymentExecutor; +import org.apache.flink.configuration.DeploymentOptions; +import org.apache.flink.core.fs.Path; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.commons.cli.CommandLine; -import java.nio.file.Path; import java.util.List; +import static org.apache.flink.cdc.composer.flink.deployment.ComposeDeployment.REMOTE; + /** Executor for doing the composing and submitting logic for {@link CliFrontend}. */ public class CliExecutor { private final Path pipelineDefPath; - private final Configuration flinkConfig; + private final org.apache.flink.configuration.Configuration flinkConfig; private final Configuration globalPipelineConfig; - private final boolean useMiniCluster; private final List<Path> additionalJars; - + private final Path flinkHome; private final CommandLine commandLine; - private PipelineComposer composer = null; - private final SavepointRestoreSettings savepointSettings; - public CliExecutor( CommandLine commandLine, Path pipelineDefPath, - Configuration flinkConfig, + org.apache.flink.configuration.Configuration flinkConfig, Configuration globalPipelineConfig, - boolean useMiniCluster, List<Path> additionalJars, - SavepointRestoreSettings savepointSettings) { + Path flinkHome) { this.commandLine = commandLine; this.pipelineDefPath = pipelineDefPath; this.flinkConfig = flinkConfig; this.globalPipelineConfig = globalPipelineConfig; - this.useMiniCluster = useMiniCluster; this.additionalJars = additionalJars; - this.savepointSettings = savepointSettings; + this.flinkHome = flinkHome; } public PipelineExecution.ExecutionInfo run() throws Exception { // Create Submit Executor to deployment flink cdc job Or Run Flink CDC Job - boolean isDeploymentMode = ConfigurationUtils.isDeploymentMode(commandLine); - if (isDeploymentMode) { - ComposeDeploymentFactory composeDeploymentFactory = new ComposeDeploymentFactory(); - PipelineDeploymentExecutor composeExecutor = - composeDeploymentFactory.getFlinkComposeExecutor(commandLine); - org.apache.flink.configuration.Configuration configuration = - org.apache.flink.configuration.Configuration.fromMap(flinkConfig.toMap()); - SavepointRestoreSettings.toConfiguration(savepointSettings, configuration); - return composeExecutor.deploy(commandLine, configuration, additionalJars); - } else { - // Run CDC Job And Parse pipeline definition file - PipelineDefinitionParser pipelineDefinitionParser = new YamlPipelineDefinitionParser(); - PipelineDef pipelineDef = - pipelineDefinitionParser.parse(pipelineDefPath, globalPipelineConfig); - // Create composer - PipelineComposer composer = getComposer(); - // Compose pipeline - PipelineExecution execution = composer.compose(pipelineDef); - // Execute or submit the pipeline - return execution.execute(); + String deploymentTargetStr = getDeploymentTarget(); + ComposeDeployment deploymentTarget = + ComposeDeployment.getDeploymentFromName(deploymentTargetStr); + switch (deploymentTarget) { + case KUBERNETES_APPLICATION: + return deployWithApplicationComposer(new K8SApplicationDeploymentExecutor()); + case YARN_APPLICATION: + return deployWithApplicationComposer(new YarnApplicationDeploymentExecutor()); + case LOCAL: + return deployWithComposer(FlinkPipelineComposer.ofMiniCluster()); + case REMOTE: + case YARN_SESSION: + return deployWithComposer( + FlinkPipelineComposer.ofRemoteCluster(flinkConfig, additionalJars)); + default: + throw new IllegalArgumentException( + String.format( + "Deployment target %s is not supported", deploymentTargetStr)); } } - private PipelineComposer getComposer() throws Exception { - if (composer == null) { - return FlinkEnvironmentUtils.createComposer( - useMiniCluster, flinkConfig, additionalJars, savepointSettings); - } - return composer; + private PipelineExecution.ExecutionInfo deployWithApplicationComposer( + PipelineDeploymentExecutor composeExecutor) throws Exception { + return composeExecutor.deploy(commandLine, flinkConfig, additionalJars, flinkHome); + } + + private PipelineExecution.ExecutionInfo deployWithComposer(PipelineComposer composer) + throws Exception { + PipelineDefinitionParser pipelineDefinitionParser = new YamlPipelineDefinitionParser(); + PipelineDef pipelineDef = + pipelineDefinitionParser.parse(pipelineDefPath, globalPipelineConfig); + PipelineExecution execution = composer.compose(pipelineDef); + return execution.execute(); + } + + @VisibleForTesting + public PipelineExecution.ExecutionInfo deployWithNoOpComposer() throws Exception { + return deployWithComposer(this.composer); + } + + // The main class for running application mode + public static void main(String[] args) throws Exception { + PipelineDefinitionParser pipelineDefinitionParser = new YamlPipelineDefinitionParser(); + PipelineDef pipelineDef = pipelineDefinitionParser.parse(args[0], new Configuration()); + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + FlinkPipelineComposer flinkPipelineComposer = + FlinkPipelineComposer.ofApplicationCluster(env); + PipelineExecution execution = flinkPipelineComposer.compose(pipelineDef); + execution.execute(); } @VisibleForTesting @@ -106,7 +124,7 @@ public class CliExecutor { } @VisibleForTesting - public Configuration getFlinkConfig() { + public org.apache.flink.configuration.Configuration getFlinkConfig() { return flinkConfig; } @@ -120,12 +138,7 @@ public class CliExecutor { return additionalJars; } - @VisibleForTesting public String getDeploymentTarget() { - return commandLine.getOptionValue("target"); - } - - public SavepointRestoreSettings getSavepointSettings() { - return savepointSettings; + return flinkConfig.get(DeploymentOptions.TARGET); } } diff --git a/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/CliFrontend.java b/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/CliFrontend.java index 40b4061f6..5f327eca2 100644 --- a/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/CliFrontend.java +++ b/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/CliFrontend.java @@ -25,9 +25,13 @@ import org.apache.flink.cdc.common.configuration.ConfigOptions; import org.apache.flink.cdc.common.configuration.Configuration; import org.apache.flink.cdc.common.utils.StringUtils; import org.apache.flink.cdc.composer.PipelineExecution; +import org.apache.flink.configuration.DeploymentOptions; +import org.apache.flink.core.fs.Path; import org.apache.flink.runtime.jobgraph.SavepointConfigOptions; import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings; +import org.apache.flink.shaded.guava31.com.google.common.base.Joiner; + import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.CommandLineParser; import org.apache.commons.cli.DefaultParser; @@ -36,9 +40,8 @@ import org.apache.commons.cli.Options; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.File; import java.lang.reflect.InvocationTargetException; -import java.nio.file.Path; -import java.nio.file.Paths; import java.util.Arrays; import java.util.List; import java.util.Optional; @@ -49,6 +52,10 @@ import static org.apache.flink.cdc.cli.CliFrontendOptions.FLINK_CONFIG; import static org.apache.flink.cdc.cli.CliFrontendOptions.SAVEPOINT_ALLOW_NON_RESTORED_OPTION; import static org.apache.flink.cdc.cli.CliFrontendOptions.SAVEPOINT_CLAIM_MODE; import static org.apache.flink.cdc.cli.CliFrontendOptions.SAVEPOINT_PATH_OPTION; +import static org.apache.flink.cdc.cli.CliFrontendOptions.TARGET; +import static org.apache.flink.cdc.cli.CliFrontendOptions.USE_MINI_CLUSTER; +import static org.apache.flink.cdc.composer.flink.deployment.ComposeDeployment.LOCAL; +import static org.apache.flink.cdc.composer.flink.deployment.ComposeDeployment.REMOTE; /** The frontend entrypoint for the command-line interface of Flink CDC. */ public class CliFrontend { @@ -86,7 +93,7 @@ public class CliFrontend { "Missing pipeline definition file path in arguments. "); } - Path pipelineDefPath = Paths.get(unparsedArgs.get(0)); + Path pipelineDefPath = new Path(unparsedArgs.get(0)); // Take the first unparsed argument as the pipeline definition file LOG.info("Real Path pipelineDefPath {}", pipelineDefPath); // Global pipeline configuration @@ -94,13 +101,17 @@ public class CliFrontend { // Load Flink environment Path flinkHome = getFlinkHome(commandLine); - Configuration flinkConfig = FlinkEnvironmentUtils.loadFlinkConfiguration(flinkHome); + Configuration configuration = FlinkEnvironmentUtils.loadFlinkConfiguration(flinkHome); // To override the Flink configuration - overrideFlinkConfiguration(flinkConfig, commandLine); + overrideFlinkConfiguration(configuration, commandLine); + + org.apache.flink.configuration.Configuration flinkConfig = + org.apache.flink.configuration.Configuration.fromMap(configuration.toMap()); // Savepoint SavepointRestoreSettings savepointSettings = createSavepointRestoreSettings(commandLine); + SavepointRestoreSettings.toConfiguration(savepointSettings, flinkConfig); // Additional JARs List<Path> additionalJars = @@ -108,7 +119,7 @@ public class CliFrontend { Optional.ofNullable( commandLine.getOptionValues(CliFrontendOptions.JAR)) .orElse(new String[0])) - .map(Paths::get) + .map(Path::new) .collect(Collectors.toList()); // Build executor @@ -117,13 +128,21 @@ public class CliFrontend { pipelineDefPath, flinkConfig, globalPipelineConfig, - commandLine.hasOption(CliFrontendOptions.USE_MINI_CLUSTER), additionalJars, - savepointSettings); + flinkHome); } private static void overrideFlinkConfiguration( Configuration flinkConfig, CommandLine commandLine) { + + String target = + commandLine.hasOption(USE_MINI_CLUSTER) + ? LOCAL.getName() + : commandLine.getOptionValue(TARGET, REMOTE.getName()); + flinkConfig.set( + ConfigOptions.key(DeploymentOptions.TARGET.key()).stringType().defaultValue(target), + target); + Properties properties = commandLine.getOptionProperties(FLINK_CONFIG.getOpt()); LOG.info("Dynamic flink config items found: {}", properties); for (String key : properties.stringPropertyNames()) { @@ -194,14 +213,14 @@ public class CliFrontend { String flinkHomeFromArgs = commandLine.getOptionValue(CliFrontendOptions.FLINK_HOME); if (flinkHomeFromArgs != null) { LOG.debug("Flink home is loaded by command-line argument: {}", flinkHomeFromArgs); - return Paths.get(flinkHomeFromArgs); + return new Path(flinkHomeFromArgs); } // Fallback to environment variable String flinkHomeFromEnvVar = System.getenv(FLINK_HOME_ENV_VAR); if (flinkHomeFromEnvVar != null) { LOG.debug("Flink home is loaded by environment variable: {}", flinkHomeFromEnvVar); - return Paths.get(flinkHomeFromEnvVar); + return new Path(flinkHomeFromEnvVar); } throw new IllegalArgumentException( @@ -214,7 +233,7 @@ public class CliFrontend { // Try to get global config path from command line String globalConfig = commandLine.getOptionValue(CliFrontendOptions.GLOBAL_CONFIG); if (globalConfig != null) { - Path globalConfigPath = Paths.get(globalConfig); + Path globalConfigPath = new Path(globalConfig); LOG.info("Using global config in command line: {}", globalConfigPath); return ConfigurationUtils.loadConfigFile(globalConfigPath); } @@ -223,7 +242,8 @@ public class CliFrontend { String flinkCdcHome = System.getenv(FLINK_CDC_HOME_ENV_VAR); if (flinkCdcHome != null) { Path globalConfigPath = - Paths.get(flinkCdcHome).resolve("conf").resolve("flink-cdc.yaml"); + new Path( + flinkCdcHome, Joiner.on(File.separator).join("conf", "flink-cdc.yaml")); LOG.info("Using global config in FLINK_CDC_HOME: {}", globalConfigPath); return ConfigurationUtils.loadConfigFile(globalConfigPath); } diff --git a/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/parser/PipelineDefinitionParser.java b/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/parser/PipelineDefinitionParser.java index bfcde27bd..c86002214 100644 --- a/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/parser/PipelineDefinitionParser.java +++ b/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/parser/PipelineDefinitionParser.java @@ -19,8 +19,7 @@ package org.apache.flink.cdc.cli.parser; import org.apache.flink.cdc.common.configuration.Configuration; import org.apache.flink.cdc.composer.definition.PipelineDef; - -import java.nio.file.Path; +import org.apache.flink.core.fs.Path; /** Parsing pipeline definition files and generate {@link PipelineDef}. */ public interface PipelineDefinitionParser { diff --git a/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParser.java b/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParser.java index b594aa35b..5d00d5394 100644 --- a/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParser.java +++ b/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParser.java @@ -30,6 +30,9 @@ import org.apache.flink.cdc.composer.definition.SinkDef; import org.apache.flink.cdc.composer.definition.SourceDef; import org.apache.flink.cdc.composer.definition.TransformDef; import org.apache.flink.cdc.composer.definition.UdfDef; +import org.apache.flink.core.fs.FSDataInputStream; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.type.TypeReference; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode; @@ -37,7 +40,6 @@ import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMap import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.yaml.YAMLFactory; -import java.nio.file.Path; import java.util.ArrayList; import java.util.Arrays; import java.util.List; @@ -103,7 +105,9 @@ public class YamlPipelineDefinitionParser implements PipelineDefinitionParser { @Override public PipelineDef parse(Path pipelineDefPath, Configuration globalPipelineConfig) throws Exception { - return parse(mapper.readTree(pipelineDefPath.toFile()), globalPipelineConfig); + FileSystem fileSystem = FileSystem.get(pipelineDefPath.toUri()); + FSDataInputStream pipelineInStream = fileSystem.open(pipelineDefPath); + return parse(mapper.readTree(pipelineInStream), globalPipelineConfig); } @Override diff --git a/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/utils/ConfigurationUtils.java b/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/utils/ConfigurationUtils.java index af1bc7318..7280c0679 100644 --- a/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/utils/ConfigurationUtils.java +++ b/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/utils/ConfigurationUtils.java @@ -18,18 +18,13 @@ package org.apache.flink.cdc.cli.utils; import org.apache.flink.cdc.common.configuration.Configuration; -import org.apache.flink.client.deployment.executors.LocalExecutor; -import org.apache.flink.client.deployment.executors.RemoteExecutor; +import org.apache.flink.core.fs.Path; -import org.apache.commons.cli.CommandLine; - -import java.nio.file.Path; +import java.io.File; import java.util.HashMap; import java.util.List; import java.util.Map; -import static org.apache.flink.cdc.cli.CliFrontendOptions.TARGET; - /** Utilities for handling {@link Configuration}. */ public class ConfigurationUtils { @@ -42,7 +37,8 @@ public class ConfigurationUtils { public static Configuration loadConfigFile(Path configPath, boolean allowDuplicateKeys) throws Exception { Map<String, Object> configMap = - YamlParserUtils.loadYamlFile(configPath.toFile(), allowDuplicateKeys); + YamlParserUtils.loadYamlFile( + new File(configPath.toUri().getPath()), allowDuplicateKeys); return Configuration.fromMap(flattenConfigMap(configMap, "")); } @@ -69,13 +65,6 @@ public class ConfigurationUtils { return flattenedMap; } - public static boolean isDeploymentMode(CommandLine commandLine) { - String target = commandLine.getOptionValue(TARGET); - return target != null - && !target.equalsIgnoreCase(LocalExecutor.NAME) - && !target.equalsIgnoreCase(RemoteExecutor.NAME); - } - public static Class<?> getClaimModeClass() { try { return Class.forName("org.apache.flink.core.execution.RestoreMode"); diff --git a/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/utils/FlinkEnvironmentUtils.java b/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/utils/FlinkEnvironmentUtils.java index 198a80c19..f10b2fa06 100644 --- a/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/utils/FlinkEnvironmentUtils.java +++ b/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/utils/FlinkEnvironmentUtils.java @@ -18,14 +18,14 @@ package org.apache.flink.cdc.cli.utils; import org.apache.flink.cdc.common.configuration.Configuration; -import org.apache.flink.cdc.composer.flink.FlinkPipelineComposer; -import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings; +import org.apache.flink.core.fs.Path; + +import org.apache.flink.shaded.guava31.com.google.common.base.Joiner; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.nio.file.Path; -import java.util.List; +import java.io.File; /** Utilities for handling Flink configuration and environment. */ public class FlinkEnvironmentUtils { @@ -36,26 +36,19 @@ public class FlinkEnvironmentUtils { private static final String FLINK_CONF_FILENAME = "config.yaml"; public static Configuration loadFlinkConfiguration(Path flinkHome) throws Exception { - Path flinkConfPath = flinkHome.resolve(FLINK_CONF_DIR).resolve(FLINK_CONF_FILENAME); - if (flinkConfPath.toFile().exists()) { + Path flinkConfPath = + new Path( + flinkHome, + Joiner.on(File.separator).join(FLINK_CONF_DIR, FLINK_CONF_FILENAME)); + if (flinkConfPath.getFileSystem().exists(flinkConfPath)) { return ConfigurationUtils.loadConfigFile(flinkConfPath); } else { return ConfigurationUtils.loadConfigFile( - flinkHome.resolve(FLINK_CONF_DIR).resolve(LEGACY_FLINK_CONF_FILENAME), true); - } - } - - public static FlinkPipelineComposer createComposer( - boolean useMiniCluster, - Configuration flinkConfig, - List<Path> additionalJars, - SavepointRestoreSettings savepointSettings) { - if (useMiniCluster) { - return FlinkPipelineComposer.ofMiniCluster(); + new Path( + flinkHome, + Joiner.on(File.separator) + .join(FLINK_CONF_DIR, LEGACY_FLINK_CONF_FILENAME)), + true); } - org.apache.flink.configuration.Configuration configuration = - org.apache.flink.configuration.Configuration.fromMap(flinkConfig.toMap()); - SavepointRestoreSettings.toConfiguration(savepointSettings, configuration); - return FlinkPipelineComposer.ofRemoteCluster(configuration, additionalJars); } } diff --git a/flink-cdc-cli/src/test/java/org/apache/flink/cdc/cli/CliFrontendTest.java b/flink-cdc-cli/src/test/java/org/apache/flink/cdc/cli/CliFrontendTest.java index 5b8a181e0..b9f42cefc 100644 --- a/flink-cdc-cli/src/test/java/org/apache/flink/cdc/cli/CliFrontendTest.java +++ b/flink-cdc-cli/src/test/java/org/apache/flink/cdc/cli/CliFrontendTest.java @@ -21,6 +21,7 @@ import org.apache.flink.cdc.composer.PipelineComposer; import org.apache.flink.cdc.composer.PipelineExecution; import org.apache.flink.cdc.composer.definition.PipelineDef; import org.apache.flink.core.execution.RestoreMode; +import org.apache.flink.core.fs.Path; import org.apache.flink.shaded.guava31.com.google.common.io.Resources; @@ -37,6 +38,9 @@ import java.net.URL; import java.nio.file.Paths; import java.util.Map; +import static org.apache.flink.configuration.StateRecoveryOptions.RESTORE_MODE; +import static org.apache.flink.configuration.StateRecoveryOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE; +import static org.apache.flink.configuration.StateRecoveryOptions.SAVEPOINT_PATH; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; @@ -101,11 +105,10 @@ class CliFrontendTest { "-cm", "no_claim", "-n"); - assertThat(executor.getSavepointSettings().getRestorePath()) + assertThat(executor.getFlinkConfig().get(SAVEPOINT_PATH)) .isEqualTo(flinkHome() + "/savepoints/savepoint-1"); - assertThat(executor.getSavepointSettings().getRestoreMode()) - .isEqualTo(RestoreMode.NO_CLAIM); - assertThat(executor.getSavepointSettings().allowNonRestoredState()).isTrue(); + assertThat(executor.getFlinkConfig().get(RESTORE_MODE)).isEqualTo(RestoreMode.NO_CLAIM); + assertThat(executor.getFlinkConfig().get(SAVEPOINT_IGNORE_UNCLAIMED_STATE)).isTrue(); } @Test @@ -119,6 +122,11 @@ class CliFrontendTest { "kubernetes-application", "-n"); assertThat(executor.getDeploymentTarget()).isEqualTo("kubernetes-application"); + + executor = + createExecutor( + pipelineDef(), "--flink-home", flinkHome(), "-t", "yarn-application", "-n"); + assertThat(executor.getDeploymentTarget()).isEqualTo("yarn-application"); } @Test @@ -128,7 +136,7 @@ class CliFrontendTest { CliExecutor executor = createExecutor( pipelineDef(), "--flink-home", flinkHome(), "--jar", aJar, "--jar", bJar); - assertThat(executor.getAdditionalJars()).contains(Paths.get(aJar), Paths.get(bJar)); + assertThat(executor.getAdditionalJars()).contains(new Path(aJar), new Path(bJar)); } @Test @@ -142,7 +150,7 @@ class CliFrontendTest { globalPipelineConfig()); NoOpComposer composer = new NoOpComposer(); executor.setComposer(composer); - PipelineExecution.ExecutionInfo executionInfo = executor.run(); + PipelineExecution.ExecutionInfo executionInfo = executor.deployWithNoOpComposer(); assertThat(executionInfo.getId()).isEqualTo("fake-id"); assertThat(executionInfo.getDescription()).isEqualTo("fake-description"); } diff --git a/flink-cdc-cli/src/test/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParserTest.java b/flink-cdc-cli/src/test/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParserTest.java index 27cbbb1de..c8f8acd13 100644 --- a/flink-cdc-cli/src/test/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParserTest.java +++ b/flink-cdc-cli/src/test/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParserTest.java @@ -27,6 +27,7 @@ import org.apache.flink.cdc.composer.definition.SinkDef; import org.apache.flink.cdc.composer.definition.SourceDef; import org.apache.flink.cdc.composer.definition.TransformDef; import org.apache.flink.cdc.composer.definition.UdfDef; +import org.apache.flink.core.fs.Path; import org.apache.flink.shaded.guava31.com.google.common.collect.ImmutableMap; import org.apache.flink.shaded.guava31.com.google.common.collect.ImmutableSet; @@ -35,7 +36,6 @@ import org.apache.flink.shaded.guava31.com.google.common.io.Resources; import org.junit.jupiter.api.Test; import java.net.URL; -import java.nio.file.Paths; import java.time.Duration; import java.time.ZoneId; import java.util.Arrays; @@ -61,7 +61,7 @@ class YamlPipelineDefinitionParserTest { void testParsingFullDefinition() throws Exception { URL resource = Resources.getResource("definitions/pipeline-definition-full.yaml"); YamlPipelineDefinitionParser parser = new YamlPipelineDefinitionParser(); - PipelineDef pipelineDef = parser.parse(Paths.get(resource.toURI()), new Configuration()); + PipelineDef pipelineDef = parser.parse(new Path(resource.toURI()), new Configuration()); assertThat(pipelineDef).isEqualTo(fullDef); } @@ -69,7 +69,7 @@ class YamlPipelineDefinitionParserTest { void testParsingNecessaryOnlyDefinition() throws Exception { URL resource = Resources.getResource("definitions/pipeline-definition-with-optional.yaml"); YamlPipelineDefinitionParser parser = new YamlPipelineDefinitionParser(); - PipelineDef pipelineDef = parser.parse(Paths.get(resource.toURI()), new Configuration()); + PipelineDef pipelineDef = parser.parse(new Path(resource.toURI()), new Configuration()); assertThat(pipelineDef).isEqualTo(defWithOptional); } @@ -77,7 +77,7 @@ class YamlPipelineDefinitionParserTest { void testMinimizedDefinition() throws Exception { URL resource = Resources.getResource("definitions/pipeline-definition-minimized.yaml"); YamlPipelineDefinitionParser parser = new YamlPipelineDefinitionParser(); - PipelineDef pipelineDef = parser.parse(Paths.get(resource.toURI()), new Configuration()); + PipelineDef pipelineDef = parser.parse(new Path(resource.toURI()), new Configuration()); assertThat(pipelineDef).isEqualTo(minimizedDef); } @@ -87,7 +87,7 @@ class YamlPipelineDefinitionParserTest { YamlPipelineDefinitionParser parser = new YamlPipelineDefinitionParser(); PipelineDef pipelineDef = parser.parse( - Paths.get(resource.toURI()), + new Path(resource.toURI()), Configuration.fromMap( ImmutableMap.<String, String>builder() .put("parallelism", "1") @@ -99,7 +99,7 @@ class YamlPipelineDefinitionParserTest { void testEvaluateDefaultLocalTimeZone() throws Exception { URL resource = Resources.getResource("definitions/pipeline-definition-minimized.yaml"); YamlPipelineDefinitionParser parser = new YamlPipelineDefinitionParser(); - PipelineDef pipelineDef = parser.parse(Paths.get(resource.toURI()), new Configuration()); + PipelineDef pipelineDef = parser.parse(new Path(resource.toURI()), new Configuration()); assertThat(pipelineDef.getConfig().get(PIPELINE_LOCAL_TIME_ZONE)) .isNotEqualTo(PIPELINE_LOCAL_TIME_ZONE.defaultValue()); } @@ -110,7 +110,7 @@ class YamlPipelineDefinitionParserTest { YamlPipelineDefinitionParser parser = new YamlPipelineDefinitionParser(); PipelineDef pipelineDef = parser.parse( - Paths.get(resource.toURI()), + new Path(resource.toURI()), Configuration.fromMap( ImmutableMap.<String, String>builder() .put( @@ -131,7 +131,7 @@ class YamlPipelineDefinitionParserTest { YamlPipelineDefinitionParser parser = new YamlPipelineDefinitionParser(); PipelineDef pipelineDef = parser.parse( - Paths.get(resource.toURI()), + new Path(resource.toURI()), Configuration.fromMap( ImmutableMap.<String, String>builder() .put(PIPELINE_LOCAL_TIME_ZONE.key(), "Asia/Shanghai") @@ -141,7 +141,7 @@ class YamlPipelineDefinitionParserTest { pipelineDef = parser.parse( - Paths.get(resource.toURI()), + new Path(resource.toURI()), Configuration.fromMap( ImmutableMap.<String, String>builder() .put(PIPELINE_LOCAL_TIME_ZONE.key(), "GMT+08:00") @@ -150,7 +150,7 @@ class YamlPipelineDefinitionParserTest { pipelineDef = parser.parse( - Paths.get(resource.toURI()), + new Path(resource.toURI()), Configuration.fromMap( ImmutableMap.<String, String>builder() .put(PIPELINE_LOCAL_TIME_ZONE.key(), "UTC") @@ -165,7 +165,7 @@ class YamlPipelineDefinitionParserTest { assertThatThrownBy( () -> parser.parse( - Paths.get(resource.toURI()), + new Path(resource.toURI()), Configuration.fromMap( ImmutableMap.<String, String>builder() .put( @@ -185,7 +185,7 @@ class YamlPipelineDefinitionParserTest { URL resource = Resources.getResource("definitions/pipeline-definition-full-with-repsym.yaml"); YamlPipelineDefinitionParser parser = new YamlPipelineDefinitionParser(); - PipelineDef pipelineDef = parser.parse(Paths.get(resource.toURI()), new Configuration()); + PipelineDef pipelineDef = parser.parse(new Path(resource.toURI()), new Configuration()); assertThat(pipelineDef).isEqualTo(fullDefWithRouteRepSym); } @@ -193,7 +193,7 @@ class YamlPipelineDefinitionParserTest { void testUdfDefinition() throws Exception { URL resource = Resources.getResource("definitions/pipeline-definition-with-udf.yaml"); YamlPipelineDefinitionParser parser = new YamlPipelineDefinitionParser(); - PipelineDef pipelineDef = parser.parse(Paths.get(resource.toURI()), new Configuration()); + PipelineDef pipelineDef = parser.parse(new Path(resource.toURI()), new Configuration()); assertThat(pipelineDef).isEqualTo(pipelineDefWithUdf); } diff --git a/flink-cdc-cli/src/test/java/org/apache/flink/cdc/cli/utils/ConfigurationUtilsTest.java b/flink-cdc-cli/src/test/java/org/apache/flink/cdc/cli/utils/ConfigurationUtilsTest.java index 5da49ec5d..078830f3a 100644 --- a/flink-cdc-cli/src/test/java/org/apache/flink/cdc/cli/utils/ConfigurationUtilsTest.java +++ b/flink-cdc-cli/src/test/java/org/apache/flink/cdc/cli/utils/ConfigurationUtilsTest.java @@ -20,6 +20,7 @@ package org.apache.flink.cdc.cli.utils; import org.apache.flink.cdc.common.configuration.ConfigOption; import org.apache.flink.cdc.common.configuration.ConfigOptions; import org.apache.flink.cdc.common.configuration.Configuration; +import org.apache.flink.core.fs.Path; import org.apache.flink.shaded.curator5.com.google.common.io.Resources; @@ -28,8 +29,6 @@ import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; import java.net.URL; -import java.nio.file.Path; -import java.nio.file.Paths; import java.util.HashMap; import java.util.Map; @@ -72,7 +71,7 @@ class ConfigurationUtilsTest { @ValueSource(strings = {"flink-home/conf/config.yaml", "flink-home/conf/flink-conf.yaml"}) void loadConfigFile(String resourcePath) throws Exception { URL resource = Resources.getResource(resourcePath); - Path path = Paths.get(resource.toURI()); + Path path = new Path(resource.toURI()); Configuration configuration = ConfigurationUtils.loadConfigFile(path, resourcePath.endsWith("flink-conf.yaml")); Map<String, String> configMap = configuration.toMap(); diff --git a/flink-cdc-composer/pom.xml b/flink-cdc-composer/pom.xml index 5971f3b45..0924951d0 100644 --- a/flink-cdc-composer/pom.xml +++ b/flink-cdc-composer/pom.xml @@ -61,6 +61,12 @@ limitations under the License. <artifactId>flink-kubernetes</artifactId> <version>${flink.version}</version> </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-yarn</artifactId> + <version>${flink.version}</version> + <scope>provided</scope> + </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-cdc-pipeline-udf-examples</artifactId> diff --git a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/PipelineDeploymentExecutor.java b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/PipelineDeploymentExecutor.java index 37d573e6b..b3b02ff7d 100644 --- a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/PipelineDeploymentExecutor.java +++ b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/PipelineDeploymentExecutor.java @@ -18,16 +18,19 @@ package org.apache.flink.cdc.composer; import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.Path; import org.apache.commons.cli.CommandLine; -import java.nio.file.Path; import java.util.List; /** PipelineDeploymentExecutor to execute flink cdc job from different target. */ public interface PipelineDeploymentExecutor { PipelineExecution.ExecutionInfo deploy( - CommandLine commandLine, Configuration flinkConfig, List<Path> additionalJars) + CommandLine commandLine, + Configuration flinkConfig, + List<Path> additionalJars, + Path flinkHome) throws Exception; } diff --git a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposer.java b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposer.java index 533868c56..9a1e0bfdd 100644 --- a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposer.java +++ b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposer.java @@ -35,14 +35,13 @@ import org.apache.flink.cdc.composer.flink.translator.SchemaOperatorTranslator; import org.apache.flink.cdc.composer.flink.translator.TransformTranslator; import org.apache.flink.cdc.runtime.partitioning.PartitioningEvent; import org.apache.flink.cdc.runtime.serializer.event.EventSerializer; -import org.apache.flink.configuration.DeploymentOptions; +import org.apache.flink.core.fs.Path; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import java.net.URI; import java.net.URL; import java.nio.file.Files; -import java.nio.file.Path; import java.nio.file.Paths; import java.util.HashSet; import java.util.List; @@ -58,16 +57,13 @@ public class FlinkPipelineComposer implements PipelineComposer { public static FlinkPipelineComposer ofRemoteCluster( org.apache.flink.configuration.Configuration flinkConfig, List<Path> additionalJars) { - org.apache.flink.configuration.Configuration effectiveConfiguration = - new org.apache.flink.configuration.Configuration(); - // Use "remote" as the default target - effectiveConfiguration.set(DeploymentOptions.TARGET, "remote"); - effectiveConfiguration.addAll(flinkConfig); - StreamExecutionEnvironment env = new StreamExecutionEnvironment(effectiveConfiguration); + StreamExecutionEnvironment env = new StreamExecutionEnvironment(flinkConfig); additionalJars.forEach( jarPath -> { try { - FlinkEnvironmentUtils.addJar(env, jarPath.toUri().toURL()); + FlinkEnvironmentUtils.addJar( + env, + jarPath.makeQualified(jarPath.getFileSystem()).toUri().toURL()); } catch (Exception e) { throw new RuntimeException( String.format( @@ -79,6 +75,10 @@ public class FlinkPipelineComposer implements PipelineComposer { return new FlinkPipelineComposer(env, false); } + public static FlinkPipelineComposer ofApplicationCluster(StreamExecutionEnvironment env) { + return new FlinkPipelineComposer(env, false); + } + public static FlinkPipelineComposer ofMiniCluster() { return new FlinkPipelineComposer( StreamExecutionEnvironment.getExecutionEnvironment(), true); diff --git a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/deployment/ComposeDeployment.java b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/deployment/ComposeDeployment.java new file mode 100644 index 000000000..5541941f6 --- /dev/null +++ b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/deployment/ComposeDeployment.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.composer.flink.deployment; + +import java.util.Arrays; +import java.util.stream.Collectors; + +import static org.apache.flink.cdc.common.utils.Preconditions.checkNotNull; + +/** Create deployment methods corresponding to different goals. */ +public enum ComposeDeployment { + YARN_SESSION("yarn-session"), + YARN_APPLICATION("yarn-application"), + LOCAL("local"), + REMOTE("remote"), + KUBERNETES_APPLICATION("kubernetes-application"); + + private final String name; + + ComposeDeployment(final String name) { + this.name = checkNotNull(name); + } + + public String getName() { + return name; + } + + public static ComposeDeployment getDeploymentFromName(final String deploymentTargetStr) { + return Arrays.stream(ComposeDeployment.values()) + .filter(d -> d.name.equalsIgnoreCase(deploymentTargetStr)) + .findFirst() + .orElseThrow( + () -> + new IllegalArgumentException( + "Unknown deployment target \"" + + deploymentTargetStr + + "\"." + + " The available options are: " + + options())); + } + + private static String options() { + return Arrays.stream(ComposeDeployment.values()) + .map(ComposeDeployment::getName) + .collect(Collectors.joining(",")); + } +} diff --git a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/deployment/ComposeDeploymentFactory.java b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/deployment/ComposeDeploymentFactory.java deleted file mode 100644 index 27a005721..000000000 --- a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/deployment/ComposeDeploymentFactory.java +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.cdc.composer.flink.deployment; - -import org.apache.flink.cdc.composer.PipelineDeploymentExecutor; - -import org.apache.commons.cli.CommandLine; - -/** Create deployment methods corresponding to different goals. */ -public class ComposeDeploymentFactory { - - public PipelineDeploymentExecutor getFlinkComposeExecutor(CommandLine commandLine) { - String target = commandLine.getOptionValue("target"); - if (target.equalsIgnoreCase("kubernetes-application")) { - return new K8SApplicationDeploymentExecutor(); - } - throw new IllegalArgumentException( - String.format("Deployment target %s is not supported", target)); - } -} diff --git a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/deployment/K8SApplicationDeploymentExecutor.java b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/deployment/K8SApplicationDeploymentExecutor.java index 19fcdb262..4a23d05cc 100644 --- a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/deployment/K8SApplicationDeploymentExecutor.java +++ b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/deployment/K8SApplicationDeploymentExecutor.java @@ -24,32 +24,33 @@ import org.apache.flink.client.deployment.application.ApplicationConfiguration; import org.apache.flink.client.program.ClusterClient; import org.apache.flink.client.program.ClusterClientProvider; import org.apache.flink.configuration.Configuration; -import org.apache.flink.configuration.DeploymentOptions; import org.apache.flink.configuration.PipelineOptions; +import org.apache.flink.core.fs.Path; import org.apache.flink.kubernetes.KubernetesClusterClientFactory; import org.apache.flink.kubernetes.KubernetesClusterDescriptor; import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions; -import org.apache.flink.kubernetes.configuration.KubernetesDeploymentTarget; import org.apache.commons.cli.CommandLine; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.nio.file.Path; import java.util.ArrayList; import java.util.List; /** deploy flink cdc job by native k8s application mode. */ public class K8SApplicationDeploymentExecutor implements PipelineDeploymentExecutor { - private static final Logger LOG = LoggerFactory.getLogger(K8SApplicationDeploymentExecutor.class); + private static final String APPLICATION_MAIN_CLASS = "org.apache.flink.cdc.cli.CliExecutor"; + @Override public PipelineExecution.ExecutionInfo deploy( - CommandLine commandLine, Configuration flinkConfig, List<Path> additionalJars) { + CommandLine commandLine, + Configuration flinkConfig, + List<Path> additionalJars, + Path flinkHome) { LOG.info("Submitting application in 'Flink K8S Application Mode'."); - flinkConfig.set(DeploymentOptions.TARGET, KubernetesDeploymentTarget.APPLICATION.getName()); List<String> jars = new ArrayList<>(); if (flinkConfig.get(PipelineOptions.JARS) == null) { // must be added cdc dist jar by default docker container path @@ -59,9 +60,7 @@ public class K8SApplicationDeploymentExecutor implements PipelineDeploymentExecu // set the default cdc latest docker image flinkConfig.set(KubernetesConfigOptions.CONTAINER_IMAGE, "flink/flink-cdc:latest"); flinkConfig.set(ApplicationConfiguration.APPLICATION_ARGS, commandLine.getArgList()); - flinkConfig.set( - ApplicationConfiguration.APPLICATION_MAIN_CLASS, - "org.apache.flink.cdc.cli.CliFrontend"); + flinkConfig.set(ApplicationConfiguration.APPLICATION_MAIN_CLASS, APPLICATION_MAIN_CLASS); KubernetesClusterClientFactory kubernetesClusterClientFactory = new KubernetesClusterClientFactory(); KubernetesClusterDescriptor descriptor = diff --git a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/deployment/YarnApplicationDeploymentExecutor.java b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/deployment/YarnApplicationDeploymentExecutor.java new file mode 100644 index 000000000..16b9b1501 --- /dev/null +++ b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/deployment/YarnApplicationDeploymentExecutor.java @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.composer.flink.deployment; + +import org.apache.flink.cdc.common.utils.Preconditions; +import org.apache.flink.cdc.composer.PipelineDeploymentExecutor; +import org.apache.flink.cdc.composer.PipelineExecution; +import org.apache.flink.client.deployment.ClusterSpecification; +import org.apache.flink.client.deployment.application.ApplicationConfiguration; +import org.apache.flink.client.program.ClusterClient; +import org.apache.flink.client.program.ClusterClientProvider; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.PipelineOptions; +import org.apache.flink.core.fs.FSDataInputStream; +import org.apache.flink.core.fs.FileStatus; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.yarn.YarnClusterClientFactory; +import org.apache.flink.yarn.YarnClusterDescriptor; +import org.apache.flink.yarn.configuration.YarnConfigOptions; +import org.apache.flink.yarn.configuration.YarnLogConfigUtil; + +import org.apache.flink.shaded.guava31.com.google.common.base.Joiner; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.yaml.YAMLFactory; + +import org.apache.commons.cli.CommandLine; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Optional; +import java.util.stream.Collectors; + +/** Deploy flink cdc job by yarn application mode. */ +public class YarnApplicationDeploymentExecutor implements PipelineDeploymentExecutor { + private static final Logger LOG = + LoggerFactory.getLogger(YarnApplicationDeploymentExecutor.class); + + private static final String FLINK_CDC_HOME_ENV_VAR = "FLINK_CDC_HOME"; + private static final String FLINK_CDC_DIST_JAR_PATTERN = + "^flink-cdc-dist-(\\d+(\\.\\d+)*)(-SNAPSHOT)?\\.jar$"; + private static final String APPLICATION_MAIN_CLASS = "org.apache.flink.cdc.cli.CliExecutor"; + + @Override + public PipelineExecution.ExecutionInfo deploy( + CommandLine commandLine, + Configuration flinkConfig, + List<Path> additionalJars, + Path flinkHome) + throws Exception { + LOG.info("Submitting application in 'Flink Yarn Application Mode'."); + if (flinkConfig.get(PipelineOptions.JARS) == null) { + flinkConfig.set( + PipelineOptions.JARS, Collections.singletonList(getFlinkCDCDistJarFromEnv())); + } + flinkConfig.set( + YarnConfigOptions.SHIP_FILES, + additionalJars.stream().map(Path::toString).collect(Collectors.toList())); + + ObjectMapper mapper = new ObjectMapper(new YAMLFactory()); + + Path pipelinePath = new Path(commandLine.getArgList().get(0)); + FileSystem fileSystem = FileSystem.get(pipelinePath.toUri()); + FSDataInputStream pipelineInStream = fileSystem.open(pipelinePath); + + flinkConfig.set( + ApplicationConfiguration.APPLICATION_ARGS, + Collections.singletonList(mapper.readTree(pipelineInStream).toString())); + YarnLogConfigUtil.setLogConfigFileInConfig( + flinkConfig, Joiner.on(File.separator).join(flinkHome, "conf")); + + flinkConfig.set(ApplicationConfiguration.APPLICATION_MAIN_CLASS, APPLICATION_MAIN_CLASS); + final YarnClusterClientFactory yarnClusterClientFactory = new YarnClusterClientFactory(); + final YarnClusterDescriptor descriptor = + yarnClusterClientFactory.createClusterDescriptor(flinkConfig); + ClusterSpecification specification = + yarnClusterClientFactory.getClusterSpecification(flinkConfig); + ApplicationConfiguration applicationConfiguration = + ApplicationConfiguration.fromConfiguration(flinkConfig); + + ClusterClient<ApplicationId> client = null; + try { + ClusterClientProvider<ApplicationId> clusterClientProvider = + descriptor.deployApplicationCluster(specification, applicationConfiguration); + client = clusterClientProvider.getClusterClient(); + ApplicationId clusterId = client.getClusterId(); + LOG.info("Deployment Flink CDC From Cluster ID {}", clusterId); + return new PipelineExecution.ExecutionInfo( + clusterId.toString(), "submit job successful"); + } catch (Exception e) { + if (client != null) { + client.shutDownCluster(); + } + throw new RuntimeException("Failed to deploy Flink CDC job", e); + } finally { + descriptor.close(); + if (client != null) { + client.close(); + } + } + } + + private String getFlinkCDCDistJarFromEnv() throws IOException { + String flinkCDCHomeFromEnvVar = System.getenv(FLINK_CDC_HOME_ENV_VAR); + Preconditions.checkNotNull( + flinkCDCHomeFromEnvVar, + "FLINK_CDC_HOME is not correctly set in environment variable, current FLINK_CDC_HOME is: " + + FLINK_CDC_HOME_ENV_VAR); + Path flinkCDCLibPath = new Path(flinkCDCHomeFromEnvVar, "lib"); + if (!flinkCDCLibPath.getFileSystem().exists(flinkCDCLibPath) + || !flinkCDCLibPath.getFileSystem().getFileStatus(flinkCDCLibPath).isDir()) { + throw new RuntimeException( + "Flink cdc home lib is not file or not directory: " + + flinkCDCLibPath.makeQualified(flinkCDCLibPath.getFileSystem())); + } + + FileStatus[] fileStatuses = flinkCDCLibPath.getFileSystem().listStatus(flinkCDCLibPath); + Optional<Path> distJars = + Arrays.stream(fileStatuses) + .filter(status -> !status.isDir()) + .map(FileStatus::getPath) + .filter(path -> path.getName().matches(FLINK_CDC_DIST_JAR_PATTERN)) + .findFirst(); + + if (distJars.isPresent()) { + Path path = distJars.get().makeQualified(distJars.get().getFileSystem()); + return path.toString(); + } else { + throw new FileNotFoundException( + "Failed to fetch Flink CDC dist jar from path: " + flinkCDCLibPath); + } + } +} diff --git a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/deployment/ComposeDeploymentTest.java b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/deployment/ComposeDeploymentTest.java new file mode 100644 index 000000000..be207736c --- /dev/null +++ b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/deployment/ComposeDeploymentTest.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.composer.flink.deployment; + +import org.assertj.core.api.Assertions; +import org.junit.jupiter.api.Test; + +/** Test for {@link ComposeDeployment}. */ +public class ComposeDeploymentTest { + @Test + public void testComposeDeployment() throws Exception { + Assertions.assertThat(ComposeDeployment.getDeploymentFromName("yarn-application")) + .as("test yarn-application") + .isEqualTo(ComposeDeployment.YARN_APPLICATION); + + Assertions.assertThat(ComposeDeployment.getDeploymentFromName("yarn-Application")) + .as("test ignore case") + .isEqualTo(ComposeDeployment.YARN_APPLICATION); + + Assertions.assertThatThrownBy(() -> ComposeDeployment.getDeploymentFromName("unKnown")) + .as("test Unknown deployment target") + .hasMessage( + "Unknown deployment target \"unKnown\". The available options are: yarn-session,yarn-application,local,remote,kubernetes-application"); + } +} diff --git a/flink-cdc-e2e-tests/flink-cdc-e2e-utils/src/test/java/org/apache/flink/cdc/common/test/utils/TestUtils.java b/flink-cdc-e2e-tests/flink-cdc-e2e-utils/src/test/java/org/apache/flink/cdc/common/test/utils/TestUtils.java index 7923bb9b9..1ccfd441a 100644 --- a/flink-cdc-e2e-tests/flink-cdc-e2e-utils/src/test/java/org/apache/flink/cdc/common/test/utils/TestUtils.java +++ b/flink-cdc-e2e-tests/flink-cdc-e2e-utils/src/test/java/org/apache/flink/cdc/common/test/utils/TestUtils.java @@ -22,9 +22,8 @@ import java.io.IOException; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; -import java.util.List; +import java.util.Optional; import java.util.regex.Pattern; -import java.util.stream.Collectors; import java.util.stream.Stream; /** General test utilities. */ @@ -58,30 +57,23 @@ public class TestUtils { } try (Stream<Path> dependencyResources = Files.walk(path)) { - final List<Path> matchingResources = + Optional<Path> jarPath = dependencyResources .filter( jar -> Pattern.compile(resourceNameRegex) .matcher(jar.toAbsolutePath().toString()) .find()) - .collect(Collectors.toList()); - switch (matchingResources.size()) { - case 0: - throw new RuntimeException( - new FileNotFoundException( - String.format( - "No resource file could be found that matches the pattern %s. " - + "This could mean that the test module must be rebuilt via maven.", - resourceNameRegex))); - case 1: - return matchingResources.get(0); - default: - throw new RuntimeException( - new IOException( - String.format( - "Multiple resource files were found matching the pattern %s. Matches=%s", - resourceNameRegex, matchingResources))); + .findFirst(); + if (jarPath.isPresent()) { + return jarPath.get(); + } else { + throw new RuntimeException( + new FileNotFoundException( + String.format( + "No resource file could be found that matches the pattern %s. " + + "This could mean that the test module must be rebuilt via maven.", + resourceNameRegex))); } } catch (final IOException ioe) { throw new RuntimeException("Could not search for resource resource files.", ioe); diff --git a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/pom.xml b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/pom.xml index 47f4217cd..47bea3d27 100644 --- a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/pom.xml +++ b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/pom.xml @@ -36,6 +36,11 @@ limitations under the License. <!-- TODO: Update this, when StarRocks releases a 1.20 compatible connector. --> <starrocks.connector.version>1.2.10_flink-${flink-major-1.19}</starrocks.connector.version> <paimon.version>1.0.1</paimon.version> + <flink.hadoop.version>3.3.4</flink.hadoop.version> + <flink.release.download.skip>false</flink.release.download.skip> + <flink.release.name>flink-${flink.version}-bin-scala_${scala.binary.version}.tgz</flink.release.name> + <flink.release.mirror>https://dlcdn.apache.org/flink/flink-${flink.version}</flink.release.mirror> + <maven.plugin.download.version>1.6.8</maven.plugin.download.version> </properties> <dependencies> @@ -108,6 +113,12 @@ limitations under the License. <version>${project.version}</version> <type>test-jar</type> <scope>test</scope> + <exclusions> + <exclusion> + <groupId>org.apache.hive</groupId> + <artifactId>hive-serde</artifactId> + </exclusion> + </exclusions> </dependency> <dependency> <groupId>org.apache.flink</groupId> @@ -128,12 +139,24 @@ limitations under the License. <version>${project.version}</version> <type>test-jar</type> <scope>test</scope> + <exclusions> + <exclusion> + <groupId>com.google.protobuf</groupId> + <artifactId>protobuf-java</artifactId> + </exclusion> + </exclusions> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-cdc-pipeline-connector-maxcompute</artifactId> <version>${project.version}</version> <scope>test</scope> + <exclusions> + <exclusion> + <groupId>com.google.protobuf</groupId> + <artifactId>protobuf-java</artifactId> + </exclusion> + </exclusions> </dependency> <dependency> <groupId>org.apache.flink</groupId> @@ -182,6 +205,122 @@ limitations under the License. <version>${testcontainers.version}</version> <scope>test</scope> </dependency> + + <!-- mini yarn --> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-yarn-api</artifactId> + <version>${flink.hadoop.version}</version> + <scope>test</scope> + <exclusions> + <exclusion> + <!-- This dependency is no longer shipped with the JDK since Java 9.--> + <groupId>jdk.tools</groupId> + <artifactId>jdk.tools</artifactId> + </exclusion> + <exclusion> + <groupId>ch.qos.reload4j</groupId> + <artifactId>reload4j</artifactId> + </exclusion> + <exclusion> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-reload4j</artifactId> + </exclusion> + </exclusions> + </dependency> + + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-common</artifactId> + <version>${flink.hadoop.version}</version> + <scope>test</scope> + <exclusions> + <exclusion> + <groupId>jdk.tools</groupId> + <artifactId>jdk.tools</artifactId> + </exclusion> + <exclusion> + <groupId>log4j</groupId> + <artifactId>log4j</artifactId> + </exclusion> + <exclusion> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-log4j12</artifactId> + </exclusion> + </exclusions> + </dependency> + + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-yarn-client</artifactId> + <version>${flink.hadoop.version}</version> + <scope>test</scope> + <exclusions> + <exclusion> + <artifactId>jdk.tools</artifactId> + <groupId>jdk.tools</groupId> + </exclusion> + <exclusion> + <artifactId>log4j</artifactId> + <groupId>log4j</groupId> + </exclusion> + <exclusion> + <artifactId>slf4j-log4j12</artifactId> + <groupId>org.slf4j</groupId> + </exclusion> + </exclusions> + </dependency> + + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-yarn-server-tests</artifactId> + <version>${flink.hadoop.version}</version> + <scope>test</scope> + <type>test-jar</type> + <exclusions> + <exclusion> + <artifactId>jdk.tools</artifactId> + <groupId>jdk.tools</groupId> + </exclusion> + <exclusion> + <artifactId>log4j</artifactId> + <groupId>log4j</groupId> + </exclusion> + <exclusion> + <artifactId>slf4j-log4j12</artifactId> + <groupId>org.slf4j</groupId> + </exclusion> + </exclusions> + </dependency> + + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-common</artifactId> + <version>${flink.hadoop.version}</version> + <scope>test</scope> + <type>test-jar</type> + <exclusions> + <exclusion> + <artifactId>jdk.tools</artifactId> + <groupId>jdk.tools</groupId> + </exclusion> + <exclusion> + <artifactId>log4j</artifactId> + <groupId>log4j</groupId> + </exclusion> + <exclusion> + <artifactId>slf4j-log4j12</artifactId> + <groupId>org.slf4j</groupId> + </exclusion> + </exclusions> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-yarn</artifactId> + <version>${flink.version}</version> + <scope>test</scope> + </dependency> </dependencies> <build> @@ -225,6 +364,31 @@ limitations under the License. </execution> </executions> </plugin> + <plugin> + <groupId>com.googlecode.maven-download-plugin</groupId> + <artifactId>download-maven-plugin</artifactId> + <version>1.6.8</version> + <configuration> + <cacheDirectory>${maven.plugin.download.cache.path}</cacheDirectory> + <outputDirectory>${project.build.directory}</outputDirectory> + <readTimeOut>60000</readTimeOut> + <retries>3</retries> + <unpack>true</unpack> + </configuration> + <executions> + <execution> + <id>download-flink-release</id> + <goals> + <goal>wget</goal> + </goals> + <phase>compile</phase> + <configuration> + <skip>${flink.release.download.skip}</skip> + <url>${flink.release.mirror}/${flink.release.name}</url> + </configuration> + </execution> + </executions> + </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> @@ -237,6 +401,17 @@ limitations under the License. <goal>copy</goal> </goals> </execution> + <execution> + <id>store-classpath-in-target-for-tests</id> + <phase>process-test-resources</phase> + <goals> + <goal>build-classpath</goal> + </goals> + <configuration> + <outputFile>${project.build.directory}/yarn.classpath</outputFile> + <excludeGroupIds>org.apache.flink</excludeGroupIds> + </configuration> + </execution> </executions> <configuration> <artifactItems> diff --git a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MysqlE2eWithYarnApplicationITCase.java b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MysqlE2eWithYarnApplicationITCase.java new file mode 100644 index 000000000..cab98c6af --- /dev/null +++ b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MysqlE2eWithYarnApplicationITCase.java @@ -0,0 +1,191 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.pipeline.tests; + +import org.apache.flink.cdc.common.test.utils.TestUtils; +import org.apache.flink.cdc.connectors.mysql.testutils.MySqlContainer; +import org.apache.flink.cdc.connectors.mysql.testutils.MySqlVersion; +import org.apache.flink.cdc.connectors.mysql.testutils.UniqueDatabase; +import org.apache.flink.cdc.pipeline.tests.utils.PipelineTestOnYarnEnvironment; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.containers.Network; +import org.testcontainers.containers.output.Slf4jLogConsumer; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.lifecycle.Startables; +import org.testcontainers.shaded.org.apache.commons.io.FileUtils; + +import java.io.File; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.nio.charset.Charset; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Arrays; +import java.util.List; +import java.util.Optional; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static org.assertj.core.api.Assertions.assertThat; + +/** End-to-end tests for mysql cdc pipeline job. */ +public class MysqlE2eWithYarnApplicationITCase extends PipelineTestOnYarnEnvironment { + private static final Logger LOG = + LoggerFactory.getLogger(MysqlE2eWithYarnApplicationITCase.class); + + // ------------------------------------------------------------------------------------------ + // MySQL Variables (we always use MySQL as the data source for easier verifying) + // ------------------------------------------------------------------------------------------ + protected static final String MYSQL_TEST_USER = "mysqluser"; + protected static final String MYSQL_TEST_PASSWORD = "mysqlpw"; + protected static final String INTER_CONTAINER_MYSQL_ALIAS = "mysql"; + + @Container + public static final MySqlContainer MYSQL = + (MySqlContainer) + new MySqlContainer( + MySqlVersion.V8_0) // v8 support both ARM and AMD architectures + .withConfigurationOverride("docker/mysql/my.cnf") + .withSetupSQL("docker/mysql/setup.sql") + .withDatabaseName("flink-test") + .withUsername("flinkuser") + .withPassword("flinkpw") + .withNetwork(Network.newNetwork()) + .withNetworkAliases(INTER_CONTAINER_MYSQL_ALIAS) + .withLogConsumer(new Slf4jLogConsumer(LOG)); + + protected final UniqueDatabase mysqlInventoryDatabase = + new UniqueDatabase(MYSQL, "mysql_inventory", MYSQL_TEST_USER, MYSQL_TEST_PASSWORD); + + @BeforeAll + public static void setup() { + LOG.info("Starting containers..."); + Startables.deepStart(Stream.of(MYSQL)).join(); + LOG.info("Containers are started."); + LOG.info("Starting up MiniYARNCluster..."); + startMiniYARNCluster(); + LOG.info("MiniYARNCluster are started."); + } + + @BeforeEach + public void before() throws Exception { + mysqlInventoryDatabase.createAndInitialize(); + } + + @AfterEach + public void after() { + mysqlInventoryDatabase.dropDatabase(); + } + + @Test + public void testSyncWholeDatabase() throws Exception { + String pipelineJob = + String.format( + "source:\n" + + " type: mysql\n" + + " hostname: %s\n" + + " port: %S\n" + + " username: %s\n" + + " password: %s\n" + + " tables: %s.\\.*\n" + + " server-id: 5400-5404\n" + + " server-time-zone: UTC\n" + + " scan.startup.mode: snapshot\n" + + "\n" + + "sink:\n" + + " type: values\n" + + "\n" + + "pipeline:\n" + + " parallelism: %d", + MYSQL.getHost(), + MYSQL.getDatabasePort(), + MYSQL_TEST_USER, + MYSQL_TEST_PASSWORD, + mysqlInventoryDatabase.getDatabaseName(), + 1); + Path mysqlCdcJar = TestUtils.getResource("mysql-cdc-pipeline-connector.jar"); + Path valuesCdcJar = TestUtils.getResource("values-cdc-pipeline-connector.jar"); + Path mysqlDriverJar = TestUtils.getResource("mysql-driver.jar"); + String applicationId = + submitPipelineJob(pipelineJob, mysqlCdcJar, valuesCdcJar, mysqlDriverJar); + LOG.info("Pipeline job is running"); + validateResult( + applicationId, + "CreateTableEvent{tableId=%s.customers, schema=columns={`id` INT NOT NULL,`name` VARCHAR(255) NOT NULL 'flink',`address` VARCHAR(1024),`phone_number` VARCHAR(512)}, primaryKeys=id, options=()}", + "DataChangeEvent{tableId=%s.customers, before=[], after=[104, user_4, Shanghai, 123567891234], op=INSERT, meta=()}", + "DataChangeEvent{tableId=%s.customers, before=[], after=[103, user_3, Shanghai, 123567891234], op=INSERT, meta=()}", + "DataChangeEvent{tableId=%s.customers, before=[], after=[102, user_2, Shanghai, 123567891234], op=INSERT, meta=()}", + "DataChangeEvent{tableId=%s.customers, before=[], after=[101, user_1, Shanghai, 123567891234], op=INSERT, meta=()}", + "CreateTableEvent{tableId=%s.products, schema=columns={`id` INT NOT NULL,`name` VARCHAR(255) NOT NULL 'flink',`description` VARCHAR(512),`weight` FLOAT,`enum_c` STRING 'red',`json_c` STRING,`point_c` STRING}, primaryKeys=id, options=()}", + "DataChangeEvent{tableId=%s.products, before=[], after=[109, spare tire, 24 inch spare tire, 22.2, null, null, null], op=INSERT, meta=()}", + "DataChangeEvent{tableId=%s.products, before=[], after=[107, rocks, box of assorted rocks, 5.3, null, null, null], op=INSERT, meta=()}", + "DataChangeEvent{tableId=%s.products, before=[], after=[108, jacket, water resistent black wind breaker, 0.1, null, null, null], op=INSERT, meta=()}", + "DataChangeEvent{tableId=%s.products, before=[], after=[105, hammer, 14oz carpenter's hammer, 0.875, red, {\"k1\": \"v1\", \"k2\": \"v2\"}, {\"coordinates\":[5,5],\"type\":\"Point\",\"srid\":0}], op=INSERT, meta=()}", + "DataChangeEvent{tableId=%s.products, before=[], after=[106, hammer, 16oz carpenter's hammer, 1.0, null, null, null], op=INSERT, meta=()}", + "DataChangeEvent{tableId=%s.products, before=[], after=[103, 12-pack drill bits, 12-pack of drill bits with sizes ranging from #40 to #3, 0.8, red, {\"key3\": \"value3\"}, {\"coordinates\":[3,3],\"type\":\"Point\",\"srid\":0}], op=INSERT, meta=()}", + "DataChangeEvent{tableId=%s.products, before=[], after=[104, hammer, 12oz carpenter's hammer, 0.75, white, {\"key4\": \"value4\"}, {\"coordinates\":[4,4],\"type\":\"Point\",\"srid\":0}], op=INSERT, meta=()}", + "DataChangeEvent{tableId=%s.products, before=[], after=[101, scooter, Small 2-wheel scooter, 3.14, red, {\"key1\": \"value1\"}, {\"coordinates\":[1,1],\"type\":\"Point\",\"srid\":0}], op=INSERT, meta=()}", + "DataChangeEvent{tableId=%s.products, before=[], after=[102, car battery, 12V car battery, 8.1, white, {\"key2\": \"value2\"}, {\"coordinates\":[2,2],\"type\":\"Point\",\"srid\":0}], op=INSERT, meta=()}"); + } + + private void validateResult(String applicationId, String... expectedEvents) { + String dbName = mysqlInventoryDatabase.getDatabaseName(); + List<String> expectedEventsList = + Arrays.stream(expectedEvents) + .map(event -> String.format(event, dbName, dbName)) + .collect(Collectors.toList()); + List<String> taskManagerOutContent = getTaskManagerOutContent(applicationId); + assertThat(taskManagerOutContent).containsExactlyInAnyOrderElementsOf(expectedEventsList); + } + + public static List<String> getTaskManagerOutContent(String applicationId) { + Path resource = + TestUtils.getResource( + YARN_CONFIGURATION.get(PipelineTestOnYarnEnvironment.TEST_CLUSTER_NAME_KEY), + "flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/target"); + try (Stream<Path> taskManagerOutFilePath = Files.walk(resource)) { + Optional<File> taskManagerOutFile = + taskManagerOutFilePath + .filter( + path -> + path.getFileName().toString().equals("taskmanager.out") + && path.toString().contains(applicationId)) + .map(Path::toFile) + .findFirst(); + + if (taskManagerOutFile.isPresent()) { + return FileUtils.readLines(taskManagerOutFile.get(), Charset.defaultCharset()); + } else { + throw new FileNotFoundException( + String.format("taskmanager.out is not existed for %s", applicationId)); + } + } catch (IOException e) { + throw new RuntimeException( + String.format( + "Could not search for %s directory.", + YARN_CONFIGURATION.get( + PipelineTestOnYarnEnvironment.TEST_CLUSTER_NAME_KEY))); + } + } +} diff --git a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/utils/PipelineTestOnYarnEnvironment.java b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/utils/PipelineTestOnYarnEnvironment.java new file mode 100644 index 000000000..e9998ad40 --- /dev/null +++ b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/utils/PipelineTestOnYarnEnvironment.java @@ -0,0 +1,322 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.pipeline.tests.utils; + +import org.apache.flink.api.common.time.Deadline; +import org.apache.flink.cdc.common.test.utils.TestUtils; +import org.apache.flink.cdc.common.utils.Preconditions; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.core.testutils.CommonTestUtils; +import org.apache.flink.util.TestLogger; + +import org.apache.commons.io.FileUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.service.Service; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.YarnApplicationState; +import org.apache.hadoop.yarn.client.api.YarnClient; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.MiniYARNCluster; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.io.TempDir; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.io.BufferedReader; +import java.io.File; +import java.io.FileNotFoundException; +import java.io.FileWriter; +import java.io.IOException; +import java.io.InputStreamReader; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.StandardOpenOption; +import java.time.Duration; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; + +import static java.lang.Thread.sleep; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Fail.fail; + +/** Test environment running pipeline job on YARN mini-cluster. */ +public class PipelineTestOnYarnEnvironment extends TestLogger { + private static final Logger LOG = LoggerFactory.getLogger(PipelineTestOnYarnEnvironment.class); + + protected static final YarnConfiguration YARN_CONFIGURATION; + private YarnClient yarnClient = null; + protected static MiniYARNCluster yarnCluster = null; + + protected static final String TEST_CLUSTER_NAME_KEY = "flink-yarn-minicluster-name"; + protected static final int NUM_NODEMANAGERS = 2; + + protected static File yarnSiteXML = null; + + @TempDir Path temporaryFolder; + + private static final Duration yarnAppTerminateTimeout = Duration.ofSeconds(120); + private static final int sleepIntervalInMS = 100; + + // copy from org.apache.flink.yarn.YarnTestBase + static { + YARN_CONFIGURATION = new YarnConfiguration(); + YARN_CONFIGURATION.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 32); + YARN_CONFIGURATION.setInt( + YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, + 4096); // 4096 is the available memory anyways + YARN_CONFIGURATION.setBoolean( + YarnConfiguration.RM_SCHEDULER_INCLUDE_PORT_IN_NODE_NAME, true); + YARN_CONFIGURATION.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2); + YARN_CONFIGURATION.setInt(YarnConfiguration.RM_MAX_COMPLETED_APPLICATIONS, 2); + YARN_CONFIGURATION.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES, 4); + YARN_CONFIGURATION.setInt(YarnConfiguration.DEBUG_NM_DELETE_DELAY_SEC, 3600); + YARN_CONFIGURATION.setBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, false); + YARN_CONFIGURATION.setInt( + YarnConfiguration.NM_VCORES, 666); // memory is overwritten in the MiniYARNCluster. + // so we have to change the number of cores for testing. + YARN_CONFIGURATION.setFloat( + YarnConfiguration.NM_MAX_PER_DISK_UTILIZATION_PERCENTAGE, 99.0F); + YARN_CONFIGURATION.set(YarnConfiguration.YARN_APPLICATION_CLASSPATH, getYarnClasspath()); + YARN_CONFIGURATION.setInt( + YarnConfiguration.RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_MS, 1000); + YARN_CONFIGURATION.setInt(YarnConfiguration.RESOURCEMANAGER_CONNECT_MAX_WAIT_MS, 5000); + YARN_CONFIGURATION.set(TEST_CLUSTER_NAME_KEY, "flink-yarn-tests-application"); + } + + @BeforeEach + public void setupYarnClient() throws Exception { + if (yarnClient == null) { + yarnClient = YarnClient.createYarnClient(); + yarnClient.init(getYarnConfiguration()); + yarnClient.start(); + } + } + + @AfterEach + public void shutdownYarnClient() { + yarnClient.stop(); + } + + @AfterAll + public static void teardown() { + + if (yarnCluster != null) { + LOG.info("Stopping MiniYarn Cluster"); + yarnCluster.stop(); + yarnCluster = null; + } + + // Unset FLINK_CONF_DIR, as it might change the behavior of other tests + Map<String, String> map = new HashMap<>(System.getenv()); + map.remove(ConfigConstants.ENV_FLINK_CONF_DIR); + map.remove("YARN_CONF_DIR"); + map.remove("IN_TESTS"); + CommonTestUtils.setEnv(map); + + if (yarnSiteXML != null) { + yarnSiteXML.delete(); + } + } + + protected static YarnConfiguration getYarnConfiguration() { + return YARN_CONFIGURATION; + } + + public static void startMiniYARNCluster() { + try { + if (yarnCluster == null) { + final String testName = + YARN_CONFIGURATION.get(PipelineTestOnYarnEnvironment.TEST_CLUSTER_NAME_KEY); + yarnCluster = + new MiniYARNCluster( + testName == null ? "YarnTest_" + UUID.randomUUID() : testName, + NUM_NODEMANAGERS, + 1, + 1); + + yarnCluster.init(YARN_CONFIGURATION); + yarnCluster.start(); + } + + File targetTestClassesFolder = new File("target/test-classes"); + writeYarnSiteConfigXML(YARN_CONFIGURATION, targetTestClassesFolder); + + Map<String, String> map = new HashMap<String, String>(System.getenv()); + map.put( + "IN_TESTS", + "yes we are in tests"); // see YarnClusterDescriptor() for more infos + map.put("YARN_CONF_DIR", targetTestClassesFolder.getAbsolutePath()); + map.put("MAX_LOG_FILE_NUMBER", "10"); + CommonTestUtils.setEnv(map); + + assertThat(yarnCluster.getServiceState()).isEqualTo(Service.STATE.STARTED); + // wait for the NodeManagers to connect + while (!yarnCluster.waitForNodeManagersToConnect(500)) { + LOG.info("Waiting for NodeManagers to connect"); + } + } catch (Exception e) { + fail("Starting MiniYARNCluster failed: ", e); + } + } + + // write yarn-site.xml to target/test-classes so that flink pick can pick up this when + // initializing YarnClient properly from classpath + public static void writeYarnSiteConfigXML(Configuration yarnConf, File targetFolder) + throws IOException { + yarnSiteXML = new File(targetFolder, "/yarn-site.xml"); + try (FileWriter writer = new FileWriter(yarnSiteXML)) { + yarnConf.writeXml(writer); + writer.flush(); + } + } + + public String submitPipelineJob(String pipelineJob, Path... jars) throws Exception { + ProcessBuilder processBuilder = new ProcessBuilder(); + Map<String, String> env = getEnv(); + processBuilder.environment().putAll(env); + Path yamlScript = temporaryFolder.resolve("mysql-to-values.yml"); + Files.write(yamlScript, pipelineJob.getBytes()); + + List<String> commandList = new ArrayList<>(); + commandList.add(env.get("FLINK_CDC_HOME") + "/bin/flink-cdc.sh"); + commandList.add("-t"); + commandList.add("yarn-application"); + commandList.add(yamlScript.toAbsolutePath().toString()); + for (Path jar : jars) { + commandList.add("--jar"); + commandList.add(jar.toString()); + } + + processBuilder.command(commandList); + LOG.info("starting flink-cdc task with flink on yarn-application"); + Process process = processBuilder.start(); + process.waitFor(); + String applicationIdStr = getApplicationId(process); + Preconditions.checkNotNull( + applicationIdStr, "applicationId should not be null, please check logs"); + ApplicationId applicationId = ApplicationId.fromString(applicationIdStr); + waitApplicationFinished(applicationId, yarnAppTerminateTimeout, sleepIntervalInMS); + LOG.info("started flink-cdc task with flink on yarn-application"); + return applicationIdStr; + } + + public Map<String, String> getEnv() { + Path flinkHome = + TestUtils.getResource( + "flink-\\d+(\\.\\d+)*$", + "flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/target"); + Map<String, String> env = new HashMap<>(); + env.put("FLINK_HOME", flinkHome.toString()); + env.put("FLINK_CONF_DIR", flinkHome.resolve("conf").toString()); + addFlinkConf(flinkHome.resolve("conf").resolve("config.yaml")); + Path flinkcdcHome = + TestUtils.getResource("flink-cdc-\\d+(\\.\\d+)*(-SNAPSHOT)?$", "flink-cdc-dist"); + env.put("FLINK_CDC_HOME", flinkcdcHome.toString()); + env.put("HADOOP_CLASSPATH", getYarnClasspath()); + return env; + } + + public void addFlinkConf(Path flinkConf) { + Map<String, String> configToAppend = new HashMap<>(); + configToAppend.put("akka.ask.timeout", "100s"); + configToAppend.put("web.timeout", "1000000"); + configToAppend.put("taskmanager.slot.timeout", "1000s"); + configToAppend.put("slot.request.timeout", "120000"); + try { + if (!Files.exists(flinkConf)) { + throw new FileNotFoundException("config.yaml not found at " + flinkConf); + } + List<String> lines = new ArrayList<>(Files.readAllLines(flinkConf)); + for (Map.Entry<String, String> entry : configToAppend.entrySet()) { + lines.add(entry.getKey() + ": " + entry.getValue()); + } + Files.write( + flinkConf, + lines, + StandardOpenOption.WRITE, + StandardOpenOption.TRUNCATE_EXISTING); + } catch (IOException e) { + throw new RuntimeException("Failed to append configuration to config.yaml", e); + } + } + + public String getApplicationId(Process process) throws IOException { + BufferedReader reader = new BufferedReader(new InputStreamReader(process.getInputStream())); + String line; + while ((line = reader.readLine()) != null) { + LOG.info(line); + if (line.startsWith("Job ID")) { + return line.split(":")[1].trim(); + } + } + return null; + } + + protected void waitApplicationFinished( + ApplicationId applicationId, Duration timeout, int sleepIntervalInMS) throws Exception { + Deadline deadline = Deadline.now().plus(timeout); + YarnApplicationState state = + getYarnClient().getApplicationReport(applicationId).getYarnApplicationState(); + + while (state != YarnApplicationState.FINISHED) { + if (state == YarnApplicationState.FAILED || state == YarnApplicationState.KILLED) { + fail("Application became FAILED or KILLED while expecting FINISHED"); + } + + if (deadline.isOverdue()) { + getYarnClient().killApplication(applicationId); + fail("Application didn't finish before timeout"); + } + + sleep(sleepIntervalInMS); + state = getYarnClient().getApplicationReport(applicationId).getYarnApplicationState(); + } + } + + @Nullable + protected YarnClient getYarnClient() { + return yarnClient; + } + + /** + * Searches for the yarn.classpath file generated by the "dependency:build-classpath" maven + * plugin in "flink-yarn-tests". + * + * @return a classpath suitable for running all YARN-launched JVMs + */ + private static String getYarnClasspath() { + Path yarnClasspathFile = TestUtils.getResource("yarn.classpath"); + try { + return FileUtils.readFileToString(yarnClasspathFile.toFile(), StandardCharsets.UTF_8); + } catch (Throwable t) { + LOG.error( + "Error while getting YARN classpath in {}", + yarnClasspathFile.toFile().getAbsoluteFile(), + t); + throw new RuntimeException("Error while getting YARN classpath", t); + } + } +}