This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 882f5d7a8c [INLONG-9768][Manager] Optimize flink job building and
manage procedure (#9769)
882f5d7a8c is described below
commit 882f5d7a8cfcb2dcaedef2650d78983eabee53a0
Author: AloysZhang <[email protected]>
AuthorDate: Tue Mar 5 17:32:22 2024 +0800
[INLONG-9768][Manager] Optimize flink job building and manage procedure
(#9769)
---
.../manager/plugin/flink/FlinkOperation.java | 80 +++++----------
.../inlong/manager/plugin/flink/FlinkService.java | 110 +++++++++++++-------
.../plugin/flink/IntegrationTaskRunner.java | 14 +--
.../plugin/listener/DeleteSortListener.java | 4 +-
.../plugin/listener/RestartSortListener.java | 5 +-
.../plugin/listener/StartupSortListener.java | 6 +-
.../plugin/listener/StartupStreamListener.java | 6 +-
.../plugin/listener/SuspendSortListener.java | 5 +-
.../manager/plugin/poller/SortStatusPoller.java | 4 +-
.../manager/plugin/util/FlinkConfiguration.java | 113 ---------------------
.../manager/plugin/util/FlinkServiceUtils.java | 59 -----------
.../inlong/manager/plugin/util/FlinkUtils.java | 113 ++++++++++++---------
12 files changed, 175 insertions(+), 344 deletions(-)
diff --git
a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkOperation.java
b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkOperation.java
index 9f5d9f7708..bf2ece030b 100644
---
a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkOperation.java
+++
b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkOperation.java
@@ -68,10 +68,14 @@ public class FlinkOperation {
private static final String CONNECTOR_JAR_PATTERN =
"^sort-connector-(?i)(%s).*jar$";
private static final String ALL_CONNECTOR_JAR_PATTERN =
"^sort-connector-.*jar$";
private static Properties properties;
- private final FlinkService flinkService;
- public FlinkOperation(FlinkService flinkService) {
- this.flinkService = flinkService;
+ private static class FlinkOperationHolder {
+
+ private static final FlinkOperation INSTANCE = new FlinkOperation();
+ }
+
+ public static FlinkOperation getInstance() {
+ return FlinkOperationHolder.INSTANCE;
}
/**
@@ -103,16 +107,15 @@ public class FlinkOperation {
* Restart the Flink job.
*/
public void restart(FlinkInfo flinkInfo) throws Exception {
- String jobId = flinkInfo.getJobId();
- boolean terminated = isNullOrTerminated(jobId);
+ boolean terminated = isNullOrTerminated(flinkInfo);
if (terminated) {
- String message = String.format("restart job failed, as " +
JOB_TERMINATED_MSG, jobId);
+ String message = String.format("restart job failed, as " +
JOB_TERMINATED_MSG, flinkInfo.getJobId());
log.error(message);
throw new Exception(message);
}
Future<?> future = TaskRunService.submit(
- new IntegrationTaskRunner(flinkService, flinkInfo,
TaskCommitType.RESTART.getCode()));
+ new IntegrationTaskRunner(flinkInfo,
TaskCommitType.RESTART.getCode()));
future.get();
}
@@ -120,24 +123,25 @@ public class FlinkOperation {
* Start the Flink job, if the job id was not empty, restore it.
*/
public void start(FlinkInfo flinkInfo) throws Exception {
- String jobId = flinkInfo.getJobId();
try {
// Start a new task without savepoint
- if (StringUtils.isEmpty(jobId)) {
- IntegrationTaskRunner taskRunner = new
IntegrationTaskRunner(flinkService, flinkInfo,
+ if (StringUtils.isEmpty(flinkInfo.getJobId())) {
+ IntegrationTaskRunner taskRunner = new
IntegrationTaskRunner(flinkInfo,
TaskCommitType.START_NOW.getCode());
Future<?> future = TaskRunService.submit(taskRunner);
future.get();
} else {
// Restore an old task with savepoint
- boolean noSavepoint = isNullOrTerminated(jobId) ||
StringUtils.isEmpty(flinkInfo.getSavepointPath());
+ boolean noSavepoint =
+ isNullOrTerminated(flinkInfo) ||
StringUtils.isEmpty(flinkInfo.getSavepointPath());
if (noSavepoint) {
- String message = String.format("restore job failed, as " +
JOB_TERMINATED_MSG, jobId);
+ String message =
+ String.format("restore job failed, as " +
JOB_TERMINATED_MSG, flinkInfo.getJobId());
log.error(message);
throw new Exception(message);
}
- IntegrationTaskRunner taskRunner = new
IntegrationTaskRunner(flinkService, flinkInfo,
+ IntegrationTaskRunner taskRunner = new
IntegrationTaskRunner(flinkInfo,
TaskCommitType.RESUME.getCode());
Future<?> future = TaskRunService.submit(taskRunner);
future.get();
@@ -148,37 +152,6 @@ public class FlinkOperation {
}
}
- /**
- * Check whether there are duplicate NodeIds in different relations.
- * <p/>
- * The JSON data in the dataflow is in the reverse order of the nodes in
the actual dataflow.
- * For example, data flow A -> B -> C, the generated topological
relationship is [[B,C],[A,B]],
- * then the input node B in the first relation [B,C] is the second output
node B in relation [A,B].
- * <p/>
- * The example of dataflow:
- * <blockquote><pre>
- * {
- * "groupId": "test_group",
- * "streams": [
- * {
- * "streamId": "test_stream",
- * "relations": [
- * {
- * "type": "baseRelation",
- * "inputs": [ "node_3" ],
- * "outputs": [ "node_4" ]
- * },
- * {
- * "type": "innerJoin",
- * "inputs": [ "node_1", "node_2" ],
- * "outputs": [ "node_3" ]
- * }
- * ]
- * }
- * ]
- * }
- * </pre></blockquote>
- */
private void checkNodeIds(String dataflow) throws Exception {
JsonNode relations =
JsonUtils.parseTree(dataflow).get(InlongConstants.STREAMS)
.get(0).get(InlongConstants.RELATIONS);
@@ -293,16 +266,15 @@ public class FlinkOperation {
* Stop the Flink job.
*/
public void stop(FlinkInfo flinkInfo) throws Exception {
- String jobId = flinkInfo.getJobId();
- boolean terminated = isNullOrTerminated(jobId);
+ boolean terminated = isNullOrTerminated(flinkInfo);
if (terminated) {
- String message = String.format("stop job failed, as " +
JOB_TERMINATED_MSG, jobId);
+ String message = String.format("stop job failed, as " +
JOB_TERMINATED_MSG, flinkInfo.getJobId());
log.error(message);
throw new Exception(message);
}
Future<?> future = TaskRunService.submit(
- new IntegrationTaskRunner(flinkService, flinkInfo,
TaskCommitType.STOP.getCode()));
+ new IntegrationTaskRunner(flinkInfo,
TaskCommitType.STOP.getCode()));
future.get();
}
@@ -311,7 +283,7 @@ public class FlinkOperation {
*/
public void delete(FlinkInfo flinkInfo) throws Exception {
String jobId = flinkInfo.getJobId();
- JobDetailsInfo jobDetailsInfo = flinkService.getJobDetail(jobId);
+ JobDetailsInfo jobDetailsInfo =
FlinkService.getInstance().getJobDetail(flinkInfo);
if (jobDetailsInfo == null) {
throw new Exception(String.format("delete job failed as the job
[%s] not found", jobId));
}
@@ -324,7 +296,7 @@ public class FlinkOperation {
}
Future<?> future = TaskRunService.submit(
- new IntegrationTaskRunner(flinkService, flinkInfo,
TaskCommitType.DELETE.getCode()));
+ new IntegrationTaskRunner(flinkInfo,
TaskCommitType.DELETE.getCode()));
future.get();
}
@@ -343,7 +315,7 @@ public class FlinkOperation {
int retryTimes = 0;
while (retryTimes <= TRY_MAX_TIMES) {
try {
- JobDetailsInfo jobDetailsInfo =
flinkService.getJobDetail(jobId);
+ JobDetailsInfo jobDetailsInfo =
FlinkService.getInstance().getJobDetail(flinkInfo);
if (jobDetailsInfo == null) {
log.error("job detail not found by {}", jobId);
throw new Exception(String.format("job detail not found by
%s", jobId));
@@ -371,11 +343,11 @@ public class FlinkOperation {
/**
* Check whether the job was terminated by the given job id.
*/
- private boolean isNullOrTerminated(String jobId) throws Exception {
- JobDetailsInfo jobDetailsInfo = flinkService.getJobDetail(jobId);
+ private boolean isNullOrTerminated(FlinkInfo flinkInfo) throws Exception {
+ JobDetailsInfo jobDetailsInfo =
FlinkService.getInstance().getJobDetail(flinkInfo);
boolean terminated = jobDetailsInfo == null ||
jobDetailsInfo.getJobStatus() == null;
if (terminated) {
- log.warn("job detail or job status was null for [{}]", jobId);
+ log.warn("job detail or job status was null for [{}]",
flinkInfo.getJobId());
return true;
}
diff --git
a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkService.java
b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkService.java
index 3732279889..69a32b73fa 100644
---
a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkService.java
+++
b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkService.java
@@ -22,8 +22,7 @@ import org.apache.inlong.manager.plugin.flink.dto.FlinkConfig;
import org.apache.inlong.manager.plugin.flink.dto.FlinkInfo;
import org.apache.inlong.manager.plugin.flink.dto.StopWithSavepointRequest;
import org.apache.inlong.manager.plugin.flink.enums.Constants;
-import org.apache.inlong.manager.plugin.util.FlinkConfiguration;
-import org.apache.inlong.manager.plugin.util.FlinkServiceUtils;
+import org.apache.inlong.manager.plugin.util.FlinkUtils;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
@@ -64,54 +63,47 @@ public class FlinkService {
private final FlinkConfig flinkConfig;
private final Integer parallelism;
private final String savepointDirectory;
- private final Configuration configuration;
- private final FlinkClientService clientService;
+ // map endpoint to Configuration
+ private final Map<String, Configuration> configurations = new HashMap<>();
+ // map Configuration to FlinkClientService
+ private final Map<Configuration, FlinkClientService> flinkClientServices =
new HashMap<>();
/**
* Constructor of FlinkService.
*/
- public FlinkService(String endpoint) throws Exception {
- FlinkConfiguration flinkConfiguration = new FlinkConfiguration();
- flinkConfig = flinkConfiguration.getFlinkConfig();
+ public FlinkService() throws Exception {
+ flinkConfig = FlinkUtils.getFlinkConfigFromFile();
parallelism = flinkConfig.getParallelism();
savepointDirectory = flinkConfig.getSavepointDirectory();
+ }
+
+ private static class FlinkServiceHolder {
- configuration = new Configuration();
- Integer jobManagerPort = flinkConfig.getJobManagerPort();
- configuration.setInteger(JobManagerOptions.PORT, jobManagerPort);
-
- Integer port;
- String address;
- if (StringUtils.isEmpty(endpoint)) {
- address = flinkConfig.getAddress();
- port = flinkConfig.getPort();
- } else {
- Map<String, String> ipPort = translateFromEndpoint(endpoint);
- if (ipPort.isEmpty()) {
- throw new BusinessException("get address:port failed from
endpoint " + endpoint);
+ private static final FlinkService INSTANCE;
+ static {
+ try {
+ INSTANCE = new FlinkService();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
}
- address = ipPort.get("address");
- port = Integer.valueOf(ipPort.get("port"));
}
- configuration.setString(JobManagerOptions.ADDRESS, address);
- configuration.setInteger(RestOptions.PORT, port);
+ }
- clientService = (FlinkClientService)
FlinkServiceUtils.getFlinkClientService(configuration, flinkConfig);
+ public static FlinkService getInstance() {
+ return FlinkServiceHolder.INSTANCE;
}
/**
* Translate the Endpoint to address & port
*/
- private Map<String, String> translateFromEndpoint(String endpoint) throws
Exception {
+ private Map<String, String> translateFromEndpoint(String endpoint) {
Map<String, String> map = new HashMap<>(2);
Matcher matcher = IP_PORT_PATTERN.matcher(endpoint);
if (matcher.find()) {
map.put("address", matcher.group(1));
map.put("port", matcher.group(2));
- return map;
- } else {
- throw new Exception("endpoint [" + endpoint + "] was not match
address:port");
}
+ return map;
}
/**
@@ -124,15 +116,52 @@ public class FlinkService {
/**
* Get the job status by the given job id.
*/
- public JobStatus getJobStatus(String jobId) throws Exception {
- return clientService.getJobStatus(jobId);
+ public JobStatus getJobStatus(String endpoint, String jobId) throws
Exception {
+ Configuration configuration = getFlinkConfiguration(endpoint);
+ return getFlinkClientService(configuration).getJobStatus(jobId);
+ }
+
+ public JobStatus getJobStatus(FlinkInfo flinkInfo) throws Exception {
+ Configuration configuration =
getFlinkConfiguration(flinkInfo.getEndpoint());
+ return
getFlinkClientService(configuration).getJobStatus(flinkInfo.getJobId());
+ }
+
+ private FlinkClientService getFlinkClientService(Configuration
configuration) {
+ return flinkClientServices.computeIfAbsent(configuration,
+ k -> (FlinkClientService)
FlinkUtils.getFlinkClientService(configuration, flinkConfig));
+ }
+
+ private Configuration getFlinkConfiguration(String endpoint) {
+ return configurations.computeIfAbsent(endpoint,
+ k -> {
+ Integer port;
+ String address;
+ if (StringUtils.isEmpty(endpoint)) {
+ address = flinkConfig.getAddress();
+ port = flinkConfig.getPort();
+ } else {
+ Map<String, String> ipPort =
translateFromEndpoint(endpoint);
+ if (ipPort.isEmpty()) {
+ throw new BusinessException("get address:port
failed from endpoint " + endpoint);
+ }
+ address = ipPort.get("address");
+ port = Integer.valueOf(ipPort.get("port"));
+ }
+ // build flink configuration
+ Configuration configuration = new Configuration();
+ configuration.setInteger(JobManagerOptions.PORT,
flinkConfig.getJobManagerPort());
+ configuration.setString(JobManagerOptions.ADDRESS,
address);
+ configuration.setInteger(RestOptions.PORT, port);
+ return configuration;
+ });
}
/**
* Get job detail by the given job id.
*/
- public JobDetailsInfo getJobDetail(String jobId) throws Exception {
- return clientService.getJobDetail(jobId);
+ public JobDetailsInfo getJobDetail(FlinkInfo flinkInfo) throws Exception {
+ Configuration configuration =
getFlinkConfiguration(flinkInfo.getEndpoint());
+ return
getFlinkClientService(configuration).getJobDetail(flinkInfo.getJobId());
}
/**
@@ -182,6 +211,8 @@ public class FlinkService {
}
}).filter(Objects::nonNull).collect(Collectors.toList());
+ Configuration configuration =
getFlinkConfiguration(flinkInfo.getEndpoint());
+
PackagedProgram program = PackagedProgram.newBuilder()
.setConfiguration(configuration)
.setEntryPointClassName(Constants.ENTRYPOINT_CLASS)
@@ -192,7 +223,7 @@ public class FlinkService {
JobGraph jobGraph = PackagedProgramUtils.createJobGraph(program,
configuration, parallelism, false);
jobGraph.addJars(connectorJars);
- RestClusterClient<StandaloneClusterId> client =
clientService.getFlinkClient();
+ RestClusterClient<StandaloneClusterId> client =
getFlinkClientService(configuration).getFlinkClient();
CompletableFuture<JobID> result = client.submitJob(jobGraph);
return result.get().toString();
}
@@ -200,15 +231,18 @@ public class FlinkService {
/**
* Stop the Flink job with the savepoint.
*/
- public String stopJob(String jobId, StopWithSavepointRequest request)
throws Exception {
- return clientService.stopJob(jobId, request.isDrain(),
request.getTargetDirectory());
+ public String stopJob(FlinkInfo flinkInfo, StopWithSavepointRequest
request) throws Exception {
+ Configuration configuration =
getFlinkConfiguration(flinkInfo.getEndpoint());
+ return
getFlinkClientService(configuration).stopJob(flinkInfo.getJobId(),
request.isDrain(),
+ request.getTargetDirectory());
}
/**
* Cancel the Flink job.
*/
- public void cancelJob(String jobId) throws Exception {
- clientService.cancelJob(jobId);
+ public void cancelJob(FlinkInfo flinkInfo) throws Exception {
+ Configuration configuration =
getFlinkConfiguration(flinkInfo.getEndpoint());
+ getFlinkClientService(configuration).cancelJob(flinkInfo.getJobId());
}
/**
diff --git
a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/IntegrationTaskRunner.java
b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/IntegrationTaskRunner.java
index 22003b2925..212c25397c 100644
---
a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/IntegrationTaskRunner.java
+++
b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/IntegrationTaskRunner.java
@@ -40,10 +40,10 @@ public class IntegrationTaskRunner implements Runnable {
private final FlinkInfo flinkInfo;
private final Integer commitType;
- public IntegrationTaskRunner(FlinkService flinkService, FlinkInfo
flinkInfo, Integer commitType) {
- this.flinkService = flinkService;
+ public IntegrationTaskRunner(FlinkInfo flinkInfo, Integer commitType) {
this.flinkInfo = flinkInfo;
this.commitType = commitType;
+ flinkService = FlinkService.getInstance();
}
@Override
@@ -84,12 +84,12 @@ public class IntegrationTaskRunner implements Runnable {
FlinkConfig flinkConfig = flinkService.getFlinkConfig();
stopWithSavepointRequest.setDrain(flinkConfig.isDrain());
stopWithSavepointRequest.setTargetDirectory(flinkConfig.getSavepointDirectory());
- String location =
flinkService.stopJob(flinkInfo.getJobId(), stopWithSavepointRequest);
+ String location = flinkService.stopJob(flinkInfo,
stopWithSavepointRequest);
flinkInfo.setSavepointPath(location);
log.info("the jobId: {} savepoint: {} ",
flinkInfo.getJobId(), location);
int times = 0;
while (times < TRY_MAX_TIMES) {
- JobStatus jobStatus =
flinkService.getJobStatus(flinkInfo.getJobId());
+ JobStatus jobStatus =
flinkService.getJobStatus(flinkInfo);
// restore job
if (jobStatus == FINISHED) {
try {
@@ -123,7 +123,7 @@ public class IntegrationTaskRunner implements Runnable {
FlinkConfig flinkConfig = flinkService.getFlinkConfig();
stopWithSavepointRequest.setDrain(flinkConfig.isDrain());
stopWithSavepointRequest.setTargetDirectory(flinkConfig.getSavepointDirectory());
- String location =
flinkService.stopJob(flinkInfo.getJobId(), stopWithSavepointRequest);
+ String location = flinkService.stopJob(flinkInfo,
stopWithSavepointRequest);
flinkInfo.setSavepointPath(location);
log.info("the jobId {} savepoint: {} ",
flinkInfo.getJobId(), location);
} catch (Exception e) {
@@ -136,9 +136,9 @@ public class IntegrationTaskRunner implements Runnable {
break;
case DELETE:
try {
- flinkService.cancelJob(flinkInfo.getJobId());
+ flinkService.cancelJob(flinkInfo);
log.info("delete job {} success in backend",
flinkInfo.getJobId());
- JobStatus jobStatus =
flinkService.getJobStatus(flinkInfo.getJobId());
+ JobStatus jobStatus = flinkService.getJobStatus(flinkInfo);
if (jobStatus.isTerminalState()) {
log.info("delete job {} success in backend",
flinkInfo.getJobId());
} else {
diff --git
a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/DeleteSortListener.java
b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/DeleteSortListener.java
index a7918c640a..009374e2a6 100644
---
a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/DeleteSortListener.java
+++
b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/DeleteSortListener.java
@@ -22,7 +22,6 @@ import
org.apache.inlong.manager.common.enums.GroupOperateType;
import org.apache.inlong.manager.common.enums.TaskEvent;
import org.apache.inlong.manager.common.util.JsonUtils;
import org.apache.inlong.manager.plugin.flink.FlinkOperation;
-import org.apache.inlong.manager.plugin.flink.FlinkService;
import org.apache.inlong.manager.plugin.flink.dto.FlinkInfo;
import org.apache.inlong.manager.pojo.sink.StreamSink;
import org.apache.inlong.manager.pojo.stream.InlongStreamExtInfo;
@@ -117,8 +116,7 @@ public class DeleteSortListener implements
SortOperateListener {
String sortUrl = kvConf.get(InlongConstants.SORT_URL);
flinkInfo.setEndpoint(sortUrl);
- FlinkService flinkService = new
FlinkService(flinkInfo.getEndpoint());
- FlinkOperation flinkOperation = new FlinkOperation(flinkService);
+ FlinkOperation flinkOperation = FlinkOperation.getInstance();
try {
flinkOperation.delete(flinkInfo);
log.info("job delete success for jobId={}", jobId);
diff --git
a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/RestartSortListener.java
b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/RestartSortListener.java
index 9a95da354a..0c6828c984 100644
---
a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/RestartSortListener.java
+++
b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/RestartSortListener.java
@@ -22,7 +22,6 @@ import
org.apache.inlong.manager.common.enums.GroupOperateType;
import org.apache.inlong.manager.common.enums.TaskEvent;
import org.apache.inlong.manager.common.util.JsonUtils;
import org.apache.inlong.manager.plugin.flink.FlinkOperation;
-import org.apache.inlong.manager.plugin.flink.FlinkService;
import org.apache.inlong.manager.plugin.flink.dto.FlinkInfo;
import org.apache.inlong.manager.plugin.flink.enums.Constants;
import org.apache.inlong.manager.pojo.sink.StreamSink;
@@ -127,9 +126,7 @@ public class RestartSortListener implements
SortOperateListener {
flinkInfo.setJobName(jobName);
String sortUrl = kvConf.get(InlongConstants.SORT_URL);
flinkInfo.setEndpoint(sortUrl);
-
- FlinkService flinkService = new
FlinkService(flinkInfo.getEndpoint());
- FlinkOperation flinkOperation = new FlinkOperation(flinkService);
+ FlinkOperation flinkOperation = FlinkOperation.getInstance();
try {
flinkOperation.genPath(flinkInfo, dataflow);
// todo Currently, savepoint is not being used to restart, but
will be improved in the future
diff --git
a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/StartupSortListener.java
b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/StartupSortListener.java
index 8fa72f1c4b..0b0e55e369 100644
---
a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/StartupSortListener.java
+++
b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/StartupSortListener.java
@@ -23,7 +23,6 @@ import
org.apache.inlong.manager.common.enums.GroupOperateType;
import org.apache.inlong.manager.common.enums.TaskEvent;
import org.apache.inlong.manager.common.util.JsonUtils;
import org.apache.inlong.manager.plugin.flink.FlinkOperation;
-import org.apache.inlong.manager.plugin.flink.FlinkService;
import org.apache.inlong.manager.plugin.flink.dto.FlinkInfo;
import org.apache.inlong.manager.plugin.flink.enums.Constants;
import org.apache.inlong.manager.pojo.sink.StreamSink;
@@ -139,10 +138,7 @@ public class StartupSortListener implements
SortOperateListener {
String sortUrl = kvConf.get(InlongConstants.SORT_URL);
flinkInfo.setEndpoint(sortUrl);
flinkInfo.setInlongStreamInfoList(Collections.singletonList(streamInfo));
-
- FlinkService flinkService = new
FlinkService(flinkInfo.getEndpoint());
- FlinkOperation flinkOperation = new FlinkOperation(flinkService);
-
+ FlinkOperation flinkOperation = FlinkOperation.getInstance();
try {
flinkOperation.genPath(flinkInfo, dataflow);
flinkOperation.start(flinkInfo);
diff --git
a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/StartupStreamListener.java
b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/StartupStreamListener.java
index 99c0245168..c66f76d467 100644
---
a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/StartupStreamListener.java
+++
b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/StartupStreamListener.java
@@ -23,7 +23,6 @@ import
org.apache.inlong.manager.common.enums.GroupOperateType;
import org.apache.inlong.manager.common.enums.TaskEvent;
import org.apache.inlong.manager.common.util.JsonUtils;
import org.apache.inlong.manager.plugin.flink.FlinkOperation;
-import org.apache.inlong.manager.plugin.flink.FlinkService;
import org.apache.inlong.manager.plugin.flink.dto.FlinkInfo;
import org.apache.inlong.manager.plugin.flink.enums.Constants;
import org.apache.inlong.manager.pojo.sink.StreamSink;
@@ -132,10 +131,7 @@ public class StartupStreamListener implements
SortOperateListener {
String sortUrl = kvConf.get(InlongConstants.SORT_URL);
flinkInfo.setEndpoint(sortUrl);
flinkInfo.setInlongStreamInfoList(Collections.singletonList(streamInfo));
-
- FlinkService flinkService = new FlinkService(flinkInfo.getEndpoint());
- FlinkOperation flinkOperation = new FlinkOperation(flinkService);
-
+ FlinkOperation flinkOperation = FlinkOperation.getInstance();
try {
flinkOperation.genPath(flinkInfo, dataflow);
flinkOperation.start(flinkInfo);
diff --git
a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/SuspendSortListener.java
b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/SuspendSortListener.java
index 06a76e1bf7..72794e5006 100644
---
a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/SuspendSortListener.java
+++
b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/SuspendSortListener.java
@@ -22,7 +22,6 @@ import
org.apache.inlong.manager.common.enums.GroupOperateType;
import org.apache.inlong.manager.common.enums.TaskEvent;
import org.apache.inlong.manager.common.util.JsonUtils;
import org.apache.inlong.manager.plugin.flink.FlinkOperation;
-import org.apache.inlong.manager.plugin.flink.FlinkService;
import org.apache.inlong.manager.plugin.flink.dto.FlinkInfo;
import org.apache.inlong.manager.pojo.sink.StreamSink;
import org.apache.inlong.manager.pojo.stream.InlongStreamExtInfo;
@@ -116,9 +115,7 @@ public class SuspendSortListener implements
SortOperateListener {
flinkInfo.setJobId(jobId);
String sortUrl = kvConf.get(InlongConstants.SORT_URL);
flinkInfo.setEndpoint(sortUrl);
-
- FlinkService flinkService = new
FlinkService(flinkInfo.getEndpoint());
- FlinkOperation flinkOperation = new FlinkOperation(flinkService);
+ FlinkOperation flinkOperation = FlinkOperation.getInstance();
try {
// todo Currently, savepoint is not being used to stop, but
will be improved in the future
flinkOperation.delete(flinkInfo);
diff --git
a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/poller/SortStatusPoller.java
b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/poller/SortStatusPoller.java
index 84509b8a0e..88c984c760 100644
---
a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/poller/SortStatusPoller.java
+++
b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/poller/SortStatusPoller.java
@@ -101,9 +101,9 @@ public class SortStatusPoller implements SortPoller {
}
String sortUrl = kvConf.get(InlongConstants.SORT_URL);
- FlinkService flinkService = new FlinkService(sortUrl);
statusInfo.setSortStatus(
-
JOB_SORT_STATUS_MAP.getOrDefault(flinkService.getJobStatus(jobId),
SortStatus.UNKNOWN));
+
JOB_SORT_STATUS_MAP.getOrDefault(FlinkService.getInstance().getJobStatus(sortUrl,
jobId),
+ SortStatus.UNKNOWN));
statusInfos.add(statusInfo);
} catch (Exception e) {
log.error("polling sort status failed for groupId=" +
streamInfo.getInlongGroupId() + " streamId="
diff --git
a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/util/FlinkConfiguration.java
b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/util/FlinkConfiguration.java
deleted file mode 100644
index d5f3d8ed6e..0000000000
---
a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/util/FlinkConfiguration.java
+++ /dev/null
@@ -1,113 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.manager.plugin.util;
-
-import org.apache.inlong.manager.plugin.flink.dto.FlinkConfig;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.FileReader;
-import java.io.IOException;
-import java.util.Properties;
-
-import static
org.apache.inlong.common.constant.Constants.METRICS_AUDIT_PROXY_HOSTS_KEY;
-import static org.apache.inlong.manager.plugin.flink.enums.Constants.ADDRESS;
-import static org.apache.inlong.manager.plugin.flink.enums.Constants.DRAIN;
-import static
org.apache.inlong.manager.plugin.flink.enums.Constants.FLINK_VERSION;
-import static
org.apache.inlong.manager.plugin.flink.enums.Constants.JOB_MANAGER_PORT;
-import static
org.apache.inlong.manager.plugin.flink.enums.Constants.PARALLELISM;
-import static org.apache.inlong.manager.plugin.flink.enums.Constants.PORT;
-import static
org.apache.inlong.manager.plugin.flink.enums.Constants.SAVEPOINT_DIRECTORY;
-
-/**
- * Configuration file for Flink, only one instance in the process.
- * Basically it used properties file to store.
- */
-public class FlinkConfiguration {
-
- private static final Logger LOGGER =
LoggerFactory.getLogger(FlinkConfiguration.class);
-
- private static final String DEFAULT_CONFIG_FILE =
"flink-sort-plugin.properties";
- private static final String INLONG_MANAGER = "inlong-manager";
-
- private final FlinkConfig flinkConfig;
-
- /**
- * load config from flink file.
- */
- public FlinkConfiguration() throws Exception {
- String path = formatPath();
- flinkConfig = getFlinkConfigFromFile(path);
- }
-
- /**
- * fetch DEFAULT_CONFIG_FILE full path
- */
- private String formatPath() throws Exception {
- String path =
this.getClass().getProtectionDomain().getCodeSource().getLocation().getPath();
- LOGGER.info("format first path {}", path);
-
- int index = path.indexOf(INLONG_MANAGER);
- if (index == -1) {
- throw new Exception(INLONG_MANAGER + " path not found in " + path);
- }
-
- path = path.substring(0, index);
- String confPath = path + INLONG_MANAGER + File.separator + "plugins" +
File.separator + DEFAULT_CONFIG_FILE;
- File file = new File(confPath);
- if (!file.exists()) {
- String message = String.format("not found %s in path %s",
DEFAULT_CONFIG_FILE, confPath);
- LOGGER.error(message);
- throw new Exception(message);
- }
-
- LOGGER.info("after format, {} located in {}", DEFAULT_CONFIG_FILE,
confPath);
- return confPath;
- }
-
- /**
- * get flink config
- */
- public FlinkConfig getFlinkConfig() {
- return flinkConfig;
- }
-
- /**
- * parse properties
- */
- private FlinkConfig getFlinkConfigFromFile(String fileName) throws
IOException {
- Properties properties = new Properties();
- try (BufferedReader bufferedReader = new BufferedReader(new
FileReader(fileName))) {
- properties.load(bufferedReader);
- }
- FlinkConfig flinkConfig = new FlinkConfig();
- flinkConfig.setPort(Integer.valueOf(properties.getProperty(PORT)));
- flinkConfig.setAddress(properties.getProperty(ADDRESS));
-
flinkConfig.setParallelism(Integer.valueOf(properties.getProperty(PARALLELISM)));
-
flinkConfig.setSavepointDirectory(properties.getProperty(SAVEPOINT_DIRECTORY));
-
flinkConfig.setJobManagerPort(Integer.valueOf(properties.getProperty(JOB_MANAGER_PORT)));
-
flinkConfig.setDrain(Boolean.parseBoolean(properties.getProperty(DRAIN)));
-
flinkConfig.setAuditProxyHosts(properties.getProperty(METRICS_AUDIT_PROXY_HOSTS_KEY));
- flinkConfig.setVersion(properties.getProperty(FLINK_VERSION));
- return flinkConfig;
- }
-
-}
diff --git
a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/util/FlinkServiceUtils.java
b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/util/FlinkServiceUtils.java
deleted file mode 100644
index 82cd665538..0000000000
---
a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/util/FlinkServiceUtils.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.manager.plugin.util;
-
-import org.apache.inlong.manager.plugin.flink.dto.FlinkConfig;
-import org.apache.inlong.manager.plugin.flink.enums.Constants;
-
-import lombok.extern.slf4j.Slf4j;
-import org.apache.flink.configuration.Configuration;
-
-import java.io.File;
-import java.net.URL;
-import java.net.URLClassLoader;
-import java.nio.file.Path;
-import java.nio.file.Paths;
-
-@Slf4j
-public class FlinkServiceUtils {
-
- private static final String DEFAULT_PLUGINS = "plugins";
-
- private static final String FILE_PREFIX = "file://";
-
- public static Object getFlinkClientService(Configuration configuration,
FlinkConfig flinkConfig) {
- log.info("Flink version {}", flinkConfig.getVersion());
-
- Path pluginPath = Paths.get(DEFAULT_PLUGINS).toAbsolutePath();
- String flinkJarName = String.format(Constants.FLINK_JAR_NAME,
flinkConfig.getVersion());
- String flinkClientPath = FILE_PREFIX + pluginPath + File.separator +
flinkJarName;
- log.info("Start to load Flink jar: {}", flinkClientPath);
-
- try (URLClassLoader classLoader = new URLClassLoader(new URL[]{new
URL(flinkClientPath)}, Thread.currentThread()
- .getContextClassLoader())) {
- Class<?> flinkClientService =
classLoader.loadClass(Constants.FLINK_CLIENT_CLASS);
- Object flinkService =
flinkClientService.getDeclaredConstructor(Configuration.class)
- .newInstance(configuration);
- log.info("Successfully loaded Flink service");
- return flinkService;
- } catch (Exception e) {
- log.error("Failed to loaded Flink service, please check flink
client jar path: {}", flinkClientPath);
- throw new RuntimeException(e);
- }
- }
-}
diff --git
a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/util/FlinkUtils.java
b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/util/FlinkUtils.java
index c3daefecfb..61e7aac484 100644
---
a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/util/FlinkUtils.java
+++
b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/util/FlinkUtils.java
@@ -17,25 +17,40 @@
package org.apache.inlong.manager.plugin.util;
+import org.apache.inlong.manager.plugin.flink.dto.FlinkConfig;
+import org.apache.inlong.manager.plugin.flink.enums.Constants;
+
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections.CollectionUtils;
-import org.apache.commons.io.FileUtils;
-import org.apache.commons.lang3.StringUtils;
+import org.apache.flink.configuration.Configuration;
+import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.File;
+import java.io.FileReader;
import java.io.FileWriter;
import java.io.IOException;
import java.io.PrintWriter;
import java.io.StringWriter;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.nio.file.Path;
+import java.nio.file.Paths;
import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
import java.util.List;
-import java.util.Objects;
+import java.util.Properties;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
+import static
org.apache.inlong.common.constant.Constants.METRICS_AUDIT_PROXY_HOSTS_KEY;
+import static org.apache.inlong.manager.plugin.flink.enums.Constants.ADDRESS;
+import static org.apache.inlong.manager.plugin.flink.enums.Constants.DRAIN;
+import static
org.apache.inlong.manager.plugin.flink.enums.Constants.FLINK_VERSION;
+import static
org.apache.inlong.manager.plugin.flink.enums.Constants.JOB_MANAGER_PORT;
+import static
org.apache.inlong.manager.plugin.flink.enums.Constants.PARALLELISM;
+import static org.apache.inlong.manager.plugin.flink.enums.Constants.PORT;
+import static
org.apache.inlong.manager.plugin.flink.enums.Constants.SAVEPOINT_DIRECTORY;
+
/**
* Util of flink.
*/
@@ -43,27 +58,9 @@ import java.util.regex.Pattern;
public class FlinkUtils {
public static final String BASE_DIRECTORY = "config";
-
- public static final List<String> FLINK_VERSION_COLLECTION =
Collections.singletonList("Flink-1.13");
-
- /**
- * getLatestFlinkVersion
- */
- public static String getLatestFlinkVersion(String[] supportedFlink) {
- if (Objects.isNull(supportedFlink)) {
- return null;
- }
- Arrays.sort(supportedFlink, Collections.reverseOrder());
- String latestFinkVersion = null;
- for (String flinkVersion : supportedFlink) {
- latestFinkVersion = FLINK_VERSION_COLLECTION.stream()
- .filter(v ->
v.equals(flinkVersion)).findFirst().orElse(null);
- if (Objects.nonNull(latestFinkVersion)) {
- return latestFinkVersion;
- }
- }
- return latestFinkVersion;
- }
+ private static final String DEFAULT_PLUGINS = "plugins";
+ private static final String FILE_PREFIX = "file://";
+ private static final String DEFAULT_CONFIG_FILE =
"flink-sort-plugin.properties";
/**
* print exception
@@ -127,13 +124,6 @@ public class FlinkUtils {
return result;
}
- /**
- * get value
- */
- public static String getValue(String key, String defaultValue) {
- return StringUtils.isNotEmpty(key) ? key : defaultValue;
- }
-
/**
* getConfigDirectory
*
@@ -171,23 +161,46 @@ public class FlinkUtils {
return true;
}
- /**
- * Delete configuration file
- *
- * @param name file config info
- * @return whether sucess
- */
- public static boolean deleteConfigFile(String name) {
- String configDirectory = getConfigDirectory(name);
- File file = new File(configDirectory);
- if (file.exists()) {
- try {
- FileUtils.deleteDirectory(file);
- } catch (IOException e) {
- log.error("delete {} failed", configDirectory, e);
- return false;
- }
+ public static Object getFlinkClientService(Configuration configuration,
FlinkConfig flinkConfig) {
+ log.info("Flink version {}", flinkConfig.getVersion());
+
+ Path pluginPath = Paths.get(DEFAULT_PLUGINS).toAbsolutePath();
+ String flinkJarName = String.format(Constants.FLINK_JAR_NAME,
flinkConfig.getVersion());
+ String flinkClientPath = FILE_PREFIX + pluginPath + File.separator +
flinkJarName;
+ log.info("Start to load Flink jar: {}", flinkClientPath);
+
+ try (URLClassLoader classLoader = new URLClassLoader(new URL[]{new
URL(flinkClientPath)}, Thread.currentThread()
+ .getContextClassLoader())) {
+ Class<?> flinkClientService =
classLoader.loadClass(Constants.FLINK_CLIENT_CLASS);
+ Object flinkService =
flinkClientService.getDeclaredConstructor(Configuration.class)
+ .newInstance(configuration);
+ log.info("Successfully loaded Flink service");
+ return flinkService;
+ } catch (Exception e) {
+ log.error("Failed to loaded Flink service, please check flink
client jar path: {}", flinkClientPath);
+ throw new RuntimeException(e);
}
- return true;
+ }
+
+ public static FlinkConfig getFlinkConfigFromFile() throws Exception {
+ Path pluginPath = Paths.get(DEFAULT_PLUGINS).toAbsolutePath();
+ String defaultConfigFilePath = pluginPath + File.separator +
DEFAULT_CONFIG_FILE;
+
+ log.info("Start to load Flink config from file: {}",
defaultConfigFilePath);
+
+ Properties properties = new Properties();
+ try (BufferedReader bufferedReader = new BufferedReader(new
FileReader(defaultConfigFilePath))) {
+ properties.load(bufferedReader);
+ }
+ FlinkConfig flinkConfig = new FlinkConfig();
+ flinkConfig.setPort(Integer.valueOf(properties.getProperty(PORT)));
+ flinkConfig.setAddress(properties.getProperty(ADDRESS));
+
flinkConfig.setParallelism(Integer.valueOf(properties.getProperty(PARALLELISM)));
+
flinkConfig.setSavepointDirectory(properties.getProperty(SAVEPOINT_DIRECTORY));
+
flinkConfig.setJobManagerPort(Integer.valueOf(properties.getProperty(JOB_MANAGER_PORT)));
+
flinkConfig.setDrain(Boolean.parseBoolean(properties.getProperty(DRAIN)));
+
flinkConfig.setAuditProxyHosts(properties.getProperty(METRICS_AUDIT_PROXY_HOSTS_KEY));
+ flinkConfig.setVersion(properties.getProperty(FLINK_VERSION));
+ return flinkConfig;
}
}