This is an automated email from the ASF dual-hosted git repository.

zhongjiajie pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new b86dc53ad1 [fix][Python] Support same task name in project defferent 
process definition (#10428)
b86dc53ad1 is described below

commit b86dc53ad10cb5c4d76f5e85c38e5a5659a349fd
Author: 陈家名 <[email protected]>
AuthorDate: Tue Jun 14 13:53:18 2022 +0800

    [fix][Python] Support same task name in project defferent process 
definition (#10428)
    
    close: #10431
---
 .../dolphinscheduler/api/python/PythonGateway.java |  14 ++-
 .../api/service/TaskDefinitionService.java         |   2 +
 .../service/impl/TaskDefinitionServiceImpl.java    |   5 +-
 .../api/python/PythonGatewayTest.java              | 123 +++++++++++++++++++++
 .../api/service/TaskDefinitionServiceImplTest.java |   6 +-
 .../dao/mapper/TaskDefinitionMapper.java           |   2 +
 .../dao/mapper/TaskDefinitionMapper.xml            |  20 +++-
 .../dao/mapper/TaskDefinitionMapperTest.java       |  25 ++++-
 .../src/pydolphinscheduler/core/task.py            |   2 +-
 9 files changed, 185 insertions(+), 14 deletions(-)

diff --git 
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/python/PythonGateway.java
 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/python/PythonGateway.java
index e142c421b0..9b89d9636b 100644
--- 
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/python/PythonGateway.java
+++ 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/python/PythonGateway.java
@@ -166,7 +166,7 @@ public class PythonGateway {
         return taskDefinitionService.genTaskCodeList(genNum);
     }
 
-    public Map<String, Long> getCodeAndVersion(String projectName, String 
taskName) throws CodeGenerateUtils.CodeGenerateException {
+    public Map<String, Long> getCodeAndVersion(String projectName, String 
processDefinitionName, String taskName) throws 
CodeGenerateUtils.CodeGenerateException {
         Project project = projectMapper.queryByName(projectName);
         Map<String, Long> result = new HashMap<>();
         // project do not exists, mean task not exists too, so we should 
directly return init value
@@ -175,7 +175,15 @@ public class PythonGateway {
             result.put("version", 0L);
             return result;
         }
-        TaskDefinition taskDefinition = 
taskDefinitionMapper.queryByName(project.getCode(), taskName);
+
+        ProcessDefinition processDefinition = 
processDefinitionMapper.queryByDefineName(project.getCode(), 
processDefinitionName);
+        if (processDefinition == null) {
+            String msg = String.format("Can not find valid process definition 
by name %s", processDefinitionName);
+            logger.error(msg);
+            throw new IllegalArgumentException(msg);
+        }
+
+        TaskDefinition taskDefinition = 
taskDefinitionMapper.queryByName(project.getCode(), 
processDefinition.getCode(), taskName);
         if (taskDefinition == null) {
             result.put("code", CodeGenerateUtils.getInstance().genCode());
             result.put("version", 0L);
@@ -520,7 +528,7 @@ public class PythonGateway {
         result.put("processDefinitionCode", processDefinition.getCode());
 
         if (taskName != null) {
-            TaskDefinition taskDefinition = 
taskDefinitionMapper.queryByName(projectCode, taskName);
+            TaskDefinition taskDefinition = 
taskDefinitionMapper.queryByName(projectCode, processDefinition.getCode(), 
taskName);
             result.put("taskDefinitionCode", taskDefinition.getCode());
         }
         return result;
diff --git 
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/TaskDefinitionService.java
 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/TaskDefinitionService.java
index 5d2ba85b02..a715b68e97 100644
--- 
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/TaskDefinitionService.java
+++ 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/TaskDefinitionService.java
@@ -60,10 +60,12 @@ public interface TaskDefinitionService {
      *
      * @param loginUser login user
      * @param projectCode project code
+     * @param processCode process code
      * @param taskName task name
      */
     Map<String, Object> queryTaskDefinitionByName(User loginUser,
                                                   long projectCode,
+                                                  long processCode,
                                                   String taskName);
 
     /**
diff --git 
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java
 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java
index 92fb74d787..30f2d0235c 100644
--- 
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java
+++ 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java
@@ -272,10 +272,11 @@ public class TaskDefinitionServiceImpl extends 
BaseServiceImpl implements TaskDe
      *
      * @param loginUser login user
      * @param projectCode project code
+     * @param processCode process code
      * @param taskName task name
      */
     @Override
-    public Map<String, Object> queryTaskDefinitionByName(User loginUser, long 
projectCode, String taskName) {
+    public Map<String, Object> queryTaskDefinitionByName(User loginUser, long 
projectCode, long processCode, String taskName) {
         Project project = projectMapper.queryByCode(projectCode);
         //check user access for project
         Map<String, Object> result = 
projectService.checkProjectAndAuth(loginUser, project, 
projectCode,TASK_DEFINITION);
@@ -283,7 +284,7 @@ public class TaskDefinitionServiceImpl extends 
BaseServiceImpl implements TaskDe
             return result;
         }
 
-        TaskDefinition taskDefinition = 
taskDefinitionMapper.queryByName(project.getCode(), taskName);
+        TaskDefinition taskDefinition = 
taskDefinitionMapper.queryByName(project.getCode(), processCode, taskName);
         if (taskDefinition == null) {
             putMsg(result, Status.TASK_DEFINE_NOT_EXIST, taskName);
         } else {
diff --git 
a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/python/PythonGatewayTest.java
 
b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/python/PythonGatewayTest.java
new file mode 100644
index 0000000000..7d8b6efabc
--- /dev/null
+++ 
b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/python/PythonGatewayTest.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.api.python;
+
+import org.apache.dolphinscheduler.common.utils.CodeGenerateUtils;
+import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
+import org.apache.dolphinscheduler.dao.entity.Project;
+import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
+import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper;
+import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
+import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.InjectMocks;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import java.util.Date;
+import java.util.Map;
+
+/**
+ * python gate test
+ */
+@RunWith(MockitoJUnitRunner.class)
+public class PythonGatewayTest {
+
+    @InjectMocks
+    private PythonGateway pythonGateway;
+
+    @Mock
+    private ProjectMapper projectMapper;
+
+    @Mock
+    private ProcessDefinitionMapper processDefinitionMapper;
+
+    @Mock
+    private TaskDefinitionMapper taskDefinitionMapper;
+
+    @Test
+    public void testGetCodeAndVersion() throws 
CodeGenerateUtils.CodeGenerateException {
+        Project project = getTestProject();
+        
Mockito.when(projectMapper.queryByName(project.getName())).thenReturn(project);
+
+        ProcessDefinition processDefinition = getTestProcessDefinition();
+        
Mockito.when(processDefinitionMapper.queryByDefineName(project.getCode(), 
processDefinition.getName())).thenReturn(processDefinition);
+
+        TaskDefinition taskDefinition = getTestTaskDefinition();
+        Mockito.when(taskDefinitionMapper.queryByName(project.getCode(), 
processDefinition.getCode(), 
taskDefinition.getName())).thenReturn(taskDefinition);
+
+        Map<String, Long> result = 
pythonGateway.getCodeAndVersion(project.getName(), processDefinition.getName(), 
taskDefinition.getName());
+        Assert.assertEquals(result.get("code").longValue(), 
taskDefinition.getCode());
+    }
+
+    @Test
+    public void testGetDependentInfo() {
+        Project project = getTestProject();
+        
Mockito.when(projectMapper.queryByName(project.getName())).thenReturn(project);
+
+        ProcessDefinition processDefinition = getTestProcessDefinition();
+        
Mockito.when(processDefinitionMapper.queryByDefineName(project.getCode(), 
processDefinition.getName())).thenReturn(processDefinition);
+
+        TaskDefinition taskDefinition = getTestTaskDefinition();
+        Mockito.when(taskDefinitionMapper.queryByName(project.getCode(), 
processDefinition.getCode(), 
taskDefinition.getName())).thenReturn(taskDefinition);
+
+        Map<String, Object> result = 
pythonGateway.getDependentInfo(project.getName(), processDefinition.getName(), 
taskDefinition.getName());
+        Assert.assertEquals((long) result.get("taskDefinitionCode"), 
taskDefinition.getCode());
+    }
+
+    private Project getTestProject() {
+        Project project = new Project();
+        project.setName("ut-project");
+        project.setUserId(111);
+        project.setCode(1L);
+        project.setCreateTime(new Date());
+        project.setUpdateTime(new Date());
+        return project;
+    }
+
+    private ProcessDefinition getTestProcessDefinition() {
+        ProcessDefinition processDefinition = new ProcessDefinition();
+        processDefinition.setCode(1L);
+        processDefinition.setName("ut-process-definition");
+        processDefinition.setProjectCode(1L);
+        processDefinition.setUserId(111);
+        processDefinition.setUpdateTime(new Date());
+        processDefinition.setCreateTime(new Date());
+        return processDefinition;
+    }
+
+    private TaskDefinition getTestTaskDefinition() {
+        TaskDefinition taskDefinition = new TaskDefinition();
+        taskDefinition.setCode(888888L);
+        taskDefinition.setName("ut-task-definition");
+        taskDefinition.setProjectCode(1L);
+        taskDefinition.setTaskType("SHELL");
+        taskDefinition.setUserId(111);
+        taskDefinition.setResourceIds("1");
+        taskDefinition.setWorkerGroup("default");
+        taskDefinition.setEnvironmentCode(1L);
+        taskDefinition.setVersion(1);
+        taskDefinition.setCreateTime(new Date());
+        taskDefinition.setUpdateTime(new Date());
+        return taskDefinition;
+    }
+
+}
diff --git 
a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskDefinitionServiceImplTest.java
 
b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskDefinitionServiceImplTest.java
index beea8cc675..6a969b182b 100644
--- 
a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskDefinitionServiceImplTest.java
+++ 
b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskDefinitionServiceImplTest.java
@@ -146,7 +146,7 @@ public class TaskDefinitionServiceImplTest {
     public void queryTaskDefinitionByName() {
         String taskName = "task";
         long projectCode = 1L;
-
+        long processCode = 1L;
         Project project = getProject(projectCode);
         
Mockito.when(projectMapper.queryByCode(projectCode)).thenReturn(project);
 
@@ -158,11 +158,11 @@ public class TaskDefinitionServiceImplTest {
         putMsg(result, Status.SUCCESS, projectCode);
         Mockito.when(projectService.checkProjectAndAuth(loginUser, project, 
projectCode,TASK_DEFINITION )).thenReturn(result);
 
-        Mockito.when(taskDefinitionMapper.queryByName(project.getCode(), 
taskName))
+        Mockito.when(taskDefinitionMapper.queryByName(project.getCode(), 
processCode, taskName))
             .thenReturn(new TaskDefinition());
 
         Map<String, Object> relation = taskDefinitionService
-            .queryTaskDefinitionByName(loginUser, projectCode, taskName);
+            .queryTaskDefinitionByName(loginUser, projectCode, processCode, 
taskName);
 
         Assert.assertEquals(Status.SUCCESS, relation.get(Constants.STATUS));
     }
diff --git 
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionMapper.java
 
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionMapper.java
index 304e623a0a..c426da6b04 100644
--- 
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionMapper.java
+++ 
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionMapper.java
@@ -41,10 +41,12 @@ public interface TaskDefinitionMapper extends 
BaseMapper<TaskDefinition> {
      * query task definition by name
      *
      * @param projectCode projectCode
+     * @param processCode processCode
      * @param name name
      * @return task definition
      */
     TaskDefinition queryByName(@Param("projectCode") long projectCode,
+                               @Param("processCode") long processCode,
                                @Param("name") String name);
 
     /**
diff --git 
a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionMapper.xml
 
b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionMapper.xml
index c78dd5a449..b8c49faa3a 100644
--- 
a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionMapper.xml
+++ 
b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionMapper.xml
@@ -23,12 +23,24 @@
         worker_group, environment_code, fail_retry_times, fail_retry_interval, 
timeout_flag, timeout_notify_strategy, timeout, delay_time,
         resource_ids, create_time, update_time, 
task_group_id,task_group_priority, cpu_quota, memory_max
     </sql>
+    <sql id="baseSqlV2">
+        ${alias}.id, ${alias}.code, ${alias}.name, ${alias}.version, 
${alias}.description, ${alias}.project_code, ${alias}.user_id,
+        ${alias}.task_type, ${alias}.task_params, ${alias}.flag, 
${alias}.task_priority, ${alias}.worker_group, ${alias}.environment_code,
+        ${alias}.fail_retry_times, ${alias}.fail_retry_interval, 
${alias}.timeout_flag, ${alias}.timeout_notify_strategy, ${alias}.timeout,
+        ${alias}.delay_time, ${alias}.resource_ids, ${alias}.create_time, 
${alias}.update_time, ${alias}.task_group_id,
+        ${alias}.task_group_priority, ${alias}.cpu_quota, ${alias}.memory_max
+    </sql>
     <select id="queryByName" 
resultType="org.apache.dolphinscheduler.dao.entity.TaskDefinition">
         select
-        <include refid="baseSql"/>
-        from t_ds_task_definition
-        WHERE project_code = #{projectCode}
-        and name = #{name}
+        <include refid="baseSqlV2">
+            <property name="alias" value="td"/>
+        </include>
+        from t_ds_task_definition td
+        join t_ds_process_task_relation ptr on ptr.project_code = 
td.project_code
+        where td.project_code = #{projectCode}
+        and td.name = #{name}
+        and ptr.process_definition_code = #{processCode}
+        and td.code = ptr.post_task_code
     </select>
     <select id="queryAllDefinitionList" 
resultType="org.apache.dolphinscheduler.dao.entity.TaskDefinition">
         select
diff --git 
a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionMapperTest.java
 
b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionMapperTest.java
index 757f751dd2..3d16e2e44d 100644
--- 
a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionMapperTest.java
+++ 
b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionMapperTest.java
@@ -20,6 +20,7 @@ package org.apache.dolphinscheduler.dao.mapper;
 import org.apache.dolphinscheduler.common.utils.JSONUtils;
 import org.apache.dolphinscheduler.dao.BaseDaoTest;
 import org.apache.dolphinscheduler.dao.entity.DefinitionGroupByUser;
+import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelation;
 import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
 import org.apache.dolphinscheduler.dao.entity.User;
 
@@ -36,6 +37,9 @@ public class TaskDefinitionMapperTest extends BaseDaoTest {
     @Autowired
     private TaskDefinitionMapper taskDefinitionMapper;
 
+    @Autowired
+    private ProcessTaskRelationMapper processTaskRelationMapper;
+
     @Autowired
     private UserMapper userMapper;
 
@@ -60,6 +64,24 @@ public class TaskDefinitionMapperTest extends BaseDaoTest {
         return taskDefinition;
     }
 
+    /**
+     * insert
+     *
+     * @return ProcessDefinition
+     */
+    private ProcessTaskRelation insertTaskRelation(long postTaskCode) {
+        ProcessTaskRelation processTaskRelation = new ProcessTaskRelation();
+        processTaskRelation.setName("def 1");
+        processTaskRelation.setProjectCode(1L);
+        processTaskRelation.setProcessDefinitionCode(1L);
+        processTaskRelation.setPostTaskCode(postTaskCode);
+        processTaskRelation.setPreTaskCode(0L);
+        processTaskRelation.setUpdateTime(new Date());
+        processTaskRelation.setCreateTime(new Date());
+        processTaskRelationMapper.insert(processTaskRelation);
+        return processTaskRelation;
+    }
+
     @Test
     public void testInsert() {
         TaskDefinition taskDefinition = insertOne();
@@ -69,7 +91,8 @@ public class TaskDefinitionMapperTest extends BaseDaoTest {
     @Test
     public void testQueryByDefinitionName() {
         TaskDefinition taskDefinition = insertOne();
-        TaskDefinition result = 
taskDefinitionMapper.queryByName(taskDefinition.getProjectCode()
+        ProcessTaskRelation processTaskRelation = 
insertTaskRelation(taskDefinition.getCode());
+        TaskDefinition result = 
taskDefinitionMapper.queryByName(taskDefinition.getProjectCode(), 
processTaskRelation.getProcessDefinitionCode()
                 , taskDefinition.getName());
 
         Assert.assertNotNull(result);
diff --git 
a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/task.py
 
b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/task.py
index 599b979369..08a1cdf7e8 100644
--- 
a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/task.py
+++ 
b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/task.py
@@ -273,7 +273,7 @@ class Task(Base):
         # TODO get code from specific project process definition and task name
         gateway = launch_gateway()
         result = gateway.entry_point.getCodeAndVersion(
-            self.process_definition._project, self.name
+            self.process_definition._project, self.process_definition.name, 
self.name
         )
         # result = 
gateway.entry_point.genTaskCodeList(DefaultTaskCodeNum.DEFAULT)
         # gateway_result_checker(result)

Reply via email to