This is an automated email from the ASF dual-hosted git repository.
journey pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new 2f01025 [Fix-3299][dao&server] Fix 3299,when Json string parsing
problem caused by non-standard json format. (#3552)
2f01025 is described below
commit 2f0102580268ff045e6042c3a691f1dee4c49962
Author: felix.wang <[email protected]>
AuthorDate: Wed Aug 19 16:27:39 2020 +0800
[Fix-3299][dao&server] Fix 3299,when Json string parsing problem caused by
non-standard json format. (#3552)
* #3299 Json string parsing problem caused by non-standard json format.
* #3299 Json string parsing problem caused by non-standard json format.
* #3299 Json string parsing problem caused by non-standard json format.
fix code style
* #3299 Json string parsing problem caused by non-standard json format.
fix code style
Co-authored-by: wangjianda <[email protected]>
---
.../org/apache/dolphinscheduler/dao/AlertDao.java | 81 ++++++++++-----
.../server/utils/AlertManager.java | 109 +++++++++------------
2 files changed, 104 insertions(+), 86 deletions(-)
diff --git
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/AlertDao.java
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/AlertDao.java
index 49b8c01..685d72c 100644
---
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/AlertDao.java
+++
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/AlertDao.java
@@ -14,15 +14,14 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.dolphinscheduler.dao;
+package org.apache.dolphinscheduler.dao;
-import org.apache.dolphinscheduler.common.utils.StringUtils;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Component;
import org.apache.dolphinscheduler.common.enums.AlertStatus;
import org.apache.dolphinscheduler.common.enums.AlertType;
import org.apache.dolphinscheduler.common.enums.ShowType;
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
+import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.dao.datasource.ConnectionFactory;
import org.apache.dolphinscheduler.dao.entity.Alert;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
@@ -30,13 +29,17 @@ import
org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.mapper.AlertMapper;
import org.apache.dolphinscheduler.dao.mapper.UserAlertGroupMapper;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
+import java.util.ArrayList;
import java.util.Date;
+import java.util.LinkedHashMap;
import java.util.List;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
@Component
public class AlertDao extends AbstractBaseDao {
@@ -56,21 +59,23 @@ public class AlertDao extends AbstractBaseDao {
/**
* insert alert
+ *
* @param alert alert
* @return add alert result
*/
- public int addAlert(Alert alert){
+ public int addAlert(Alert alert) {
return alertMapper.insert(alert);
}
/**
* update alert
+ *
* @param alertStatus alertStatus
* @param log log
* @param id id
* @return update alert result
*/
- public int updateAlert(AlertStatus alertStatus,String log,int id){
+ public int updateAlert(AlertStatus alertStatus, String log, int id) {
Alert alert = alertMapper.selectById(id);
alert.setAlertStatus(alertStatus);
alert.setUpdateTime(new Date());
@@ -80,46 +85,61 @@ public class AlertDao extends AbstractBaseDao {
/**
* query user list by alert group id
+ *
* @param alerGroupId alerGroupId
* @return user list
*/
- public List<User> queryUserByAlertGroupId(int alerGroupId){
+ public List<User> queryUserByAlertGroupId(int alerGroupId) {
return userAlertGroupMapper.listUserByAlertgroupId(alerGroupId);
}
/**
* MasterServer or WorkerServer stoped
+ *
* @param alertgroupId alertgroupId
* @param host host
* @param serverType serverType
*/
- public void sendServerStopedAlert(int alertgroupId,String host,String
serverType){
+ public void sendServerStopedAlert(int alertgroupId, String host, String
serverType) {
Alert alert = new Alert();
- String content =
String.format("[{'type':'%s','host':'%s','event':'server down','warning
level':'serious'}]",
- serverType, host);
+ List<LinkedHashMap> serverStopList = new ArrayList<>(1);
+ LinkedHashMap<String, String> serverStopedMap = new LinkedHashMap();
+ serverStopedMap.put("type", serverType);
+ serverStopedMap.put("host", host);
+ serverStopedMap.put("event", "server down");
+ serverStopedMap.put("warning level", "serious");
+ serverStopList.add(serverStopedMap);
+ String content = JSONUtils.toJsonString(serverStopList);
alert.setTitle("Fault tolerance warning");
saveTaskTimeoutAlert(alert, content, alertgroupId, null, null);
}
/**
* process time out alert
+ *
* @param processInstance processInstance
* @param processDefinition processDefinition
*/
- public void sendProcessTimeoutAlert(ProcessInstance processInstance,
ProcessDefinition processDefinition){
+ public void sendProcessTimeoutAlert(ProcessInstance processInstance,
ProcessDefinition processDefinition) {
int alertgroupId = processInstance.getWarningGroupId();
String receivers = processDefinition.getReceivers();
String receiversCc = processDefinition.getReceiversCc();
Alert alert = new Alert();
- String content =
String.format("[{'id':'%d','name':'%s','event':'timeout','warnLevel':'middle'}]",
- processInstance.getId(), processInstance.getName());
+ List<LinkedHashMap> processTimeoutList = new ArrayList<>(1);
+ LinkedHashMap<String, String> processTimeoutMap = new LinkedHashMap();
+ processTimeoutMap.put("id", String.valueOf(processInstance.getId()));
+ processTimeoutMap.put("name", processInstance.getName());
+ processTimeoutMap.put("event", "timeout");
+ processTimeoutMap.put("warnLevel", "middle");
+ processTimeoutList.add(processTimeoutMap);
+ String content = JSONUtils.toJsonString(processTimeoutList);
alert.setTitle("Process Timeout Warn");
saveTaskTimeoutAlert(alert, content, alertgroupId, receivers,
receiversCc);
}
- private void saveTaskTimeoutAlert(Alert alert, String content, int
alertgroupId,
- String receivers, String receiversCc){
+ private void saveTaskTimeoutAlert(Alert alert, String content, int
alertgroupId,
+ String receivers, String receiversCc) {
alert.setShowType(ShowType.TABLE);
alert.setContent(content);
alert.setAlertType(AlertType.EMAIL);
@@ -135,9 +155,9 @@ public class AlertDao extends AbstractBaseDao {
alertMapper.insert(alert);
}
-
/**
* task timeout warn
+ *
* @param alertgroupId alertgroupId
* @param receivers receivers
* @param receiversCc receiversCc
@@ -146,34 +166,45 @@ public class AlertDao extends AbstractBaseDao {
* @param taskId taskId
* @param taskName taskName
*/
- public void sendTaskTimeoutAlert(int alertgroupId,String receivers,String
receiversCc, int processInstanceId,
- String processInstanceName, int
taskId,String taskName){
+ public void sendTaskTimeoutAlert(int alertgroupId, String receivers,
String receiversCc, int processInstanceId,
+ String processInstanceName, int taskId,
String taskName) {
Alert alert = new Alert();
- String content = String.format("[{'process instance id':'%d','task
name':'%s','task id':'%d','task name':'%s'," +
- "'event':'timeout','warnLevel':'middle'}]",
processInstanceId, processInstanceName, taskId, taskName);
+ List<LinkedHashMap> taskTimeoutList = new ArrayList<>(1);
+ LinkedHashMap<String, String> taskTimeoutMap = new LinkedHashMap();
+ taskTimeoutMap.put("process instance id",
String.valueOf(processInstanceId));
+ taskTimeoutMap.put("process name", processInstanceName);
+ taskTimeoutMap.put("task id", String.valueOf(taskId));
+ taskTimeoutMap.put("task name", taskName);
+ taskTimeoutMap.put("event", "timeout");
+ taskTimeoutMap.put("warnLevel", "middle");
+ taskTimeoutList.add(taskTimeoutMap);
+ String content = JSONUtils.toJsonString(taskTimeoutList);
alert.setTitle("Task Timeout Warn");
saveTaskTimeoutAlert(alert, content, alertgroupId, receivers,
receiversCc);
}
/**
* list the alert information of waiting to be executed
+ *
* @return alert list
*/
- public List<Alert> listWaitExecutionAlert(){
+ public List<Alert> listWaitExecutionAlert() {
return alertMapper.listAlertByStatus(AlertStatus.WAIT_EXECUTION);
}
/**
* list user information by alert group id
+ *
* @param alertgroupId alertgroupId
* @return user list
*/
- public List<User> listUserByAlertgroupId(int alertgroupId){
+ public List<User> listUserByAlertgroupId(int alertgroupId) {
return userAlertGroupMapper.listUserByAlertgroupId(alertgroupId);
}
/**
* for test
+ *
* @return AlertMapper
*/
public AlertMapper getAlertMapper() {
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/AlertManager.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/AlertManager.java
index 49ec9d3..08c6022 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/AlertManager.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/AlertManager.java
@@ -14,29 +14,30 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.dolphinscheduler.server.utils;
+package org.apache.dolphinscheduler.server.utils;
import org.apache.dolphinscheduler.common.enums.AlertType;
import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.common.enums.ShowType;
import org.apache.dolphinscheduler.common.enums.WarningType;
import org.apache.dolphinscheduler.common.utils.DateUtils;
-import org.apache.dolphinscheduler.common.utils.*;
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.dao.AlertDao;
import org.apache.dolphinscheduler.dao.DaoFactory;
import org.apache.dolphinscheduler.dao.entity.Alert;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Date;
import java.util.LinkedHashMap;
import java.util.List;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
/**
* alert manager
*/
@@ -50,8 +51,7 @@ public class AlertManager {
/**
* alert dao
*/
- private AlertDao alertDao = DaoFactory.getDaoInstance(AlertDao.class);
-
+ private final AlertDao alertDao =
DaoFactory.getDaoInstance(AlertDao.class);
/**
* command type convert chinese
@@ -87,49 +87,36 @@ public class AlertManager {
}
/**
- * process instance format
- */
- private static final String PROCESS_INSTANCE_FORMAT =
- "\"id:%d\"," +
- "\"name:%s\"," +
- "\"job type: %s\"," +
- "\"state: %s\"," +
- "\"recovery:%s\"," +
- "\"run time: %d\"," +
- "\"start time: %s\"," +
- "\"end time: %s\"," +
- "\"host: %s\"" ;
-
- /**
* get process instance content
- * @param processInstance process instance
- * @param taskInstances task instance list
+ *
+ * @param processInstance process instance
+ * @param taskInstances task instance list
* @return process instance format content
*/
public String getContentProcessInstance(ProcessInstance processInstance,
- List<TaskInstance> taskInstances){
+ List<TaskInstance> taskInstances) {
String res = "";
- if(processInstance.getState().typeIsSuccess()){
- res = String.format(PROCESS_INSTANCE_FORMAT,
- processInstance.getId(),
- processInstance.getName(),
- getCommandCnName(processInstance.getCommandType()),
- processInstance.getState().toString(),
- processInstance.getRecovery().toString(),
- processInstance.getRunTimes(),
- DateUtils.dateToString(processInstance.getStartTime()),
- DateUtils.dateToString(processInstance.getEndTime()),
- processInstance.getHost()
-
- );
- res = "[" + res + "]";
- }else if(processInstance.getState().typeIsFailure()){
+ if (processInstance.getState().typeIsSuccess()) {
+ List<LinkedHashMap> successTaskList = new ArrayList<>(1);
+ LinkedHashMap<String, String> successTaskMap = new LinkedHashMap();
+ successTaskMap.put("id", String.valueOf(processInstance.getId()));
+ successTaskMap.put("name", processInstance.getName());
+ successTaskMap.put("job type",
getCommandCnName(processInstance.getCommandType()));
+ successTaskMap.put("state", processInstance.getState().toString());
+ successTaskMap.put("recovery",
processInstance.getRecovery().toString());
+ successTaskMap.put("run time",
String.valueOf(processInstance.getRunTimes()));
+ successTaskMap.put("start time",
DateUtils.dateToString(processInstance.getStartTime()));
+ successTaskMap.put("end time",
DateUtils.dateToString(processInstance.getEndTime()));
+ successTaskMap.put("host", processInstance.getHost());
+ successTaskList.add(successTaskMap);
+ res = JSONUtils.toJsonString(successTaskList);
+ } else if (processInstance.getState().typeIsFailure()) {
List<LinkedHashMap> failedTaskList = new ArrayList<>();
- for(TaskInstance task : taskInstances){
- if(task.getState().typeIsSuccess()){
+ for (TaskInstance task : taskInstances) {
+ if (task.getState().typeIsSuccess()) {
continue;
}
LinkedHashMap<String, String> failedTaskMap = new
LinkedHashMap();
@@ -154,15 +141,15 @@ public class AlertManager {
/**
* getting worker fault tolerant content
*
- * @param processInstance process instance
+ * @param processInstance process instance
* @param toleranceTaskList tolerance task list
* @return worker tolerance content
*/
- private String getWorkerToleranceContent(ProcessInstance processInstance,
List<TaskInstance> toleranceTaskList){
+ private String getWorkerToleranceContent(ProcessInstance processInstance,
List<TaskInstance> toleranceTaskList) {
- List<LinkedHashMap<String, String>> toleranceTaskInstanceList = new
ArrayList<>();
+ List<LinkedHashMap<String, String>> toleranceTaskInstanceList = new
ArrayList<>();
- for(TaskInstance taskInstance: toleranceTaskList){
+ for (TaskInstance taskInstance : toleranceTaskList) {
LinkedHashMap<String, String> toleranceWorkerContentMap = new
LinkedHashMap();
toleranceWorkerContentMap.put("process name",
processInstance.getName());
toleranceWorkerContentMap.put("task name", taskInstance.getName());
@@ -176,11 +163,11 @@ public class AlertManager {
/**
* send worker alert fault tolerance
*
- * @param processInstance process instance
+ * @param processInstance process instance
* @param toleranceTaskList tolerance task list
*/
- public void sendAlertWorkerToleranceFault(ProcessInstance processInstance,
List<TaskInstance> toleranceTaskList){
- try{
+ public void sendAlertWorkerToleranceFault(ProcessInstance processInstance,
List<TaskInstance> toleranceTaskList) {
+ try {
Alert alert = new Alert();
alert.setTitle("worker fault tolerance");
alert.setShowType(ShowType.TABLE);
@@ -188,13 +175,13 @@ public class AlertManager {
alert.setContent(content);
alert.setAlertType(AlertType.EMAIL);
alert.setCreateTime(new Date());
- alert.setAlertGroupId(processInstance.getWarningGroupId() == null
? 1:processInstance.getWarningGroupId());
+ alert.setAlertGroupId(processInstance.getWarningGroupId() == null
? 1 : processInstance.getWarningGroupId());
alert.setReceivers(processInstance.getProcessDefinition().getReceivers());
alert.setReceiversCc(processInstance.getProcessDefinition().getReceiversCc());
alertDao.addAlert(alert);
logger.info("add alert to db , alert : {}", alert.toString());
- }catch (Exception e){
+ } catch (Exception e) {
logger.error("send alert failed:{} ", e.getMessage());
}
@@ -202,40 +189,40 @@ public class AlertManager {
/**
* send process instance alert
- * @param processInstance process instance
- * @param taskInstances task instance list
+ *
+ * @param processInstance process instance
+ * @param taskInstances task instance list
*/
public void sendAlertProcessInstance(ProcessInstance processInstance,
- List<TaskInstance> taskInstances){
+ List<TaskInstance> taskInstances) {
boolean sendWarnning = false;
WarningType warningType = processInstance.getWarningType();
- switch (warningType){
+ switch (warningType) {
case ALL:
- if(processInstance.getState().typeIsFinished()){
+ if (processInstance.getState().typeIsFinished()) {
sendWarnning = true;
}
break;
case SUCCESS:
- if(processInstance.getState().typeIsSuccess()){
+ if (processInstance.getState().typeIsSuccess()) {
sendWarnning = true;
}
break;
case FAILURE:
- if(processInstance.getState().typeIsFailure()){
+ if (processInstance.getState().typeIsFailure()) {
sendWarnning = true;
}
break;
- default:
+ default:
}
- if(!sendWarnning){
+ if (!sendWarnning) {
return;
}
Alert alert = new Alert();
-
String cmdName = getCommandCnName(processInstance.getCommandType());
- String success = processInstance.getState().typeIsSuccess() ?
"success" :"failed";
+ String success = processInstance.getState().typeIsSuccess() ?
"success" : "failed";
alert.setTitle(cmdName + " " + success);
ShowType showType = processInstance.getState().typeIsSuccess() ?
ShowType.TEXT : ShowType.TABLE;
alert.setShowType(showType);
@@ -254,7 +241,7 @@ public class AlertManager {
/**
* send process timeout alert
*
- * @param processInstance process instance
+ * @param processInstance process instance
* @param processDefinition process definition
*/
public void sendProcessTimeoutAlert(ProcessInstance processInstance,
ProcessDefinition processDefinition) {