????????????????????????????????????????




------------------ ???????? ------------------
??????:                                                                         
                                               "dev"                            
                                                        
<[email protected]&gt;;
????????:&nbsp;2021??9??30??(??????) ????10:36
??????:&nbsp;"dev"<[email protected]&gt;;

????:&nbsp;??????????: [PROPOSAL] Add Python API implementation of 
workflows-as-code



??????????????



---????????---
??????: "zhang junfan"<[email protected]&amp;gt;
????????: 2021??9??30??(????) ????10:35
??????: 
"[email protected]"<[email protected]&amp;gt;;
????: ????: [PROPOSAL] Add Python API implementation of workflows-as-code


Good job, thanks focusing on multi-lang support.

Minor discussion.

&amp;nbsp; 1.&amp;nbsp; Could you please provide some spark/flink process 
examples?
&amp;nbsp; 2.&amp;nbsp; I'm confused with workflow-as-code, you means it just 
define the DAG and workflow parameters? Could we combine workflow and user task 
code(like spark/flink programs)?

________________________________
??????: Jiajie Zhong <[email protected]&amp;gt;
????????: 2021??9??28?? 11:42
??????: [email protected] <[email protected]&amp;gt;
????: [PROPOSAL] Add Python API implementation of workflows-as-code

Hey guys,

&amp;nbsp;&amp;nbsp;&amp;nbsp; Apache DolphinScheduler is a good tool for 
workflow scheduler, it??s easy-to-extend,
distributed and have nice UI to create and maintain workflow. Our workflow only 
support
define in UI, which is easy to use and user friendly, it??s good but could be 
batter by
adding extend API and make workflow could define as code or yaml file. And 
consider yaml
file it??s hard to maintain manually I think it better to use code to define 
it, aka workflows-as-code.

&amp;nbsp;&amp;nbsp;&amp;nbsp; When workflow definitions as code, we could easy 
to modify some configure and do
some batch change for it. It??s could more easy to define similar task by loop 
statement,
and it give ability adding unittest for workflow too. I hope Apache 
DolphinScheduler could
combine the benefit of define by code and by UI, so I raise proposal for adding
workflows-as-code to Apache DolphinScheduler.

&amp;nbsp;&amp;nbsp;&amp;nbsp; Actually, I already start it by adding POC 
PR[1]. In this PR, I adding Python API give
user define workflow by Python code. This feature use *Py4J* connect Java and 
Python,
which mean I never add any new database model and infra to Apache 
DolphinScheduler,
I just reuse layer service in dolphinscheduler-api package to create workflow. 
And we could
consider Python API just another interface for Apache DolphinScheduler, just 
like our UI, it
allow we define and maintain workflow follow their rule.

&amp;nbsp;&amp;nbsp;&amp;nbsp; Here it??s an tutorial workflow definitions by 
Python API, which you could find it in PR file[2]

```python
from pydolphinscheduler.core.process_definition import ProcessDefinition
from pydolphinscheduler.tasks.shell import Shell

with ProcessDefinition(name="tutorial") as pd:
&amp;nbsp;&amp;nbsp;&amp;nbsp; task_parent = Shell(name="task_parent", 
command="echo hello pydolphinscheduler")
&amp;nbsp;&amp;nbsp;&amp;nbsp; task_child_one = Shell(name="task_child_one", 
command="echo 'child one'")
&amp;nbsp;&amp;nbsp;&amp;nbsp; task_child_two = Shell(name="task_child_two", 
command="echo 'child two'")
&amp;nbsp;&amp;nbsp;&amp;nbsp; task_union = Shell(name="task_union", 
command="echo union")

&amp;nbsp;&amp;nbsp;&amp;nbsp; task_group = [task_child_one, task_child_two]
&amp;nbsp;&amp;nbsp;&amp;nbsp; task_parent.set_downstream(task_group)

&amp;nbsp;&amp;nbsp;&amp;nbsp; task_union << task_group

&amp;nbsp;&amp;nbsp;&amp;nbsp; pd.run()
```

&amp;nbsp;&amp;nbsp;&amp;nbsp; In tutorial, we define a new ProcessDefinition 
named ??tutorial?? using python context,
and then we add four Shell tasks to ??tutorial??, just five line we could 
create one process
definition with four tasks.
&amp;nbsp;&amp;nbsp;&amp;nbsp; Beside process definition and tasks, another 
think we have to
add to workflow it??s task dependent, we add function `set_downstream` and 
`set_upstream`
to describe task dependent. At the same time, we overwrite bit operator and add 
a shortcut
`&amp;gt;&amp;gt;` and&amp;nbsp; `<<` to do it.
&amp;nbsp;&amp;nbsp; After dependent set, we done our workflow definition, but 
all definition are in Python API
side, which mean it not persist to Apache DolphinScheduler database, and it 
could not runs
by Apache DolphinScheduler until declare `pd.submit()` or directly run it by 
`pd.run()`


[1]: https://github.com/apache/dolphinscheduler/pull/6269 
<https://github.com/apache/dolphinscheduler/pull/6269&amp;gt;
[2]: 
https://github.com/apache/dolphinscheduler/pull/6269/files#diff-5561fec6b57cc611bee2b0d8f030965d76bdd202801d9f8a1e2e74c21769bc41
 
<https://github.com/apache/dolphinscheduler/pull/6269/files#diff-5561fec6b57cc611bee2b0d8f030965d76bdd202801d9f8a1e2e74c21769bc41&amp;gt;


Best Wish
?? Jiajie

Reply via email to