This is an automated email from the ASF dual-hosted git repository.
renqs pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git
The following commit(s) were added to refs/heads/master by this push:
new d6b687b61 [FLINK-34853] Submit CDC Job To Flink K8S Native Application
Mode (#3093)
d6b687b61 is described below
commit d6b687b61dd1701f9c7d2c5bc4446a88e1641d2b
Author: ConradJam <[email protected]>
AuthorDate: Thu Aug 8 22:35:52 2024 +0800
[FLINK-34853] Submit CDC Job To Flink K8S Native Application Mode (#3093)
---
Dockerfile | 35 ++++++++
flink-cdc-cli/pom.xml | 10 ++-
.../java/org/apache/flink/cdc/cli/CliExecutor.java | 51 ++++++++----
.../java/org/apache/flink/cdc/cli/CliFrontend.java | 11 +--
.../apache/flink/cdc/cli/CliFrontendOptions.java | 12 +++
.../flink/cdc/cli/utils/ConfigurationUtils.java | 13 +++
.../org/apache/flink/cdc/cli/CliFrontendTest.java | 17 ++++
flink-cdc-composer/pom.xml | 7 +-
.../cdc/composer/PipelineDeploymentExecutor.java | 33 ++++++++
.../flink/deployment/ComposeDeploymentFactory.java | 35 ++++++++
.../K8SApplicationDeploymentExecutor.java | 93 ++++++++++++++++++++++
.../cdc/composer/utils/FactoryDiscoveryUtils.java | 9 ++-
12 files changed, 301 insertions(+), 25 deletions(-)
diff --git a/Dockerfile b/Dockerfile
new file mode 100644
index 000000000..d0deb5a29
--- /dev/null
+++ b/Dockerfile
@@ -0,0 +1,35 @@
+#/*
+# * Licensed to the Apache Software Foundation (ASF) under one or more
+# * contributor license agreements. See the NOTICE file distributed with
+# * this work for additional information regarding copyright ownership.
+# * The ASF licenses this file to You under the Apache License, Version 2.0
+# * (the "License"); you may not use this file except in compliance with
+# * the License. You may obtain a copy of the License at
+# *
+# * http://www.apache.org/licenses/LICENSE-2.0
+# *
+# * Unless required by applicable law or agreed to in writing, software
+# * distributed under the License is distributed on an "AS IS" BASIS,
+# * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# * See the License for the specific language governing permissions and
+# * limitations under the License.
+# */
+
+FROM flink
+
+ARG FLINK_CDC_VERSION=3.2-SNAPSHOT
+ARG PIPELINE_DEFINITION_FILE
+
+RUN mkdir -p /opt/flink-cdc
+RUN mkdir -p /opt/flink/usrlib
+ENV FLINK_CDC_HOME /opt/flink-cdc
+COPY flink-cdc-dist/target/flink-cdc-${FLINK_CDC_VERSION}-bin.tar.gz /tmp/
+RUN tar -xzvf /tmp/flink-cdc-${FLINK_CDC_VERSION}-bin.tar.gz -C /tmp/ && \
+ mv /tmp/flink-cdc-${FLINK_CDC_VERSION}/* /opt/flink-cdc/ && \
+ mv /opt/flink-cdc/lib/flink-cdc-dist-${FLINK_CDC_VERSION}.jar
/opt/flink-cdc/lib/flink-cdc-dist.jar && \
+ rm -rf /tmp/flink-cdc-${FLINK_CDC_VERSION}
/tmp/flink-cdc-${FLINK_CDC_VERSION}-bin.tar.gz
+# copy jars to cdc libs
+COPY
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-values/target/flink-cdc-pipeline-connector-values-${FLINK_CDC_VERSION}.jar
/opt/flink/usrlib/flink-cdc-pipeline-connector-values-${FLINK_CDC_VERSION}.jar
+COPY
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/target/flink-cdc-pipeline-connector-mysql-${FLINK_CDC_VERSION}.jar
/opt/flink/usrlib/flink-cdc-pipeline-connector-mysql-${FLINK_CDC_VERSION}.jar
+# copy flink cdc pipeline conf file, Here is an example. Users can replace it
according to their needs.
+COPY $PIPELINE_DEFINITION_FILE $FLINK_CDC_HOME/conf
diff --git a/flink-cdc-cli/pom.xml b/flink-cdc-cli/pom.xml
index 1aa57f336..0cd2d0de2 100644
--- a/flink-cdc-cli/pom.xml
+++ b/flink-cdc-cli/pom.xml
@@ -28,7 +28,7 @@ limitations under the License.
<artifactId>flink-cdc-cli</artifactId>
<properties>
- <commons-cli.version>1.6.0</commons-cli.version>
+ <commons-cli.version>1.7.0</commons-cli.version>
<snakeyaml.version>2.6</snakeyaml.version>
</properties>
@@ -55,6 +55,14 @@ limitations under the License.
<artifactId>commons-cli</artifactId>
<version>${commons-cli.version}</version>
</dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-clients</artifactId>
+ <version>${flink.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
</dependencies>
</project>
\ No newline at end of file
diff --git
a/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/CliExecutor.java
b/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/CliExecutor.java
index 3e76607b2..2452ab59e 100644
--- a/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/CliExecutor.java
+++ b/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/CliExecutor.java
@@ -19,14 +19,19 @@ package org.apache.flink.cdc.cli;
import org.apache.flink.cdc.cli.parser.PipelineDefinitionParser;
import org.apache.flink.cdc.cli.parser.YamlPipelineDefinitionParser;
+import org.apache.flink.cdc.cli.utils.ConfigurationUtils;
import org.apache.flink.cdc.cli.utils.FlinkEnvironmentUtils;
import org.apache.flink.cdc.common.annotation.VisibleForTesting;
import org.apache.flink.cdc.common.configuration.Configuration;
import org.apache.flink.cdc.composer.PipelineComposer;
+import org.apache.flink.cdc.composer.PipelineDeploymentExecutor;
import org.apache.flink.cdc.composer.PipelineExecution;
import org.apache.flink.cdc.composer.definition.PipelineDef;
+import org.apache.flink.cdc.composer.flink.deployment.ComposeDeploymentFactory;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
+import org.apache.commons.cli.CommandLine;
+
import java.nio.file.Path;
import java.util.List;
@@ -39,17 +44,21 @@ public class CliExecutor {
private final boolean useMiniCluster;
private final List<Path> additionalJars;
+ private final CommandLine commandLine;
+
private PipelineComposer composer = null;
private final SavepointRestoreSettings savepointSettings;
public CliExecutor(
+ CommandLine commandLine,
Path pipelineDefPath,
Configuration flinkConfig,
Configuration globalPipelineConfig,
boolean useMiniCluster,
List<Path> additionalJars,
SavepointRestoreSettings savepointSettings) {
+ this.commandLine = commandLine;
this.pipelineDefPath = pipelineDefPath;
this.flinkConfig = flinkConfig;
this.globalPipelineConfig = globalPipelineConfig;
@@ -59,22 +68,31 @@ public class CliExecutor {
}
public PipelineExecution.ExecutionInfo run() throws Exception {
- // Parse pipeline definition file
- PipelineDefinitionParser pipelineDefinitionParser = new
YamlPipelineDefinitionParser();
- PipelineDef pipelineDef =
- pipelineDefinitionParser.parse(pipelineDefPath,
globalPipelineConfig);
-
- // Create composer
- PipelineComposer composer = getComposer();
-
- // Compose pipeline
- PipelineExecution execution = composer.compose(pipelineDef);
-
- // Execute the pipeline
- return execution.execute();
+ // Create Submit Executor to deployment flink cdc job Or Run Flink CDC
Job
+ boolean isDeploymentMode =
ConfigurationUtils.isDeploymentMode(commandLine);
+ if (isDeploymentMode) {
+ ComposeDeploymentFactory composeDeploymentFactory = new
ComposeDeploymentFactory();
+ PipelineDeploymentExecutor composeExecutor =
+
composeDeploymentFactory.getFlinkComposeExecutor(commandLine);
+ return composeExecutor.deploy(
+ commandLine,
+
org.apache.flink.configuration.Configuration.fromMap(flinkConfig.toMap()),
+ additionalJars);
+ } else {
+ // Run CDC Job And Parse pipeline definition file
+ PipelineDefinitionParser pipelineDefinitionParser = new
YamlPipelineDefinitionParser();
+ PipelineDef pipelineDef =
+ pipelineDefinitionParser.parse(pipelineDefPath,
globalPipelineConfig);
+ // Create composer
+ PipelineComposer composer = getComposer();
+ // Compose pipeline
+ PipelineExecution execution = composer.compose(pipelineDef);
+ // Execute or submit the pipeline
+ return execution.execute();
+ }
}
- private PipelineComposer getComposer() {
+ private PipelineComposer getComposer() throws Exception {
if (composer == null) {
return FlinkEnvironmentUtils.createComposer(
useMiniCluster, flinkConfig, additionalJars,
savepointSettings);
@@ -102,6 +120,11 @@ public class CliExecutor {
return additionalJars;
}
+ @VisibleForTesting
+ public String getDeploymentTarget() {
+ return commandLine.getOptionValue("target");
+ }
+
public SavepointRestoreSettings getSavepointSettings() {
return savepointSettings;
}
diff --git
a/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/CliFrontend.java
b/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/CliFrontend.java
index c67c3cf99..ac746e224 100644
--- a/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/CliFrontend.java
+++ b/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/CliFrontend.java
@@ -34,8 +34,6 @@ import org.apache.commons.cli.Options;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.FileNotFoundException;
-import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Arrays;
@@ -83,13 +81,9 @@ public class CliFrontend {
"Missing pipeline definition file path in arguments. ");
}
- // Take the first unparsed argument as the pipeline definition file
Path pipelineDefPath = Paths.get(unparsedArgs.get(0));
- if (!Files.exists(pipelineDefPath)) {
- throw new FileNotFoundException(
- String.format("Cannot find pipeline definition file
\"%s\"", pipelineDefPath));
- }
-
+ // Take the first unparsed argument as the pipeline definition file
+ LOG.info("Real Path pipelineDefPath {}", pipelineDefPath);
// Global pipeline configuration
Configuration globalPipelineConfig = getGlobalConfig(commandLine);
@@ -111,6 +105,7 @@ public class CliFrontend {
// Build executor
return new CliExecutor(
+ commandLine,
pipelineDefPath,
flinkConfig,
globalPipelineConfig,
diff --git
a/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/CliFrontendOptions.java
b/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/CliFrontendOptions.java
index 320213285..adf39a40b 100644
---
a/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/CliFrontendOptions.java
+++
b/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/CliFrontendOptions.java
@@ -46,6 +46,16 @@ public class CliFrontendOptions {
.desc("JARs to be submitted together with the pipeline")
.build();
+ public static final Option TARGET =
+ Option.builder("t")
+ .longOpt("target")
+ .hasArg()
+ .desc(
+ "The deployment target for the execution. This can
take one of the following values "
+ +
"local/remote/yarn-session/yarn-application/kubernetes-session/kubernetes"
+ + "-application")
+ .build();
+
public static final Option USE_MINI_CLUSTER =
Option.builder()
.longOpt("use-mini-cluster")
@@ -91,6 +101,8 @@ public class CliFrontendOptions {
.addOption(FLINK_HOME)
.addOption(GLOBAL_CONFIG)
.addOption(USE_MINI_CLUSTER)
+ .addOption(TARGET)
+ .addOption(USE_MINI_CLUSTER)
.addOption(SAVEPOINT_PATH_OPTION)
.addOption(SAVEPOINT_CLAIM_MODE)
.addOption(SAVEPOINT_ALLOW_NON_RESTORED_OPTION);
diff --git
a/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/utils/ConfigurationUtils.java
b/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/utils/ConfigurationUtils.java
index 8bc4ba628..6d2100aa5 100644
---
a/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/utils/ConfigurationUtils.java
+++
b/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/utils/ConfigurationUtils.java
@@ -18,12 +18,18 @@
package org.apache.flink.cdc.cli.utils;
import org.apache.flink.cdc.common.configuration.Configuration;
+import org.apache.flink.client.deployment.executors.LocalExecutor;
+import org.apache.flink.client.deployment.executors.RemoteExecutor;
+
+import org.apache.commons.cli.CommandLine;
import java.nio.file.Path;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import static org.apache.flink.cdc.cli.CliFrontendOptions.TARGET;
+
/** Utilities for handling {@link Configuration}. */
public class ConfigurationUtils {
@@ -62,4 +68,11 @@ public class ConfigurationUtils {
return flattenedMap;
}
+
+ public static boolean isDeploymentMode(CommandLine commandLine) {
+ String target = commandLine.getOptionValue(TARGET);
+ return target != null
+ && !target.equalsIgnoreCase(LocalExecutor.NAME)
+ && !target.equalsIgnoreCase(RemoteExecutor.NAME);
+ }
}
diff --git
a/flink-cdc-cli/src/test/java/org/apache/flink/cdc/cli/CliFrontendTest.java
b/flink-cdc-cli/src/test/java/org/apache/flink/cdc/cli/CliFrontendTest.java
index 83ccb1cbe..501b28840 100644
--- a/flink-cdc-cli/src/test/java/org/apache/flink/cdc/cli/CliFrontendTest.java
+++ b/flink-cdc-cli/src/test/java/org/apache/flink/cdc/cli/CliFrontendTest.java
@@ -106,6 +106,19 @@ class CliFrontendTest {
assertThat(executor.getSavepointSettings().allowNonRestoredState()).isTrue();
}
+ @Test
+ void testDeploymentTargetConfiguration() throws Exception {
+ CliExecutor executor =
+ createExecutor(
+ pipelineDef(),
+ "--flink-home",
+ flinkHome(),
+ "-t",
+ "kubernetes-application",
+ "-n");
+
assertThat(executor.getDeploymentTarget()).isEqualTo("kubernetes-application");
+ }
+
@Test
void testAdditionalJar() throws Exception {
String aJar = "/foo/jar/a.jar";
@@ -177,6 +190,10 @@ class CliFrontendTest {
+ " was triggered.\n"
+ " -s,--from-savepoint <arg> Path to a savepoint
to restore the job from\n"
+ " (for example
hdfs:///flink/savepoint-1537\n"
+ + " -t,--target <arg> The deployment
target for the execution. This\n"
+ + " can take one of the
following values\n"
+ + "
local/remote/yarn-session/yarn-application/ku\n"
+ + "
bernetes-session/kubernetes-application\n"
+ " --use-mini-cluster Use Flink
MiniCluster to run the pipeline\n";
private static class NoOpComposer implements PipelineComposer {
diff --git a/flink-cdc-composer/pom.xml b/flink-cdc-composer/pom.xml
index 12471f032..f9f57fd3c 100644
--- a/flink-cdc-composer/pom.xml
+++ b/flink-cdc-composer/pom.xml
@@ -48,7 +48,7 @@ limitations under the License.
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>${flink.version}</version>
- <scope>test</scope>
+ <scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
@@ -56,6 +56,11 @@ limitations under the License.
<version>${flink.version}</version>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-kubernetes</artifactId>
+ <version>${flink.version}</version>
+ </dependency>
</dependencies>
</project>
\ No newline at end of file
diff --git
a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/PipelineDeploymentExecutor.java
b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/PipelineDeploymentExecutor.java
new file mode 100644
index 000000000..37d573e6b
--- /dev/null
+++
b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/PipelineDeploymentExecutor.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.composer;
+
+import org.apache.flink.configuration.Configuration;
+
+import org.apache.commons.cli.CommandLine;
+
+import java.nio.file.Path;
+import java.util.List;
+
+/** PipelineDeploymentExecutor to execute flink cdc job from different target.
*/
+public interface PipelineDeploymentExecutor {
+
+ PipelineExecution.ExecutionInfo deploy(
+ CommandLine commandLine, Configuration flinkConfig, List<Path>
additionalJars)
+ throws Exception;
+}
diff --git
a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/deployment/ComposeDeploymentFactory.java
b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/deployment/ComposeDeploymentFactory.java
new file mode 100644
index 000000000..27a005721
--- /dev/null
+++
b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/deployment/ComposeDeploymentFactory.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.composer.flink.deployment;
+
+import org.apache.flink.cdc.composer.PipelineDeploymentExecutor;
+
+import org.apache.commons.cli.CommandLine;
+
+/** Create deployment methods corresponding to different goals. */
+public class ComposeDeploymentFactory {
+
+ public PipelineDeploymentExecutor getFlinkComposeExecutor(CommandLine
commandLine) {
+ String target = commandLine.getOptionValue("target");
+ if (target.equalsIgnoreCase("kubernetes-application")) {
+ return new K8SApplicationDeploymentExecutor();
+ }
+ throw new IllegalArgumentException(
+ String.format("Deployment target %s is not supported",
target));
+ }
+}
diff --git
a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/deployment/K8SApplicationDeploymentExecutor.java
b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/deployment/K8SApplicationDeploymentExecutor.java
new file mode 100644
index 000000000..19fcdb262
--- /dev/null
+++
b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/deployment/K8SApplicationDeploymentExecutor.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.composer.flink.deployment;
+
+import org.apache.flink.cdc.composer.PipelineDeploymentExecutor;
+import org.apache.flink.cdc.composer.PipelineExecution;
+import org.apache.flink.client.deployment.ClusterSpecification;
+import org.apache.flink.client.deployment.application.ApplicationConfiguration;
+import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.client.program.ClusterClientProvider;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.DeploymentOptions;
+import org.apache.flink.configuration.PipelineOptions;
+import org.apache.flink.kubernetes.KubernetesClusterClientFactory;
+import org.apache.flink.kubernetes.KubernetesClusterDescriptor;
+import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
+import org.apache.flink.kubernetes.configuration.KubernetesDeploymentTarget;
+
+import org.apache.commons.cli.CommandLine;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.List;
+
+/** deploy flink cdc job by native k8s application mode. */
+public class K8SApplicationDeploymentExecutor implements
PipelineDeploymentExecutor {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(K8SApplicationDeploymentExecutor.class);
+
+ @Override
+ public PipelineExecution.ExecutionInfo deploy(
+ CommandLine commandLine, Configuration flinkConfig, List<Path>
additionalJars) {
+ LOG.info("Submitting application in 'Flink K8S Application Mode'.");
+ flinkConfig.set(DeploymentOptions.TARGET,
KubernetesDeploymentTarget.APPLICATION.getName());
+ List<String> jars = new ArrayList<>();
+ if (flinkConfig.get(PipelineOptions.JARS) == null) {
+ // must be added cdc dist jar by default docker container path
+ jars.add("local:///opt/flink-cdc/lib/flink-cdc-dist.jar");
+ flinkConfig.set(PipelineOptions.JARS, jars);
+ }
+ // set the default cdc latest docker image
+ flinkConfig.set(KubernetesConfigOptions.CONTAINER_IMAGE,
"flink/flink-cdc:latest");
+ flinkConfig.set(ApplicationConfiguration.APPLICATION_ARGS,
commandLine.getArgList());
+ flinkConfig.set(
+ ApplicationConfiguration.APPLICATION_MAIN_CLASS,
+ "org.apache.flink.cdc.cli.CliFrontend");
+ KubernetesClusterClientFactory kubernetesClusterClientFactory =
+ new KubernetesClusterClientFactory();
+ KubernetesClusterDescriptor descriptor =
+
kubernetesClusterClientFactory.createClusterDescriptor(flinkConfig);
+ ClusterSpecification specification =
+
kubernetesClusterClientFactory.getClusterSpecification(flinkConfig);
+ ApplicationConfiguration applicationConfiguration =
+ ApplicationConfiguration.fromConfiguration(flinkConfig);
+ ClusterClient<String> client = null;
+ try {
+ ClusterClientProvider<String> clusterClientProvider =
+ descriptor.deployApplicationCluster(specification,
applicationConfiguration);
+ client = clusterClientProvider.getClusterClient();
+ String clusterId = client.getClusterId();
+ LOG.info("Deployment Flink CDC From Cluster ID {}", clusterId);
+ return new PipelineExecution.ExecutionInfo(clusterId, "submit job
successful");
+ } catch (Exception e) {
+ if (client != null) {
+ client.shutDownCluster();
+ }
+ throw new RuntimeException("Failed to deploy Flink CDC job", e);
+ } finally {
+ descriptor.close();
+ if (client != null) {
+ client.close();
+ }
+ }
+ }
+}
diff --git
a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/utils/FactoryDiscoveryUtils.java
b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/utils/FactoryDiscoveryUtils.java
index 2eb79aead..4f7649d9e 100644
---
a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/utils/FactoryDiscoveryUtils.java
+++
b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/utils/FactoryDiscoveryUtils.java
@@ -93,6 +93,12 @@ public class FactoryDiscoveryUtils {
try {
T factory = getFactoryByIdentifier(identifier, factoryClass);
URL url =
factory.getClass().getProtectionDomain().getCodeSource().getLocation();
+ String urlString = url.toString();
+ if (urlString.contains("usrlib")) {
+ String flinkHome = System.getenv("FLINK_HOME");
+ urlString = urlString.replace("usrlib", flinkHome + "/usrlib");
+ }
+ url = new URL(urlString);
if (Files.isDirectory(Paths.get(url.toURI()))) {
LOG.warn(
"The factory class \"{}\" is contained by directory
\"{}\" instead of JAR. "
@@ -104,7 +110,8 @@ public class FactoryDiscoveryUtils {
return Optional.of(url);
} catch (Exception e) {
throw new RuntimeException(
- String.format("Failed to search JAR by factory identifier
\"%s\"", identifier));
+ String.format("Failed to search JAR by factory identifier
\"%s\"", identifier),
+ e);
}
}
}