This is an automated email from the ASF dual-hosted git repository.
benjobs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/dev by this push:
new 70197772d [Feature] Add CDC jar client for support cdc yaml (#4166)
70197772d is described below
commit 70197772df8650d824bc083db89041a675f6c114
Author: ouyangwulin <[email protected]>
AuthorDate: Mon Jan 13 23:12:53 2025 +0800
[Feature] Add CDC jar client for support cdc yaml (#4166)
* [fixed] fixed envsetting unload flink-conf.yaml,if not like catalogstore
conf will not work
* [Feature] Add CDC jar client for support cdc yaml
* [Feature] The front-end is modified to support the cdc yaml api
* fixed package and log info
* delete unused method
* fixed module
---
.../streampark/common/constants/Constants.java | 6 +-
.../console/core/util/ServiceHelper.java | 24 ++++
streampark-flink/pom.xml | 1 +
.../streampark-flink-cdcclient/pom.xml | 140 +++++++++++++++++++++
.../apache/streampark/flink/cdc/cli/CDCClient.java | 80 ++++++++++++
.../streampark/flink/cdc/cli/CDCExecutor.java | 69 ++++++++++
.../streampark/flink/cdc/cli/package-info.java | 18 +++
.../flink/core/FlinkTableInitializer.scala | 6 +
8 files changed, 343 insertions(+), 1 deletion(-)
diff --git
a/streampark-common/src/main/java/org/apache/streampark/common/constants/Constants.java
b/streampark-common/src/main/java/org/apache/streampark/common/constants/Constants.java
index b6d84e272..c8f27bcb5 100644
---
a/streampark-common/src/main/java/org/apache/streampark/common/constants/Constants.java
+++
b/streampark-common/src/main/java/org/apache/streampark/common/constants/Constants.java
@@ -19,7 +19,9 @@ package org.apache.streampark.common.constants;
import java.time.Duration;
-/** A constant class to hold the constants variables. */
+/**
+ * A constant class to hold the constants variables.
+ */
public final class Constants {
private Constants() {
@@ -51,6 +53,8 @@ public final class Constants {
public static final String STREAMPARK_SPARKSQL_CLIENT_CLASS =
"org.apache.streampark.spark.cli.SqlClient";
+ public static final String STREAMPARK_FLINKCDC_CLIENT_CLASS =
"org.apache.streampark.flink.cdc.cli.CDCClient";
+
public static final String PYTHON_EXECUTABLE = "venv.zip/venv/bin/python3";
public static final String SINGLE_SLASH = "/";
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/util/ServiceHelper.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/util/ServiceHelper.java
index 252b7364e..02a6e8ab3 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/util/ServiceHelper.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/util/ServiceHelper.java
@@ -39,6 +39,8 @@ public class ServiceHelper {
private static String flinkSqlClientJar = null;
+ private static String flinkCDCClientJar = null;
+
private static String sparkSqlClientJar = null;
public static User getLoginUser() {
@@ -58,6 +60,28 @@ public class ServiceHelper {
return null;
}
+ public static String getFlinkCDCClientJar(FlinkEnv flinkEnv) {
+ if (flinkCDCClientJar == null) {
+ File localClient = WebUtils.getAppClientDir();
+ ApiAlertException.throwIfFalse(localClient.exists(),
+ "[StreamPark]" + localClient + " no exists. please check.");
+ String regex =
String.format("streampark-flink-cdcclient_%s-.*\\.jar",
flinkEnv.getScalaVersion());
+
+ List<String> jars =
Arrays.stream(Objects.requireNonNull(localClient.list()))
+ .filter(x -> x.matches(regex))
+ .collect(Collectors.toList());
+ ApiAlertException.throwIfTrue(
+ jars.isEmpty(),
+ "[StreamPark] can't found streampark-flink-cdcclient jar in "
+ localClient);
+
+ ApiAlertException.throwIfTrue(
+ jars.size() > 1,
+ "[StreamPark] found multiple streampark-flink-cdclient jar in
" + localClient);
+ flinkCDCClientJar = jars.get(0);
+ }
+ return flinkCDCClientJar;
+ }
+
public static String getFlinkSqlClientJar(FlinkEnv flinkEnv) {
if (flinkSqlClientJar == null) {
File localClient = WebUtils.getAppClientDir();
diff --git a/streampark-flink/pom.xml b/streampark-flink/pom.xml
index 7976177a7..154362715 100644
--- a/streampark-flink/pom.xml
+++ b/streampark-flink/pom.xml
@@ -39,6 +39,7 @@
<module>streampark-flink-kubernetes</module>
<module>streampark-flink-catalog-store</module>
<module>streampark-flink-connector-plugin</module>
+ <module>streampark-flink-cdcclient</module>
</modules>
<dependencies>
diff --git a/streampark-flink/streampark-flink-cdcclient/pom.xml
b/streampark-flink/streampark-flink-cdcclient/pom.xml
new file mode 100644
index 000000000..ca972a301
--- /dev/null
+++ b/streampark-flink/streampark-flink-cdcclient/pom.xml
@@ -0,0 +1,140 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.streampark</groupId>
+ <artifactId>streampark-flink</artifactId>
+ <version>2.2.0-SNAPSHOT</version>
+ </parent>
+
+
<artifactId>streampark-flink-cdc-client_${scala.binary.version}</artifactId>
+ <version>2.2.0-SNAPSHOT</version>
+ <name>StreamPark : Flink CDC Client</name>
+
+ <properties>
+ <flink.cdc.version>3.2.1</flink.cdc.version>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-cdc-common</artifactId>
+ <version>${flink.cdc.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-cdc-runtime</artifactId>
+ <version>${flink.cdc.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-cdc-cli</artifactId>
+ <version>${flink.cdc.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-cdc-composer</artifactId>
+ <version>${flink.cdc.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-java</artifactId>
+ <version>${flink.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-runtime</artifactId>
+ <version>${flink.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.streampark</groupId>
+ <artifactId>streampark-common_${scala.binary.version}</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-yarn</artifactId>
+ <version>1.18.1</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ </dependency>
+
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-shade-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>shade-dist</id>
+ <goals>
+ <goal>shade</goal>
+ </goals>
+ <phase>package</phase>
+ <configuration>
+ <filters>
+ <filter>
+ <artifact>*:*</artifact>
+ <excludes>
+ <exclude>META-INF/*.SF</exclude>
+ <exclude>META-INF/*.DSA</exclude>
+ <exclude>META-INF/*.RSA</exclude>
+ </excludes>
+ </filter>
+ </filters>
+ <shadeTestJar>false</shadeTestJar>
+
<createDependencyReducedPom>false</createDependencyReducedPom>
+
<shadedArtifactAttached>false</shadedArtifactAttached>
+ <artifactSet>
+ <includes>
+ <include>*</include>
+ </includes>
+ </artifactSet>
+ <relocations>
+ <relocation>
+ <pattern>org.apache.commons.cli</pattern>
+
<shadedPattern>org.apache.flink.cdc.shaded.com.apache.commons.cli</shadedPattern>
+ </relocation>
+ <relocation>
+ <pattern>org.apache.calcite</pattern>
+
<shadedPattern>org.apache.flink.cdc.shaded.org.apache.calcite</shadedPattern>
+ </relocation>
+ </relocations>
+ <transformers>
+ <transformer
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
+
<mainClass>org.apache.streampark.flink.cdc.cli.CDCClient</mainClass>
+ </transformer>
+ </transformers>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+
+</project>
diff --git
a/streampark-flink/streampark-flink-cdcclient/src/main/java/org/apache/streampark/flink/cdc/cli/CDCClient.java
b/streampark-flink/streampark-flink-cdcclient/src/main/java/org/apache/streampark/flink/cdc/cli/CDCClient.java
new file mode 100644
index 000000000..379359548
--- /dev/null
+++
b/streampark-flink/streampark-flink-cdcclient/src/main/java/org/apache/streampark/flink/cdc/cli/CDCClient.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.flink.cdc.cli;
+
+import org.apache.streampark.common.conf.ConfigKeys;
+import org.apache.streampark.common.util.DeflaterUtils;
+import org.apache.streampark.common.util.PropertiesUtils;
+
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.cdc.common.configuration.Configuration;
+import org.apache.flink.cdc.composer.PipelineExecution;
+import org.apache.flink.configuration.CoreOptions;
+import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
+import org.apache.flink.util.StringUtils;
+import org.apache.flink.yarn.configuration.YarnConfigOptions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * cdc client
+ */
+public class CDCClient {
+
+ private static final Logger LOG = LoggerFactory.getLogger(CDCClient.class);
+
+ public static void main(String[] args) throws Exception {
+ ParameterTool parameter = ParameterTool.fromArgs(args);
+ Map<String, String> configMap = new HashMap<>();
+ String cdcYamlDecode = parameter.get(ConfigKeys.KEY_FLINK_SQL(null));
+ String appNameDecode = parameter.get(ConfigKeys.KEY_APP_NAME(null));
+ String flinkConfigDecode =
parameter.get(ConfigKeys.KEY_FLINK_CONF(null));
+ String parallelism =
parameter.get(ConfigKeys.KEY_FLINK_PARALLELISM(null));
+ if (StringUtils.isNullOrWhitespaceOnly(cdcYamlDecode)
+ || StringUtils.isNullOrWhitespaceOnly(appNameDecode)
+ || StringUtils.isNullOrWhitespaceOnly(flinkConfigDecode)) {
+ LOG.error("--flink.conf or --app.name or `cdc yaml` must not be
null.");
+ return;
+ }
+
+ String cdcYaml = DeflaterUtils.unzipString(cdcYamlDecode);
+ String appName = DeflaterUtils.unzipString(appNameDecode);
+ String flinkConfigString =
DeflaterUtils.unzipString(flinkConfigDecode);
+
configMap.putAll(PropertiesUtils.fromYamlTextAsJava(flinkConfigString));
+ configMap.put(YarnConfigOptions.APPLICATION_NAME.key(), appName);
+ configMap.put(CoreOptions.DEFAULT_PARALLELISM.key(), parallelism);
+ Configuration flinkConfig = Configuration.fromMap(configMap);
+ LOG.debug("Flink cdc config {}", flinkConfig);
+ LOG.debug("Flink cdc yaml {}", cdcYaml);
+ PipelineExecution.ExecutionInfo result =
+ new CDCExecutor(cdcYaml, flinkConfig, new ArrayList<>(),
SavepointRestoreSettings.none()).run();
+ printExecutionInfo(result);
+
+ }
+
+ private static void printExecutionInfo(PipelineExecution.ExecutionInfo
info) {
+ System.out.println("Pipeline has been submitted to cluster.");
+ System.out.printf("Job ID: %s\n", info.getId());
+ System.out.printf("Job Description: %s\n", info.getDescription());
+ }
+}
diff --git
a/streampark-flink/streampark-flink-cdcclient/src/main/java/org/apache/streampark/flink/cdc/cli/CDCExecutor.java
b/streampark-flink/streampark-flink-cdcclient/src/main/java/org/apache/streampark/flink/cdc/cli/CDCExecutor.java
new file mode 100644
index 000000000..78a46d7a8
--- /dev/null
+++
b/streampark-flink/streampark-flink-cdcclient/src/main/java/org/apache/streampark/flink/cdc/cli/CDCExecutor.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.flink.cdc.cli;
+
+import org.apache.flink.cdc.cli.parser.PipelineDefinitionParser;
+import org.apache.flink.cdc.cli.parser.YamlPipelineDefinitionParser;
+import org.apache.flink.cdc.cli.utils.FlinkEnvironmentUtils;
+import org.apache.flink.cdc.common.configuration.Configuration;
+import org.apache.flink.cdc.composer.PipelineComposer;
+import org.apache.flink.cdc.composer.PipelineExecution;
+import org.apache.flink.cdc.composer.definition.PipelineDef;
+import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
+
+import java.nio.file.Path;
+import java.util.List;
+
+/**
+ * cdc executor
+ */
+public class CDCExecutor {
+
+ private final String pipelineString;
+ private final Configuration configuration;
+ private final SavepointRestoreSettings savePointSettings;
+ private final List<Path> additionalJar;
+
+ private PipelineComposer composer;
+
+ public CDCExecutor(String pipelineString,
+ Configuration flinkConfig,
+ List<Path> additionalJar,
+ SavepointRestoreSettings savePointRestoreSettings) {
+ this.pipelineString = pipelineString;
+ this.configuration = flinkConfig;
+ this.additionalJar = additionalJar;
+ this.savePointSettings = savePointRestoreSettings;
+ }
+
+ public PipelineExecution.ExecutionInfo run() throws Exception {
+ PipelineDefinitionParser pipelineDefinitionParser = new
YamlPipelineDefinitionParser();
+ PipelineDef pipelineDef =
pipelineDefinitionParser.parse(pipelineString, configuration);
+ PipelineComposer composer = getComposer();
+ PipelineExecution execution = composer.compose(pipelineDef);
+ return execution.execute();
+ }
+
+ private PipelineComposer getComposer() throws Exception {
+ if (composer == null) {
+ return FlinkEnvironmentUtils.createComposer(
+ true, configuration, additionalJar, savePointSettings);
+ }
+ return composer;
+ }
+}
diff --git
a/streampark-flink/streampark-flink-cdcclient/src/test/java/org/apache/streampark/flink/cdc/cli/package-info.java
b/streampark-flink/streampark-flink-cdcclient/src/test/java/org/apache/streampark/flink/cdc/cli/package-info.java
new file mode 100644
index 000000000..b85a06e75
--- /dev/null
+++
b/streampark-flink/streampark-flink-cdcclient/src/test/java/org/apache/streampark/flink/cdc/cli/package-info.java
@@ -0,0 +1,18 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.flink.cdc.cli;
diff --git
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkTableInitializer.scala
b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkTableInitializer.scala
index 7cb463ed7..8e1e19b7b 100644
---
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkTableInitializer.scala
+++
b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkTableInitializer.scala
@@ -118,6 +118,12 @@ private[flink] class FlinkTableInitializer(args:
Array[String], apiType: ApiType
}
}
+ parameter.get(KEY_FLINK_CONF(), null) match {
+ case null | "" =>
+ throw new ExceptionInInitializerError(
+ "[StreamPark] Usage:can't find config,please set \"--flink.conf
$conf \" in main arguments")
+ case conf =>
builder.withConfiguration(Configuration.fromMap(PropertiesUtils.fromYamlText(DeflaterUtils.unzipString(conf))))
+ }
val buildWith =
(parameter.get(KEY_FLINK_TABLE_CATALOG),
parameter.get(KEY_FLINK_TABLE_DATABASE))
buildWith match {