This is an automated email from the ASF dual-hosted git repository.
healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new db75ab1e9e [INLONG-8264][Manager] Manager supports Flink 1.15 (#8265)
db75ab1e9e is described below
commit db75ab1e9e56ceec6df3d2d1e98ee92b5abe7b99
Author: haifxu <[email protected]>
AuthorDate: Mon Jul 10 20:36:53 2023 +0800
[INLONG-8264][Manager] Manager supports Flink 1.15 (#8265)
---
inlong-manager/manager-plugins/{ => base}/pom.xml | 28 +++--
.../{ => base}/src/main/assembly/assembly.xml | 39 +++++++
.../manager/plugin/FlinkSortPollerPlugin.java | 0
.../manager/plugin/FlinkSortProcessPlugin.java | 0
.../manager/plugin/flink/FlinkOperation.java | 0
.../inlong/manager/plugin/flink/FlinkService.java | 58 ++---------
.../plugin/flink/IntegrationTaskRunner.java | 0
.../manager/plugin/flink/TaskRunService.java | 0
.../manager/plugin/flink/dto/FlinkConfig.java | 3 +
.../inlong/manager/plugin/flink/dto/FlinkInfo.java | 0
.../manager/plugin/flink/dto/JarEntryInfo.java | 0
.../manager/plugin/flink/dto/JarFileInfo.java | 0
.../manager/plugin/flink/dto/JarListInfo.java | 0
.../manager/plugin/flink/dto/JarRunRequest.java | 0
.../inlong/manager/plugin/flink/dto/Jars.java | 0
.../inlong/manager/plugin/flink/dto/LoginConf.java | 0
.../plugin/flink/dto/StopWithSavepointRequest.java | 0
.../plugin/flink/enums/ConnectorJarType.java | 0
.../manager/plugin/flink/enums/Constants.java | 7 ++
.../manager/plugin/flink/enums/TaskCommitType.java | 0
.../plugin/listener/DeleteSortListener.java | 0
.../plugin/listener/DeleteStreamListener.java | 0
.../plugin/listener/RestartSortListener.java | 0
.../plugin/listener/RestartStreamListener.java | 0
.../plugin/listener/StartupSortListener.java | 0
.../plugin/listener/StartupStreamListener.java | 0
.../plugin/listener/SuspendSortListener.java | 0
.../plugin/listener/SuspendStreamListener.java | 0
.../manager/plugin/poller/SortStatusPoller.java | 0
.../manager/plugin/util/FlinkConfiguration.java | 2 +
.../manager/plugin/util/FlinkServiceUtils.java | 59 +++++++++++
.../inlong/manager/plugin/util/FlinkUtils.java | 0
.../src/main/resources/META-INF/plugin.yaml | 0
.../main/resources/flink-sort-plugin.properties | 2 +
.../plugin/listener/DeleteSortListenerTest.java | 0
.../plugin/listener/RestartSortListenerTest.java | 0
.../plugin/listener/StartupSortListenerTest.java | 0
.../plugin/listener/SuspendSortListenerTest.java | 0
.../manager-plugins-flink-v1.13/pom.xml | 75 ++++++++++++++
.../manager/plugin/flink/FlinkClientService.java | 112 ++++++++++++++++++++
.../{ => manager-plugins-flink-v1.15}/pom.xml | 89 ++++++----------
.../manager/plugin/flink/FlinkClientService.java | 114 +++++++++++++++++++++
inlong-manager/manager-plugins/pom.xml | 96 ++---------------
.../manager/service/plugin/PluginClassLoader.java | 3 +-
.../manager/service/plugin/PluginService.java | 1 -
...xample.jar => manager-plugins-base-example.jar} | Bin
inlong-manager/manager-web/assembly.xml | 7 +-
inlong-manager/manager-web/bin/startup.sh | 4 +-
inlong-manager/manager-web/pom.xml | 2 +-
.../sort-connectors/postgres-cdc/pom.xml | 5 +
.../sort/postgre/PostgreSQLTableFactory.java | 6 ++
51 files changed, 495 insertions(+), 217 deletions(-)
diff --git a/inlong-manager/manager-plugins/pom.xml
b/inlong-manager/manager-plugins/base/pom.xml
similarity index 86%
copy from inlong-manager/manager-plugins/pom.xml
copy to inlong-manager/manager-plugins/base/pom.xml
index c60b924f7e..cc77e45ec1 100644
--- a/inlong-manager/manager-plugins/pom.xml
+++ b/inlong-manager/manager-plugins/base/pom.xml
@@ -20,15 +20,15 @@
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.inlong</groupId>
- <artifactId>inlong-manager</artifactId>
+ <artifactId>manager-plugins</artifactId>
<version>1.9.0-SNAPSHOT</version>
</parent>
- <artifactId>manager-plugins</artifactId>
- <name>Apache InLong - Manager Plugins</name>
+ <artifactId>manager-plugins-base</artifactId>
+ <name>Apache InLong - Manager Plugins Base</name>
<properties>
- <inlong.root.dir>${project.parent.parent.basedir}</inlong.root.dir>
+
<inlong.root.dir>${project.parent.parent.parent.basedir}</inlong.root.dir>
</properties>
<dependencies>
@@ -44,25 +44,21 @@
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
+
<dependency>
- <groupId>org.projectlombok</groupId>
- <artifactId>lombok</artifactId>
+ <groupId>org.apache.inlong</groupId>
+ <artifactId>manager-plugins-flink-v1.13</artifactId>
+ <version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.inlong</groupId>
- <artifactId>sort-flink-dependencies-v1.13</artifactId>
+ <artifactId>manager-plugins-flink-v1.15</artifactId>
<version>${project.version}</version>
- <exclusions>
- <exclusion>
- <groupId>org.scala-lang</groupId>
- <artifactId>scala-library</artifactId>
- </exclusion>
- </exclusions>
</dependency>
+
<dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-table-api-java</artifactId>
- <version>${flink.version}</version>
+ <groupId>org.projectlombok</groupId>
+ <artifactId>lombok</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
diff --git a/inlong-manager/manager-plugins/src/main/assembly/assembly.xml
b/inlong-manager/manager-plugins/base/src/main/assembly/assembly.xml
similarity index 52%
rename from inlong-manager/manager-plugins/src/main/assembly/assembly.xml
rename to inlong-manager/manager-plugins/base/src/main/assembly/assembly.xml
index e3c6f29b38..f7818d38d4 100644
--- a/inlong-manager/manager-plugins/src/main/assembly/assembly.xml
+++ b/inlong-manager/manager-plugins/base/src/main/assembly/assembly.xml
@@ -33,5 +33,44 @@
<include>${project.artifactId}-${project.version}.jar</include>
</includes>
</fileSet>
+
+ <!-- Plugins Flink v1.13 -->
+ <fileSet>
+ <directory>../manager-plugins-flink-v1.13/target</directory>
+ <outputDirectory>./</outputDirectory>
+ <includes>
+ <include>manager-plugins-flink-v1.13.jar</include>
+ </includes>
+ </fileSet>
+
+ <!-- Plugins Flink v1.15 -->
+ <fileSet>
+ <directory>../manager-plugins-flink-v1.15/target</directory>
+ <outputDirectory>./</outputDirectory>
+ <includes>
+ <include>manager-plugins-flink-v1.15.jar</include>
+ </includes>
+ </fileSet>
+
+ <!-- Flink v1.13 dependencies -->
+ <fileSet>
+ <directory>../manager-plugins-flink-v1.13/target</directory>
+ <outputDirectory>./flink-v1.13</outputDirectory>
+ <includes>
+ <include>flink-*.jar</include>
+ <include>sort-flink-*.jar</include>
+ <include>scala-*.jar</include>
+ </includes>
+ </fileSet>
+
+ <!-- Flink v1.15 dependencies -->
+ <fileSet>
+ <directory>../manager-plugins-flink-v1.15/target</directory>
+ <outputDirectory>./flink-v1.15</outputDirectory>
+ <includes>
+ <include>flink-*.jar</include>
+ <include>sort-flink-*.jar</include>
+ </includes>
+ </fileSet>
</fileSets>
</assembly>
\ No newline at end of file
diff --git
a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/FlinkSortPollerPlugin.java
b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/FlinkSortPollerPlugin.java
similarity index 100%
rename from
inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/FlinkSortPollerPlugin.java
rename to
inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/FlinkSortPollerPlugin.java
diff --git
a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/FlinkSortProcessPlugin.java
b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/FlinkSortProcessPlugin.java
similarity index 100%
rename from
inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/FlinkSortProcessPlugin.java
rename to
inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/FlinkSortProcessPlugin.java
diff --git
a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkOperation.java
b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkOperation.java
similarity index 100%
rename from
inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkOperation.java
rename to
inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkOperation.java
diff --git
a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkService.java
b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkService.java
similarity index 77%
rename from
inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkService.java
rename to
inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkService.java
index e83c712626..3732279889 100644
---
a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkService.java
+++
b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkService.java
@@ -23,6 +23,7 @@ import org.apache.inlong.manager.plugin.flink.dto.FlinkInfo;
import org.apache.inlong.manager.plugin.flink.dto.StopWithSavepointRequest;
import org.apache.inlong.manager.plugin.flink.enums.Constants;
import org.apache.inlong.manager.plugin.util.FlinkConfiguration;
+import org.apache.inlong.manager.plugin.util.FlinkServiceUtils;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
@@ -64,6 +65,7 @@ public class FlinkService {
private final Integer parallelism;
private final String savepointDirectory;
private final Configuration configuration;
+ private final FlinkClientService clientService;
/**
* Constructor of FlinkService.
@@ -93,6 +95,8 @@ public class FlinkService {
}
configuration.setString(JobManagerOptions.ADDRESS, address);
configuration.setInteger(RestOptions.PORT, port);
+
+ clientService = (FlinkClientService)
FlinkServiceUtils.getFlinkClientService(configuration, flinkConfig);
}
/**
@@ -117,46 +121,18 @@ public class FlinkService {
return flinkConfig;
}
- /**
- * Get the Flink Client.
- */
- public RestClusterClient<StandaloneClusterId> getFlinkClient() throws
Exception {
- try {
- return new RestClusterClient<>(configuration,
StandaloneClusterId.getInstance());
- } catch (Exception e) {
- log.error("get flink client failed: ", e);
- throw new Exception("get flink client failed: " + e.getMessage());
- }
- }
-
/**
* Get the job status by the given job id.
*/
public JobStatus getJobStatus(String jobId) throws Exception {
- try {
- RestClusterClient<StandaloneClusterId> client = getFlinkClient();
- JobID jobID = JobID.fromHexString(jobId);
- CompletableFuture<JobStatus> jobStatus =
client.getJobStatus(jobID);
- return jobStatus.get();
- } catch (Exception e) {
- log.error("get job status by jobId={} failed: ", jobId, e);
- throw new Exception("get job status by jobId=" + jobId + " failed:
" + e.getMessage());
- }
+ return clientService.getJobStatus(jobId);
}
/**
* Get job detail by the given job id.
*/
public JobDetailsInfo getJobDetail(String jobId) throws Exception {
- try {
- RestClusterClient<StandaloneClusterId> client = getFlinkClient();
- JobID jobID = JobID.fromHexString(jobId);
- CompletableFuture<JobDetailsInfo> jobDetails =
client.getJobDetails(jobID);
- return jobDetails.get();
- } catch (Exception e) {
- log.error("get job detail by jobId={} failed: ", jobId, e);
- throw new Exception("get job detail by jobId=" + jobId + " failed:
" + e.getMessage());
- }
+ return clientService.getJobDetail(jobId);
}
/**
@@ -216,7 +192,7 @@ public class FlinkService {
JobGraph jobGraph = PackagedProgramUtils.createJobGraph(program,
configuration, parallelism, false);
jobGraph.addJars(connectorJars);
- RestClusterClient<StandaloneClusterId> client = getFlinkClient();
+ RestClusterClient<StandaloneClusterId> client =
clientService.getFlinkClient();
CompletableFuture<JobID> result = client.submitJob(jobGraph);
return result.get().toString();
}
@@ -225,30 +201,14 @@ public class FlinkService {
* Stop the Flink job with the savepoint.
*/
public String stopJob(String jobId, StopWithSavepointRequest request)
throws Exception {
- try {
- RestClusterClient<StandaloneClusterId> client = getFlinkClient();
- JobID jobID = JobID.fromHexString(jobId);
- CompletableFuture<String> stopResult =
client.stopWithSavepoint(jobID, request.isDrain(),
- request.getTargetDirectory());
- return stopResult.get();
- } catch (Exception e) {
- log.error("stop job {} and request {} failed: ", jobId, request,
e);
- throw new Exception("stop job " + jobId + " failed: " +
e.getMessage());
- }
+ return clientService.stopJob(jobId, request.isDrain(),
request.getTargetDirectory());
}
/**
* Cancel the Flink job.
*/
public void cancelJob(String jobId) throws Exception {
- try {
- RestClusterClient<StandaloneClusterId> client = getFlinkClient();
- JobID jobID = JobID.fromHexString(jobId);
- client.cancel(jobID);
- } catch (Exception e) {
- log.error("cancel job {} failed: ", jobId, e);
- throw new Exception("cancel job " + jobId + " failed: " +
e.getMessage());
- }
+ clientService.cancelJob(jobId);
}
/**
diff --git
a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/IntegrationTaskRunner.java
b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/IntegrationTaskRunner.java
similarity index 100%
rename from
inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/IntegrationTaskRunner.java
rename to
inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/IntegrationTaskRunner.java
diff --git
a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/TaskRunService.java
b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/TaskRunService.java
similarity index 100%
rename from
inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/TaskRunService.java
rename to
inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/TaskRunService.java
diff --git
a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/dto/FlinkConfig.java
b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/dto/FlinkConfig.java
similarity index 96%
rename from
inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/dto/FlinkConfig.java
rename to
inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/dto/FlinkConfig.java
index dd2f58ae9d..54320a892c 100644
---
a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/dto/FlinkConfig.java
+++
b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/dto/FlinkConfig.java
@@ -39,4 +39,7 @@ public class FlinkConfig {
private String auditProxyHosts;
+ // flink version
+ private String version;
+
}
diff --git
a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/dto/FlinkInfo.java
b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/dto/FlinkInfo.java
similarity index 100%
rename from
inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/dto/FlinkInfo.java
rename to
inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/dto/FlinkInfo.java
diff --git
a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/dto/JarEntryInfo.java
b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/dto/JarEntryInfo.java
similarity index 100%
rename from
inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/dto/JarEntryInfo.java
rename to
inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/dto/JarEntryInfo.java
diff --git
a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/dto/JarFileInfo.java
b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/dto/JarFileInfo.java
similarity index 100%
rename from
inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/dto/JarFileInfo.java
rename to
inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/dto/JarFileInfo.java
diff --git
a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/dto/JarListInfo.java
b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/dto/JarListInfo.java
similarity index 100%
rename from
inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/dto/JarListInfo.java
rename to
inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/dto/JarListInfo.java
diff --git
a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/dto/JarRunRequest.java
b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/dto/JarRunRequest.java
similarity index 100%
rename from
inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/dto/JarRunRequest.java
rename to
inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/dto/JarRunRequest.java
diff --git
a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/dto/Jars.java
b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/dto/Jars.java
similarity index 100%
rename from
inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/dto/Jars.java
rename to
inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/dto/Jars.java
diff --git
a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/dto/LoginConf.java
b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/dto/LoginConf.java
similarity index 100%
rename from
inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/dto/LoginConf.java
rename to
inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/dto/LoginConf.java
diff --git
a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/dto/StopWithSavepointRequest.java
b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/dto/StopWithSavepointRequest.java
similarity index 100%
rename from
inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/dto/StopWithSavepointRequest.java
rename to
inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/dto/StopWithSavepointRequest.java
diff --git
a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/enums/ConnectorJarType.java
b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/enums/ConnectorJarType.java
similarity index 100%
rename from
inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/enums/ConnectorJarType.java
rename to
inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/enums/ConnectorJarType.java
diff --git
a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/enums/Constants.java
b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/enums/Constants.java
similarity index 92%
rename from
inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/enums/Constants.java
rename to
inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/enums/Constants.java
index 6834fa0849..b5628e8664 100644
---
a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/enums/Constants.java
+++
b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/enums/Constants.java
@@ -41,6 +41,8 @@ public class Constants {
public static final String DRAIN = "flink.drain";
+ public static final String FLINK_VERSION = "flink.version";
+
// dataflow
public static final String SOURCE_INFO = "source_info";
@@ -58,6 +60,11 @@ public class Constants {
public static final String RESOURCE_ID = "resource_id";
+ // flink
+ public static final String FLINK_CLIENT_CLASS =
"org.apache.inlong.manager.plugin.flink.FlinkClientService";
+
+ public static final String FLINK_JAR_NAME =
"manager-plugins-flink-v%s.jar";
+
// REST API URL
public static final String JOB_URL = "/jobs";
diff --git
a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/enums/TaskCommitType.java
b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/enums/TaskCommitType.java
similarity index 100%
rename from
inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/enums/TaskCommitType.java
rename to
inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/enums/TaskCommitType.java
diff --git
a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/DeleteSortListener.java
b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/DeleteSortListener.java
similarity index 100%
rename from
inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/DeleteSortListener.java
rename to
inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/DeleteSortListener.java
diff --git
a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/DeleteStreamListener.java
b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/DeleteStreamListener.java
similarity index 100%
rename from
inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/DeleteStreamListener.java
rename to
inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/DeleteStreamListener.java
diff --git
a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/RestartSortListener.java
b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/RestartSortListener.java
similarity index 100%
rename from
inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/RestartSortListener.java
rename to
inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/RestartSortListener.java
diff --git
a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/RestartStreamListener.java
b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/RestartStreamListener.java
similarity index 100%
rename from
inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/RestartStreamListener.java
rename to
inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/RestartStreamListener.java
diff --git
a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/StartupSortListener.java
b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/StartupSortListener.java
similarity index 100%
rename from
inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/StartupSortListener.java
rename to
inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/StartupSortListener.java
diff --git
a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/StartupStreamListener.java
b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/StartupStreamListener.java
similarity index 100%
rename from
inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/StartupStreamListener.java
rename to
inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/StartupStreamListener.java
diff --git
a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/SuspendSortListener.java
b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/SuspendSortListener.java
similarity index 100%
rename from
inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/SuspendSortListener.java
rename to
inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/SuspendSortListener.java
diff --git
a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/SuspendStreamListener.java
b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/SuspendStreamListener.java
similarity index 100%
rename from
inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/SuspendStreamListener.java
rename to
inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/SuspendStreamListener.java
diff --git
a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/poller/SortStatusPoller.java
b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/poller/SortStatusPoller.java
similarity index 100%
rename from
inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/poller/SortStatusPoller.java
rename to
inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/poller/SortStatusPoller.java
diff --git
a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/util/FlinkConfiguration.java
b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/util/FlinkConfiguration.java
similarity index 96%
rename from
inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/util/FlinkConfiguration.java
rename to
inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/util/FlinkConfiguration.java
index 103316f630..cc84a652b8 100644
---
a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/util/FlinkConfiguration.java
+++
b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/util/FlinkConfiguration.java
@@ -31,6 +31,7 @@ import java.util.Properties;
import static
org.apache.inlong.common.constant.Constants.METRICS_AUDIT_PROXY_HOSTS_KEY;
import static org.apache.inlong.manager.plugin.flink.enums.Constants.ADDRESS;
import static org.apache.inlong.manager.plugin.flink.enums.Constants.DRAIN;
+import static
org.apache.inlong.manager.plugin.flink.enums.Constants.FLINK_VERSION;
import static
org.apache.inlong.manager.plugin.flink.enums.Constants.JOB_MANAGER_PORT;
import static
org.apache.inlong.manager.plugin.flink.enums.Constants.PARALLELISM;
import static org.apache.inlong.manager.plugin.flink.enums.Constants.PORT;
@@ -104,6 +105,7 @@ public class FlinkConfiguration {
flinkConfig.setJobManagerPort(Integer.valueOf(properties.getProperty(JOB_MANAGER_PORT)));
flinkConfig.setDrain(Boolean.parseBoolean(properties.getProperty(DRAIN)));
flinkConfig.setAuditProxyHosts(properties.getProperty(METRICS_AUDIT_PROXY_HOSTS_KEY));
+ flinkConfig.setVersion(properties.getProperty(FLINK_VERSION));
return flinkConfig;
}
diff --git
a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/util/FlinkServiceUtils.java
b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/util/FlinkServiceUtils.java
new file mode 100644
index 0000000000..82cd665538
--- /dev/null
+++
b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/util/FlinkServiceUtils.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.plugin.util;
+
+import org.apache.inlong.manager.plugin.flink.dto.FlinkConfig;
+import org.apache.inlong.manager.plugin.flink.enums.Constants;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.flink.configuration.Configuration;
+
+import java.io.File;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+
+@Slf4j
+public class FlinkServiceUtils {
+
+ private static final String DEFAULT_PLUGINS = "plugins";
+
+ private static final String FILE_PREFIX = "file://";
+
+ public static Object getFlinkClientService(Configuration configuration,
FlinkConfig flinkConfig) {
+ log.info("Flink version {}", flinkConfig.getVersion());
+
+ Path pluginPath = Paths.get(DEFAULT_PLUGINS).toAbsolutePath();
+ String flinkJarName = String.format(Constants.FLINK_JAR_NAME,
flinkConfig.getVersion());
+ String flinkClientPath = FILE_PREFIX + pluginPath + File.separator +
flinkJarName;
+ log.info("Start to load Flink jar: {}", flinkClientPath);
+
+ try (URLClassLoader classLoader = new URLClassLoader(new URL[]{new
URL(flinkClientPath)}, Thread.currentThread()
+ .getContextClassLoader())) {
+ Class<?> flinkClientService =
classLoader.loadClass(Constants.FLINK_CLIENT_CLASS);
+ Object flinkService =
flinkClientService.getDeclaredConstructor(Configuration.class)
+ .newInstance(configuration);
+ log.info("Successfully loaded Flink service");
+ return flinkService;
+ } catch (Exception e) {
+ log.error("Failed to loaded Flink service, please check flink
client jar path: {}", flinkClientPath);
+ throw new RuntimeException(e);
+ }
+ }
+}
diff --git
a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/util/FlinkUtils.java
b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/util/FlinkUtils.java
similarity index 100%
rename from
inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/util/FlinkUtils.java
rename to
inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/util/FlinkUtils.java
diff --git
a/inlong-manager/manager-plugins/src/main/resources/META-INF/plugin.yaml
b/inlong-manager/manager-plugins/base/src/main/resources/META-INF/plugin.yaml
similarity index 100%
rename from
inlong-manager/manager-plugins/src/main/resources/META-INF/plugin.yaml
rename to
inlong-manager/manager-plugins/base/src/main/resources/META-INF/plugin.yaml
diff --git
a/inlong-manager/manager-plugins/src/main/resources/flink-sort-plugin.properties
b/inlong-manager/manager-plugins/base/src/main/resources/flink-sort-plugin.properties
similarity index 92%
rename from
inlong-manager/manager-plugins/src/main/resources/flink-sort-plugin.properties
rename to
inlong-manager/manager-plugins/base/src/main/resources/flink-sort-plugin.properties
index 7b70c404ae..347d5caa9a 100644
---
a/inlong-manager/manager-plugins/src/main/resources/flink-sort-plugin.properties
+++
b/inlong-manager/manager-plugins/base/src/main/resources/flink-sort-plugin.properties
@@ -21,6 +21,8 @@
########################
# flink config
########################
+# Flink version, support [1.13|1.15]
+flink.version=1.13
# the REST server address for Flink
flink.rest.address=127.0.0.1
# the REST server Port for Flink
diff --git
a/inlong-manager/manager-plugins/src/test/java/org/apache/inlong/manager/plugin/listener/DeleteSortListenerTest.java
b/inlong-manager/manager-plugins/base/src/test/java/org/apache/inlong/manager/plugin/listener/DeleteSortListenerTest.java
similarity index 100%
rename from
inlong-manager/manager-plugins/src/test/java/org/apache/inlong/manager/plugin/listener/DeleteSortListenerTest.java
rename to
inlong-manager/manager-plugins/base/src/test/java/org/apache/inlong/manager/plugin/listener/DeleteSortListenerTest.java
diff --git
a/inlong-manager/manager-plugins/src/test/java/org/apache/inlong/manager/plugin/listener/RestartSortListenerTest.java
b/inlong-manager/manager-plugins/base/src/test/java/org/apache/inlong/manager/plugin/listener/RestartSortListenerTest.java
similarity index 100%
rename from
inlong-manager/manager-plugins/src/test/java/org/apache/inlong/manager/plugin/listener/RestartSortListenerTest.java
rename to
inlong-manager/manager-plugins/base/src/test/java/org/apache/inlong/manager/plugin/listener/RestartSortListenerTest.java
diff --git
a/inlong-manager/manager-plugins/src/test/java/org/apache/inlong/manager/plugin/listener/StartupSortListenerTest.java
b/inlong-manager/manager-plugins/base/src/test/java/org/apache/inlong/manager/plugin/listener/StartupSortListenerTest.java
similarity index 100%
rename from
inlong-manager/manager-plugins/src/test/java/org/apache/inlong/manager/plugin/listener/StartupSortListenerTest.java
rename to
inlong-manager/manager-plugins/base/src/test/java/org/apache/inlong/manager/plugin/listener/StartupSortListenerTest.java
diff --git
a/inlong-manager/manager-plugins/src/test/java/org/apache/inlong/manager/plugin/listener/SuspendSortListenerTest.java
b/inlong-manager/manager-plugins/base/src/test/java/org/apache/inlong/manager/plugin/listener/SuspendSortListenerTest.java
similarity index 100%
rename from
inlong-manager/manager-plugins/src/test/java/org/apache/inlong/manager/plugin/listener/SuspendSortListenerTest.java
rename to
inlong-manager/manager-plugins/base/src/test/java/org/apache/inlong/manager/plugin/listener/SuspendSortListenerTest.java
diff --git a/inlong-manager/manager-plugins/manager-plugins-flink-v1.13/pom.xml
b/inlong-manager/manager-plugins/manager-plugins-flink-v1.13/pom.xml
new file mode 100644
index 0000000000..bf884777f5
--- /dev/null
+++ b/inlong-manager/manager-plugins/manager-plugins-flink-v1.13/pom.xml
@@ -0,0 +1,75 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.inlong</groupId>
+ <artifactId>manager-plugins</artifactId>
+ <version>1.9.0-SNAPSHOT</version>
+ </parent>
+
+ <artifactId>manager-plugins-flink-v1.13</artifactId>
+ <name>Apache InLong - Manager Plugins Flink v1.13</name>
+
+ <properties>
+
<inlong.root.dir>${project.parent.parent.parent.basedir}</inlong.root.dir>
+ <flink.version>1.13.5</flink.version>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.inlong</groupId>
+ <artifactId>sort-flink-dependencies-v1.13</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-table-api-java</artifactId>
+ <version>${flink.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.projectlombok</groupId>
+ <artifactId>lombok</artifactId>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <finalName>manager-plugins-flink-v1.13</finalName>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-dependency-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>copy-dependencies</id>
+ <goals>
+ <goal>copy-dependencies</goal>
+ </goals>
+ <phase>package</phase>
+ <configuration>
+ <outputDirectory>target/</outputDirectory>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+
+</project>
diff --git
a/inlong-manager/manager-plugins/manager-plugins-flink-v1.13/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkClientService.java
b/inlong-manager/manager-plugins/manager-plugins-flink-v1.13/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkClientService.java
new file mode 100644
index 0000000000..4256db3842
--- /dev/null
+++
b/inlong-manager/manager-plugins/manager-plugins-flink-v1.13/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkClientService.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.plugin.flink;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.client.deployment.StandaloneClusterId;
+import org.apache.flink.client.program.rest.RestClusterClient;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.rest.messages.job.JobDetailsInfo;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Flink service, such as save or get flink config info, etc.
+ */
+@Slf4j
+public class FlinkClientService {
+
+ private final Configuration configuration;
+
+ public FlinkClientService(Configuration configuration) {
+ this.configuration = configuration;
+ }
+
+ /**
+ * Get the Flink Client.
+ */
+ public RestClusterClient<StandaloneClusterId> getFlinkClient() throws
Exception {
+ try {
+ return new RestClusterClient<>(configuration,
StandaloneClusterId.getInstance());
+ } catch (Exception e) {
+ log.error("get flink client failed: ", e);
+ throw new Exception("get flink client failed: " + e.getMessage());
+ }
+ }
+
+ /**
+ * Get the job status by the given job id.
+ */
+ public JobStatus getJobStatus(String jobId) throws Exception {
+ try {
+ RestClusterClient<StandaloneClusterId> client = getFlinkClient();
+ JobID jobID = JobID.fromHexString(jobId);
+ CompletableFuture<JobStatus> jobStatus =
client.getJobStatus(jobID);
+ return jobStatus.get();
+ } catch (Exception e) {
+ log.error("get job status by jobId={} failed: ", jobId, e);
+ throw new Exception("get job status by jobId=" + jobId + " failed:
" + e.getMessage());
+ }
+ }
+
+ /**
+ * Get job detail by the given job id.
+ */
+ public JobDetailsInfo getJobDetail(String jobId) throws Exception {
+ try {
+ RestClusterClient<StandaloneClusterId> client = getFlinkClient();
+ JobID jobID = JobID.fromHexString(jobId);
+ CompletableFuture<JobDetailsInfo> jobDetails =
client.getJobDetails(jobID);
+ return jobDetails.get();
+ } catch (Exception e) {
+ log.error("get job detail by jobId={} failed: ", jobId, e);
+ throw new Exception("get job detail by jobId=" + jobId + " failed:
" + e.getMessage());
+ }
+ }
+
+ /**
+ * Stop the Flink job with the savepoint.
+ */
+ public String stopJob(String jobId, boolean isDrain, String
savepointDirectory) throws Exception {
+ try {
+ RestClusterClient<StandaloneClusterId> client = getFlinkClient();
+ JobID jobID = JobID.fromHexString(jobId);
+ CompletableFuture<String> stopResult =
client.stopWithSavepoint(jobID, isDrain, savepointDirectory);
+ return stopResult.get();
+ } catch (Exception e) {
+ log.error("stop job {} failed and savepoint directory is {} : ",
jobId, savepointDirectory, e);
+ throw new Exception("stop job " + jobId + " failed: " +
e.getMessage());
+ }
+ }
+
+ /**
+ * Cancel the Flink job.
+ */
+ public void cancelJob(String jobId) throws Exception {
+ try {
+ RestClusterClient<StandaloneClusterId> client = getFlinkClient();
+ JobID jobID = JobID.fromHexString(jobId);
+ client.cancel(jobID);
+ } catch (Exception e) {
+ log.error("cancel job {} failed: ", jobId, e);
+ throw new Exception("cancel job " + jobId + " failed: " +
e.getMessage());
+ }
+ }
+}
diff --git a/inlong-manager/manager-plugins/pom.xml
b/inlong-manager/manager-plugins/manager-plugins-flink-v1.15/pom.xml
similarity index 51%
copy from inlong-manager/manager-plugins/pom.xml
copy to inlong-manager/manager-plugins/manager-plugins-flink-v1.15/pom.xml
index c60b924f7e..bcb2b299f3 100644
--- a/inlong-manager/manager-plugins/pom.xml
+++ b/inlong-manager/manager-plugins/manager-plugins-flink-v1.15/pom.xml
@@ -20,42 +20,27 @@
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.inlong</groupId>
- <artifactId>inlong-manager</artifactId>
+ <artifactId>manager-plugins</artifactId>
<version>1.9.0-SNAPSHOT</version>
</parent>
- <artifactId>manager-plugins</artifactId>
- <name>Apache InLong - Manager Plugins</name>
+ <artifactId>manager-plugins-flink-v1.15</artifactId>
+ <name>Apache InLong - Manager Plugins Flink v1.15</name>
<properties>
- <inlong.root.dir>${project.parent.parent.basedir}</inlong.root.dir>
+
<inlong.root.dir>${project.parent.parent.parent.basedir}</inlong.root.dir>
+ <flink.version>1.15.4</flink.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.inlong</groupId>
- <artifactId>manager-common</artifactId>
- <version>${project.version}</version>
- <scope>provided</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.inlong</groupId>
- <artifactId>manager-workflow</artifactId>
- <version>${project.version}</version>
- <scope>provided</scope>
- </dependency>
- <dependency>
- <groupId>org.projectlombok</groupId>
- <artifactId>lombok</artifactId>
- </dependency>
- <dependency>
- <groupId>org.apache.inlong</groupId>
- <artifactId>sort-flink-dependencies-v1.13</artifactId>
+ <artifactId>sort-flink-dependencies-v1.15</artifactId>
<version>${project.version}</version>
<exclusions>
<exclusion>
- <groupId>org.scala-lang</groupId>
- <artifactId>scala-library</artifactId>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-file-sink-common</artifactId>
</exclusion>
</exclusions>
</dependency>
@@ -65,59 +50,47 @@
<version>${flink.version}</version>
</dependency>
<dependency>
- <groupId>com.fasterxml.jackson.core</groupId>
- <artifactId>jackson-core</artifactId>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-core</artifactId>
+ <version>${flink.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-table-common</artifactId>
+ <version>${flink.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-shaded-jackson</artifactId>
+ <version>2.12.4-15.0</version>
</dependency>
+
<dependency>
- <groupId>org.junit.jupiter</groupId>
- <artifactId>junit-jupiter</artifactId>
- <scope>test</scope>
+ <groupId>org.projectlombok</groupId>
+ <artifactId>lombok</artifactId>
</dependency>
</dependencies>
<build>
+ <finalName>manager-plugins-flink-v1.15</finalName>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-resources-plugin</artifactId>
+ <artifactId>maven-dependency-plugin</artifactId>
<executions>
<execution>
- <id>copy-resources</id>
+ <id>copy-dependencies</id>
<goals>
- <goal>copy-resources</goal>
+ <goal>copy-dependencies</goal>
</goals>
- <phase>prepare-package</phase>
+ <phase>package</phase>
<configuration>
- <outputDirectory>target/plugins</outputDirectory>
- <resources>
- <resource>
- <directory>src/main/resources</directory>
- </resource>
- </resources>
+ <outputDirectory>target/</outputDirectory>
</configuration>
</execution>
</executions>
</plugin>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-assembly-plugin</artifactId>
- <configuration>
- <finalName>plugins</finalName>
- <appendAssemblyId>false</appendAssemblyId>
- <descriptors>
- <descriptor>src/main/assembly/assembly.xml</descriptor>
- </descriptors>
- </configuration>
- <executions>
- <execution>
- <id>plugins</id>
- <goals>
- <goal>single</goal>
- </goals>
- <phase>package</phase>
- </execution>
- </executions>
- </plugin>
</plugins>
</build>
+
</project>
diff --git
a/inlong-manager/manager-plugins/manager-plugins-flink-v1.15/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkClientService.java
b/inlong-manager/manager-plugins/manager-plugins-flink-v1.15/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkClientService.java
new file mode 100644
index 0000000000..390db34188
--- /dev/null
+++
b/inlong-manager/manager-plugins/manager-plugins-flink-v1.15/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkClientService.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.plugin.flink;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.client.deployment.StandaloneClusterId;
+import org.apache.flink.client.program.rest.RestClusterClient;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.execution.SavepointFormatType;
+import org.apache.flink.runtime.rest.messages.job.JobDetailsInfo;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Flink service, such as save or get flink config info, etc.
+ */
+@Slf4j
+public class FlinkClientService {
+
+ private final Configuration configuration;
+
+ public FlinkClientService(Configuration configuration) {
+ this.configuration = configuration;
+ }
+
+ /**
+ * Get the Flink Client.
+ */
+ public RestClusterClient<StandaloneClusterId> getFlinkClient() throws
Exception {
+ try {
+ return new RestClusterClient<>(configuration,
StandaloneClusterId.getInstance());
+ } catch (Exception e) {
+ log.error("get flink client failed: ", e);
+ throw new Exception("get flink client failed: " + e.getMessage());
+ }
+ }
+
+ /**
+ * Get the job status by the given job id.
+ */
+ public JobStatus getJobStatus(String jobId) throws Exception {
+ try {
+ RestClusterClient<StandaloneClusterId> client = getFlinkClient();
+ JobID jobID = JobID.fromHexString(jobId);
+ CompletableFuture<JobStatus> jobStatus =
client.getJobStatus(jobID);
+ return jobStatus.get();
+ } catch (Exception e) {
+ log.error("get job status by jobId={} failed: ", jobId, e);
+ throw new Exception("get job status by jobId=" + jobId + " failed:
" + e.getMessage());
+ }
+ }
+
+ /**
+ * Get job detail by the given job id.
+ */
+ public JobDetailsInfo getJobDetail(String jobId) throws Exception {
+ try {
+ RestClusterClient<StandaloneClusterId> client = getFlinkClient();
+ JobID jobID = JobID.fromHexString(jobId);
+ CompletableFuture<JobDetailsInfo> jobDetails =
client.getJobDetails(jobID);
+ return jobDetails.get();
+ } catch (Exception e) {
+ log.error("get job detail by jobId={} failed: ", jobId, e);
+ throw new Exception("get job detail by jobId=" + jobId + " failed:
" + e.getMessage());
+ }
+ }
+
+ /**
+ * Stop the Flink job with the savepoint.
+ */
+ public String stopJob(String jobId, boolean isDrain, String
savepointDirectory) throws Exception {
+ try {
+ RestClusterClient<StandaloneClusterId> client = getFlinkClient();
+ JobID jobID = JobID.fromHexString(jobId);
+ CompletableFuture<String> stopResult =
client.stopWithSavepoint(jobID, isDrain, savepointDirectory,
+ SavepointFormatType.CANONICAL);
+ return stopResult.get();
+ } catch (Exception e) {
+ log.error("stop job {} failed and savepoint directory is {} : ",
jobId, savepointDirectory, e);
+ throw new Exception("stop job " + jobId + " failed: " +
e.getMessage());
+ }
+ }
+
+ /**
+ * Cancel the Flink job.
+ */
+ public void cancelJob(String jobId) throws Exception {
+ try {
+ RestClusterClient<StandaloneClusterId> client = getFlinkClient();
+ JobID jobID = JobID.fromHexString(jobId);
+ client.cancel(jobID);
+ } catch (Exception e) {
+ log.error("cancel job {} failed: ", jobId, e);
+ throw new Exception("cancel job " + jobId + " failed: " +
e.getMessage());
+ }
+ }
+}
diff --git a/inlong-manager/manager-plugins/pom.xml
b/inlong-manager/manager-plugins/pom.xml
index c60b924f7e..bbff6fee0a 100644
--- a/inlong-manager/manager-plugins/pom.xml
+++ b/inlong-manager/manager-plugins/pom.xml
@@ -25,99 +25,17 @@
</parent>
<artifactId>manager-plugins</artifactId>
+ <packaging>pom</packaging>
<name>Apache InLong - Manager Plugins</name>
+ <modules>
+ <module>manager-plugins-flink-v1.13</module>
+ <module>manager-plugins-flink-v1.15</module>
+ <module>base</module>
+ </modules>
+
<properties>
<inlong.root.dir>${project.parent.parent.basedir}</inlong.root.dir>
</properties>
- <dependencies>
- <dependency>
- <groupId>org.apache.inlong</groupId>
- <artifactId>manager-common</artifactId>
- <version>${project.version}</version>
- <scope>provided</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.inlong</groupId>
- <artifactId>manager-workflow</artifactId>
- <version>${project.version}</version>
- <scope>provided</scope>
- </dependency>
- <dependency>
- <groupId>org.projectlombok</groupId>
- <artifactId>lombok</artifactId>
- </dependency>
- <dependency>
- <groupId>org.apache.inlong</groupId>
- <artifactId>sort-flink-dependencies-v1.13</artifactId>
- <version>${project.version}</version>
- <exclusions>
- <exclusion>
- <groupId>org.scala-lang</groupId>
- <artifactId>scala-library</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-table-api-java</artifactId>
- <version>${flink.version}</version>
- </dependency>
- <dependency>
- <groupId>com.fasterxml.jackson.core</groupId>
- <artifactId>jackson-core</artifactId>
- </dependency>
- <dependency>
- <groupId>org.junit.jupiter</groupId>
- <artifactId>junit-jupiter</artifactId>
- <scope>test</scope>
- </dependency>
- </dependencies>
-
- <build>
- <plugins>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-resources-plugin</artifactId>
- <executions>
- <execution>
- <id>copy-resources</id>
- <goals>
- <goal>copy-resources</goal>
- </goals>
- <phase>prepare-package</phase>
- <configuration>
- <outputDirectory>target/plugins</outputDirectory>
- <resources>
- <resource>
- <directory>src/main/resources</directory>
- </resource>
- </resources>
- </configuration>
- </execution>
- </executions>
- </plugin>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-assembly-plugin</artifactId>
- <configuration>
- <finalName>plugins</finalName>
- <appendAssemblyId>false</appendAssemblyId>
- <descriptors>
- <descriptor>src/main/assembly/assembly.xml</descriptor>
- </descriptors>
- </configuration>
- <executions>
- <execution>
- <id>plugins</id>
- <goals>
- <goal>single</goal>
- </goals>
- <phase>package</phase>
- </execution>
- </executions>
- </plugin>
- </plugins>
- </build>
</project>
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/plugin/PluginClassLoader.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/plugin/PluginClassLoader.java
index 60f844ab4b..08d04b58ee 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/plugin/PluginClassLoader.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/plugin/PluginClassLoader.java
@@ -144,7 +144,8 @@ public class PluginClassLoader extends URLClassLoader {
List<PluginDefinition> definitions = new ArrayList<>();
for (File jarFile : files) {
- if (!jarFile.getName().endsWith(".jar")) {
+ String jarName = jarFile.getName();
+ if (!jarName.endsWith(".jar") ||
!jarName.contains("plugins-base")) {
log.warn("invalid plugin jar {}, skip to load", jarFile);
continue;
}
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/plugin/PluginService.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/plugin/PluginService.java
index 9c6fd2ea0d..b349ea445f 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/plugin/PluginService.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/plugin/PluginService.java
@@ -117,5 +117,4 @@ public class PluginService implements InitializingBean {
}
}
}
-
}
diff --git
a/inlong-manager/manager-service/src/test/resources/plugins/manager-plugin-example.jar
b/inlong-manager/manager-service/src/test/resources/plugins/manager-plugins-base-example.jar
similarity index 100%
rename from
inlong-manager/manager-service/src/test/resources/plugins/manager-plugin-example.jar
rename to
inlong-manager/manager-service/src/test/resources/plugins/manager-plugins-base-example.jar
diff --git a/inlong-manager/manager-web/assembly.xml
b/inlong-manager/manager-web/assembly.xml
index a9ecc5d24d..a54091a3ea 100755
--- a/inlong-manager/manager-web/assembly.xml
+++ b/inlong-manager/manager-web/assembly.xml
@@ -62,6 +62,11 @@
<fileSet>
<directory>${build.directory}/lib</directory>
<outputDirectory>lib</outputDirectory>
+ <excludes>
+ <exclude>*scala*.jar</exclude>
+ <exclude>flink-*.jar</exclude>
+ <exclude>sort-flink-*.jar</exclude>
+ </excludes>
</fileSet>
<!-- Package the project startup jar into the lib directory -->
@@ -75,7 +80,7 @@
<!-- Package the manager plugins -->
<fileSet>
- <directory>../manager-plugins/target/plugins</directory>
+ <directory>../manager-plugins/base/target/plugins</directory>
<outputDirectory>plugins</outputDirectory>
</fileSet>
diff --git a/inlong-manager/manager-web/bin/startup.sh
b/inlong-manager/manager-web/bin/startup.sh
index 8ad446ebc5..f641314e8a 100755
--- a/inlong-manager/manager-web/bin/startup.sh
+++ b/inlong-manager/manager-web/bin/startup.sh
@@ -53,8 +53,10 @@ BASE_PATH=$(pwd)
# The absolute directory of the external configuration file, if the directory
needs / end, you can also directly specify the file
# If you specify a directory, spring will read all configuration files in the
directory
+FLINK_VERSION=$(grep "^flink.version="
${BASE_PATH}"/plugins/flink-sort-plugin.properties" | awk -F= '{print $2}')
CONFIG_DIR=${BASE_PATH}"/conf/"
-JAR_LIBS=${BASE_PATH}"/lib/*"
+# Base dependency and flink dependency corresponding to the flink version
+JAR_LIBS=${BASE_PATH}"/lib/*:"${BASE_PATH}"/plugins/flink-v"${FLINK_VERSION}"/*"
JAR_MAIN=${BASE_PATH}"/lib/"${APPLICATION_JAR}
CLASSPATH=${CONFIG_DIR}:${JAR_LIBS}:${JAR_MAIN}
MAIN_CLASS=org.apache.inlong.manager.web.InlongManagerMain
diff --git a/inlong-manager/manager-web/pom.xml
b/inlong-manager/manager-web/pom.xml
index b8246871e1..86faedac78 100644
--- a/inlong-manager/manager-web/pom.xml
+++ b/inlong-manager/manager-web/pom.xml
@@ -45,7 +45,7 @@
</dependency>
<dependency>
<groupId>org.apache.inlong</groupId>
- <artifactId>manager-plugins</artifactId>
+ <artifactId>manager-plugins-base</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/pom.xml
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/pom.xml
index 2804752891..d37b57d627 100644
---
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/pom.xml
+++
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/pom.xml
@@ -71,6 +71,11 @@
</exclusion>
</exclusions>
</dependency>
+ <dependency>
+ <groupId>org.apache.inlong</groupId>
+ <artifactId>sort-connector-base</artifactId>
+ <version>${project.version}</version>
+ </dependency>
</dependencies>
<build>
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/PostgreSQLTableFactory.java
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/PostgreSQLTableFactory.java
index 281add0fae..9c659995ca 100644
---
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/PostgreSQLTableFactory.java
+++
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/PostgreSQLTableFactory.java
@@ -35,6 +35,9 @@ import static
com.ververica.cdc.debezium.table.DebeziumOptions.DEBEZIUM_OPTIONS_
import static
com.ververica.cdc.debezium.table.DebeziumOptions.getDebeziumProperties;
import static
com.ververica.cdc.debezium.utils.ResolvedSchemaUtils.getPhysicalSchema;
import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.inlong.sort.base.Constants.AUDIT_KEYS;
+import static org.apache.inlong.sort.base.Constants.INLONG_AUDIT;
+import static org.apache.inlong.sort.base.Constants.INLONG_METRIC;
/** Factory for creating configured instance of {@link PostgreSQLTableSource}.
*/
public class PostgreSQLTableFactory implements DynamicTableSourceFactory {
@@ -176,6 +179,9 @@ public class PostgreSQLTableFactory implements
DynamicTableSourceFactory {
options.add(PORT);
options.add(DECODING_PLUGIN_NAME);
options.add(CHANGELOG_MODE);
+ options.add(INLONG_METRIC);
+ options.add(AUDIT_KEYS);
+ options.add(INLONG_AUDIT);
return options;
}
}