This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 4802277803 [INLONG-9023][Manager] Fix the problem of failed to stop
job task (#9024)
4802277803 is described below
commit 4802277803e2535ef072ae0d1d2ab6b8f5d0c991
Author: fuweng11 <[email protected]>
AuthorDate: Sun Oct 8 16:52:58 2023 +0800
[INLONG-9023][Manager] Fix the problem of failed to stop job task (#9024)
---
.../manager/plugin/flink/FlinkOperation.java | 20 ++--
.../plugin/listener/DeleteSortListener.java | 6 +-
.../plugin/listener/DeleteStreamListener.java | 90 +-----------------
.../plugin/listener/RestartSortListener.java | 23 ++++-
.../plugin/listener/RestartStreamListener.java | 104 +--------------------
.../plugin/listener/StartupSortListener.java | 6 +-
.../plugin/listener/SuspendSortListener.java | 10 +-
.../plugin/listener/SuspendStreamListener.java | 91 +-----------------
8 files changed, 52 insertions(+), 298 deletions(-)
diff --git
a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkOperation.java
b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkOperation.java
index 2cb5ce3868..9f5d9f7708 100644
---
a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkOperation.java
+++
b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkOperation.java
@@ -51,14 +51,13 @@ import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
-import static org.apache.flink.api.common.JobStatus.RUNNING;
-
/**
* Flink task operation, such restart or stop flink job.
*/
@Slf4j
public class FlinkOperation {
+ private static final Integer TRY_MAX_TIMES = 60;
private static final String CONFIG_FILE = "application.properties";
private static final String CONNECTOR_DIR_KEY = "sort.connector.dir";
private static final String JOB_TERMINATED_MSG = "the job not found by id
%s, "
@@ -332,7 +331,7 @@ public class FlinkOperation {
/**
* Status of Flink job.
*/
- public void pollJobStatus(FlinkInfo flinkInfo) throws Exception {
+ public void pollJobStatus(FlinkInfo flinkInfo, JobStatus expectStatus)
throws Exception {
if (flinkInfo.isException()) {
throw new BusinessException("startup failed: " +
flinkInfo.getExceptionMsg());
}
@@ -341,8 +340,8 @@ public class FlinkOperation {
log.error("job id cannot empty for {}", flinkInfo);
throw new Exception("job id cannot empty");
}
-
- while (true) {
+ int retryTimes = 0;
+ while (retryTimes <= TRY_MAX_TIMES) {
try {
JobDetailsInfo jobDetailsInfo =
flinkService.getJobDetail(jobId);
if (jobDetailsInfo == null) {
@@ -351,20 +350,21 @@ public class FlinkOperation {
}
JobStatus jobStatus = jobDetailsInfo.getJobStatus();
- if (jobStatus.isTerminalState()) {
+ if (jobStatus.isTerminalState() && expectStatus !=
JobStatus.CANCELED) {
log.error("job was terminated for {}, exception: {}",
jobId, flinkInfo.getExceptionMsg());
throw new Exception("job was terminated for " + jobId);
}
- if (jobStatus == RUNNING) {
- log.info("job status is Running for {}", jobId);
+ if (jobStatus == expectStatus) {
+ log.info("job status is {} for {}", jobStatus, jobId);
break;
}
- log.info("job was not Running for {}", jobId);
- TimeUnit.SECONDS.sleep(5);
+ log.info("job status is {} for {}", jobStatus, jobId);
} catch (Exception e) {
log.error("poll job status error for {}, exception: ",
flinkInfo, e);
}
+ TimeUnit.SECONDS.sleep(5);
+ retryTimes++;
}
}
diff --git
a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/DeleteSortListener.java
b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/DeleteSortListener.java
index 63d942d2a1..d7f33fd77d 100644
---
a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/DeleteSortListener.java
+++
b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/DeleteSortListener.java
@@ -35,6 +35,7 @@ import
org.apache.inlong.manager.workflow.event.task.SortOperateListener;
import com.fasterxml.jackson.core.type.TypeReference;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
+import org.apache.flink.api.common.JobStatus;
import java.util.HashMap;
import java.util.List;
@@ -113,16 +114,17 @@ public class DeleteSortListener implements
SortOperateListener {
try {
flinkOperation.delete(flinkInfo);
log.info("job delete success for jobId={}", jobId);
- return ListenerResult.success();
} catch (Exception e) {
flinkInfo.setException(true);
flinkInfo.setExceptionMsg(getExceptionStackMsg(e));
- flinkOperation.pollJobStatus(flinkInfo);
+ flinkOperation.pollJobStatus(flinkInfo, JobStatus.CANCELED);
String message = String.format("delete sort failed for
groupId=%s", groupId);
log.error(message, e);
return ListenerResult.fail(message + ": " + e.getMessage());
}
+ flinkOperation.pollJobStatus(flinkInfo, JobStatus.CANCELED);
+ return ListenerResult.success();
}
}
diff --git
a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/DeleteStreamListener.java
b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/DeleteStreamListener.java
index 4bb20c6de6..49773e635c 100644
---
a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/DeleteStreamListener.java
+++
b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/DeleteStreamListener.java
@@ -17,32 +17,12 @@
package org.apache.inlong.manager.plugin.listener;
-import org.apache.inlong.manager.common.consts.InlongConstants;
-import org.apache.inlong.manager.common.enums.GroupOperateType;
import org.apache.inlong.manager.common.enums.TaskEvent;
-import org.apache.inlong.manager.common.util.JsonUtils;
-import org.apache.inlong.manager.plugin.flink.FlinkOperation;
-import org.apache.inlong.manager.plugin.flink.FlinkService;
-import org.apache.inlong.manager.plugin.flink.dto.FlinkInfo;
-import org.apache.inlong.manager.pojo.group.InlongGroupExtInfo;
-import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
-import org.apache.inlong.manager.pojo.stream.InlongStreamExtInfo;
-import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
-import org.apache.inlong.manager.pojo.workflow.form.process.ProcessForm;
-import
org.apache.inlong.manager.pojo.workflow.form.process.StreamResourceProcessForm;
import org.apache.inlong.manager.workflow.WorkflowContext;
import org.apache.inlong.manager.workflow.event.ListenerResult;
import org.apache.inlong.manager.workflow.event.task.SortOperateListener;
-import com.fasterxml.jackson.core.type.TypeReference;
import lombok.extern.slf4j.Slf4j;
-import org.apache.commons.lang3.StringUtils;
-
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import static
org.apache.inlong.manager.plugin.util.FlinkUtils.getExceptionStackMsg;
/**
* Listener of delete stream sort
@@ -57,77 +37,13 @@ public class DeleteStreamListener implements
SortOperateListener {
@Override
public boolean accept(WorkflowContext context) {
- ProcessForm processForm = context.getProcessForm();
- String groupId = processForm.getInlongGroupId();
- if (!(processForm instanceof StreamResourceProcessForm)) {
- log.info("not add delete stream listener, not
StreamResourceProcessForm for groupId={}", groupId);
- return false;
- }
-
- StreamResourceProcessForm streamProcessForm =
(StreamResourceProcessForm) processForm;
- String streamId =
streamProcessForm.getStreamInfo().getInlongStreamId();
- if (streamProcessForm.getGroupOperateType() !=
GroupOperateType.DELETE) {
- log.info("not add delete stream listener, as the operate was not
DELETE for groupId={} streamId={}",
- groupId, streamId);
- return false;
- }
-
- log.info("add delete stream listener for groupId={} streamId={}",
groupId, streamId);
- return true;
+ log.info("not need to delete the sort task for InlongStream");
+ return false;
}
@Override
public ListenerResult listen(WorkflowContext context) throws Exception {
- ProcessForm processForm = context.getProcessForm();
- StreamResourceProcessForm streamResourceProcessForm =
(StreamResourceProcessForm) processForm;
- InlongGroupInfo groupInfo = streamResourceProcessForm.getGroupInfo();
- List<InlongGroupExtInfo> groupExtList = groupInfo.getExtList();
- log.info("inlong group: {} ext info: {}",
groupInfo.getInlongGroupId(), groupExtList);
-
- InlongStreamInfo streamInfo =
streamResourceProcessForm.getStreamInfo();
- List<InlongStreamExtInfo> streamExtList = streamInfo.getExtList();
- log.info("inlong stream: {} ext info: {}",
streamInfo.getInlongStreamId(), streamExtList);
-
- Map<String, String> kvConf = new HashMap<>();
- groupExtList.forEach(groupExtInfo ->
kvConf.put(groupExtInfo.getKeyName(), groupExtInfo.getKeyValue()));
- streamExtList.forEach(extInfo -> kvConf.put(extInfo.getKeyName(),
extInfo.getKeyValue()));
-
- final String groupId = streamInfo.getInlongGroupId();
- final String streamId = streamInfo.getInlongStreamId();
- String sortExt = kvConf.get(InlongConstants.SORT_PROPERTIES);
- if (StringUtils.isNotEmpty(sortExt)) {
- Map<String, String> result = JsonUtils.OBJECT_MAPPER.convertValue(
- JsonUtils.OBJECT_MAPPER.readTree(sortExt), new
TypeReference<Map<String, String>>() {
- });
- kvConf.putAll(result);
- }
-
- String jobId = kvConf.get(InlongConstants.SORT_JOB_ID);
- if (StringUtils.isBlank(jobId)) {
- String message = String.format("sort job id is empty for
groupId=%s streamId=%s", groupId, streamId);
- return ListenerResult.fail(message);
- }
-
- FlinkInfo flinkInfo = new FlinkInfo();
- flinkInfo.setJobId(jobId);
- String sortUrl = kvConf.get(InlongConstants.SORT_URL);
- flinkInfo.setEndpoint(sortUrl);
-
- FlinkService flinkService = new FlinkService(flinkInfo.getEndpoint());
- FlinkOperation flinkOperation = new FlinkOperation(flinkService);
- try {
- flinkOperation.delete(flinkInfo);
- log.info("job delete success for jobId={}", jobId);
- return ListenerResult.success();
- } catch (Exception e) {
- flinkInfo.setException(true);
- flinkInfo.setExceptionMsg(getExceptionStackMsg(e));
- flinkOperation.pollJobStatus(flinkInfo);
-
- String message = String.format("delete sort failed for groupId=%s
streamId=%s", groupId, streamId);
- log.error(message, e);
- return ListenerResult.fail(message + e.getMessage());
- }
+ return ListenerResult.success();
}
}
diff --git
a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/RestartSortListener.java
b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/RestartSortListener.java
index f228f9fe81..c8a223e0af 100644
---
a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/RestartSortListener.java
+++
b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/RestartSortListener.java
@@ -36,6 +36,7 @@ import
org.apache.inlong.manager.workflow.event.task.SortOperateListener;
import com.fasterxml.jackson.core.type.TypeReference;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
+import org.apache.flink.api.common.JobStatus;
import java.util.HashMap;
import java.util.List;
@@ -111,7 +112,6 @@ public class RestartSortListener implements
SortOperateListener {
}
FlinkInfo flinkInfo = new FlinkInfo();
- flinkInfo.setJobId(jobId);
String jobName = Constants.SORT_JOB_NAME_GENERATOR.apply(processForm);
flinkInfo.setJobName(jobName);
String sortUrl = kvConf.get(InlongConstants.SORT_URL);
@@ -121,18 +121,33 @@ public class RestartSortListener implements
SortOperateListener {
FlinkOperation flinkOperation = new FlinkOperation(flinkService);
try {
flinkOperation.genPath(flinkInfo, dataflow);
- flinkOperation.restart(flinkInfo);
+ // todo Currently, savepoint is not being used to restart, but
will be improved in the future
+ flinkOperation.start(flinkInfo);
log.info("job restart success for [{}]", jobId);
- return ListenerResult.success();
} catch (Exception e) {
flinkInfo.setException(true);
flinkInfo.setExceptionMsg(getExceptionStackMsg(e));
- flinkOperation.pollJobStatus(flinkInfo);
+ flinkOperation.pollJobStatus(flinkInfo, JobStatus.RUNNING);
String message = String.format("restart sort failed for groupId
[%s] ", groupId);
log.error(message, e);
return ListenerResult.fail(message + e.getMessage());
}
+ extList.forEach(groupExtInfo ->
kvConf.remove(InlongConstants.SORT_JOB_ID));
+ saveInfo(groupId, InlongConstants.SORT_JOB_ID, flinkInfo.getJobId(),
extList);
+ flinkOperation.pollJobStatus(flinkInfo, JobStatus.RUNNING);
+ return ListenerResult.success();
+ }
+
+ /**
+ * Save ext info into list.
+ */
+ private void saveInfo(String inlongGroupId, String keyName, String
keyValue, List<InlongGroupExtInfo> extInfoList) {
+ InlongGroupExtInfo extInfo = new InlongGroupExtInfo();
+ extInfo.setInlongGroupId(inlongGroupId);
+ extInfo.setKeyName(keyName);
+ extInfo.setKeyValue(keyValue);
+ extInfoList.add(extInfo);
}
}
diff --git
a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/RestartStreamListener.java
b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/RestartStreamListener.java
index da88e53ffb..6e4f3a8953 100644
---
a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/RestartStreamListener.java
+++
b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/RestartStreamListener.java
@@ -17,34 +17,12 @@
package org.apache.inlong.manager.plugin.listener;
-import org.apache.inlong.manager.common.consts.InlongConstants;
-import org.apache.inlong.manager.common.enums.GroupOperateType;
import org.apache.inlong.manager.common.enums.TaskEvent;
-import org.apache.inlong.manager.common.util.JsonUtils;
-import org.apache.inlong.manager.plugin.flink.FlinkOperation;
-import org.apache.inlong.manager.plugin.flink.FlinkService;
-import org.apache.inlong.manager.plugin.flink.dto.FlinkInfo;
-import org.apache.inlong.manager.plugin.flink.enums.Constants;
-import org.apache.inlong.manager.pojo.group.InlongGroupExtInfo;
-import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
-import org.apache.inlong.manager.pojo.stream.InlongStreamExtInfo;
-import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
-import org.apache.inlong.manager.pojo.workflow.form.process.ProcessForm;
-import
org.apache.inlong.manager.pojo.workflow.form.process.StreamResourceProcessForm;
import org.apache.inlong.manager.workflow.WorkflowContext;
import org.apache.inlong.manager.workflow.event.ListenerResult;
import org.apache.inlong.manager.workflow.event.task.SortOperateListener;
-import com.fasterxml.jackson.core.type.TypeReference;
import lombok.extern.slf4j.Slf4j;
-import org.apache.commons.collections.CollectionUtils;
-import org.apache.commons.lang3.StringUtils;
-
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import static
org.apache.inlong.manager.plugin.util.FlinkUtils.getExceptionStackMsg;
/**
* Listener of restart stream sort.
@@ -59,89 +37,13 @@ public class RestartStreamListener implements
SortOperateListener {
@Override
public boolean accept(WorkflowContext workflowContext) {
- ProcessForm processForm = workflowContext.getProcessForm();
- String groupId = processForm.getInlongGroupId();
- if (!(processForm instanceof StreamResourceProcessForm)) {
- log.info("not add restart stream listener, not
StreamResourceProcessForm for groupId [{}]", groupId);
- return false;
- }
-
- StreamResourceProcessForm streamProcessForm =
(StreamResourceProcessForm) processForm;
- String streamId =
streamProcessForm.getStreamInfo().getInlongStreamId();
- if (streamProcessForm.getGroupOperateType() !=
GroupOperateType.RESTART) {
- log.info("not add restart stream listener, as the operate was not
RESTART for groupId [{}] streamId [{}]",
- groupId, streamId);
- return false;
- }
-
- log.info("add restart stream listener for groupId [{}] streamId [{}]",
groupId, streamId);
- return true;
+ log.info("not need to restart the sort task for InlongStream");
+ return false;
}
@Override
public ListenerResult listen(WorkflowContext context) throws Exception {
- ProcessForm processForm = context.getProcessForm();
- StreamResourceProcessForm streamResourceProcessForm =
(StreamResourceProcessForm) processForm;
- InlongGroupInfo groupInfo = streamResourceProcessForm.getGroupInfo();
- List<InlongGroupExtInfo> groupExtList = groupInfo.getExtList();
- log.info("inlong group :{} ext info: {}",
groupInfo.getInlongGroupId(), groupExtList);
- InlongStreamInfo streamInfo =
streamResourceProcessForm.getStreamInfo();
- List<InlongStreamExtInfo> streamExtList = streamInfo.getExtList();
- log.info("inlong stream :{} ext info: {}",
streamInfo.getInlongStreamId(), streamExtList);
-
- Map<String, String> kvConf = new HashMap<>();
- groupExtList.forEach(groupExtInfo ->
kvConf.put(groupExtInfo.getKeyName(), groupExtInfo.getKeyValue()));
- // There is a possibility that the extList value is null
- if (CollectionUtils.isNotEmpty(streamExtList)) {
- streamExtList.forEach(extInfo -> {
- kvConf.put(extInfo.getKeyName(), extInfo.getKeyValue());
- });
- }
- final String groupId = streamInfo.getInlongGroupId();
- final String streamId = streamInfo.getInlongStreamId();
- String sortExt = kvConf.get(InlongConstants.SORT_PROPERTIES);
- if (StringUtils.isNotEmpty(sortExt)) {
- Map<String, String> result = JsonUtils.OBJECT_MAPPER.convertValue(
- JsonUtils.OBJECT_MAPPER.readTree(sortExt), new
TypeReference<Map<String, String>>() {
- });
- kvConf.putAll(result);
- }
-
- String jobId = kvConf.get(InlongConstants.SORT_JOB_ID);
- if (StringUtils.isBlank(jobId)) {
- String message = String.format("sort job id is empty for groupId
[%s] streamId [%s]", groupId, streamId);
- return ListenerResult.fail(message);
- }
- String dataflow = kvConf.get(InlongConstants.DATAFLOW);
- if (StringUtils.isEmpty(dataflow)) {
- String message = String.format("dataflow is empty for groupId [%s]
streamId [%s]", groupId, streamId);
- log.error(message);
- return ListenerResult.fail(message);
- }
-
- FlinkInfo flinkInfo = new FlinkInfo();
- flinkInfo.setJobId(jobId);
- String jobName = Constants.SORT_JOB_NAME_GENERATOR.apply(processForm);
- flinkInfo.setJobName(jobName);
- String sortUrl = kvConf.get(InlongConstants.SORT_URL);
- flinkInfo.setEndpoint(sortUrl);
-
- FlinkService flinkService = new FlinkService(flinkInfo.getEndpoint());
- FlinkOperation flinkOperation = new FlinkOperation(flinkService);
- try {
- flinkOperation.genPath(flinkInfo, dataflow);
- flinkOperation.restart(flinkInfo);
- log.info("job restart success for [{}]", jobId);
- return ListenerResult.success();
- } catch (Exception e) {
- flinkInfo.setException(true);
- flinkInfo.setExceptionMsg(getExceptionStackMsg(e));
- flinkOperation.pollJobStatus(flinkInfo);
-
- String message = String.format("restart sort failed for groupId
[%s] streamId [%s] ", groupId, streamId);
- log.error(message, e);
- return ListenerResult.fail(message + e.getMessage());
- }
+ return ListenerResult.success();
}
}
diff --git
a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/StartupSortListener.java
b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/StartupSortListener.java
index 8348a9281a..b65fb40ea6 100644
---
a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/StartupSortListener.java
+++
b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/StartupSortListener.java
@@ -37,6 +37,7 @@ import
org.apache.inlong.manager.workflow.event.task.SortOperateListener;
import com.fasterxml.jackson.core.type.TypeReference;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
+import org.apache.flink.api.common.JobStatus;
import java.util.List;
import java.util.Map;
@@ -132,10 +133,9 @@ public class StartupSortListener implements
SortOperateListener {
flinkOperation.start(flinkInfo);
log.info("job submit success, jobId is [{}]",
flinkInfo.getJobId());
} catch (Exception e) {
- flinkOperation.pollJobStatus(flinkInfo);
flinkInfo.setException(true);
flinkInfo.setExceptionMsg(getExceptionStackMsg(e));
- flinkOperation.pollJobStatus(flinkInfo);
+ flinkOperation.pollJobStatus(flinkInfo, JobStatus.RUNNING);
String message = String.format("startup sort failed for groupId
[%s] ", groupId);
log.error(message, e);
@@ -143,7 +143,7 @@ public class StartupSortListener implements
SortOperateListener {
}
saveInfo(groupId, InlongConstants.SORT_JOB_ID, flinkInfo.getJobId(),
extList);
- flinkOperation.pollJobStatus(flinkInfo);
+ flinkOperation.pollJobStatus(flinkInfo, JobStatus.RUNNING);
return ListenerResult.success();
}
diff --git
a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/SuspendSortListener.java
b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/SuspendSortListener.java
index 76cdc60586..affe9bbd22 100644
---
a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/SuspendSortListener.java
+++
b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/SuspendSortListener.java
@@ -35,6 +35,7 @@ import
org.apache.inlong.manager.workflow.event.task.SortOperateListener;
import com.fasterxml.jackson.core.type.TypeReference;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
+import org.apache.flink.api.common.JobStatus;
import java.util.HashMap;
import java.util.List;
@@ -111,18 +112,21 @@ public class SuspendSortListener implements
SortOperateListener {
FlinkService flinkService = new FlinkService(flinkInfo.getEndpoint());
FlinkOperation flinkOperation = new FlinkOperation(flinkService);
try {
- flinkOperation.stop(flinkInfo);
+ // todo Currently, savepoint is not being used to stop, but will
be improved in the future
+ flinkOperation.delete(flinkInfo);
log.info("job suspend success for [{}]", jobId);
- return ListenerResult.success();
} catch (Exception e) {
flinkInfo.setException(true);
flinkInfo.setExceptionMsg(getExceptionStackMsg(e));
- flinkOperation.pollJobStatus(flinkInfo);
+ flinkOperation.pollJobStatus(flinkInfo, JobStatus.CANCELED);
String message = String.format("suspend sort failed for groupId
[%s] ", groupId);
log.error(message, e);
return ListenerResult.fail(message + e.getMessage());
}
+ flinkOperation.pollJobStatus(flinkInfo, JobStatus.CANCELED);
+ return ListenerResult.success();
+
}
}
diff --git
a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/SuspendStreamListener.java
b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/SuspendStreamListener.java
index 7a0371a323..7eda3df1be 100644
---
a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/SuspendStreamListener.java
+++
b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/SuspendStreamListener.java
@@ -17,32 +17,12 @@
package org.apache.inlong.manager.plugin.listener;
-import org.apache.inlong.manager.common.consts.InlongConstants;
-import org.apache.inlong.manager.common.enums.GroupOperateType;
import org.apache.inlong.manager.common.enums.TaskEvent;
-import org.apache.inlong.manager.common.util.JsonUtils;
-import org.apache.inlong.manager.plugin.flink.FlinkOperation;
-import org.apache.inlong.manager.plugin.flink.FlinkService;
-import org.apache.inlong.manager.plugin.flink.dto.FlinkInfo;
-import org.apache.inlong.manager.pojo.group.InlongGroupExtInfo;
-import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
-import org.apache.inlong.manager.pojo.stream.InlongStreamExtInfo;
-import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
-import org.apache.inlong.manager.pojo.workflow.form.process.ProcessForm;
-import
org.apache.inlong.manager.pojo.workflow.form.process.StreamResourceProcessForm;
import org.apache.inlong.manager.workflow.WorkflowContext;
import org.apache.inlong.manager.workflow.event.ListenerResult;
import org.apache.inlong.manager.workflow.event.task.SortOperateListener;
-import com.fasterxml.jackson.core.type.TypeReference;
import lombok.extern.slf4j.Slf4j;
-import org.apache.commons.lang3.StringUtils;
-
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import static
org.apache.inlong.manager.plugin.util.FlinkUtils.getExceptionStackMsg;
/**
* Listener of suspend stream sort.
@@ -57,78 +37,13 @@ public class SuspendStreamListener implements
SortOperateListener {
@Override
public boolean accept(WorkflowContext workflowContext) {
- ProcessForm processForm = workflowContext.getProcessForm();
- String groupId = processForm.getInlongGroupId();
- if (!(processForm instanceof StreamResourceProcessForm)) {
- log.info("not add suspend stream listener, not
StreamResourceProcessForm for groupId [{}]", groupId);
- return false;
- }
-
- StreamResourceProcessForm streamProcessForm =
(StreamResourceProcessForm) processForm;
- String streamId =
streamProcessForm.getStreamInfo().getInlongStreamId();
- if (streamProcessForm.getGroupOperateType() !=
GroupOperateType.SUSPEND) {
- log.info("not add suspend stream listener as the operate SUSPEND
for groupId [{}] streamId [{}]",
- groupId, streamId);
- return false;
- }
-
- log.info("add suspend stream listener for groupId [{}] streamId [{}]",
groupId, streamId);
- return true;
+ log.info("not need to suspend the sort task for InlongStream");
+ return false;
}
@Override
public ListenerResult listen(WorkflowContext context) throws Exception {
- ProcessForm processForm = context.getProcessForm();
- StreamResourceProcessForm streamResourceProcessForm =
(StreamResourceProcessForm) processForm;
- InlongGroupInfo groupInfo = streamResourceProcessForm.getGroupInfo();
- List<InlongGroupExtInfo> groupExtList = groupInfo.getExtList();
- log.info("inlong group :{} ext info: {}",
groupInfo.getInlongGroupId(), groupExtList);
- InlongStreamInfo streamInfo =
streamResourceProcessForm.getStreamInfo();
- List<InlongStreamExtInfo> streamExtList = streamInfo.getExtList();
- log.info("inlong stream :{} ext info: {}",
streamInfo.getInlongStreamId(), streamExtList);
-
- Map<String, String> kvConf = new HashMap<>();
- groupExtList.forEach(groupExtInfo ->
kvConf.put(groupExtInfo.getKeyName(), groupExtInfo.getKeyValue()));
- streamExtList.forEach(extInfo -> {
- kvConf.put(extInfo.getKeyName(), extInfo.getKeyValue());
- });
-
- final String groupId = streamInfo.getInlongGroupId();
- final String streamId = streamInfo.getInlongStreamId();
- String sortExt = kvConf.get(InlongConstants.SORT_PROPERTIES);
- if (StringUtils.isNotEmpty(sortExt)) {
- Map<String, String> result = JsonUtils.OBJECT_MAPPER.convertValue(
- JsonUtils.OBJECT_MAPPER.readTree(sortExt), new
TypeReference<Map<String, String>>() {
- });
- kvConf.putAll(result);
- }
-
- String jobId = kvConf.get(InlongConstants.SORT_JOB_ID);
- if (StringUtils.isBlank(jobId)) {
- String message = String.format("sort job id is empty for groupId
[%s] streamId [%s]", groupId, streamId);
- return ListenerResult.fail(message);
- }
-
- FlinkInfo flinkInfo = new FlinkInfo();
- flinkInfo.setJobId(jobId);
- String sortUrl = kvConf.get(InlongConstants.SORT_URL);
- flinkInfo.setEndpoint(sortUrl);
-
- FlinkService flinkService = new FlinkService(flinkInfo.getEndpoint());
- FlinkOperation flinkOperation = new FlinkOperation(flinkService);
- try {
- flinkOperation.stop(flinkInfo);
- log.info("job suspend success for [{}]", jobId);
- return ListenerResult.success();
- } catch (Exception e) {
- flinkInfo.setException(true);
- flinkInfo.setExceptionMsg(getExceptionStackMsg(e));
- flinkOperation.pollJobStatus(flinkInfo);
-
- String message = String.format("suspend sort failed for groupId
[%s] streamId[%s]", groupId, streamId);
- log.error(message, e);
- return ListenerResult.fail(message + e.getMessage());
- }
+ return ListenerResult.success();
}
}