This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 4802277803 [INLONG-9023][Manager] Fix the problem of failed to stop 
job task (#9024)
4802277803 is described below

commit 4802277803e2535ef072ae0d1d2ab6b8f5d0c991
Author: fuweng11 <[email protected]>
AuthorDate: Sun Oct 8 16:52:58 2023 +0800

    [INLONG-9023][Manager] Fix the problem of failed to stop job task (#9024)
---
 .../manager/plugin/flink/FlinkOperation.java       |  20 ++--
 .../plugin/listener/DeleteSortListener.java        |   6 +-
 .../plugin/listener/DeleteStreamListener.java      |  90 +-----------------
 .../plugin/listener/RestartSortListener.java       |  23 ++++-
 .../plugin/listener/RestartStreamListener.java     | 104 +--------------------
 .../plugin/listener/StartupSortListener.java       |   6 +-
 .../plugin/listener/SuspendSortListener.java       |  10 +-
 .../plugin/listener/SuspendStreamListener.java     |  91 +-----------------
 8 files changed, 52 insertions(+), 298 deletions(-)

diff --git 
a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkOperation.java
 
b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkOperation.java
index 2cb5ce3868..9f5d9f7708 100644
--- 
a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkOperation.java
+++ 
b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkOperation.java
@@ -51,14 +51,13 @@ import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
-import static org.apache.flink.api.common.JobStatus.RUNNING;
-
 /**
  * Flink task operation, such restart or stop flink job.
  */
 @Slf4j
 public class FlinkOperation {
 
+    private static final Integer TRY_MAX_TIMES = 60;
     private static final String CONFIG_FILE = "application.properties";
     private static final String CONNECTOR_DIR_KEY = "sort.connector.dir";
     private static final String JOB_TERMINATED_MSG = "the job not found by id 
%s, "
@@ -332,7 +331,7 @@ public class FlinkOperation {
     /**
      * Status of Flink job.
      */
-    public void pollJobStatus(FlinkInfo flinkInfo) throws Exception {
+    public void pollJobStatus(FlinkInfo flinkInfo, JobStatus expectStatus) 
throws Exception {
         if (flinkInfo.isException()) {
             throw new BusinessException("startup failed: " + 
flinkInfo.getExceptionMsg());
         }
@@ -341,8 +340,8 @@ public class FlinkOperation {
             log.error("job id cannot empty for {}", flinkInfo);
             throw new Exception("job id cannot empty");
         }
-
-        while (true) {
+        int retryTimes = 0;
+        while (retryTimes <= TRY_MAX_TIMES) {
             try {
                 JobDetailsInfo jobDetailsInfo = 
flinkService.getJobDetail(jobId);
                 if (jobDetailsInfo == null) {
@@ -351,20 +350,21 @@ public class FlinkOperation {
                 }
 
                 JobStatus jobStatus = jobDetailsInfo.getJobStatus();
-                if (jobStatus.isTerminalState()) {
+                if (jobStatus.isTerminalState() && expectStatus != 
JobStatus.CANCELED) {
                     log.error("job was terminated for {}, exception: {}", 
jobId, flinkInfo.getExceptionMsg());
                     throw new Exception("job was terminated for " + jobId);
                 }
 
-                if (jobStatus == RUNNING) {
-                    log.info("job status is Running for {}", jobId);
+                if (jobStatus == expectStatus) {
+                    log.info("job status is {} for {}", jobStatus, jobId);
                     break;
                 }
-                log.info("job was not Running for {}", jobId);
-                TimeUnit.SECONDS.sleep(5);
+                log.info("job status is {} for {}", jobStatus, jobId);
             } catch (Exception e) {
                 log.error("poll job status error for {}, exception: ", 
flinkInfo, e);
             }
+            TimeUnit.SECONDS.sleep(5);
+            retryTimes++;
         }
     }
 
diff --git 
a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/DeleteSortListener.java
 
b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/DeleteSortListener.java
index 63d942d2a1..d7f33fd77d 100644
--- 
a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/DeleteSortListener.java
+++ 
b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/DeleteSortListener.java
@@ -35,6 +35,7 @@ import 
org.apache.inlong.manager.workflow.event.task.SortOperateListener;
 import com.fasterxml.jackson.core.type.TypeReference;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.flink.api.common.JobStatus;
 
 import java.util.HashMap;
 import java.util.List;
@@ -113,16 +114,17 @@ public class DeleteSortListener implements 
SortOperateListener {
         try {
             flinkOperation.delete(flinkInfo);
             log.info("job delete success for jobId={}", jobId);
-            return ListenerResult.success();
         } catch (Exception e) {
             flinkInfo.setException(true);
             flinkInfo.setExceptionMsg(getExceptionStackMsg(e));
-            flinkOperation.pollJobStatus(flinkInfo);
+            flinkOperation.pollJobStatus(flinkInfo, JobStatus.CANCELED);
 
             String message = String.format("delete sort failed for 
groupId=%s", groupId);
             log.error(message, e);
             return ListenerResult.fail(message + ": " + e.getMessage());
         }
+        flinkOperation.pollJobStatus(flinkInfo, JobStatus.CANCELED);
+        return ListenerResult.success();
     }
 
 }
diff --git 
a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/DeleteStreamListener.java
 
b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/DeleteStreamListener.java
index 4bb20c6de6..49773e635c 100644
--- 
a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/DeleteStreamListener.java
+++ 
b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/DeleteStreamListener.java
@@ -17,32 +17,12 @@
 
 package org.apache.inlong.manager.plugin.listener;
 
-import org.apache.inlong.manager.common.consts.InlongConstants;
-import org.apache.inlong.manager.common.enums.GroupOperateType;
 import org.apache.inlong.manager.common.enums.TaskEvent;
-import org.apache.inlong.manager.common.util.JsonUtils;
-import org.apache.inlong.manager.plugin.flink.FlinkOperation;
-import org.apache.inlong.manager.plugin.flink.FlinkService;
-import org.apache.inlong.manager.plugin.flink.dto.FlinkInfo;
-import org.apache.inlong.manager.pojo.group.InlongGroupExtInfo;
-import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
-import org.apache.inlong.manager.pojo.stream.InlongStreamExtInfo;
-import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
-import org.apache.inlong.manager.pojo.workflow.form.process.ProcessForm;
-import 
org.apache.inlong.manager.pojo.workflow.form.process.StreamResourceProcessForm;
 import org.apache.inlong.manager.workflow.WorkflowContext;
 import org.apache.inlong.manager.workflow.event.ListenerResult;
 import org.apache.inlong.manager.workflow.event.task.SortOperateListener;
 
-import com.fasterxml.jackson.core.type.TypeReference;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.commons.lang3.StringUtils;
-
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import static 
org.apache.inlong.manager.plugin.util.FlinkUtils.getExceptionStackMsg;
 
 /**
  * Listener of delete stream sort
@@ -57,77 +37,13 @@ public class DeleteStreamListener implements 
SortOperateListener {
 
     @Override
     public boolean accept(WorkflowContext context) {
-        ProcessForm processForm = context.getProcessForm();
-        String groupId = processForm.getInlongGroupId();
-        if (!(processForm instanceof StreamResourceProcessForm)) {
-            log.info("not add delete stream listener, not 
StreamResourceProcessForm for groupId={}", groupId);
-            return false;
-        }
-
-        StreamResourceProcessForm streamProcessForm = 
(StreamResourceProcessForm) processForm;
-        String streamId = 
streamProcessForm.getStreamInfo().getInlongStreamId();
-        if (streamProcessForm.getGroupOperateType() != 
GroupOperateType.DELETE) {
-            log.info("not add delete stream listener, as the operate was not 
DELETE for groupId={} streamId={}",
-                    groupId, streamId);
-            return false;
-        }
-
-        log.info("add delete stream listener for groupId={} streamId={}", 
groupId, streamId);
-        return true;
+        log.info("not need to delete the sort task for InlongStream");
+        return false;
     }
 
     @Override
     public ListenerResult listen(WorkflowContext context) throws Exception {
-        ProcessForm processForm = context.getProcessForm();
-        StreamResourceProcessForm streamResourceProcessForm = 
(StreamResourceProcessForm) processForm;
-        InlongGroupInfo groupInfo = streamResourceProcessForm.getGroupInfo();
-        List<InlongGroupExtInfo> groupExtList = groupInfo.getExtList();
-        log.info("inlong group: {} ext info: {}", 
groupInfo.getInlongGroupId(), groupExtList);
-
-        InlongStreamInfo streamInfo = 
streamResourceProcessForm.getStreamInfo();
-        List<InlongStreamExtInfo> streamExtList = streamInfo.getExtList();
-        log.info("inlong stream: {} ext info: {}", 
streamInfo.getInlongStreamId(), streamExtList);
-
-        Map<String, String> kvConf = new HashMap<>();
-        groupExtList.forEach(groupExtInfo -> 
kvConf.put(groupExtInfo.getKeyName(), groupExtInfo.getKeyValue()));
-        streamExtList.forEach(extInfo -> kvConf.put(extInfo.getKeyName(), 
extInfo.getKeyValue()));
-
-        final String groupId = streamInfo.getInlongGroupId();
-        final String streamId = streamInfo.getInlongStreamId();
-        String sortExt = kvConf.get(InlongConstants.SORT_PROPERTIES);
-        if (StringUtils.isNotEmpty(sortExt)) {
-            Map<String, String> result = JsonUtils.OBJECT_MAPPER.convertValue(
-                    JsonUtils.OBJECT_MAPPER.readTree(sortExt), new 
TypeReference<Map<String, String>>() {
-                    });
-            kvConf.putAll(result);
-        }
-
-        String jobId = kvConf.get(InlongConstants.SORT_JOB_ID);
-        if (StringUtils.isBlank(jobId)) {
-            String message = String.format("sort job id is empty for 
groupId=%s streamId=%s", groupId, streamId);
-            return ListenerResult.fail(message);
-        }
-
-        FlinkInfo flinkInfo = new FlinkInfo();
-        flinkInfo.setJobId(jobId);
-        String sortUrl = kvConf.get(InlongConstants.SORT_URL);
-        flinkInfo.setEndpoint(sortUrl);
-
-        FlinkService flinkService = new FlinkService(flinkInfo.getEndpoint());
-        FlinkOperation flinkOperation = new FlinkOperation(flinkService);
-        try {
-            flinkOperation.delete(flinkInfo);
-            log.info("job delete success for jobId={}", jobId);
-            return ListenerResult.success();
-        } catch (Exception e) {
-            flinkInfo.setException(true);
-            flinkInfo.setExceptionMsg(getExceptionStackMsg(e));
-            flinkOperation.pollJobStatus(flinkInfo);
-
-            String message = String.format("delete sort failed for groupId=%s 
streamId=%s", groupId, streamId);
-            log.error(message, e);
-            return ListenerResult.fail(message + e.getMessage());
-        }
+        return ListenerResult.success();
     }
 
 }
diff --git 
a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/RestartSortListener.java
 
b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/RestartSortListener.java
index f228f9fe81..c8a223e0af 100644
--- 
a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/RestartSortListener.java
+++ 
b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/RestartSortListener.java
@@ -36,6 +36,7 @@ import 
org.apache.inlong.manager.workflow.event.task.SortOperateListener;
 import com.fasterxml.jackson.core.type.TypeReference;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.flink.api.common.JobStatus;
 
 import java.util.HashMap;
 import java.util.List;
@@ -111,7 +112,6 @@ public class RestartSortListener implements 
SortOperateListener {
         }
 
         FlinkInfo flinkInfo = new FlinkInfo();
-        flinkInfo.setJobId(jobId);
         String jobName = Constants.SORT_JOB_NAME_GENERATOR.apply(processForm);
         flinkInfo.setJobName(jobName);
         String sortUrl = kvConf.get(InlongConstants.SORT_URL);
@@ -121,18 +121,33 @@ public class RestartSortListener implements 
SortOperateListener {
         FlinkOperation flinkOperation = new FlinkOperation(flinkService);
         try {
             flinkOperation.genPath(flinkInfo, dataflow);
-            flinkOperation.restart(flinkInfo);
+            // todo Currently, savepoint is not being used to restart, but 
will be improved in the future
+            flinkOperation.start(flinkInfo);
             log.info("job restart success for [{}]", jobId);
-            return ListenerResult.success();
         } catch (Exception e) {
             flinkInfo.setException(true);
             flinkInfo.setExceptionMsg(getExceptionStackMsg(e));
-            flinkOperation.pollJobStatus(flinkInfo);
+            flinkOperation.pollJobStatus(flinkInfo, JobStatus.RUNNING);
 
             String message = String.format("restart sort failed for groupId 
[%s] ", groupId);
             log.error(message, e);
             return ListenerResult.fail(message + e.getMessage());
         }
+        extList.forEach(groupExtInfo -> 
kvConf.remove(InlongConstants.SORT_JOB_ID));
+        saveInfo(groupId, InlongConstants.SORT_JOB_ID, flinkInfo.getJobId(), 
extList);
+        flinkOperation.pollJobStatus(flinkInfo, JobStatus.RUNNING);
+        return ListenerResult.success();
+    }
+
+    /**
+     * Save ext info into list.
+     */
+    private void saveInfo(String inlongGroupId, String keyName, String 
keyValue, List<InlongGroupExtInfo> extInfoList) {
+        InlongGroupExtInfo extInfo = new InlongGroupExtInfo();
+        extInfo.setInlongGroupId(inlongGroupId);
+        extInfo.setKeyName(keyName);
+        extInfo.setKeyValue(keyValue);
+        extInfoList.add(extInfo);
     }
 
 }
diff --git 
a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/RestartStreamListener.java
 
b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/RestartStreamListener.java
index da88e53ffb..6e4f3a8953 100644
--- 
a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/RestartStreamListener.java
+++ 
b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/RestartStreamListener.java
@@ -17,34 +17,12 @@
 
 package org.apache.inlong.manager.plugin.listener;
 
-import org.apache.inlong.manager.common.consts.InlongConstants;
-import org.apache.inlong.manager.common.enums.GroupOperateType;
 import org.apache.inlong.manager.common.enums.TaskEvent;
-import org.apache.inlong.manager.common.util.JsonUtils;
-import org.apache.inlong.manager.plugin.flink.FlinkOperation;
-import org.apache.inlong.manager.plugin.flink.FlinkService;
-import org.apache.inlong.manager.plugin.flink.dto.FlinkInfo;
-import org.apache.inlong.manager.plugin.flink.enums.Constants;
-import org.apache.inlong.manager.pojo.group.InlongGroupExtInfo;
-import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
-import org.apache.inlong.manager.pojo.stream.InlongStreamExtInfo;
-import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
-import org.apache.inlong.manager.pojo.workflow.form.process.ProcessForm;
-import 
org.apache.inlong.manager.pojo.workflow.form.process.StreamResourceProcessForm;
 import org.apache.inlong.manager.workflow.WorkflowContext;
 import org.apache.inlong.manager.workflow.event.ListenerResult;
 import org.apache.inlong.manager.workflow.event.task.SortOperateListener;
 
-import com.fasterxml.jackson.core.type.TypeReference;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.commons.collections.CollectionUtils;
-import org.apache.commons.lang3.StringUtils;
-
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import static 
org.apache.inlong.manager.plugin.util.FlinkUtils.getExceptionStackMsg;
 
 /**
  * Listener of restart stream sort.
@@ -59,89 +37,13 @@ public class RestartStreamListener implements 
SortOperateListener {
 
     @Override
     public boolean accept(WorkflowContext workflowContext) {
-        ProcessForm processForm = workflowContext.getProcessForm();
-        String groupId = processForm.getInlongGroupId();
-        if (!(processForm instanceof StreamResourceProcessForm)) {
-            log.info("not add restart stream listener, not 
StreamResourceProcessForm for groupId [{}]", groupId);
-            return false;
-        }
-
-        StreamResourceProcessForm streamProcessForm = 
(StreamResourceProcessForm) processForm;
-        String streamId = 
streamProcessForm.getStreamInfo().getInlongStreamId();
-        if (streamProcessForm.getGroupOperateType() != 
GroupOperateType.RESTART) {
-            log.info("not add restart stream listener, as the operate was not 
RESTART for groupId [{}] streamId [{}]",
-                    groupId, streamId);
-            return false;
-        }
-
-        log.info("add restart stream listener for groupId [{}] streamId [{}]", 
groupId, streamId);
-        return true;
+        log.info("not need to restart the sort task for InlongStream");
+        return false;
     }
 
     @Override
     public ListenerResult listen(WorkflowContext context) throws Exception {
-        ProcessForm processForm = context.getProcessForm();
-        StreamResourceProcessForm streamResourceProcessForm = 
(StreamResourceProcessForm) processForm;
-        InlongGroupInfo groupInfo = streamResourceProcessForm.getGroupInfo();
-        List<InlongGroupExtInfo> groupExtList = groupInfo.getExtList();
-        log.info("inlong group :{} ext info: {}", 
groupInfo.getInlongGroupId(), groupExtList);
-        InlongStreamInfo streamInfo = 
streamResourceProcessForm.getStreamInfo();
-        List<InlongStreamExtInfo> streamExtList = streamInfo.getExtList();
-        log.info("inlong stream :{} ext info: {}", 
streamInfo.getInlongStreamId(), streamExtList);
-
-        Map<String, String> kvConf = new HashMap<>();
-        groupExtList.forEach(groupExtInfo -> 
kvConf.put(groupExtInfo.getKeyName(), groupExtInfo.getKeyValue()));
-        // There is a possibility that the extList value is null
-        if (CollectionUtils.isNotEmpty(streamExtList)) {
-            streamExtList.forEach(extInfo -> {
-                kvConf.put(extInfo.getKeyName(), extInfo.getKeyValue());
-            });
-        }
-        final String groupId = streamInfo.getInlongGroupId();
-        final String streamId = streamInfo.getInlongStreamId();
-        String sortExt = kvConf.get(InlongConstants.SORT_PROPERTIES);
-        if (StringUtils.isNotEmpty(sortExt)) {
-            Map<String, String> result = JsonUtils.OBJECT_MAPPER.convertValue(
-                    JsonUtils.OBJECT_MAPPER.readTree(sortExt), new 
TypeReference<Map<String, String>>() {
-                    });
-            kvConf.putAll(result);
-        }
-
-        String jobId = kvConf.get(InlongConstants.SORT_JOB_ID);
-        if (StringUtils.isBlank(jobId)) {
-            String message = String.format("sort job id is empty for groupId 
[%s] streamId [%s]", groupId, streamId);
-            return ListenerResult.fail(message);
-        }
-        String dataflow = kvConf.get(InlongConstants.DATAFLOW);
-        if (StringUtils.isEmpty(dataflow)) {
-            String message = String.format("dataflow is empty for groupId [%s] 
streamId [%s]", groupId, streamId);
-            log.error(message);
-            return ListenerResult.fail(message);
-        }
-
-        FlinkInfo flinkInfo = new FlinkInfo();
-        flinkInfo.setJobId(jobId);
-        String jobName = Constants.SORT_JOB_NAME_GENERATOR.apply(processForm);
-        flinkInfo.setJobName(jobName);
-        String sortUrl = kvConf.get(InlongConstants.SORT_URL);
-        flinkInfo.setEndpoint(sortUrl);
-
-        FlinkService flinkService = new FlinkService(flinkInfo.getEndpoint());
-        FlinkOperation flinkOperation = new FlinkOperation(flinkService);
-        try {
-            flinkOperation.genPath(flinkInfo, dataflow);
-            flinkOperation.restart(flinkInfo);
-            log.info("job restart success for [{}]", jobId);
-            return ListenerResult.success();
-        } catch (Exception e) {
-            flinkInfo.setException(true);
-            flinkInfo.setExceptionMsg(getExceptionStackMsg(e));
-            flinkOperation.pollJobStatus(flinkInfo);
-
-            String message = String.format("restart sort failed for groupId 
[%s] streamId [%s] ", groupId, streamId);
-            log.error(message, e);
-            return ListenerResult.fail(message + e.getMessage());
-        }
+        return ListenerResult.success();
     }
 
 }
diff --git 
a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/StartupSortListener.java
 
b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/StartupSortListener.java
index 8348a9281a..b65fb40ea6 100644
--- 
a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/StartupSortListener.java
+++ 
b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/StartupSortListener.java
@@ -37,6 +37,7 @@ import 
org.apache.inlong.manager.workflow.event.task.SortOperateListener;
 import com.fasterxml.jackson.core.type.TypeReference;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.flink.api.common.JobStatus;
 
 import java.util.List;
 import java.util.Map;
@@ -132,10 +133,9 @@ public class StartupSortListener implements 
SortOperateListener {
             flinkOperation.start(flinkInfo);
             log.info("job submit success, jobId is [{}]", 
flinkInfo.getJobId());
         } catch (Exception e) {
-            flinkOperation.pollJobStatus(flinkInfo);
             flinkInfo.setException(true);
             flinkInfo.setExceptionMsg(getExceptionStackMsg(e));
-            flinkOperation.pollJobStatus(flinkInfo);
+            flinkOperation.pollJobStatus(flinkInfo, JobStatus.RUNNING);
 
             String message = String.format("startup sort failed for groupId 
[%s] ", groupId);
             log.error(message, e);
@@ -143,7 +143,7 @@ public class StartupSortListener implements 
SortOperateListener {
         }
 
         saveInfo(groupId, InlongConstants.SORT_JOB_ID, flinkInfo.getJobId(), 
extList);
-        flinkOperation.pollJobStatus(flinkInfo);
+        flinkOperation.pollJobStatus(flinkInfo, JobStatus.RUNNING);
         return ListenerResult.success();
     }
 
diff --git 
a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/SuspendSortListener.java
 
b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/SuspendSortListener.java
index 76cdc60586..affe9bbd22 100644
--- 
a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/SuspendSortListener.java
+++ 
b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/SuspendSortListener.java
@@ -35,6 +35,7 @@ import 
org.apache.inlong.manager.workflow.event.task.SortOperateListener;
 import com.fasterxml.jackson.core.type.TypeReference;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.flink.api.common.JobStatus;
 
 import java.util.HashMap;
 import java.util.List;
@@ -111,18 +112,21 @@ public class SuspendSortListener implements 
SortOperateListener {
         FlinkService flinkService = new FlinkService(flinkInfo.getEndpoint());
         FlinkOperation flinkOperation = new FlinkOperation(flinkService);
         try {
-            flinkOperation.stop(flinkInfo);
+            // todo Currently, savepoint is not being used to stop, but will 
be improved in the future
+            flinkOperation.delete(flinkInfo);
             log.info("job suspend success for [{}]", jobId);
-            return ListenerResult.success();
         } catch (Exception e) {
             flinkInfo.setException(true);
             flinkInfo.setExceptionMsg(getExceptionStackMsg(e));
-            flinkOperation.pollJobStatus(flinkInfo);
+            flinkOperation.pollJobStatus(flinkInfo, JobStatus.CANCELED);
 
             String message = String.format("suspend sort failed for groupId 
[%s] ", groupId);
             log.error(message, e);
             return ListenerResult.fail(message + e.getMessage());
         }
+        flinkOperation.pollJobStatus(flinkInfo, JobStatus.CANCELED);
+        return ListenerResult.success();
+
     }
 
 }
diff --git 
a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/SuspendStreamListener.java
 
b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/SuspendStreamListener.java
index 7a0371a323..7eda3df1be 100644
--- 
a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/SuspendStreamListener.java
+++ 
b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/SuspendStreamListener.java
@@ -17,32 +17,12 @@
 
 package org.apache.inlong.manager.plugin.listener;
 
-import org.apache.inlong.manager.common.consts.InlongConstants;
-import org.apache.inlong.manager.common.enums.GroupOperateType;
 import org.apache.inlong.manager.common.enums.TaskEvent;
-import org.apache.inlong.manager.common.util.JsonUtils;
-import org.apache.inlong.manager.plugin.flink.FlinkOperation;
-import org.apache.inlong.manager.plugin.flink.FlinkService;
-import org.apache.inlong.manager.plugin.flink.dto.FlinkInfo;
-import org.apache.inlong.manager.pojo.group.InlongGroupExtInfo;
-import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
-import org.apache.inlong.manager.pojo.stream.InlongStreamExtInfo;
-import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
-import org.apache.inlong.manager.pojo.workflow.form.process.ProcessForm;
-import 
org.apache.inlong.manager.pojo.workflow.form.process.StreamResourceProcessForm;
 import org.apache.inlong.manager.workflow.WorkflowContext;
 import org.apache.inlong.manager.workflow.event.ListenerResult;
 import org.apache.inlong.manager.workflow.event.task.SortOperateListener;
 
-import com.fasterxml.jackson.core.type.TypeReference;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.commons.lang3.StringUtils;
-
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import static 
org.apache.inlong.manager.plugin.util.FlinkUtils.getExceptionStackMsg;
 
 /**
  * Listener of suspend stream sort.
@@ -57,78 +37,13 @@ public class SuspendStreamListener implements 
SortOperateListener {
 
     @Override
     public boolean accept(WorkflowContext workflowContext) {
-        ProcessForm processForm = workflowContext.getProcessForm();
-        String groupId = processForm.getInlongGroupId();
-        if (!(processForm instanceof StreamResourceProcessForm)) {
-            log.info("not add suspend stream listener, not 
StreamResourceProcessForm for groupId [{}]", groupId);
-            return false;
-        }
-
-        StreamResourceProcessForm streamProcessForm = 
(StreamResourceProcessForm) processForm;
-        String streamId = 
streamProcessForm.getStreamInfo().getInlongStreamId();
-        if (streamProcessForm.getGroupOperateType() != 
GroupOperateType.SUSPEND) {
-            log.info("not add suspend stream listener as the operate SUSPEND 
for groupId [{}] streamId [{}]",
-                    groupId, streamId);
-            return false;
-        }
-
-        log.info("add suspend stream listener for groupId [{}] streamId [{}]", 
groupId, streamId);
-        return true;
+        log.info("not need to suspend the sort task for InlongStream");
+        return false;
     }
 
     @Override
     public ListenerResult listen(WorkflowContext context) throws Exception {
-        ProcessForm processForm = context.getProcessForm();
-        StreamResourceProcessForm streamResourceProcessForm = 
(StreamResourceProcessForm) processForm;
-        InlongGroupInfo groupInfo = streamResourceProcessForm.getGroupInfo();
-        List<InlongGroupExtInfo> groupExtList = groupInfo.getExtList();
-        log.info("inlong group :{} ext info: {}", 
groupInfo.getInlongGroupId(), groupExtList);
-        InlongStreamInfo streamInfo = 
streamResourceProcessForm.getStreamInfo();
-        List<InlongStreamExtInfo> streamExtList = streamInfo.getExtList();
-        log.info("inlong stream :{} ext info: {}", 
streamInfo.getInlongStreamId(), streamExtList);
-
-        Map<String, String> kvConf = new HashMap<>();
-        groupExtList.forEach(groupExtInfo -> 
kvConf.put(groupExtInfo.getKeyName(), groupExtInfo.getKeyValue()));
-        streamExtList.forEach(extInfo -> {
-            kvConf.put(extInfo.getKeyName(), extInfo.getKeyValue());
-        });
-
-        final String groupId = streamInfo.getInlongGroupId();
-        final String streamId = streamInfo.getInlongStreamId();
-        String sortExt = kvConf.get(InlongConstants.SORT_PROPERTIES);
-        if (StringUtils.isNotEmpty(sortExt)) {
-            Map<String, String> result = JsonUtils.OBJECT_MAPPER.convertValue(
-                    JsonUtils.OBJECT_MAPPER.readTree(sortExt), new 
TypeReference<Map<String, String>>() {
-                    });
-            kvConf.putAll(result);
-        }
-
-        String jobId = kvConf.get(InlongConstants.SORT_JOB_ID);
-        if (StringUtils.isBlank(jobId)) {
-            String message = String.format("sort job id is empty for groupId 
[%s] streamId [%s]", groupId, streamId);
-            return ListenerResult.fail(message);
-        }
-
-        FlinkInfo flinkInfo = new FlinkInfo();
-        flinkInfo.setJobId(jobId);
-        String sortUrl = kvConf.get(InlongConstants.SORT_URL);
-        flinkInfo.setEndpoint(sortUrl);
-
-        FlinkService flinkService = new FlinkService(flinkInfo.getEndpoint());
-        FlinkOperation flinkOperation = new FlinkOperation(flinkService);
-        try {
-            flinkOperation.stop(flinkInfo);
-            log.info("job suspend success for [{}]", jobId);
-            return ListenerResult.success();
-        } catch (Exception e) {
-            flinkInfo.setException(true);
-            flinkInfo.setExceptionMsg(getExceptionStackMsg(e));
-            flinkOperation.pollJobStatus(flinkInfo);
-
-            String message = String.format("suspend sort failed for groupId 
[%s] streamId[%s]", groupId, streamId);
-            log.error(message, e);
-            return ListenerResult.fail(message + e.getMessage());
-        }
+        return ListenerResult.success();
     }
 
 }

Reply via email to