zhongjiajie commented on a change in pull request #7022:
URL: https://github.com/apache/dolphinscheduler/pull/7022#discussion_r759893805



##########
File path: 
dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/sub_process.py
##########
@@ -0,0 +1,73 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Task sub_process."""
+
+from typing import Dict, Optional
+
+from pydolphinscheduler.constants import ProcessDefinitionDefault, TaskType
+from pydolphinscheduler.core.task import Task, TaskParams
+from pydolphinscheduler.java_gateway import launch_gateway
+
+
+class SubProcessTaskParams(TaskParams):
+    """Parameter only for Sub Process task type."""
+
+    def __init__(self, process_definition_code, *args, **kwargs):
+        super().__init__(*args, **kwargs)
+        self.process_definition_code = process_definition_code
+
+
+class SubProcess(Task):
+    """Task SubProcess object, declare behavior for SubProcess task to 
dolphinscheduler."""
+
+    def __init__(
+        self,
+        name: str,
+        process_definition_name: str,
+        user: Optional[str] = ProcessDefinitionDefault.USER,
+        project_name: Optional[str] = ProcessDefinitionDefault.PROJECT,

Review comment:
       I check out DS web definition and found out task `sub process` could 
only use workflow process definition. As workflow-as-code we already declare  
`project` and `user`, so maybe we should found a way to get attribute `user` 
and `project` from `ProcessDefinition`
   
   Maybe at the end it would look like 
   
   ```py
   with ProcessDefinition(
       "test-pd-name",
       user="specific-user",
       project="specific-project",
   ) as pd:
       sub_process = SubProcess(
           name = "su-process-task-name",
           process_definition_name = "sub-process-definition-name"
       )
   ```
   
   and task `sub_process` could get `user='specific-user'` and 
`project='specific-project'`. WDYT

##########
File path: 
dolphinscheduler-python/src/main/java/org/apache/dolphinscheduler/server/PythonGatewayServer.java
##########
@@ -391,6 +391,42 @@ public void createUser(String userName,
         return result;
     }
 
+    /**
+     * Get processDefinition by given processDefinitionName name. It return 
map contain processDefinition id, name, code.
+     * Useful in Python API create subProcess task which need 
processDefinition information.
+     *
+     * @param userName              user who create or update schedule
+     * @param projectName           project name which process definition 
belongs to
+     * @param processDefinitionName process definition name
+     */
+    public Map<String, Object> getProcessDefinitionInfo(String userName, 
String projectName, String processDefinitionName) {

Review comment:
       I found this function is most likely to function 
`createOrUpdateProcessDefinition` in PythonGatewayServer. Could you add a 
private function combine your function and function 
`createOrUpdateProcessDefinition` define?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to