This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new b0357fa163 [INLONG-10056][Manager] Support new manager plugin for
flink 1.18 (#10077)
b0357fa163 is described below
commit b0357fa163e3e13fe8abe7e93415e6ffa8372726
Author: AloysZhang <[email protected]>
AuthorDate: Fri Apr 26 09:18:09 2024 +0800
[INLONG-10056][Manager] Support new manager plugin for flink 1.18 (#10077)
---
inlong-manager/manager-plugins/base/pom.xml | 47 +++++++--
.../base/src/main/assembly/assembly.xml | 21 ++++
.../{base => manager-plugins-flink-v1.18}/pom.xml | 84 ++++++----------
.../manager/plugin/flink/FlinkClientService.java | 112 +++++++++++++++++++++
inlong-manager/manager-plugins/pom.xml | 33 +++++-
5 files changed, 230 insertions(+), 67 deletions(-)
diff --git a/inlong-manager/manager-plugins/base/pom.xml
b/inlong-manager/manager-plugins/base/pom.xml
index f49388b5ba..9c738b7e69 100644
--- a/inlong-manager/manager-plugins/base/pom.xml
+++ b/inlong-manager/manager-plugins/base/pom.xml
@@ -45,17 +45,6 @@
<scope>provided</scope>
</dependency>
- <dependency>
- <groupId>org.apache.inlong</groupId>
- <artifactId>manager-plugins-flink-v1.13</artifactId>
- <version>${project.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.inlong</groupId>
- <artifactId>manager-plugins-flink-v1.15</artifactId>
- <version>${project.version}</version>
- </dependency>
-
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
@@ -116,4 +105,40 @@
</plugin>
</plugins>
</build>
+
+ <profiles>
+ <profile>
+ <id>v1.13</id>
+ <activation>
+ <activeByDefault>true</activeByDefault>
+ </activation>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.inlong</groupId>
+ <artifactId>manager-plugins-flink-v1.13</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ </dependencies>
+ </profile>
+ <profile>
+ <id>v1.15</id>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.inlong</groupId>
+ <artifactId>manager-plugins-flink-v1.15</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ </dependencies>
+ </profile>
+ <profile>
+ <id>v1.18</id>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.inlong</groupId>
+ <artifactId>manager-plugins-flink-v1.18</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ </dependencies>
+ </profile>
+ </profiles>
</project>
diff --git a/inlong-manager/manager-plugins/base/src/main/assembly/assembly.xml
b/inlong-manager/manager-plugins/base/src/main/assembly/assembly.xml
index 2779612f40..bc9c164d6a 100644
--- a/inlong-manager/manager-plugins/base/src/main/assembly/assembly.xml
+++ b/inlong-manager/manager-plugins/base/src/main/assembly/assembly.xml
@@ -52,6 +52,15 @@
</includes>
</fileSet>
+ <!-- Plugins Flink v1.18 -->
+ <fileSet>
+ <directory>../manager-plugins-flink-v1.18/target</directory>
+ <outputDirectory>./</outputDirectory>
+ <includes>
+ <include>manager-plugins-flink-v1.18.jar</include>
+ </includes>
+ </fileSet>
+
<!-- Flink v1.13 dependencies -->
<fileSet>
<directory>../manager-plugins-flink-v1.13/target</directory>
@@ -75,5 +84,17 @@
<include>manager-plugins-flink-v1.15.jar</include>
</includes>
</fileSet>
+
+ <!-- Flink v1.18 dependencies -->
+ <fileSet>
+ <directory>../manager-plugins-flink-v1.18/target</directory>
+ <outputDirectory>./flink-v1.18</outputDirectory>
+ <includes>
+ <include>flink-*.jar</include>
+ <include>sort-flink-*.jar</include>
+ <include>scala-*.jar</include>
+ <include>manager-plugins-flink-v1.18.jar</include>
+ </includes>
+ </fileSet>
</fileSets>
</assembly>
\ No newline at end of file
diff --git a/inlong-manager/manager-plugins/base/pom.xml
b/inlong-manager/manager-plugins/manager-plugins-flink-v1.18/pom.xml
similarity index 50%
copy from inlong-manager/manager-plugins/base/pom.xml
copy to inlong-manager/manager-plugins/manager-plugins-flink-v1.18/pom.xml
index f49388b5ba..6ad32208c5 100644
--- a/inlong-manager/manager-plugins/base/pom.xml
+++ b/inlong-manager/manager-plugins/manager-plugins-flink-v1.18/pom.xml
@@ -24,96 +24,72 @@
<version>1.13.0-SNAPSHOT</version>
</parent>
- <artifactId>manager-plugins-base</artifactId>
- <name>Apache InLong - Manager Plugins Base</name>
+ <artifactId>manager-plugins-flink-v1.18</artifactId>
+ <name>Apache InLong - Manager Plugins Flink v1.18</name>
<properties>
<inlong.root.dir>${project.parent.parent.parent.basedir}</inlong.root.dir>
+ <flink.version>1.18.1</flink.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.inlong</groupId>
- <artifactId>manager-common</artifactId>
+ <artifactId>sort-flink-dependencies-v1.18</artifactId>
<version>${project.version}</version>
- <scope>provided</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-file-sink-common</artifactId>
+ </exclusion>
+ </exclusions>
</dependency>
<dependency>
- <groupId>org.apache.inlong</groupId>
- <artifactId>manager-workflow</artifactId>
- <version>${project.version}</version>
- <scope>provided</scope>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-table-api-java</artifactId>
+ <version>${flink.version}</version>
</dependency>
-
<dependency>
- <groupId>org.apache.inlong</groupId>
- <artifactId>manager-plugins-flink-v1.13</artifactId>
- <version>${project.version}</version>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-core</artifactId>
+ <version>${flink.version}</version>
</dependency>
<dependency>
- <groupId>org.apache.inlong</groupId>
- <artifactId>manager-plugins-flink-v1.15</artifactId>
- <version>${project.version}</version>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-table-common</artifactId>
+ <version>${flink.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-shaded-jackson</artifactId>
+ <version>2.15.3-18.0</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
- <dependency>
- <groupId>com.fasterxml.jackson.core</groupId>
- <artifactId>jackson-core</artifactId>
- </dependency>
- <dependency>
- <groupId>org.junit.jupiter</groupId>
- <artifactId>junit-jupiter</artifactId>
- <scope>test</scope>
- </dependency>
</dependencies>
<build>
+ <finalName>manager-plugins-flink-v1.18</finalName>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-resources-plugin</artifactId>
+ <artifactId>maven-dependency-plugin</artifactId>
<executions>
<execution>
- <id>copy-resources</id>
+ <id>copy-dependencies</id>
<goals>
- <goal>copy-resources</goal>
+ <goal>copy-dependencies</goal>
</goals>
- <phase>prepare-package</phase>
+ <phase>package</phase>
<configuration>
- <outputDirectory>target/plugins</outputDirectory>
- <resources>
- <resource>
- <directory>src/main/resources</directory>
- </resource>
- </resources>
+ <outputDirectory>target/</outputDirectory>
</configuration>
</execution>
</executions>
</plugin>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-assembly-plugin</artifactId>
- <configuration>
- <finalName>plugins</finalName>
- <appendAssemblyId>false</appendAssemblyId>
- <descriptors>
- <descriptor>src/main/assembly/assembly.xml</descriptor>
- </descriptors>
- </configuration>
- <executions>
- <execution>
- <id>plugins</id>
- <goals>
- <goal>single</goal>
- </goals>
- <phase>package</phase>
- </execution>
- </executions>
- </plugin>
</plugins>
</build>
</project>
diff --git
a/inlong-manager/manager-plugins/manager-plugins-flink-v1.18/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkClientService.java
b/inlong-manager/manager-plugins/manager-plugins-flink-v1.18/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkClientService.java
new file mode 100644
index 0000000000..58094e9bf7
--- /dev/null
+++
b/inlong-manager/manager-plugins/manager-plugins-flink-v1.18/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkClientService.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.plugin.flink;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.client.deployment.StandaloneClusterId;
+import org.apache.flink.client.program.rest.RestClusterClient;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.execution.SavepointFormatType;
+import org.apache.flink.runtime.rest.messages.job.JobDetailsInfo;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Flink service, such as save or get flink config info, etc.
+ */
+@Slf4j
+public class FlinkClientService {
+
+ private final Configuration configuration;
+ private final RestClusterClient<StandaloneClusterId> flinkClient;
+
+ public FlinkClientService(Configuration configuration) throws Exception {
+ this.configuration = configuration;
+ this.flinkClient = getFlinkClient();
+ }
+
+ /**
+ * Get the Flink Client.
+ */
+ public RestClusterClient<StandaloneClusterId> getFlinkClient() throws
Exception {
+ try {
+ return new RestClusterClient<>(configuration,
StandaloneClusterId.getInstance());
+ } catch (Exception e) {
+ log.error("get flink client failed: ", e);
+ throw new Exception("get flink client failed: " + e.getMessage());
+ }
+ }
+
+ /**
+ * Get the job status by the given job id.
+ */
+ public JobStatus getJobStatus(String jobId) throws Exception {
+ try {
+ JobID jobID = JobID.fromHexString(jobId);
+ CompletableFuture<JobStatus> jobStatus =
flinkClient.getJobStatus(jobID);
+ return jobStatus.get();
+ } catch (Exception e) {
+ log.error("get job status by jobId={} failed: ", jobId, e);
+ throw new Exception("get job status by jobId=" + jobId + " failed:
" + e.getMessage());
+ }
+ }
+
+ /**
+ * Get job detail by the given job id.
+ */
+ public JobDetailsInfo getJobDetail(String jobId) throws Exception {
+ try {
+ JobID jobID = JobID.fromHexString(jobId);
+ CompletableFuture<JobDetailsInfo> jobDetails =
flinkClient.getJobDetails(jobID);
+ return jobDetails.get();
+ } catch (Exception e) {
+ log.error("get job detail by jobId={} failed: ", jobId, e);
+ throw new Exception("get job detail by jobId=" + jobId + " failed:
" + e.getMessage());
+ }
+ }
+
+ /**
+ * Stop the Flink job with the savepoint.
+ */
+ public String stopJob(String jobId, boolean isDrain, String
savepointDirectory) throws Exception {
+ try {
+ JobID jobID = JobID.fromHexString(jobId);
+ CompletableFuture<String> stopResult =
flinkClient.stopWithSavepoint(jobID, isDrain, savepointDirectory,
+ SavepointFormatType.CANONICAL);
+ return stopResult.get();
+ } catch (Exception e) {
+ log.error("stop job {} failed and savepoint directory is {} : ",
jobId, savepointDirectory, e);
+ throw new Exception("stop job " + jobId + " failed: " +
e.getMessage());
+ }
+ }
+
+ /**
+ * Cancel the Flink job.
+ */
+ public void cancelJob(String jobId) throws Exception {
+ try {
+ JobID jobID = JobID.fromHexString(jobId);
+ flinkClient.cancel(jobID);
+ } catch (Exception e) {
+ log.error("cancel job {} failed: ", jobId, e);
+ throw new Exception("cancel job " + jobId + " failed: " +
e.getMessage());
+ }
+ }
+}
diff --git a/inlong-manager/manager-plugins/pom.xml
b/inlong-manager/manager-plugins/pom.xml
index 6d3e1cd764..4bb92e96e0 100644
--- a/inlong-manager/manager-plugins/pom.xml
+++ b/inlong-manager/manager-plugins/pom.xml
@@ -29,8 +29,6 @@
<name>Apache InLong - Manager Plugins</name>
<modules>
- <module>manager-plugins-flink-v1.13</module>
- <module>manager-plugins-flink-v1.15</module>
<module>base</module>
</modules>
@@ -38,4 +36,35 @@
<inlong.root.dir>${project.parent.parent.basedir}</inlong.root.dir>
</properties>
+ <profiles>
+ <profile>
+ <id>flink-all-version</id>
+ <activation>
+ <activeByDefault>true</activeByDefault>
+ </activation>
+ <modules>
+ <module>manager-plugins-flink-v1.13</module>
+ <module>manager-plugins-flink-v1.15</module>
+ <module>manager-plugins-flink-v1.18</module>
+ </modules>
+ </profile>
+ <profile>
+ <id>v1.13</id>
+ <modules>
+ <module>manager-plugins-flink-v1.13</module>
+ </modules>
+ </profile>
+ <profile>
+ <id>v1.15</id>
+ <modules>
+ <module>manager-plugins-flink-v1.15</module>
+ </modules>
+ </profile>
+ <profile>
+ <id>v1.18</id>
+ <modules>
+ <module>manager-plugins-flink-v1.18</module>
+ </modules>
+ </profile>
+ </profiles>
</project>