This is an automated email from the ASF dual-hosted git repository.
leonbao pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new bc22ae7 [Improvement][Task] Remove deprecated TaskRecordDao and
simply the after() method in the AbstractTask class (#5492)
bc22ae7 is described below
commit bc22ae7c91c9cbd7c971796ba3a45358c2f11864
Author: Yao WANG <[email protected]>
AuthorDate: Tue May 18 17:00:03 2021 +0800
[Improvement][Task] Remove deprecated TaskRecordDao and simply the after()
method in the AbstractTask class (#5492)
* remove the depreated data quality module
* fix lint
Co-authored-by: Yao <[email protected]>
---
.../api/controller/TaskRecordController.java | 123 ---------
.../api/service/TaskRecordService.java | 46 ----
.../api/service/impl/TaskRecordServiceImpl.java | 86 ------
.../api/controller/TaskRecordControllerTest.java | 95 -------
.../apache/dolphinscheduler/common/Constants.java | 13 -
.../apache/dolphinscheduler/dao/TaskRecordDao.java | 291 ---------------------
.../server/worker/task/AbstractTask.java | 51 +---
7 files changed, 7 insertions(+), 698 deletions(-)
diff --git
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/TaskRecordController.java
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/TaskRecordController.java
deleted file mode 100644
index 38867fe..0000000
---
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/TaskRecordController.java
+++ /dev/null
@@ -1,123 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.dolphinscheduler.api.controller;
-
-import static
org.apache.dolphinscheduler.api.enums.Status.QUERY_TASK_RECORD_LIST_PAGING_ERROR;
-
-import org.apache.dolphinscheduler.api.aspect.AccessLogAnnotation;
-import org.apache.dolphinscheduler.api.exceptions.ApiException;
-import org.apache.dolphinscheduler.api.service.TaskRecordService;
-import org.apache.dolphinscheduler.api.utils.Result;
-import org.apache.dolphinscheduler.common.Constants;
-import org.apache.dolphinscheduler.dao.entity.User;
-
-import java.util.Map;
-
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.http.HttpStatus;
-import org.springframework.web.bind.annotation.GetMapping;
-import org.springframework.web.bind.annotation.RequestAttribute;
-import org.springframework.web.bind.annotation.RequestMapping;
-import org.springframework.web.bind.annotation.RequestParam;
-import org.springframework.web.bind.annotation.ResponseStatus;
-import org.springframework.web.bind.annotation.RestController;
-
-import springfox.documentation.annotations.ApiIgnore;
-
-/**
- * task record controller
- */
-@ApiIgnore
-@RestController
-@RequestMapping("/projects/task-record")
-public class TaskRecordController extends BaseController {
-
- @Autowired
- TaskRecordService taskRecordService;
-
- /**
- * query task record list page
- *
- * @param loginUser login user
- * @param taskName task name
- * @param state state
- * @param sourceTable source table
- * @param destTable destination table
- * @param taskDate task date
- * @param startTime start time
- * @param endTime end time
- * @param pageNo page number
- * @param pageSize page size
- * @return task record list
- */
- @GetMapping("/list-paging")
- @ResponseStatus(HttpStatus.OK)
- @ApiException(QUERY_TASK_RECORD_LIST_PAGING_ERROR)
- @AccessLogAnnotation(ignoreRequestArgs = "loginUser")
- public Result queryTaskRecordListPaging(@ApiIgnore @RequestAttribute(value
= Constants.SESSION_USER) User loginUser,
- @RequestParam(value = "taskName",
required = false) String taskName,
- @RequestParam(value = "state",
required = false) String state,
- @RequestParam(value =
"sourceTable", required = false) String sourceTable,
- @RequestParam(value = "destTable",
required = false) String destTable,
- @RequestParam(value = "taskDate",
required = false) String taskDate,
- @RequestParam(value = "startDate",
required = false) String startTime,
- @RequestParam(value = "endDate",
required = false) String endTime,
- @RequestParam("pageNo") Integer
pageNo,
- @RequestParam("pageSize") Integer
pageSize
- ) {
-
- Map<String, Object> result =
taskRecordService.queryTaskRecordListPaging(false, taskName, startTime,
taskDate, sourceTable, destTable, endTime, state, pageNo, pageSize);
- return returnDataListPaging(result);
- }
-
- /**
- * query history task record list paging
- *
- * @param loginUser login user
- * @param taskName task name
- * @param state state
- * @param sourceTable source table
- * @param destTable destination table
- * @param taskDate task date
- * @param startTime start time
- * @param endTime end time
- * @param pageNo page number
- * @param pageSize page size
- * @return history task record list
- */
- @GetMapping("/history-list-paging")
- @ResponseStatus(HttpStatus.OK)
- @ApiException(QUERY_TASK_RECORD_LIST_PAGING_ERROR)
- @AccessLogAnnotation(ignoreRequestArgs = "loginUser")
- public Result queryHistoryTaskRecordListPaging(@ApiIgnore
@RequestAttribute(value = Constants.SESSION_USER) User loginUser,
- @RequestParam(value =
"taskName", required = false) String taskName,
- @RequestParam(value =
"state", required = false) String state,
- @RequestParam(value =
"sourceTable", required = false) String sourceTable,
- @RequestParam(value =
"destTable", required = false) String destTable,
- @RequestParam(value =
"taskDate", required = false) String taskDate,
- @RequestParam(value =
"startDate", required = false) String startTime,
- @RequestParam(value =
"endDate", required = false) String endTime,
- @RequestParam("pageNo")
Integer pageNo,
- @RequestParam("pageSize")
Integer pageSize
- ) {
-
- Map<String, Object> result =
taskRecordService.queryTaskRecordListPaging(true, taskName, startTime,
taskDate, sourceTable, destTable, endTime, state, pageNo, pageSize);
- return returnDataListPaging(result);
- }
-
-}
diff --git
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/TaskRecordService.java
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/TaskRecordService.java
deleted file mode 100644
index 8c8ad0a..0000000
---
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/TaskRecordService.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.dolphinscheduler.api.service;
-
-import java.util.Map;
-
-/**
- * task record service
- */
-public interface TaskRecordService {
-
- /**
- * query task record list paging
- *
- * @param taskName task name
- * @param state state
- * @param sourceTable source table
- * @param destTable destination table
- * @param taskDate task date
- * @param startDate start time
- * @param endDate end time
- * @param pageNo page numbere
- * @param pageSize page size
- * @param isHistory is history
- * @return task record list
- */
- Map<String,Object> queryTaskRecordListPaging(boolean isHistory, String
taskName, String startDate,
- String taskDate, String
sourceTable,
- String destTable, String
endDate,
- String state, Integer pageNo,
Integer pageSize);
-}
diff --git
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskRecordServiceImpl.java
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskRecordServiceImpl.java
deleted file mode 100644
index c755da0..0000000
---
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskRecordServiceImpl.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.dolphinscheduler.api.service.impl;
-
-import static
org.apache.dolphinscheduler.common.Constants.TASK_RECORD_TABLE_HISTORY_HIVE_LOG;
-import static
org.apache.dolphinscheduler.common.Constants.TASK_RECORD_TABLE_HIVE_LOG;
-
-import org.apache.dolphinscheduler.api.enums.Status;
-import org.apache.dolphinscheduler.api.service.TaskRecordService;
-import org.apache.dolphinscheduler.api.utils.PageInfo;
-import org.apache.dolphinscheduler.common.Constants;
-import org.apache.dolphinscheduler.dao.TaskRecordDao;
-import org.apache.dolphinscheduler.dao.entity.TaskRecord;
-
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.springframework.stereotype.Service;
-
-/**
- * task record service impl
- */
-@Service
-public class TaskRecordServiceImpl extends BaseServiceImpl implements
TaskRecordService {
-
- /**
- * query task record list paging
- *
- * @param taskName task name
- * @param state state
- * @param sourceTable source table
- * @param destTable destination table
- * @param taskDate task date
- * @param startDate start time
- * @param endDate end time
- * @param pageNo page numbere
- * @param pageSize page size
- * @param isHistory is history
- * @return task record list
- */
- @Override
- public Map<String,Object> queryTaskRecordListPaging(boolean isHistory,
String taskName, String startDate,
- String taskDate,
String sourceTable,
- String destTable,
String endDate,
- String state, Integer
pageNo, Integer pageSize) {
- Map<String, Object> result = new HashMap<>();
- PageInfo<TaskRecord> pageInfo = new PageInfo<>(pageNo, pageSize);
-
- Map<String, String> map = new HashMap<>();
- map.put("taskName", taskName);
- map.put("taskDate", taskDate);
- map.put("state", state);
- map.put("sourceTable", sourceTable);
- map.put("targetTable", destTable);
- map.put("startTime", startDate);
- map.put("endTime", endDate);
- map.put("offset", pageInfo.getStart().toString());
- map.put("pageSize", pageInfo.getPageSize().toString());
-
- String table = isHistory ? TASK_RECORD_TABLE_HISTORY_HIVE_LOG :
TASK_RECORD_TABLE_HIVE_LOG;
- int count = TaskRecordDao.countTaskRecord(map, table);
- List<TaskRecord> recordList = TaskRecordDao.queryAllTaskRecord(map,
table);
- pageInfo.setTotalCount(count);
- pageInfo.setLists(recordList);
- result.put(Constants.DATA_LIST, pageInfo);
- putMsg(result, Status.SUCCESS);
-
- return result;
- }
-}
diff --git
a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/TaskRecordControllerTest.java
b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/TaskRecordControllerTest.java
deleted file mode 100644
index a78ac06..0000000
---
a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/TaskRecordControllerTest.java
+++ /dev/null
@@ -1,95 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.dolphinscheduler.api.controller;
-
-import static
org.springframework.test.web.servlet.request.MockMvcRequestBuilders.get;
-import static
org.springframework.test.web.servlet.result.MockMvcResultMatchers.content;
-import static
org.springframework.test.web.servlet.result.MockMvcResultMatchers.status;
-
-import org.apache.dolphinscheduler.api.enums.Status;
-import org.apache.dolphinscheduler.api.utils.Result;
-import org.apache.dolphinscheduler.common.utils.JSONUtils;
-
-import org.junit.Assert;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.http.MediaType;
-import org.springframework.test.web.servlet.MvcResult;
-import org.springframework.util.LinkedMultiValueMap;
-import org.springframework.util.MultiValueMap;
-
-/**
- * task record controller test
- */
-public class TaskRecordControllerTest extends AbstractControllerTest {
-
- private static final Logger logger =
LoggerFactory.getLogger(TaskRecordControllerTest.class);
-
- @Test
- public void testQueryTaskRecordListPaging() throws Exception {
- MultiValueMap<String, String> paramsMap = new
LinkedMultiValueMap<>();
- paramsMap.add("taskName","taskName");
- paramsMap.add("state","state");
- paramsMap.add("sourceTable","");
- paramsMap.add("destTable","");
- paramsMap.add("taskDate","");
- paramsMap.add("startDate","2019-12-16 00:00:00");
- paramsMap.add("endDate","2019-12-17 00:00:00");
- paramsMap.add("pageNo","1");
- paramsMap.add("pageSize","30");
-
- MvcResult mvcResult =
mockMvc.perform(get("/projects/task-record/list-paging")
- .header(SESSION_ID, sessionId)
- .params(paramsMap))
- .andExpect(status().isOk())
-
.andExpect(content().contentType(MediaType.APPLICATION_JSON_UTF8))
- .andReturn();
-
- Result result =
JSONUtils.parseObject(mvcResult.getResponse().getContentAsString(),
Result.class);
-
Assert.assertEquals(Status.SUCCESS.getCode(),result.getCode().intValue());
- logger.info(mvcResult.getResponse().getContentAsString());
- }
-
-
- @Test
- public void testQueryHistoryTaskRecordListPaging() throws Exception {
- MultiValueMap<String, String> paramsMap = new LinkedMultiValueMap<>();
- paramsMap.add("taskName","taskName");
- paramsMap.add("state","state");
- paramsMap.add("sourceTable","");
- paramsMap.add("destTable","");
- paramsMap.add("taskDate","");
- paramsMap.add("startDate","2019-12-16 00:00:00");
- paramsMap.add("endDate","2019-12-17 00:00:00");
- paramsMap.add("pageNo","1");
- paramsMap.add("pageSize","30");
-
- MvcResult mvcResult =
mockMvc.perform(get("/projects/task-record/history-list-paging")
- .header(SESSION_ID, sessionId)
- .params(paramsMap))
- .andExpect(status().isOk())
-
.andExpect(content().contentType(MediaType.APPLICATION_JSON_UTF8))
- .andReturn();
-
- Result result =
JSONUtils.parseObject(mvcResult.getResponse().getContentAsString(),
Result.class);
-
Assert.assertEquals(Status.SUCCESS.getCode(),result.getCode().intValue());
- logger.info(mvcResult.getResponse().getContentAsString());
-
- }
-}
diff --git
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
index 1544f2b..4ef30a2 100644
---
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
+++
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
@@ -432,14 +432,6 @@ public final class Constants {
*/
public static final String DATASOURCE_PROPERTIES =
"/datasource.properties";
- public static final String TASK_RECORD_URL = "task.record.datasource.url";
-
- public static final String TASK_RECORD_FLAG = "task.record.flag";
-
- public static final String TASK_RECORD_USER =
"task.record.datasource.username";
-
- public static final String TASK_RECORD_PWD =
"task.record.datasource.password";
-
public static final String DEFAULT = "Default";
public static final String USER = "user";
public static final String PASSWORD = "password";
@@ -448,11 +440,6 @@ public final class Constants {
public static final String THREAD_NAME_MASTER_SERVER = "Master-Server";
public static final String THREAD_NAME_WORKER_SERVER = "Worker-Server";
- public static final String TASK_RECORD_TABLE_HIVE_LOG = "eamp_hive_log_hd";
-
- public static final String TASK_RECORD_TABLE_HISTORY_HIVE_LOG =
"eamp_hive_hist_log_hd";
-
-
/**
* command parameter keys
*/
diff --git
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/TaskRecordDao.java
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/TaskRecordDao.java
deleted file mode 100644
index af8d9af..0000000
---
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/TaskRecordDao.java
+++ /dev/null
@@ -1,291 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.dolphinscheduler.dao;
-
-import static
org.apache.dolphinscheduler.common.Constants.DATASOURCE_PROPERTIES;
-
-import org.apache.dolphinscheduler.common.Constants;
-import org.apache.dolphinscheduler.common.enums.TaskRecordStatus;
-import org.apache.dolphinscheduler.common.utils.CollectionUtils;
-import org.apache.dolphinscheduler.common.utils.ConnectionUtils;
-import org.apache.dolphinscheduler.common.utils.DateUtils;
-import org.apache.dolphinscheduler.common.utils.PropertyUtils;
-import org.apache.dolphinscheduler.common.utils.StringUtils;
-import org.apache.dolphinscheduler.dao.entity.TaskRecord;
-
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.PreparedStatement;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * task record dao
- */
-public class TaskRecordDao {
-
-
- private static Logger logger =
LoggerFactory.getLogger(TaskRecordDao.class.getName());
-
- static {
- PropertyUtils.loadPropertyFile(DATASOURCE_PROPERTIES);
- }
-
- /**
- * get task record flag
- *
- * @return whether startup taskrecord
- */
- public static boolean getTaskRecordFlag() {
- return PropertyUtils.getBoolean(Constants.TASK_RECORD_FLAG, false);
- }
-
- /**
- * create connection
- *
- * @return connection
- */
- private static Connection getConn() {
- if (!getTaskRecordFlag()) {
- return null;
- }
- String driver = "com.mysql.jdbc.Driver";
- String url = PropertyUtils.getString(Constants.TASK_RECORD_URL);
- String username = PropertyUtils.getString(Constants.TASK_RECORD_USER);
- String password = PropertyUtils.getString(Constants.TASK_RECORD_PWD);
- Connection conn = null;
- try {
- //classLoader,load driver
- Class.forName(driver);
- conn = DriverManager.getConnection(url, username, password);
- } catch (ClassNotFoundException e) {
- logger.error("Class not found Exception ", e);
- } catch (SQLException e) {
- logger.error("SQL Exception ", e);
- }
- return conn;
- }
-
- /**
- * generate where sql string
- *
- * @param filterMap filterMap
- * @return sql string
- */
- private static String getWhereString(Map<String, String> filterMap) {
- if (filterMap.size() == 0) {
- return "";
- }
-
- String result = " where 1=1 ";
-
- Object taskName = filterMap.get("taskName");
- if (taskName != null && StringUtils.isNotEmpty(taskName.toString())) {
- result += " and PROC_NAME like concat('%', '" +
taskName.toString() + "', '%') ";
- }
-
- Object taskDate = filterMap.get("taskDate");
- if (taskDate != null && StringUtils.isNotEmpty(taskDate.toString())) {
- result += " and PROC_DATE='" + taskDate.toString() + "'";
- }
-
- Object state = filterMap.get("state");
- if (state != null && StringUtils.isNotEmpty(state.toString())) {
- result += " and NOTE='" + state.toString() + "'";
- }
-
- Object sourceTable = filterMap.get("sourceTable");
- if (sourceTable != null &&
StringUtils.isNotEmpty(sourceTable.toString())) {
- result += " and SOURCE_TAB like concat('%', '" +
sourceTable.toString() + "', '%')";
- }
-
- Object targetTable = filterMap.get("targetTable");
- if (sourceTable != null &&
StringUtils.isNotEmpty(targetTable.toString())) {
- result += " and TARGET_TAB like concat('%', '" +
targetTable.toString() + "', '%') ";
- }
-
- Object start = filterMap.get("startTime");
- if (start != null && StringUtils.isNotEmpty(start.toString())) {
- result += " and STARTDATE>='" + start.toString() + "'";
- }
-
- Object end = filterMap.get("endTime");
- if (end != null && StringUtils.isNotEmpty(end.toString())) {
- result += " and ENDDATE>='" + end.toString() + "'";
- }
- return result;
- }
-
- /**
- * count task record
- *
- * @param filterMap filterMap
- * @param table table
- * @return task record count
- */
- public static int countTaskRecord(Map<String, String> filterMap, String
table) {
-
- int count = 0;
- Connection conn = null;
- PreparedStatement pstmt = null;
- ResultSet rs = null;
- try {
- conn = getConn();
- if (conn == null) {
- return count;
- }
- String sql = String.format("select count(1) as count from %s",
table);
- sql += getWhereString(filterMap);
- pstmt = conn.prepareStatement(sql);
- rs = pstmt.executeQuery();
- while (rs.next()) {
- count = rs.getInt("count");
- break;
- }
- } catch (SQLException e) {
- logger.error("Exception ", e);
- } finally {
- ConnectionUtils.releaseResource(rs, pstmt, conn);
- }
- return count;
- }
-
- /**
- * query task record by filter map paging
- *
- * @param filterMap filterMap
- * @param table table
- * @return task record list
- */
- public static List<TaskRecord> queryAllTaskRecord(Map<String, String>
filterMap, String table) {
-
- String sql = String.format("select * from %s", table);
- sql += getWhereString(filterMap);
-
- int offset = Integer.parseInt(filterMap.get("offset"));
- int pageSize = Integer.parseInt(filterMap.get("pageSize"));
- sql += String.format(" order by STARTDATE desc limit %d,%d", offset,
pageSize);
-
- List<TaskRecord> recordList = new ArrayList<>();
- try {
- recordList = getQueryResult(sql);
- } catch (Exception e) {
- logger.error("Exception ", e);
- }
- return recordList;
- }
-
- /**
- * convert result set to task record
- *
- * @param resultSet resultSet
- * @return task record
- * @throws SQLException if error throws SQLException
- */
- private static TaskRecord convertToTaskRecord(ResultSet resultSet) throws
SQLException {
-
- TaskRecord taskRecord = new TaskRecord();
- taskRecord.setId(resultSet.getInt("ID"));
- taskRecord.setProcId(resultSet.getInt("PROC_ID"));
- taskRecord.setProcName(resultSet.getString("PROC_NAME"));
- taskRecord.setProcDate(resultSet.getString("PROC_DATE"));
-
taskRecord.setStartTime(DateUtils.stringToDate(resultSet.getString("STARTDATE")));
-
taskRecord.setEndTime(DateUtils.stringToDate(resultSet.getString("ENDDATE")));
- taskRecord.setResult(resultSet.getString("RESULT"));
- taskRecord.setDuration(resultSet.getInt("DURATION"));
- taskRecord.setNote(resultSet.getString("NOTE"));
- taskRecord.setSchema(resultSet.getString("SCHEMA"));
- taskRecord.setJobId(resultSet.getString("JOB_ID"));
- taskRecord.setSourceTab(resultSet.getString("SOURCE_TAB"));
- taskRecord.setSourceRowCount(resultSet.getLong("SOURCE_ROW_COUNT"));
- taskRecord.setTargetTab(resultSet.getString("TARGET_TAB"));
- taskRecord.setTargetRowCount(resultSet.getLong("TARGET_ROW_COUNT"));
- taskRecord.setErrorCode(resultSet.getString("ERROR_CODE"));
- return taskRecord;
- }
-
- /**
- * query task list by select sql
- *
- * @param selectSql select sql
- * @return task record list
- */
- private static List<TaskRecord> getQueryResult(String selectSql) {
- List<TaskRecord> recordList = new ArrayList<>();
- Connection conn = null;
- PreparedStatement pstmt = null;
- ResultSet rs = null;
- try {
- conn = getConn();
- if (conn == null) {
- return recordList;
- }
- pstmt = conn.prepareStatement(selectSql);
- rs = pstmt.executeQuery();
-
- while (rs.next()) {
- TaskRecord taskRecord = convertToTaskRecord(rs);
- recordList.add(taskRecord);
- }
- } catch (SQLException e) {
- logger.error("Exception ", e);
- } finally {
- ConnectionUtils.releaseResource(rs, pstmt, conn);
- }
- return recordList;
- }
-
- /**
- * according to procname and procdate query task record
- *
- * @param procName procName
- * @param procDate procDate
- * @return task record status
- */
- public static TaskRecordStatus getTaskRecordState(String procName, String
procDate) {
- String sql = String.format("SELECT * FROM eamp_hive_log_hd WHERE
PROC_NAME='%s' and PROC_DATE like '%s'"
- , procName, procDate + "%");
- List<TaskRecord> taskRecordList = getQueryResult(sql);
-
- // contains no record and sql exception
- if (CollectionUtils.isEmpty(taskRecordList)) {
- // exception
- return TaskRecordStatus.EXCEPTION;
- } else if (taskRecordList.size() > 1) {
- return TaskRecordStatus.EXCEPTION;
- } else {
- TaskRecord taskRecord = taskRecordList.get(0);
- if (taskRecord == null) {
- return TaskRecordStatus.EXCEPTION;
- }
- Long targetRowCount = taskRecord.getTargetRowCount();
- if (targetRowCount <= 0) {
- return TaskRecordStatus.FAILURE;
- } else {
- return TaskRecordStatus.SUCCESS;
- }
-
- }
- }
-}
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractTask.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractTask.java
index a5221dd..7454f49 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractTask.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractTask.java
@@ -20,20 +20,12 @@ package org.apache.dolphinscheduler.server.worker.task;
import static ch.qos.logback.classic.ClassicConstants.FINALIZE_SESSION_MARKER;
import org.apache.dolphinscheduler.common.Constants;
-import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
-import org.apache.dolphinscheduler.common.enums.TaskRecordStatus;
import org.apache.dolphinscheduler.common.enums.TaskType;
-import org.apache.dolphinscheduler.common.process.Property;
import org.apache.dolphinscheduler.common.task.AbstractParameters;
-import org.apache.dolphinscheduler.common.utils.StringUtils;
-import org.apache.dolphinscheduler.common.utils.TaskParametersUtils;
-import org.apache.dolphinscheduler.dao.TaskRecordDao;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
-import org.apache.dolphinscheduler.server.utils.ParamUtils;
import java.util.List;
-import java.util.Map;
import org.slf4j.Logger;
@@ -110,6 +102,13 @@ public abstract class AbstractTask {
*/
public abstract void handle() throws Exception;
+ /**
+ * result processing
+ *
+ * @throws Exception exception
+ */
+ public void after() throws Exception {
+ }
/**
* cancel application
@@ -187,42 +186,6 @@ public abstract class AbstractTask {
*/
public abstract AbstractParameters getParameters();
-
- /**
- * result processing
- */
- public void after() {
- if (getExitStatusCode() == Constants.EXIT_CODE_SUCCESS) {
- // task recor flat : if true , start up qianfan
- if (TaskRecordDao.getTaskRecordFlag() &&
typeIsNormalTask(taskExecutionContext.getTaskType())) {
- AbstractParameters params =
TaskParametersUtils.getParameters(taskExecutionContext.getTaskType(),
taskExecutionContext.getTaskParams());
-
- // replace placeholder
- Map<String, Property> paramsMap =
ParamUtils.convert(ParamUtils.getUserDefParamsMap(taskExecutionContext.getDefinedParams()),
- taskExecutionContext.getDefinedParams(),
- params.getLocalParametersMap(),
-
CommandType.of(taskExecutionContext.getCmdTypeIfComplement()),
- taskExecutionContext.getScheduleTime());
- if (paramsMap != null && !paramsMap.isEmpty()
- && paramsMap.containsKey("v_proc_date")) {
- String vProcDate = paramsMap.get("v_proc_date").getValue();
- if (!StringUtils.isEmpty(vProcDate)) {
- TaskRecordStatus taskRecordState =
TaskRecordDao.getTaskRecordState(taskExecutionContext.getTaskName(), vProcDate);
- logger.info("task record status : {}",
taskRecordState);
- if (taskRecordState == TaskRecordStatus.FAILURE) {
- setExitStatusCode(Constants.EXIT_CODE_FAILURE);
- }
- }
- }
- }
-
- } else if (getExitStatusCode() == Constants.EXIT_CODE_KILL) {
- setExitStatusCode(Constants.EXIT_CODE_KILL);
- } else {
- setExitStatusCode(Constants.EXIT_CODE_FAILURE);
- }
- }
-
private boolean typeIsNormalTask(String taskType) {
return !(TaskType.SUB_PROCESS.getDesc().equalsIgnoreCase(taskType) ||
TaskType.DEPENDENT.getDesc().equalsIgnoreCase(taskType));
}