This is an automated email from the ASF dual-hosted git repository.

benjobs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git


The following commit(s) were added to refs/heads/dev by this push:
     new 70197772d [Feature] Add CDC jar client for support cdc yaml  (#4166)
70197772d is described below

commit 70197772df8650d824bc083db89041a675f6c114
Author: ouyangwulin <[email protected]>
AuthorDate: Mon Jan 13 23:12:53 2025 +0800

    [Feature] Add CDC jar client for support cdc yaml  (#4166)
    
    * [fixed] fixed envsetting unload flink-conf.yaml,if not like catalogstore 
conf will not work
    
    * [Feature] Add CDC jar client for support cdc yaml
    
    * [Feature] The front-end is modified to support the cdc yaml api
    
    * fixed package and log info
    
    * delete unused method
    
    * fixed module
---
 .../streampark/common/constants/Constants.java     |   6 +-
 .../console/core/util/ServiceHelper.java           |  24 ++++
 streampark-flink/pom.xml                           |   1 +
 .../streampark-flink-cdcclient/pom.xml             | 140 +++++++++++++++++++++
 .../apache/streampark/flink/cdc/cli/CDCClient.java |  80 ++++++++++++
 .../streampark/flink/cdc/cli/CDCExecutor.java      |  69 ++++++++++
 .../streampark/flink/cdc/cli/package-info.java     |  18 +++
 .../flink/core/FlinkTableInitializer.scala         |   6 +
 8 files changed, 343 insertions(+), 1 deletion(-)

diff --git 
a/streampark-common/src/main/java/org/apache/streampark/common/constants/Constants.java
 
b/streampark-common/src/main/java/org/apache/streampark/common/constants/Constants.java
index b6d84e272..c8f27bcb5 100644
--- 
a/streampark-common/src/main/java/org/apache/streampark/common/constants/Constants.java
+++ 
b/streampark-common/src/main/java/org/apache/streampark/common/constants/Constants.java
@@ -19,7 +19,9 @@ package org.apache.streampark.common.constants;
 
 import java.time.Duration;
 
-/** A constant class to hold the constants variables. */
+/**
+ * A constant class to hold the constants variables.
+ */
 public final class Constants {
 
     private Constants() {
@@ -51,6 +53,8 @@ public final class Constants {
 
     public static final String STREAMPARK_SPARKSQL_CLIENT_CLASS = 
"org.apache.streampark.spark.cli.SqlClient";
 
+    public static final String STREAMPARK_FLINKCDC_CLIENT_CLASS = 
"org.apache.streampark.flink.cdc.cli.CDCClient";
+
     public static final String PYTHON_EXECUTABLE = "venv.zip/venv/bin/python3";
 
     public static final String SINGLE_SLASH = "/";
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/util/ServiceHelper.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/util/ServiceHelper.java
index 252b7364e..02a6e8ab3 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/util/ServiceHelper.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/util/ServiceHelper.java
@@ -39,6 +39,8 @@ public class ServiceHelper {
 
     private static String flinkSqlClientJar = null;
 
+    private static String flinkCDCClientJar = null;
+
     private static String sparkSqlClientJar = null;
 
     public static User getLoginUser() {
@@ -58,6 +60,28 @@ public class ServiceHelper {
         return null;
     }
 
+    public static String getFlinkCDCClientJar(FlinkEnv flinkEnv) {
+        if (flinkCDCClientJar == null) {
+            File localClient = WebUtils.getAppClientDir();
+            ApiAlertException.throwIfFalse(localClient.exists(),
+                "[StreamPark]" + localClient + " no exists. please check.");
+            String regex = 
String.format("streampark-flink-cdcclient_%s-.*\\.jar", 
flinkEnv.getScalaVersion());
+
+            List<String> jars = 
Arrays.stream(Objects.requireNonNull(localClient.list()))
+                .filter(x -> x.matches(regex))
+                .collect(Collectors.toList());
+            ApiAlertException.throwIfTrue(
+                jars.isEmpty(),
+                "[StreamPark] can't found streampark-flink-cdcclient jar in " 
+ localClient);
+
+            ApiAlertException.throwIfTrue(
+                jars.size() > 1,
+                "[StreamPark] found multiple streampark-flink-cdclient jar in 
" + localClient);
+            flinkCDCClientJar = jars.get(0);
+        }
+        return flinkCDCClientJar;
+    }
+
     public static String getFlinkSqlClientJar(FlinkEnv flinkEnv) {
         if (flinkSqlClientJar == null) {
             File localClient = WebUtils.getAppClientDir();
diff --git a/streampark-flink/pom.xml b/streampark-flink/pom.xml
index 7976177a7..154362715 100644
--- a/streampark-flink/pom.xml
+++ b/streampark-flink/pom.xml
@@ -39,6 +39,7 @@
         <module>streampark-flink-kubernetes</module>
         <module>streampark-flink-catalog-store</module>
         <module>streampark-flink-connector-plugin</module>
+        <module>streampark-flink-cdcclient</module>
     </modules>
 
     <dependencies>
diff --git a/streampark-flink/streampark-flink-cdcclient/pom.xml 
b/streampark-flink/streampark-flink-cdcclient/pom.xml
new file mode 100644
index 000000000..ca972a301
--- /dev/null
+++ b/streampark-flink/streampark-flink-cdcclient/pom.xml
@@ -0,0 +1,140 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.streampark</groupId>
+        <artifactId>streampark-flink</artifactId>
+        <version>2.2.0-SNAPSHOT</version>
+    </parent>
+
+    
<artifactId>streampark-flink-cdc-client_${scala.binary.version}</artifactId>
+    <version>2.2.0-SNAPSHOT</version>
+    <name>StreamPark : Flink CDC Client</name>
+
+    <properties>
+        <flink.cdc.version>3.2.1</flink.cdc.version>
+    </properties>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-cdc-common</artifactId>
+            <version>${flink.cdc.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-cdc-runtime</artifactId>
+            <version>${flink.cdc.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-cdc-cli</artifactId>
+            <version>${flink.cdc.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-cdc-composer</artifactId>
+            <version>${flink.cdc.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-java</artifactId>
+            <version>${flink.version}</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-runtime</artifactId>
+            <version>${flink.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.streampark</groupId>
+            <artifactId>streampark-common_${scala.binary.version}</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-yarn</artifactId>
+            <version>1.18.1</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-api</artifactId>
+        </dependency>
+
+    </dependencies>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-shade-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <id>shade-dist</id>
+                        <goals>
+                            <goal>shade</goal>
+                        </goals>
+                        <phase>package</phase>
+                        <configuration>
+                            <filters>
+                                <filter>
+                                    <artifact>*:*</artifact>
+                                    <excludes>
+                                        <exclude>META-INF/*.SF</exclude>
+                                        <exclude>META-INF/*.DSA</exclude>
+                                        <exclude>META-INF/*.RSA</exclude>
+                                    </excludes>
+                                </filter>
+                            </filters>
+                            <shadeTestJar>false</shadeTestJar>
+                            
<createDependencyReducedPom>false</createDependencyReducedPom>
+                            
<shadedArtifactAttached>false</shadedArtifactAttached>
+                            <artifactSet>
+                                <includes>
+                                    <include>*</include>
+                                </includes>
+                            </artifactSet>
+                            <relocations>
+                                <relocation>
+                                    <pattern>org.apache.commons.cli</pattern>
+                                    
<shadedPattern>org.apache.flink.cdc.shaded.com.apache.commons.cli</shadedPattern>
+                                </relocation>
+                                <relocation>
+                                    <pattern>org.apache.calcite</pattern>
+                                    
<shadedPattern>org.apache.flink.cdc.shaded.org.apache.calcite</shadedPattern>
+                                </relocation>
+                            </relocations>
+                            <transformers>
+                                <transformer 
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
+                                    
<mainClass>org.apache.streampark.flink.cdc.cli.CDCClient</mainClass>
+                                </transformer>
+                            </transformers>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+
+</project>
diff --git 
a/streampark-flink/streampark-flink-cdcclient/src/main/java/org/apache/streampark/flink/cdc/cli/CDCClient.java
 
b/streampark-flink/streampark-flink-cdcclient/src/main/java/org/apache/streampark/flink/cdc/cli/CDCClient.java
new file mode 100644
index 000000000..379359548
--- /dev/null
+++ 
b/streampark-flink/streampark-flink-cdcclient/src/main/java/org/apache/streampark/flink/cdc/cli/CDCClient.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.flink.cdc.cli;
+
+import org.apache.streampark.common.conf.ConfigKeys;
+import org.apache.streampark.common.util.DeflaterUtils;
+import org.apache.streampark.common.util.PropertiesUtils;
+
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.cdc.common.configuration.Configuration;
+import org.apache.flink.cdc.composer.PipelineExecution;
+import org.apache.flink.configuration.CoreOptions;
+import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
+import org.apache.flink.util.StringUtils;
+import org.apache.flink.yarn.configuration.YarnConfigOptions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * cdc client
+ */
+public class CDCClient {
+
+    private static final Logger LOG = LoggerFactory.getLogger(CDCClient.class);
+
+    public static void main(String[] args) throws Exception {
+        ParameterTool parameter = ParameterTool.fromArgs(args);
+        Map<String, String> configMap = new HashMap<>();
+        String cdcYamlDecode = parameter.get(ConfigKeys.KEY_FLINK_SQL(null));
+        String appNameDecode = parameter.get(ConfigKeys.KEY_APP_NAME(null));
+        String flinkConfigDecode = 
parameter.get(ConfigKeys.KEY_FLINK_CONF(null));
+        String parallelism = 
parameter.get(ConfigKeys.KEY_FLINK_PARALLELISM(null));
+        if (StringUtils.isNullOrWhitespaceOnly(cdcYamlDecode)
+            || StringUtils.isNullOrWhitespaceOnly(appNameDecode)
+            || StringUtils.isNullOrWhitespaceOnly(flinkConfigDecode)) {
+            LOG.error("--flink.conf or --app.name or `cdc yaml` must not be 
null.");
+            return;
+        }
+
+        String cdcYaml = DeflaterUtils.unzipString(cdcYamlDecode);
+        String appName = DeflaterUtils.unzipString(appNameDecode);
+        String flinkConfigString = 
DeflaterUtils.unzipString(flinkConfigDecode);
+        
configMap.putAll(PropertiesUtils.fromYamlTextAsJava(flinkConfigString));
+        configMap.put(YarnConfigOptions.APPLICATION_NAME.key(), appName);
+        configMap.put(CoreOptions.DEFAULT_PARALLELISM.key(), parallelism);
+        Configuration flinkConfig = Configuration.fromMap(configMap);
+        LOG.debug("Flink cdc config {}", flinkConfig);
+        LOG.debug("Flink cdc yaml {}", cdcYaml);
+        PipelineExecution.ExecutionInfo result =
+            new CDCExecutor(cdcYaml, flinkConfig, new ArrayList<>(), 
SavepointRestoreSettings.none()).run();
+        printExecutionInfo(result);
+
+    }
+
+    private static void printExecutionInfo(PipelineExecution.ExecutionInfo 
info) {
+        System.out.println("Pipeline has been submitted to cluster.");
+        System.out.printf("Job ID: %s\n", info.getId());
+        System.out.printf("Job Description: %s\n", info.getDescription());
+    }
+}
diff --git 
a/streampark-flink/streampark-flink-cdcclient/src/main/java/org/apache/streampark/flink/cdc/cli/CDCExecutor.java
 
b/streampark-flink/streampark-flink-cdcclient/src/main/java/org/apache/streampark/flink/cdc/cli/CDCExecutor.java
new file mode 100644
index 000000000..78a46d7a8
--- /dev/null
+++ 
b/streampark-flink/streampark-flink-cdcclient/src/main/java/org/apache/streampark/flink/cdc/cli/CDCExecutor.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.flink.cdc.cli;
+
+import org.apache.flink.cdc.cli.parser.PipelineDefinitionParser;
+import org.apache.flink.cdc.cli.parser.YamlPipelineDefinitionParser;
+import org.apache.flink.cdc.cli.utils.FlinkEnvironmentUtils;
+import org.apache.flink.cdc.common.configuration.Configuration;
+import org.apache.flink.cdc.composer.PipelineComposer;
+import org.apache.flink.cdc.composer.PipelineExecution;
+import org.apache.flink.cdc.composer.definition.PipelineDef;
+import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
+
+import java.nio.file.Path;
+import java.util.List;
+
+/**
+ * cdc executor
+ */
+public class CDCExecutor {
+
+    private final String pipelineString;
+    private final Configuration configuration;
+    private final SavepointRestoreSettings savePointSettings;
+    private final List<Path> additionalJar;
+
+    private PipelineComposer composer;
+
+    public CDCExecutor(String pipelineString,
+                       Configuration flinkConfig,
+                       List<Path> additionalJar,
+                       SavepointRestoreSettings savePointRestoreSettings) {
+        this.pipelineString = pipelineString;
+        this.configuration = flinkConfig;
+        this.additionalJar = additionalJar;
+        this.savePointSettings = savePointRestoreSettings;
+    }
+
+    public PipelineExecution.ExecutionInfo run() throws Exception {
+        PipelineDefinitionParser pipelineDefinitionParser = new 
YamlPipelineDefinitionParser();
+        PipelineDef pipelineDef = 
pipelineDefinitionParser.parse(pipelineString, configuration);
+        PipelineComposer composer = getComposer();
+        PipelineExecution execution = composer.compose(pipelineDef);
+        return execution.execute();
+    }
+
+    private PipelineComposer getComposer() throws Exception {
+        if (composer == null) {
+            return FlinkEnvironmentUtils.createComposer(
+                true, configuration, additionalJar, savePointSettings);
+        }
+        return composer;
+    }
+}
diff --git 
a/streampark-flink/streampark-flink-cdcclient/src/test/java/org/apache/streampark/flink/cdc/cli/package-info.java
 
b/streampark-flink/streampark-flink-cdcclient/src/test/java/org/apache/streampark/flink/cdc/cli/package-info.java
new file mode 100644
index 000000000..b85a06e75
--- /dev/null
+++ 
b/streampark-flink/streampark-flink-cdcclient/src/test/java/org/apache/streampark/flink/cdc/cli/package-info.java
@@ -0,0 +1,18 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.flink.cdc.cli;
diff --git 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkTableInitializer.scala
 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkTableInitializer.scala
index 7cb463ed7..8e1e19b7b 100644
--- 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkTableInitializer.scala
+++ 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkTableInitializer.scala
@@ -118,6 +118,12 @@ private[flink] class FlinkTableInitializer(args: 
Array[String], apiType: ApiType
         }
     }
 
+    parameter.get(KEY_FLINK_CONF(), null) match {
+      case null | "" =>
+        throw new ExceptionInInitializerError(
+          "[StreamPark] Usage:can't find config,please set \"--flink.conf 
$conf \" in main arguments")
+      case conf => 
builder.withConfiguration(Configuration.fromMap(PropertiesUtils.fromYamlText(DeflaterUtils.unzipString(conf))))
+    }
     val buildWith =
       (parameter.get(KEY_FLINK_TABLE_CATALOG), 
parameter.get(KEY_FLINK_TABLE_DATABASE))
     buildWith match {

Reply via email to