This is an automated email from the ASF dual-hosted git repository. liudongkai pushed a commit to branch 3.0.0-beta-2-prepare in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
commit 476f2395dc2841f2f148cb40de0dbfe10e22536a Author: 陈家名 <[email protected]> AuthorDate: Tue Jun 14 13:53:18 2022 +0800 [fix][Python] Support same task name in project defferent process definition (#10428) close: #10431 (cherry picked from commit b86dc53ad10cb5c4d76f5e85c38e5a5659a349fd) --- .../dolphinscheduler/api/python/PythonGateway.java | 14 ++- .../api/service/TaskDefinitionService.java | 2 + .../service/impl/TaskDefinitionServiceImpl.java | 5 +- .../api/python/PythonGatewayTest.java | 123 +++++++++++++++++++++ .../api/service/TaskDefinitionServiceImplTest.java | 6 +- .../dao/mapper/TaskDefinitionMapper.java | 2 + .../dao/mapper/TaskDefinitionMapper.xml | 20 +++- .../dao/mapper/TaskDefinitionMapperTest.java | 25 ++++- .../src/pydolphinscheduler/core/task.py | 2 +- 9 files changed, 185 insertions(+), 14 deletions(-) diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/python/PythonGateway.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/python/PythonGateway.java index 817f411854..54962b523d 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/python/PythonGateway.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/python/PythonGateway.java @@ -166,7 +166,7 @@ public class PythonGateway { return taskDefinitionService.genTaskCodeList(genNum); } - public Map<String, Long> getCodeAndVersion(String projectName, String taskName) throws CodeGenerateUtils.CodeGenerateException { + public Map<String, Long> getCodeAndVersion(String projectName, String processDefinitionName, String taskName) throws CodeGenerateUtils.CodeGenerateException { Project project = projectMapper.queryByName(projectName); Map<String, Long> result = new HashMap<>(); // project do not exists, mean task not exists too, so we should directly return init value @@ -175,7 +175,15 @@ public class PythonGateway { result.put("version", 0L); return result; } - TaskDefinition taskDefinition = taskDefinitionMapper.queryByName(project.getCode(), taskName); + + ProcessDefinition processDefinition = processDefinitionMapper.queryByDefineName(project.getCode(), processDefinitionName); + if (processDefinition == null) { + String msg = String.format("Can not find valid process definition by name %s", processDefinitionName); + logger.error(msg); + throw new IllegalArgumentException(msg); + } + + TaskDefinition taskDefinition = taskDefinitionMapper.queryByName(project.getCode(), processDefinition.getCode(), taskName); if (taskDefinition == null) { result.put("code", CodeGenerateUtils.getInstance().genCode()); result.put("version", 0L); @@ -520,7 +528,7 @@ public class PythonGateway { result.put("processDefinitionCode", processDefinition.getCode()); if (taskName != null) { - TaskDefinition taskDefinition = taskDefinitionMapper.queryByName(projectCode, taskName); + TaskDefinition taskDefinition = taskDefinitionMapper.queryByName(projectCode, processDefinition.getCode(), taskName); result.put("taskDefinitionCode", taskDefinition.getCode()); } return result; diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/TaskDefinitionService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/TaskDefinitionService.java index 5d2ba85b02..a715b68e97 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/TaskDefinitionService.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/TaskDefinitionService.java @@ -60,10 +60,12 @@ public interface TaskDefinitionService { * * @param loginUser login user * @param projectCode project code + * @param processCode process code * @param taskName task name */ Map<String, Object> queryTaskDefinitionByName(User loginUser, long projectCode, + long processCode, String taskName); /** diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java index cd617958e0..15105acfe8 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java @@ -270,10 +270,11 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe * * @param loginUser login user * @param projectCode project code + * @param processCode process code * @param taskName task name */ @Override - public Map<String, Object> queryTaskDefinitionByName(User loginUser, long projectCode, String taskName) { + public Map<String, Object> queryTaskDefinitionByName(User loginUser, long projectCode, long processCode, String taskName) { Project project = projectMapper.queryByCode(projectCode); //check user access for project Map<String, Object> result = projectService.checkProjectAndAuth(loginUser, project, projectCode); @@ -281,7 +282,7 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe return result; } - TaskDefinition taskDefinition = taskDefinitionMapper.queryByName(project.getCode(), taskName); + TaskDefinition taskDefinition = taskDefinitionMapper.queryByName(project.getCode(), processCode, taskName); if (taskDefinition == null) { putMsg(result, Status.TASK_DEFINE_NOT_EXIST, taskName); } else { diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/python/PythonGatewayTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/python/PythonGatewayTest.java new file mode 100644 index 0000000000..7d8b6efabc --- /dev/null +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/python/PythonGatewayTest.java @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.api.python; + +import org.apache.dolphinscheduler.common.utils.CodeGenerateUtils; +import org.apache.dolphinscheduler.dao.entity.ProcessDefinition; +import org.apache.dolphinscheduler.dao.entity.Project; +import org.apache.dolphinscheduler.dao.entity.TaskDefinition; +import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper; +import org.apache.dolphinscheduler.dao.mapper.ProjectMapper; +import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper; +import org.junit.Assert; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.InjectMocks; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.junit.MockitoJUnitRunner; + +import java.util.Date; +import java.util.Map; + +/** + * python gate test + */ +@RunWith(MockitoJUnitRunner.class) +public class PythonGatewayTest { + + @InjectMocks + private PythonGateway pythonGateway; + + @Mock + private ProjectMapper projectMapper; + + @Mock + private ProcessDefinitionMapper processDefinitionMapper; + + @Mock + private TaskDefinitionMapper taskDefinitionMapper; + + @Test + public void testGetCodeAndVersion() throws CodeGenerateUtils.CodeGenerateException { + Project project = getTestProject(); + Mockito.when(projectMapper.queryByName(project.getName())).thenReturn(project); + + ProcessDefinition processDefinition = getTestProcessDefinition(); + Mockito.when(processDefinitionMapper.queryByDefineName(project.getCode(), processDefinition.getName())).thenReturn(processDefinition); + + TaskDefinition taskDefinition = getTestTaskDefinition(); + Mockito.when(taskDefinitionMapper.queryByName(project.getCode(), processDefinition.getCode(), taskDefinition.getName())).thenReturn(taskDefinition); + + Map<String, Long> result = pythonGateway.getCodeAndVersion(project.getName(), processDefinition.getName(), taskDefinition.getName()); + Assert.assertEquals(result.get("code").longValue(), taskDefinition.getCode()); + } + + @Test + public void testGetDependentInfo() { + Project project = getTestProject(); + Mockito.when(projectMapper.queryByName(project.getName())).thenReturn(project); + + ProcessDefinition processDefinition = getTestProcessDefinition(); + Mockito.when(processDefinitionMapper.queryByDefineName(project.getCode(), processDefinition.getName())).thenReturn(processDefinition); + + TaskDefinition taskDefinition = getTestTaskDefinition(); + Mockito.when(taskDefinitionMapper.queryByName(project.getCode(), processDefinition.getCode(), taskDefinition.getName())).thenReturn(taskDefinition); + + Map<String, Object> result = pythonGateway.getDependentInfo(project.getName(), processDefinition.getName(), taskDefinition.getName()); + Assert.assertEquals((long) result.get("taskDefinitionCode"), taskDefinition.getCode()); + } + + private Project getTestProject() { + Project project = new Project(); + project.setName("ut-project"); + project.setUserId(111); + project.setCode(1L); + project.setCreateTime(new Date()); + project.setUpdateTime(new Date()); + return project; + } + + private ProcessDefinition getTestProcessDefinition() { + ProcessDefinition processDefinition = new ProcessDefinition(); + processDefinition.setCode(1L); + processDefinition.setName("ut-process-definition"); + processDefinition.setProjectCode(1L); + processDefinition.setUserId(111); + processDefinition.setUpdateTime(new Date()); + processDefinition.setCreateTime(new Date()); + return processDefinition; + } + + private TaskDefinition getTestTaskDefinition() { + TaskDefinition taskDefinition = new TaskDefinition(); + taskDefinition.setCode(888888L); + taskDefinition.setName("ut-task-definition"); + taskDefinition.setProjectCode(1L); + taskDefinition.setTaskType("SHELL"); + taskDefinition.setUserId(111); + taskDefinition.setResourceIds("1"); + taskDefinition.setWorkerGroup("default"); + taskDefinition.setEnvironmentCode(1L); + taskDefinition.setVersion(1); + taskDefinition.setCreateTime(new Date()); + taskDefinition.setUpdateTime(new Date()); + return taskDefinition; + } + +} diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskDefinitionServiceImplTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskDefinitionServiceImplTest.java index 9aade5555c..676688a3ec 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskDefinitionServiceImplTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskDefinitionServiceImplTest.java @@ -145,7 +145,7 @@ public class TaskDefinitionServiceImplTest { public void queryTaskDefinitionByName() { String taskName = "task"; long projectCode = 1L; - + long processCode = 1L; Project project = getProject(projectCode); Mockito.when(projectMapper.queryByCode(projectCode)).thenReturn(project); @@ -157,11 +157,11 @@ public class TaskDefinitionServiceImplTest { putMsg(result, Status.SUCCESS, projectCode); Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectCode)).thenReturn(result); - Mockito.when(taskDefinitionMapper.queryByName(project.getCode(), taskName)) + Mockito.when(taskDefinitionMapper.queryByName(project.getCode(), processCode, taskName)) .thenReturn(new TaskDefinition()); Map<String, Object> relation = taskDefinitionService - .queryTaskDefinitionByName(loginUser, projectCode, taskName); + .queryTaskDefinitionByName(loginUser, projectCode, processCode, taskName); Assert.assertEquals(Status.SUCCESS, relation.get(Constants.STATUS)); } diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionMapper.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionMapper.java index 304e623a0a..c426da6b04 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionMapper.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionMapper.java @@ -41,10 +41,12 @@ public interface TaskDefinitionMapper extends BaseMapper<TaskDefinition> { * query task definition by name * * @param projectCode projectCode + * @param processCode processCode * @param name name * @return task definition */ TaskDefinition queryByName(@Param("projectCode") long projectCode, + @Param("processCode") long processCode, @Param("name") String name); /** diff --git a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionMapper.xml b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionMapper.xml index 5c889d1b07..255be38b2f 100644 --- a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionMapper.xml +++ b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionMapper.xml @@ -23,12 +23,24 @@ worker_group, environment_code, fail_retry_times, fail_retry_interval, timeout_flag, timeout_notify_strategy, timeout, delay_time, resource_ids, create_time, update_time, task_group_id,task_group_priority </sql> + <sql id="baseSqlV2"> + ${alias}.id, ${alias}.code, ${alias}.name, ${alias}.version, ${alias}.description, ${alias}.project_code, ${alias}.user_id, + ${alias}.task_type, ${alias}.task_params, ${alias}.flag, ${alias}.task_priority, ${alias}.worker_group, ${alias}.environment_code, + ${alias}.fail_retry_times, ${alias}.fail_retry_interval, ${alias}.timeout_flag, ${alias}.timeout_notify_strategy, ${alias}.timeout, + ${alias}.delay_time, ${alias}.resource_ids, ${alias}.create_time, ${alias}.update_time, ${alias}.task_group_id, + ${alias}.task_group_priority, ${alias}.cpu_quota, ${alias}.memory_max + </sql> <select id="queryByName" resultType="org.apache.dolphinscheduler.dao.entity.TaskDefinition"> select - <include refid="baseSql"/> - from t_ds_task_definition - WHERE project_code = #{projectCode} - and name = #{name} + <include refid="baseSqlV2"> + <property name="alias" value="td"/> + </include> + from t_ds_task_definition td + join t_ds_process_task_relation ptr on ptr.project_code = td.project_code + where td.project_code = #{projectCode} + and td.name = #{name} + and ptr.process_definition_code = #{processCode} + and td.code = ptr.post_task_code </select> <select id="queryAllDefinitionList" resultType="org.apache.dolphinscheduler.dao.entity.TaskDefinition"> select diff --git a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionMapperTest.java b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionMapperTest.java index 757f751dd2..3d16e2e44d 100644 --- a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionMapperTest.java +++ b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionMapperTest.java @@ -20,6 +20,7 @@ package org.apache.dolphinscheduler.dao.mapper; import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.dao.BaseDaoTest; import org.apache.dolphinscheduler.dao.entity.DefinitionGroupByUser; +import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelation; import org.apache.dolphinscheduler.dao.entity.TaskDefinition; import org.apache.dolphinscheduler.dao.entity.User; @@ -36,6 +37,9 @@ public class TaskDefinitionMapperTest extends BaseDaoTest { @Autowired private TaskDefinitionMapper taskDefinitionMapper; + @Autowired + private ProcessTaskRelationMapper processTaskRelationMapper; + @Autowired private UserMapper userMapper; @@ -60,6 +64,24 @@ public class TaskDefinitionMapperTest extends BaseDaoTest { return taskDefinition; } + /** + * insert + * + * @return ProcessDefinition + */ + private ProcessTaskRelation insertTaskRelation(long postTaskCode) { + ProcessTaskRelation processTaskRelation = new ProcessTaskRelation(); + processTaskRelation.setName("def 1"); + processTaskRelation.setProjectCode(1L); + processTaskRelation.setProcessDefinitionCode(1L); + processTaskRelation.setPostTaskCode(postTaskCode); + processTaskRelation.setPreTaskCode(0L); + processTaskRelation.setUpdateTime(new Date()); + processTaskRelation.setCreateTime(new Date()); + processTaskRelationMapper.insert(processTaskRelation); + return processTaskRelation; + } + @Test public void testInsert() { TaskDefinition taskDefinition = insertOne(); @@ -69,7 +91,8 @@ public class TaskDefinitionMapperTest extends BaseDaoTest { @Test public void testQueryByDefinitionName() { TaskDefinition taskDefinition = insertOne(); - TaskDefinition result = taskDefinitionMapper.queryByName(taskDefinition.getProjectCode() + ProcessTaskRelation processTaskRelation = insertTaskRelation(taskDefinition.getCode()); + TaskDefinition result = taskDefinitionMapper.queryByName(taskDefinition.getProjectCode(), processTaskRelation.getProcessDefinitionCode() , taskDefinition.getName()); Assert.assertNotNull(result); diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/task.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/task.py index 599b979369..08a1cdf7e8 100644 --- a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/task.py +++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/task.py @@ -273,7 +273,7 @@ class Task(Base): # TODO get code from specific project process definition and task name gateway = launch_gateway() result = gateway.entry_point.getCodeAndVersion( - self.process_definition._project, self.name + self.process_definition._project, self.process_definition.name, self.name ) # result = gateway.entry_point.genTaskCodeList(DefaultTaskCodeNum.DEFAULT) # gateway_result_checker(result)
