This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 882f5d7a8c [INLONG-9768][Manager] Optimize flink job building and 
manage procedure (#9769)
882f5d7a8c is described below

commit 882f5d7a8cfcb2dcaedef2650d78983eabee53a0
Author: AloysZhang <[email protected]>
AuthorDate: Tue Mar 5 17:32:22 2024 +0800

    [INLONG-9768][Manager] Optimize flink job building and manage procedure 
(#9769)
---
 .../manager/plugin/flink/FlinkOperation.java       |  80 +++++----------
 .../inlong/manager/plugin/flink/FlinkService.java  | 110 +++++++++++++-------
 .../plugin/flink/IntegrationTaskRunner.java        |  14 +--
 .../plugin/listener/DeleteSortListener.java        |   4 +-
 .../plugin/listener/RestartSortListener.java       |   5 +-
 .../plugin/listener/StartupSortListener.java       |   6 +-
 .../plugin/listener/StartupStreamListener.java     |   6 +-
 .../plugin/listener/SuspendSortListener.java       |   5 +-
 .../manager/plugin/poller/SortStatusPoller.java    |   4 +-
 .../manager/plugin/util/FlinkConfiguration.java    | 113 ---------------------
 .../manager/plugin/util/FlinkServiceUtils.java     |  59 -----------
 .../inlong/manager/plugin/util/FlinkUtils.java     | 113 ++++++++++++---------
 12 files changed, 175 insertions(+), 344 deletions(-)

diff --git 
a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkOperation.java
 
b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkOperation.java
index 9f5d9f7708..bf2ece030b 100644
--- 
a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkOperation.java
+++ 
b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkOperation.java
@@ -68,10 +68,14 @@ public class FlinkOperation {
     private static final String CONNECTOR_JAR_PATTERN = 
"^sort-connector-(?i)(%s).*jar$";
     private static final String ALL_CONNECTOR_JAR_PATTERN = 
"^sort-connector-.*jar$";
     private static Properties properties;
-    private final FlinkService flinkService;
 
-    public FlinkOperation(FlinkService flinkService) {
-        this.flinkService = flinkService;
+    private static class FlinkOperationHolder {
+
+        private static final FlinkOperation INSTANCE = new FlinkOperation();
+    }
+
+    public static FlinkOperation getInstance() {
+        return FlinkOperationHolder.INSTANCE;
     }
 
     /**
@@ -103,16 +107,15 @@ public class FlinkOperation {
      * Restart the Flink job.
      */
     public void restart(FlinkInfo flinkInfo) throws Exception {
-        String jobId = flinkInfo.getJobId();
-        boolean terminated = isNullOrTerminated(jobId);
+        boolean terminated = isNullOrTerminated(flinkInfo);
         if (terminated) {
-            String message = String.format("restart job failed, as " + 
JOB_TERMINATED_MSG, jobId);
+            String message = String.format("restart job failed, as " + 
JOB_TERMINATED_MSG, flinkInfo.getJobId());
             log.error(message);
             throw new Exception(message);
         }
 
         Future<?> future = TaskRunService.submit(
-                new IntegrationTaskRunner(flinkService, flinkInfo, 
TaskCommitType.RESTART.getCode()));
+                new IntegrationTaskRunner(flinkInfo, 
TaskCommitType.RESTART.getCode()));
         future.get();
     }
 
@@ -120,24 +123,25 @@ public class FlinkOperation {
      * Start the Flink job, if the job id was not empty, restore it.
      */
     public void start(FlinkInfo flinkInfo) throws Exception {
-        String jobId = flinkInfo.getJobId();
         try {
             // Start a new task without savepoint
-            if (StringUtils.isEmpty(jobId)) {
-                IntegrationTaskRunner taskRunner = new 
IntegrationTaskRunner(flinkService, flinkInfo,
+            if (StringUtils.isEmpty(flinkInfo.getJobId())) {
+                IntegrationTaskRunner taskRunner = new 
IntegrationTaskRunner(flinkInfo,
                         TaskCommitType.START_NOW.getCode());
                 Future<?> future = TaskRunService.submit(taskRunner);
                 future.get();
             } else {
                 // Restore an old task with savepoint
-                boolean noSavepoint = isNullOrTerminated(jobId) || 
StringUtils.isEmpty(flinkInfo.getSavepointPath());
+                boolean noSavepoint =
+                        isNullOrTerminated(flinkInfo) || 
StringUtils.isEmpty(flinkInfo.getSavepointPath());
                 if (noSavepoint) {
-                    String message = String.format("restore job failed, as " + 
JOB_TERMINATED_MSG, jobId);
+                    String message =
+                            String.format("restore job failed, as " + 
JOB_TERMINATED_MSG, flinkInfo.getJobId());
                     log.error(message);
                     throw new Exception(message);
                 }
 
-                IntegrationTaskRunner taskRunner = new 
IntegrationTaskRunner(flinkService, flinkInfo,
+                IntegrationTaskRunner taskRunner = new 
IntegrationTaskRunner(flinkInfo,
                         TaskCommitType.RESUME.getCode());
                 Future<?> future = TaskRunService.submit(taskRunner);
                 future.get();
@@ -148,37 +152,6 @@ public class FlinkOperation {
         }
     }
 
-    /**
-     * Check whether there are duplicate NodeIds in different relations.
-     * <p/>
-     * The JSON data in the dataflow is in the reverse order of the nodes in 
the actual dataflow.
-     * For example, data flow A -> B -> C, the generated topological 
relationship is [[B,C],[A,B]],
-     * then the input node B in the first relation [B,C] is the second output 
node B in relation [A,B].
-     * <p/>
-     * The example of dataflow:
-     * <blockquote><pre>
-     * {
-     *     "groupId": "test_group",
-     *     "streams": [
-     *         {
-     *             "streamId": "test_stream",
-     *             "relations": [
-     *                 {
-     *                     "type": "baseRelation",
-     *                     "inputs": [ "node_3" ],
-     *                     "outputs": [ "node_4" ]
-     *                 },
-     *                 {
-     *                     "type": "innerJoin",
-     *                     "inputs": [ "node_1", "node_2" ],
-     *                     "outputs": [ "node_3"  ]
-     *                 }
-     *             ]
-     *         }
-     *     ]
-     * }
-     * </pre></blockquote>
-     */
     private void checkNodeIds(String dataflow) throws Exception {
         JsonNode relations = 
JsonUtils.parseTree(dataflow).get(InlongConstants.STREAMS)
                 .get(0).get(InlongConstants.RELATIONS);
@@ -293,16 +266,15 @@ public class FlinkOperation {
      * Stop the Flink job.
      */
     public void stop(FlinkInfo flinkInfo) throws Exception {
-        String jobId = flinkInfo.getJobId();
-        boolean terminated = isNullOrTerminated(jobId);
+        boolean terminated = isNullOrTerminated(flinkInfo);
         if (terminated) {
-            String message = String.format("stop job failed, as " + 
JOB_TERMINATED_MSG, jobId);
+            String message = String.format("stop job failed, as " + 
JOB_TERMINATED_MSG, flinkInfo.getJobId());
             log.error(message);
             throw new Exception(message);
         }
 
         Future<?> future = TaskRunService.submit(
-                new IntegrationTaskRunner(flinkService, flinkInfo, 
TaskCommitType.STOP.getCode()));
+                new IntegrationTaskRunner(flinkInfo, 
TaskCommitType.STOP.getCode()));
         future.get();
     }
 
@@ -311,7 +283,7 @@ public class FlinkOperation {
      */
     public void delete(FlinkInfo flinkInfo) throws Exception {
         String jobId = flinkInfo.getJobId();
-        JobDetailsInfo jobDetailsInfo = flinkService.getJobDetail(jobId);
+        JobDetailsInfo jobDetailsInfo = 
FlinkService.getInstance().getJobDetail(flinkInfo);
         if (jobDetailsInfo == null) {
             throw new Exception(String.format("delete job failed as the job 
[%s] not found", jobId));
         }
@@ -324,7 +296,7 @@ public class FlinkOperation {
         }
 
         Future<?> future = TaskRunService.submit(
-                new IntegrationTaskRunner(flinkService, flinkInfo, 
TaskCommitType.DELETE.getCode()));
+                new IntegrationTaskRunner(flinkInfo, 
TaskCommitType.DELETE.getCode()));
         future.get();
     }
 
@@ -343,7 +315,7 @@ public class FlinkOperation {
         int retryTimes = 0;
         while (retryTimes <= TRY_MAX_TIMES) {
             try {
-                JobDetailsInfo jobDetailsInfo = 
flinkService.getJobDetail(jobId);
+                JobDetailsInfo jobDetailsInfo = 
FlinkService.getInstance().getJobDetail(flinkInfo);
                 if (jobDetailsInfo == null) {
                     log.error("job detail not found by {}", jobId);
                     throw new Exception(String.format("job detail not found by 
%s", jobId));
@@ -371,11 +343,11 @@ public class FlinkOperation {
     /**
      * Check whether the job was terminated by the given job id.
      */
-    private boolean isNullOrTerminated(String jobId) throws Exception {
-        JobDetailsInfo jobDetailsInfo = flinkService.getJobDetail(jobId);
+    private boolean isNullOrTerminated(FlinkInfo flinkInfo) throws Exception {
+        JobDetailsInfo jobDetailsInfo = 
FlinkService.getInstance().getJobDetail(flinkInfo);
         boolean terminated = jobDetailsInfo == null || 
jobDetailsInfo.getJobStatus() == null;
         if (terminated) {
-            log.warn("job detail or job status was null for [{}]", jobId);
+            log.warn("job detail or job status was null for [{}]", 
flinkInfo.getJobId());
             return true;
         }
 
diff --git 
a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkService.java
 
b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkService.java
index 3732279889..69a32b73fa 100644
--- 
a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkService.java
+++ 
b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkService.java
@@ -22,8 +22,7 @@ import org.apache.inlong.manager.plugin.flink.dto.FlinkConfig;
 import org.apache.inlong.manager.plugin.flink.dto.FlinkInfo;
 import org.apache.inlong.manager.plugin.flink.dto.StopWithSavepointRequest;
 import org.apache.inlong.manager.plugin.flink.enums.Constants;
-import org.apache.inlong.manager.plugin.util.FlinkConfiguration;
-import org.apache.inlong.manager.plugin.util.FlinkServiceUtils;
+import org.apache.inlong.manager.plugin.util.FlinkUtils;
 
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
@@ -64,54 +63,47 @@ public class FlinkService {
     private final FlinkConfig flinkConfig;
     private final Integer parallelism;
     private final String savepointDirectory;
-    private final Configuration configuration;
-    private final FlinkClientService clientService;
+    // map endpoint to Configuration
+    private final Map<String, Configuration> configurations = new HashMap<>();
+    // map Configuration to FlinkClientService
+    private final Map<Configuration, FlinkClientService> flinkClientServices = 
new HashMap<>();
 
     /**
      * Constructor of FlinkService.
      */
-    public FlinkService(String endpoint) throws Exception {
-        FlinkConfiguration flinkConfiguration = new FlinkConfiguration();
-        flinkConfig = flinkConfiguration.getFlinkConfig();
+    public FlinkService() throws Exception {
+        flinkConfig = FlinkUtils.getFlinkConfigFromFile();
         parallelism = flinkConfig.getParallelism();
         savepointDirectory = flinkConfig.getSavepointDirectory();
+    }
+
+    private static class FlinkServiceHolder {
 
-        configuration = new Configuration();
-        Integer jobManagerPort = flinkConfig.getJobManagerPort();
-        configuration.setInteger(JobManagerOptions.PORT, jobManagerPort);
-
-        Integer port;
-        String address;
-        if (StringUtils.isEmpty(endpoint)) {
-            address = flinkConfig.getAddress();
-            port = flinkConfig.getPort();
-        } else {
-            Map<String, String> ipPort = translateFromEndpoint(endpoint);
-            if (ipPort.isEmpty()) {
-                throw new BusinessException("get address:port failed from 
endpoint " + endpoint);
+        private static final FlinkService INSTANCE;
+        static {
+            try {
+                INSTANCE = new FlinkService();
+            } catch (Exception e) {
+                throw new RuntimeException(e);
             }
-            address = ipPort.get("address");
-            port = Integer.valueOf(ipPort.get("port"));
         }
-        configuration.setString(JobManagerOptions.ADDRESS, address);
-        configuration.setInteger(RestOptions.PORT, port);
+    }
 
-        clientService = (FlinkClientService) 
FlinkServiceUtils.getFlinkClientService(configuration, flinkConfig);
+    public static FlinkService getInstance() {
+        return FlinkServiceHolder.INSTANCE;
     }
 
     /**
      * Translate the Endpoint to address & port
      */
-    private Map<String, String> translateFromEndpoint(String endpoint) throws 
Exception {
+    private Map<String, String> translateFromEndpoint(String endpoint) {
         Map<String, String> map = new HashMap<>(2);
         Matcher matcher = IP_PORT_PATTERN.matcher(endpoint);
         if (matcher.find()) {
             map.put("address", matcher.group(1));
             map.put("port", matcher.group(2));
-            return map;
-        } else {
-            throw new Exception("endpoint [" + endpoint + "] was not match 
address:port");
         }
+        return map;
     }
 
     /**
@@ -124,15 +116,52 @@ public class FlinkService {
     /**
      * Get the job status by the given job id.
      */
-    public JobStatus getJobStatus(String jobId) throws Exception {
-        return clientService.getJobStatus(jobId);
+    public JobStatus getJobStatus(String endpoint, String jobId) throws 
Exception {
+        Configuration configuration = getFlinkConfiguration(endpoint);
+        return getFlinkClientService(configuration).getJobStatus(jobId);
+    }
+
+    public JobStatus getJobStatus(FlinkInfo flinkInfo) throws Exception {
+        Configuration configuration = 
getFlinkConfiguration(flinkInfo.getEndpoint());
+        return 
getFlinkClientService(configuration).getJobStatus(flinkInfo.getJobId());
+    }
+
+    private FlinkClientService getFlinkClientService(Configuration 
configuration) {
+        return flinkClientServices.computeIfAbsent(configuration,
+                k -> (FlinkClientService) 
FlinkUtils.getFlinkClientService(configuration, flinkConfig));
+    }
+
+    private Configuration getFlinkConfiguration(String endpoint) {
+        return configurations.computeIfAbsent(endpoint,
+                k -> {
+                    Integer port;
+                    String address;
+                    if (StringUtils.isEmpty(endpoint)) {
+                        address = flinkConfig.getAddress();
+                        port = flinkConfig.getPort();
+                    } else {
+                        Map<String, String> ipPort = 
translateFromEndpoint(endpoint);
+                        if (ipPort.isEmpty()) {
+                            throw new BusinessException("get address:port 
failed from endpoint " + endpoint);
+                        }
+                        address = ipPort.get("address");
+                        port = Integer.valueOf(ipPort.get("port"));
+                    }
+                    // build flink configuration
+                    Configuration configuration = new Configuration();
+                    configuration.setInteger(JobManagerOptions.PORT, 
flinkConfig.getJobManagerPort());
+                    configuration.setString(JobManagerOptions.ADDRESS, 
address);
+                    configuration.setInteger(RestOptions.PORT, port);
+                    return configuration;
+                });
     }
 
     /**
      * Get job detail by the given job id.
      */
-    public JobDetailsInfo getJobDetail(String jobId) throws Exception {
-        return clientService.getJobDetail(jobId);
+    public JobDetailsInfo getJobDetail(FlinkInfo flinkInfo) throws Exception {
+        Configuration configuration = 
getFlinkConfiguration(flinkInfo.getEndpoint());
+        return 
getFlinkClientService(configuration).getJobDetail(flinkInfo.getJobId());
     }
 
     /**
@@ -182,6 +211,8 @@ public class FlinkService {
             }
         }).filter(Objects::nonNull).collect(Collectors.toList());
 
+        Configuration configuration = 
getFlinkConfiguration(flinkInfo.getEndpoint());
+
         PackagedProgram program = PackagedProgram.newBuilder()
                 .setConfiguration(configuration)
                 .setEntryPointClassName(Constants.ENTRYPOINT_CLASS)
@@ -192,7 +223,7 @@ public class FlinkService {
         JobGraph jobGraph = PackagedProgramUtils.createJobGraph(program, 
configuration, parallelism, false);
         jobGraph.addJars(connectorJars);
 
-        RestClusterClient<StandaloneClusterId> client = 
clientService.getFlinkClient();
+        RestClusterClient<StandaloneClusterId> client = 
getFlinkClientService(configuration).getFlinkClient();
         CompletableFuture<JobID> result = client.submitJob(jobGraph);
         return result.get().toString();
     }
@@ -200,15 +231,18 @@ public class FlinkService {
     /**
      * Stop the Flink job with the savepoint.
      */
-    public String stopJob(String jobId, StopWithSavepointRequest request) 
throws Exception {
-        return clientService.stopJob(jobId, request.isDrain(), 
request.getTargetDirectory());
+    public String stopJob(FlinkInfo flinkInfo, StopWithSavepointRequest 
request) throws Exception {
+        Configuration configuration = 
getFlinkConfiguration(flinkInfo.getEndpoint());
+        return 
getFlinkClientService(configuration).stopJob(flinkInfo.getJobId(), 
request.isDrain(),
+                request.getTargetDirectory());
     }
 
     /**
      * Cancel the Flink job.
      */
-    public void cancelJob(String jobId) throws Exception {
-        clientService.cancelJob(jobId);
+    public void cancelJob(FlinkInfo flinkInfo) throws Exception {
+        Configuration configuration = 
getFlinkConfiguration(flinkInfo.getEndpoint());
+        getFlinkClientService(configuration).cancelJob(flinkInfo.getJobId());
     }
 
     /**
diff --git 
a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/IntegrationTaskRunner.java
 
b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/IntegrationTaskRunner.java
index 22003b2925..212c25397c 100644
--- 
a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/IntegrationTaskRunner.java
+++ 
b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/IntegrationTaskRunner.java
@@ -40,10 +40,10 @@ public class IntegrationTaskRunner implements Runnable {
     private final FlinkInfo flinkInfo;
     private final Integer commitType;
 
-    public IntegrationTaskRunner(FlinkService flinkService, FlinkInfo 
flinkInfo, Integer commitType) {
-        this.flinkService = flinkService;
+    public IntegrationTaskRunner(FlinkInfo flinkInfo, Integer commitType) {
         this.flinkInfo = flinkInfo;
         this.commitType = commitType;
+        flinkService = FlinkService.getInstance();
     }
 
     @Override
@@ -84,12 +84,12 @@ public class IntegrationTaskRunner implements Runnable {
                     FlinkConfig flinkConfig = flinkService.getFlinkConfig();
                     stopWithSavepointRequest.setDrain(flinkConfig.isDrain());
                     
stopWithSavepointRequest.setTargetDirectory(flinkConfig.getSavepointDirectory());
-                    String location = 
flinkService.stopJob(flinkInfo.getJobId(), stopWithSavepointRequest);
+                    String location = flinkService.stopJob(flinkInfo, 
stopWithSavepointRequest);
                     flinkInfo.setSavepointPath(location);
                     log.info("the jobId: {} savepoint: {} ", 
flinkInfo.getJobId(), location);
                     int times = 0;
                     while (times < TRY_MAX_TIMES) {
-                        JobStatus jobStatus = 
flinkService.getJobStatus(flinkInfo.getJobId());
+                        JobStatus jobStatus = 
flinkService.getJobStatus(flinkInfo);
                         // restore job
                         if (jobStatus == FINISHED) {
                             try {
@@ -123,7 +123,7 @@ public class IntegrationTaskRunner implements Runnable {
                     FlinkConfig flinkConfig = flinkService.getFlinkConfig();
                     stopWithSavepointRequest.setDrain(flinkConfig.isDrain());
                     
stopWithSavepointRequest.setTargetDirectory(flinkConfig.getSavepointDirectory());
-                    String location = 
flinkService.stopJob(flinkInfo.getJobId(), stopWithSavepointRequest);
+                    String location = flinkService.stopJob(flinkInfo, 
stopWithSavepointRequest);
                     flinkInfo.setSavepointPath(location);
                     log.info("the jobId {} savepoint: {} ", 
flinkInfo.getJobId(), location);
                 } catch (Exception e) {
@@ -136,9 +136,9 @@ public class IntegrationTaskRunner implements Runnable {
                 break;
             case DELETE:
                 try {
-                    flinkService.cancelJob(flinkInfo.getJobId());
+                    flinkService.cancelJob(flinkInfo);
                     log.info("delete job {} success in backend", 
flinkInfo.getJobId());
-                    JobStatus jobStatus = 
flinkService.getJobStatus(flinkInfo.getJobId());
+                    JobStatus jobStatus = flinkService.getJobStatus(flinkInfo);
                     if (jobStatus.isTerminalState()) {
                         log.info("delete job {} success in backend", 
flinkInfo.getJobId());
                     } else {
diff --git 
a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/DeleteSortListener.java
 
b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/DeleteSortListener.java
index a7918c640a..009374e2a6 100644
--- 
a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/DeleteSortListener.java
+++ 
b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/DeleteSortListener.java
@@ -22,7 +22,6 @@ import 
org.apache.inlong.manager.common.enums.GroupOperateType;
 import org.apache.inlong.manager.common.enums.TaskEvent;
 import org.apache.inlong.manager.common.util.JsonUtils;
 import org.apache.inlong.manager.plugin.flink.FlinkOperation;
-import org.apache.inlong.manager.plugin.flink.FlinkService;
 import org.apache.inlong.manager.plugin.flink.dto.FlinkInfo;
 import org.apache.inlong.manager.pojo.sink.StreamSink;
 import org.apache.inlong.manager.pojo.stream.InlongStreamExtInfo;
@@ -117,8 +116,7 @@ public class DeleteSortListener implements 
SortOperateListener {
             String sortUrl = kvConf.get(InlongConstants.SORT_URL);
             flinkInfo.setEndpoint(sortUrl);
 
-            FlinkService flinkService = new 
FlinkService(flinkInfo.getEndpoint());
-            FlinkOperation flinkOperation = new FlinkOperation(flinkService);
+            FlinkOperation flinkOperation = FlinkOperation.getInstance();
             try {
                 flinkOperation.delete(flinkInfo);
                 log.info("job delete success for jobId={}", jobId);
diff --git 
a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/RestartSortListener.java
 
b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/RestartSortListener.java
index 9a95da354a..0c6828c984 100644
--- 
a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/RestartSortListener.java
+++ 
b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/RestartSortListener.java
@@ -22,7 +22,6 @@ import 
org.apache.inlong.manager.common.enums.GroupOperateType;
 import org.apache.inlong.manager.common.enums.TaskEvent;
 import org.apache.inlong.manager.common.util.JsonUtils;
 import org.apache.inlong.manager.plugin.flink.FlinkOperation;
-import org.apache.inlong.manager.plugin.flink.FlinkService;
 import org.apache.inlong.manager.plugin.flink.dto.FlinkInfo;
 import org.apache.inlong.manager.plugin.flink.enums.Constants;
 import org.apache.inlong.manager.pojo.sink.StreamSink;
@@ -127,9 +126,7 @@ public class RestartSortListener implements 
SortOperateListener {
             flinkInfo.setJobName(jobName);
             String sortUrl = kvConf.get(InlongConstants.SORT_URL);
             flinkInfo.setEndpoint(sortUrl);
-
-            FlinkService flinkService = new 
FlinkService(flinkInfo.getEndpoint());
-            FlinkOperation flinkOperation = new FlinkOperation(flinkService);
+            FlinkOperation flinkOperation = FlinkOperation.getInstance();
             try {
                 flinkOperation.genPath(flinkInfo, dataflow);
                 // todo Currently, savepoint is not being used to restart, but 
will be improved in the future
diff --git 
a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/StartupSortListener.java
 
b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/StartupSortListener.java
index 8fa72f1c4b..0b0e55e369 100644
--- 
a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/StartupSortListener.java
+++ 
b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/StartupSortListener.java
@@ -23,7 +23,6 @@ import 
org.apache.inlong.manager.common.enums.GroupOperateType;
 import org.apache.inlong.manager.common.enums.TaskEvent;
 import org.apache.inlong.manager.common.util.JsonUtils;
 import org.apache.inlong.manager.plugin.flink.FlinkOperation;
-import org.apache.inlong.manager.plugin.flink.FlinkService;
 import org.apache.inlong.manager.plugin.flink.dto.FlinkInfo;
 import org.apache.inlong.manager.plugin.flink.enums.Constants;
 import org.apache.inlong.manager.pojo.sink.StreamSink;
@@ -139,10 +138,7 @@ public class StartupSortListener implements 
SortOperateListener {
             String sortUrl = kvConf.get(InlongConstants.SORT_URL);
             flinkInfo.setEndpoint(sortUrl);
             
flinkInfo.setInlongStreamInfoList(Collections.singletonList(streamInfo));
-
-            FlinkService flinkService = new 
FlinkService(flinkInfo.getEndpoint());
-            FlinkOperation flinkOperation = new FlinkOperation(flinkService);
-
+            FlinkOperation flinkOperation = FlinkOperation.getInstance();
             try {
                 flinkOperation.genPath(flinkInfo, dataflow);
                 flinkOperation.start(flinkInfo);
diff --git 
a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/StartupStreamListener.java
 
b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/StartupStreamListener.java
index 99c0245168..c66f76d467 100644
--- 
a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/StartupStreamListener.java
+++ 
b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/StartupStreamListener.java
@@ -23,7 +23,6 @@ import 
org.apache.inlong.manager.common.enums.GroupOperateType;
 import org.apache.inlong.manager.common.enums.TaskEvent;
 import org.apache.inlong.manager.common.util.JsonUtils;
 import org.apache.inlong.manager.plugin.flink.FlinkOperation;
-import org.apache.inlong.manager.plugin.flink.FlinkService;
 import org.apache.inlong.manager.plugin.flink.dto.FlinkInfo;
 import org.apache.inlong.manager.plugin.flink.enums.Constants;
 import org.apache.inlong.manager.pojo.sink.StreamSink;
@@ -132,10 +131,7 @@ public class StartupStreamListener implements 
SortOperateListener {
         String sortUrl = kvConf.get(InlongConstants.SORT_URL);
         flinkInfo.setEndpoint(sortUrl);
         
flinkInfo.setInlongStreamInfoList(Collections.singletonList(streamInfo));
-
-        FlinkService flinkService = new FlinkService(flinkInfo.getEndpoint());
-        FlinkOperation flinkOperation = new FlinkOperation(flinkService);
-
+        FlinkOperation flinkOperation = FlinkOperation.getInstance();
         try {
             flinkOperation.genPath(flinkInfo, dataflow);
             flinkOperation.start(flinkInfo);
diff --git 
a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/SuspendSortListener.java
 
b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/SuspendSortListener.java
index 06a76e1bf7..72794e5006 100644
--- 
a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/SuspendSortListener.java
+++ 
b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/SuspendSortListener.java
@@ -22,7 +22,6 @@ import 
org.apache.inlong.manager.common.enums.GroupOperateType;
 import org.apache.inlong.manager.common.enums.TaskEvent;
 import org.apache.inlong.manager.common.util.JsonUtils;
 import org.apache.inlong.manager.plugin.flink.FlinkOperation;
-import org.apache.inlong.manager.plugin.flink.FlinkService;
 import org.apache.inlong.manager.plugin.flink.dto.FlinkInfo;
 import org.apache.inlong.manager.pojo.sink.StreamSink;
 import org.apache.inlong.manager.pojo.stream.InlongStreamExtInfo;
@@ -116,9 +115,7 @@ public class SuspendSortListener implements 
SortOperateListener {
             flinkInfo.setJobId(jobId);
             String sortUrl = kvConf.get(InlongConstants.SORT_URL);
             flinkInfo.setEndpoint(sortUrl);
-
-            FlinkService flinkService = new 
FlinkService(flinkInfo.getEndpoint());
-            FlinkOperation flinkOperation = new FlinkOperation(flinkService);
+            FlinkOperation flinkOperation = FlinkOperation.getInstance();
             try {
                 // todo Currently, savepoint is not being used to stop, but 
will be improved in the future
                 flinkOperation.delete(flinkInfo);
diff --git 
a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/poller/SortStatusPoller.java
 
b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/poller/SortStatusPoller.java
index 84509b8a0e..88c984c760 100644
--- 
a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/poller/SortStatusPoller.java
+++ 
b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/poller/SortStatusPoller.java
@@ -101,9 +101,9 @@ public class SortStatusPoller implements SortPoller {
                 }
 
                 String sortUrl = kvConf.get(InlongConstants.SORT_URL);
-                FlinkService flinkService = new FlinkService(sortUrl);
                 statusInfo.setSortStatus(
-                        
JOB_SORT_STATUS_MAP.getOrDefault(flinkService.getJobStatus(jobId), 
SortStatus.UNKNOWN));
+                        
JOB_SORT_STATUS_MAP.getOrDefault(FlinkService.getInstance().getJobStatus(sortUrl,
 jobId),
+                                SortStatus.UNKNOWN));
                 statusInfos.add(statusInfo);
             } catch (Exception e) {
                 log.error("polling sort status failed for groupId=" + 
streamInfo.getInlongGroupId() + " streamId="
diff --git 
a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/util/FlinkConfiguration.java
 
b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/util/FlinkConfiguration.java
deleted file mode 100644
index d5f3d8ed6e..0000000000
--- 
a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/util/FlinkConfiguration.java
+++ /dev/null
@@ -1,113 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.manager.plugin.util;
-
-import org.apache.inlong.manager.plugin.flink.dto.FlinkConfig;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.FileReader;
-import java.io.IOException;
-import java.util.Properties;
-
-import static 
org.apache.inlong.common.constant.Constants.METRICS_AUDIT_PROXY_HOSTS_KEY;
-import static org.apache.inlong.manager.plugin.flink.enums.Constants.ADDRESS;
-import static org.apache.inlong.manager.plugin.flink.enums.Constants.DRAIN;
-import static 
org.apache.inlong.manager.plugin.flink.enums.Constants.FLINK_VERSION;
-import static 
org.apache.inlong.manager.plugin.flink.enums.Constants.JOB_MANAGER_PORT;
-import static 
org.apache.inlong.manager.plugin.flink.enums.Constants.PARALLELISM;
-import static org.apache.inlong.manager.plugin.flink.enums.Constants.PORT;
-import static 
org.apache.inlong.manager.plugin.flink.enums.Constants.SAVEPOINT_DIRECTORY;
-
-/**
- * Configuration file for Flink, only one instance in the process.
- * Basically it used properties file to store.
- */
-public class FlinkConfiguration {
-
-    private static final Logger LOGGER = 
LoggerFactory.getLogger(FlinkConfiguration.class);
-
-    private static final String DEFAULT_CONFIG_FILE = 
"flink-sort-plugin.properties";
-    private static final String INLONG_MANAGER = "inlong-manager";
-
-    private final FlinkConfig flinkConfig;
-
-    /**
-     * load config from flink file.
-     */
-    public FlinkConfiguration() throws Exception {
-        String path = formatPath();
-        flinkConfig = getFlinkConfigFromFile(path);
-    }
-
-    /**
-     * fetch DEFAULT_CONFIG_FILE full path
-     */
-    private String formatPath() throws Exception {
-        String path = 
this.getClass().getProtectionDomain().getCodeSource().getLocation().getPath();
-        LOGGER.info("format first path {}", path);
-
-        int index = path.indexOf(INLONG_MANAGER);
-        if (index == -1) {
-            throw new Exception(INLONG_MANAGER + " path not found in " + path);
-        }
-
-        path = path.substring(0, index);
-        String confPath = path + INLONG_MANAGER + File.separator + "plugins" + 
File.separator + DEFAULT_CONFIG_FILE;
-        File file = new File(confPath);
-        if (!file.exists()) {
-            String message = String.format("not found %s in path %s", 
DEFAULT_CONFIG_FILE, confPath);
-            LOGGER.error(message);
-            throw new Exception(message);
-        }
-
-        LOGGER.info("after format, {} located in {}", DEFAULT_CONFIG_FILE, 
confPath);
-        return confPath;
-    }
-
-    /**
-     * get flink config
-     */
-    public FlinkConfig getFlinkConfig() {
-        return flinkConfig;
-    }
-
-    /**
-     * parse properties
-     */
-    private FlinkConfig getFlinkConfigFromFile(String fileName) throws 
IOException {
-        Properties properties = new Properties();
-        try (BufferedReader bufferedReader = new BufferedReader(new 
FileReader(fileName))) {
-            properties.load(bufferedReader);
-        }
-        FlinkConfig flinkConfig = new FlinkConfig();
-        flinkConfig.setPort(Integer.valueOf(properties.getProperty(PORT)));
-        flinkConfig.setAddress(properties.getProperty(ADDRESS));
-        
flinkConfig.setParallelism(Integer.valueOf(properties.getProperty(PARALLELISM)));
-        
flinkConfig.setSavepointDirectory(properties.getProperty(SAVEPOINT_DIRECTORY));
-        
flinkConfig.setJobManagerPort(Integer.valueOf(properties.getProperty(JOB_MANAGER_PORT)));
-        
flinkConfig.setDrain(Boolean.parseBoolean(properties.getProperty(DRAIN)));
-        
flinkConfig.setAuditProxyHosts(properties.getProperty(METRICS_AUDIT_PROXY_HOSTS_KEY));
-        flinkConfig.setVersion(properties.getProperty(FLINK_VERSION));
-        return flinkConfig;
-    }
-
-}
diff --git 
a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/util/FlinkServiceUtils.java
 
b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/util/FlinkServiceUtils.java
deleted file mode 100644
index 82cd665538..0000000000
--- 
a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/util/FlinkServiceUtils.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.manager.plugin.util;
-
-import org.apache.inlong.manager.plugin.flink.dto.FlinkConfig;
-import org.apache.inlong.manager.plugin.flink.enums.Constants;
-
-import lombok.extern.slf4j.Slf4j;
-import org.apache.flink.configuration.Configuration;
-
-import java.io.File;
-import java.net.URL;
-import java.net.URLClassLoader;
-import java.nio.file.Path;
-import java.nio.file.Paths;
-
-@Slf4j
-public class FlinkServiceUtils {
-
-    private static final String DEFAULT_PLUGINS = "plugins";
-
-    private static final String FILE_PREFIX = "file://";
-
-    public static Object getFlinkClientService(Configuration configuration, 
FlinkConfig flinkConfig) {
-        log.info("Flink version {}", flinkConfig.getVersion());
-
-        Path pluginPath = Paths.get(DEFAULT_PLUGINS).toAbsolutePath();
-        String flinkJarName = String.format(Constants.FLINK_JAR_NAME, 
flinkConfig.getVersion());
-        String flinkClientPath = FILE_PREFIX + pluginPath + File.separator + 
flinkJarName;
-        log.info("Start to load Flink jar: {}", flinkClientPath);
-
-        try (URLClassLoader classLoader = new URLClassLoader(new URL[]{new 
URL(flinkClientPath)}, Thread.currentThread()
-                .getContextClassLoader())) {
-            Class<?> flinkClientService = 
classLoader.loadClass(Constants.FLINK_CLIENT_CLASS);
-            Object flinkService = 
flinkClientService.getDeclaredConstructor(Configuration.class)
-                    .newInstance(configuration);
-            log.info("Successfully loaded Flink service");
-            return flinkService;
-        } catch (Exception e) {
-            log.error("Failed to loaded Flink service, please check flink 
client jar path: {}", flinkClientPath);
-            throw new RuntimeException(e);
-        }
-    }
-}
diff --git 
a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/util/FlinkUtils.java
 
b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/util/FlinkUtils.java
index c3daefecfb..61e7aac484 100644
--- 
a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/util/FlinkUtils.java
+++ 
b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/util/FlinkUtils.java
@@ -17,25 +17,40 @@
 
 package org.apache.inlong.manager.plugin.util;
 
+import org.apache.inlong.manager.plugin.flink.dto.FlinkConfig;
+import org.apache.inlong.manager.plugin.flink.enums.Constants;
+
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.collections.CollectionUtils;
-import org.apache.commons.io.FileUtils;
-import org.apache.commons.lang3.StringUtils;
+import org.apache.flink.configuration.Configuration;
 
+import java.io.BufferedReader;
 import java.io.BufferedWriter;
 import java.io.File;
+import java.io.FileReader;
 import java.io.FileWriter;
 import java.io.IOException;
 import java.io.PrintWriter;
 import java.io.StringWriter;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.nio.file.Path;
+import java.nio.file.Paths;
 import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
 import java.util.List;
-import java.util.Objects;
+import java.util.Properties;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
+import static 
org.apache.inlong.common.constant.Constants.METRICS_AUDIT_PROXY_HOSTS_KEY;
+import static org.apache.inlong.manager.plugin.flink.enums.Constants.ADDRESS;
+import static org.apache.inlong.manager.plugin.flink.enums.Constants.DRAIN;
+import static 
org.apache.inlong.manager.plugin.flink.enums.Constants.FLINK_VERSION;
+import static 
org.apache.inlong.manager.plugin.flink.enums.Constants.JOB_MANAGER_PORT;
+import static 
org.apache.inlong.manager.plugin.flink.enums.Constants.PARALLELISM;
+import static org.apache.inlong.manager.plugin.flink.enums.Constants.PORT;
+import static 
org.apache.inlong.manager.plugin.flink.enums.Constants.SAVEPOINT_DIRECTORY;
+
 /**
  * Util of flink.
  */
@@ -43,27 +58,9 @@ import java.util.regex.Pattern;
 public class FlinkUtils {
 
     public static final String BASE_DIRECTORY = "config";
-
-    public static final List<String> FLINK_VERSION_COLLECTION = 
Collections.singletonList("Flink-1.13");
-
-    /**
-     * getLatestFlinkVersion
-     */
-    public static String getLatestFlinkVersion(String[] supportedFlink) {
-        if (Objects.isNull(supportedFlink)) {
-            return null;
-        }
-        Arrays.sort(supportedFlink, Collections.reverseOrder());
-        String latestFinkVersion = null;
-        for (String flinkVersion : supportedFlink) {
-            latestFinkVersion = FLINK_VERSION_COLLECTION.stream()
-                    .filter(v -> 
v.equals(flinkVersion)).findFirst().orElse(null);
-            if (Objects.nonNull(latestFinkVersion)) {
-                return latestFinkVersion;
-            }
-        }
-        return latestFinkVersion;
-    }
+    private static final String DEFAULT_PLUGINS = "plugins";
+    private static final String FILE_PREFIX = "file://";
+    private static final String DEFAULT_CONFIG_FILE = 
"flink-sort-plugin.properties";
 
     /**
      * print exception
@@ -127,13 +124,6 @@ public class FlinkUtils {
         return result;
     }
 
-    /**
-     * get value
-     */
-    public static String getValue(String key, String defaultValue) {
-        return StringUtils.isNotEmpty(key) ? key : defaultValue;
-    }
-
     /**
      * getConfigDirectory
      *
@@ -171,23 +161,46 @@ public class FlinkUtils {
         return true;
     }
 
-    /**
-     * Delete configuration file
-     *
-     * @param name file config info
-     * @return whether sucess
-     */
-    public static boolean deleteConfigFile(String name) {
-        String configDirectory = getConfigDirectory(name);
-        File file = new File(configDirectory);
-        if (file.exists()) {
-            try {
-                FileUtils.deleteDirectory(file);
-            } catch (IOException e) {
-                log.error("delete {} failed", configDirectory, e);
-                return false;
-            }
+    public static Object getFlinkClientService(Configuration configuration, 
FlinkConfig flinkConfig) {
+        log.info("Flink version {}", flinkConfig.getVersion());
+
+        Path pluginPath = Paths.get(DEFAULT_PLUGINS).toAbsolutePath();
+        String flinkJarName = String.format(Constants.FLINK_JAR_NAME, 
flinkConfig.getVersion());
+        String flinkClientPath = FILE_PREFIX + pluginPath + File.separator + 
flinkJarName;
+        log.info("Start to load Flink jar: {}", flinkClientPath);
+
+        try (URLClassLoader classLoader = new URLClassLoader(new URL[]{new 
URL(flinkClientPath)}, Thread.currentThread()
+                .getContextClassLoader())) {
+            Class<?> flinkClientService = 
classLoader.loadClass(Constants.FLINK_CLIENT_CLASS);
+            Object flinkService = 
flinkClientService.getDeclaredConstructor(Configuration.class)
+                    .newInstance(configuration);
+            log.info("Successfully loaded Flink service");
+            return flinkService;
+        } catch (Exception e) {
+            log.error("Failed to loaded Flink service, please check flink 
client jar path: {}", flinkClientPath);
+            throw new RuntimeException(e);
         }
-        return true;
+    }
+
+    public static FlinkConfig getFlinkConfigFromFile() throws Exception {
+        Path pluginPath = Paths.get(DEFAULT_PLUGINS).toAbsolutePath();
+        String defaultConfigFilePath = pluginPath + File.separator + 
DEFAULT_CONFIG_FILE;
+
+        log.info("Start to load Flink config from file: {}", 
defaultConfigFilePath);
+
+        Properties properties = new Properties();
+        try (BufferedReader bufferedReader = new BufferedReader(new 
FileReader(defaultConfigFilePath))) {
+            properties.load(bufferedReader);
+        }
+        FlinkConfig flinkConfig = new FlinkConfig();
+        flinkConfig.setPort(Integer.valueOf(properties.getProperty(PORT)));
+        flinkConfig.setAddress(properties.getProperty(ADDRESS));
+        
flinkConfig.setParallelism(Integer.valueOf(properties.getProperty(PARALLELISM)));
+        
flinkConfig.setSavepointDirectory(properties.getProperty(SAVEPOINT_DIRECTORY));
+        
flinkConfig.setJobManagerPort(Integer.valueOf(properties.getProperty(JOB_MANAGER_PORT)));
+        
flinkConfig.setDrain(Boolean.parseBoolean(properties.getProperty(DRAIN)));
+        
flinkConfig.setAuditProxyHosts(properties.getProperty(METRICS_AUDIT_PROXY_HOSTS_KEY));
+        flinkConfig.setVersion(properties.getProperty(FLINK_VERSION));
+        return flinkConfig;
     }
 }


Reply via email to