This is an automated email from the ASF dual-hosted git repository.

leonbao pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new dd6ed36  Add Python API implementation of workflows-as-code (#6269)
dd6ed36 is described below

commit dd6ed36f65d2add3bf8e31cad24ab25f4606c9d9
Author: Jiajie Zhong <[email protected]>
AuthorDate: Sun Oct 31 20:35:46 2021 +0800

    Add Python API implementation of workflows-as-code (#6269)
    
    * Init DS python SDK pydolphinscheduler: python code definition
    
    * Doc first
    
    * Add quick start and developer doc
    
    * Java documentation change
    
    * Add LICENSE-py4j.txt
    
    * Add py4j to release-docs/LICENSE
    
    * Move dependency version to parent pom
    
    * Remove outdated code
    
    * Add tenant parameter to tutorial
---
 .gitignore                                         |   5 +
 .../api/service/ProjectService.java                |   9 +
 .../dolphinscheduler/api/service/QueueService.java |   8 +
 .../api/service/TenantService.java                 |  16 ++
 .../api/service/impl/ProjectServiceImpl.java       |  15 +
 .../api/service/impl/QueueServiceImpl.java         |  26 ++
 .../api/service/impl/TenantServiceImpl.java        |  19 +-
 .../apache/dolphinscheduler/common/Constants.java  |   1 +
 .../dolphinscheduler/dao/mapper/QueueMapper.java   |   7 +
 .../dolphinscheduler/dao/mapper/QueueMapper.xml    |   9 +
 dolphinscheduler-dist/release-docs/LICENSE         |   2 +-
 .../release-docs/licenses/LICENSE-py4j.txt         |  26 ++
 dolphinscheduler-python/pom.xml                    |  60 ++++
 .../pydolphinscheduler/README.md                   | 103 +++++++
 .../pydolphinscheduler/ROADMAP.md                  |  34 +++
 .../pydolphinscheduler/examples/tutorial.py        |  43 +++
 .../pydolphinscheduler/requirements.txt            |  18 ++
 .../pydolphinscheduler/requirements_dev.txt        |  24 ++
 .../pydolphinscheduler/setup.cfg                   |  16 ++
 .../pydolphinscheduler/setup.py                    |  90 ++++++
 .../pydolphinscheduler/src/__init__.py             |  16 ++
 .../src/pydolphinscheduler/__init__.py             |  16 ++
 .../src/pydolphinscheduler/constants.py            |  74 +++++
 .../src/pydolphinscheduler/core/__init__.py        |  16 ++
 .../src/pydolphinscheduler/core/base.py            |  72 +++++
 .../src/pydolphinscheduler/core/base_side.py       |  43 +++
 .../pydolphinscheduler/core/process_definition.py  | 249 +++++++++++++++++
 .../src/pydolphinscheduler/core/task.py            | 237 ++++++++++++++++
 .../src/pydolphinscheduler/java_gateway.py         |  43 +++
 .../src/pydolphinscheduler/side/__init__.py        |  20 ++
 .../src/pydolphinscheduler/side/project.py         |  45 +++
 .../src/pydolphinscheduler/side/queue.py           |  44 +++
 .../src/pydolphinscheduler/side/tenant.py          |  45 +++
 .../src/pydolphinscheduler/side/user.py            |  68 +++++
 .../src/pydolphinscheduler/side/worker_group.py    |  35 +++
 .../src/pydolphinscheduler/tasks/__init__.py       |  16 ++
 .../src/pydolphinscheduler/tasks/shell.py          |  34 +++
 .../src/pydolphinscheduler/utils/__init__.py       |  16 ++
 .../src/pydolphinscheduler/utils/string.py         |  30 ++
 .../pydolphinscheduler/test/__init__.py            |  16 ++
 .../pydolphinscheduler/test/core/__init__.py       |  16 ++
 .../test/core/test_process_definition.py           | 118 ++++++++
 .../pydolphinscheduler/test/core/test_task.py      |  96 +++++++
 .../pydolphinscheduler/test/example/__init__.py    |  16 ++
 .../pydolphinscheduler/test/tasks/__init__.py      |  16 ++
 .../pydolphinscheduler/test/tasks/test_shell.py    |  61 ++++
 .../pydolphinscheduler/test/test_java_gateway.py   |  46 +++
 .../server/PythonGatewayServer.java                | 310 +++++++++++++++++++++
 dolphinscheduler-standalone-server/pom.xml         |   4 +
 .../dolphinscheduler/server/StandaloneServer.java  |   3 +-
 pom.xml                                            |  13 +
 tools/dependencies/known-dependencies.txt          |   1 +
 52 files changed, 2363 insertions(+), 3 deletions(-)

diff --git a/.gitignore b/.gitignore
index 9011db1..3dccfdf 100644
--- a/.gitignore
+++ b/.gitignore
@@ -46,3 +46,8 @@ dolphinscheduler-server/src/main/resources/logback.xml
 dolphinscheduler-ui/dist
 dolphinscheduler-ui/node
 docker/build/apache-dolphinscheduler*
+
+# pydolphinscheduler
+__pycache__/
+build/
+*egg-info/
diff --git 
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProjectService.java
 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProjectService.java
index dffa866..df05dee 100644
--- 
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProjectService.java
+++ 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProjectService.java
@@ -47,6 +47,15 @@ public interface ProjectService {
     Map<String, Object> queryByCode(User loginUser, long projectCode);
 
     /**
+     * query project details by name
+     *
+     * @param loginUser login user
+     * @param projectName project name
+     * @return project detail information
+     */
+    Map<String, Object> queryByName(User loginUser, String projectName);
+
+    /**
      * check project and authorization
      *
      * @param loginUser login user
diff --git 
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/QueueService.java
 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/QueueService.java
index 7013520..f978b96 100644
--- 
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/QueueService.java
+++ 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/QueueService.java
@@ -76,4 +76,12 @@ public interface QueueService {
      */
     Result<Object> verifyQueue(String queue, String queueName);
 
+    /**
+     * query queue by queueName
+     *
+     * @param queueName queue name
+     * @return queue object for provide queue name
+     */
+    Map<String, Object> queryQueueName(String queueName);
+
 }
diff --git 
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/TenantService.java
 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/TenantService.java
index 30d98a1..47a4082 100644
--- 
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/TenantService.java
+++ 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/TenantService.java
@@ -92,4 +92,20 @@ public interface TenantService {
      * @return true if tenant code can user, otherwise return false
      */
     Result verifyTenantCode(String tenantCode);
+
+    /**
+     * check if provide tenant code object exists
+     *
+     * @param tenantCode tenant code
+     * @return true if tenant code exists, false if not
+     */
+    boolean checkTenantExists(String tenantCode);
+
+    /**
+     * query tenant by tenant code
+     *
+     * @param tenantCode tenant code
+     * @return tenant list
+     */
+    Map<String, Object> queryByTenantCode(String tenantCode);
 }
diff --git 
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProjectServiceImpl.java
 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProjectServiceImpl.java
index 7d49cfa..9fc5aab 100644
--- 
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProjectServiceImpl.java
+++ 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProjectServiceImpl.java
@@ -139,6 +139,21 @@ public class ProjectServiceImpl extends BaseServiceImpl 
implements ProjectServic
         return result;
     }
 
+    @Override
+    public Map<String, Object> queryByName(User loginUser, String projectName) 
{
+        Map<String, Object> result = new HashMap<>();
+        Project project = projectMapper.queryByName(projectName);
+        boolean hasProjectAndPerm = hasProjectAndPerm(loginUser, project, 
result);
+        if (!hasProjectAndPerm) {
+            return result;
+        }
+        if (project != null) {
+            result.put(Constants.DATA_LIST, project);
+            putMsg(result, Status.SUCCESS);
+        }
+        return result;
+    }
+
     /**
      * check project and authorization
      *
diff --git 
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/QueueServiceImpl.java
 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/QueueServiceImpl.java
index edd0e1d..e298833 100644
--- 
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/QueueServiceImpl.java
+++ 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/QueueServiceImpl.java
@@ -263,6 +263,32 @@ public class QueueServiceImpl extends BaseServiceImpl 
implements QueueService {
     }
 
     /**
+     * query queue by queueName
+     *
+     * @param queueName queue name
+     * @return queue object for provide queue name
+     */
+    @Override
+    public Map<String, Object> queryQueueName(String queueName) {
+        Map<String, Object> result = new HashMap<>();
+
+        if (StringUtils.isEmpty(queueName)) {
+            putMsg(result, Status.REQUEST_PARAMS_NOT_VALID_ERROR, 
Constants.QUEUE_NAME);
+            return result;
+        }
+
+        if (!checkQueueNameExist(queueName)) {
+            putMsg(result, Status.QUEUE_NOT_EXIST, queueName);
+            return result;
+        }
+
+        List<Queue> queueList = queueMapper.queryQueueName(queueName);
+        result.put(Constants.DATA_LIST, queueList);
+        putMsg(result, Status.SUCCESS);
+        return result;
+    }
+
+    /**
      * check queue exist
      * if exists return true,not exists return false
      * check queue exist
diff --git 
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TenantServiceImpl.java
 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TenantServiceImpl.java
index fc01bd7..58ab865 100644
--- 
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TenantServiceImpl.java
+++ 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TenantServiceImpl.java
@@ -320,8 +320,25 @@ public class TenantServiceImpl extends BaseServiceImpl 
implements TenantService
      * @param tenantCode tenant code
      * @return ture if the tenant code exists, otherwise return false
      */
-    private boolean checkTenantExists(String tenantCode) {
+    public boolean checkTenantExists(String tenantCode) {
         Boolean existTenant = tenantMapper.existTenant(tenantCode);
         return existTenant == Boolean.TRUE;
     }
+
+    /**
+     * query tenant by tenant code
+     *
+     * @param tenantCode tenant code
+     * @return tenant detail information
+     */
+    @Override
+    public Map<String, Object> queryByTenantCode(String tenantCode) {
+        Map<String, Object> result = new HashMap<>();
+        Tenant tenant = tenantMapper.queryByTenantCode(tenantCode);
+        if (tenant != null) {
+            result.put(Constants.DATA_LIST, tenant);
+            putMsg(result, Status.SUCCESS);
+        }
+        return result;
+    }
 }
diff --git 
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
 
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
index e662347..fc9e36f 100644
--- 
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
+++ 
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
@@ -415,6 +415,7 @@ public final class Constants {
     public static final String NULL = "NULL";
     public static final String THREAD_NAME_MASTER_SERVER = "Master-Server";
     public static final String THREAD_NAME_WORKER_SERVER = "Worker-Server";
+    public static final String THREAD_NAME_GATEWAY_SERVER = "Gateway-Server";
 
     /**
      * command parameter keys
diff --git 
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/QueueMapper.java
 
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/QueueMapper.java
index 027bfd2..e486070 100644
--- 
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/QueueMapper.java
+++ 
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/QueueMapper.java
@@ -53,4 +53,11 @@ public interface QueueMapper extends BaseMapper<Queue> {
      * @return true if exist else return null
      */
     Boolean existQueue(@Param("queue") String queue, @Param("queueName") 
String queueName);
+
+    /**
+     * query queue by queue name
+     * @param queueName queueName
+     * @return queue list
+     */
+    List<Queue> queryQueueName(@Param("queueName") String queueName);
 }
diff --git 
a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/QueueMapper.xml
 
b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/QueueMapper.xml
index 0d75c6e..cf381e4 100644
--- 
a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/QueueMapper.xml
+++ 
b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/QueueMapper.xml
@@ -54,4 +54,13 @@
             and queue_name =#{queueName}
         </if>
     </select>
+    <select id="queryQueueName" 
resultType="org.apache.dolphinscheduler.dao.entity.Queue">
+        select
+        <include refid="baseSql"/>
+        from t_ds_queue
+        where 1 = 1
+        <if test="queueName != null and queueName != ''">
+            and queue_name =#{queueName}
+        </if>
+    </select>
 </mapper>
diff --git a/dolphinscheduler-dist/release-docs/LICENSE 
b/dolphinscheduler-dist/release-docs/LICENSE
index 63ab3f6..937216f 100644
--- a/dolphinscheduler-dist/release-docs/LICENSE
+++ b/dolphinscheduler-dist/release-docs/LICENSE
@@ -436,7 +436,7 @@ The text of each license is also included at 
licenses/LICENSE-[project].txt.
     threetenbp 1.3.6: 
https://mvnrepository.com/artifact/org.threeten/threetenbp/1.3.6,  BSD 3-clause
     xmlenc 0.52: https://mvnrepository.com/artifact/xmlenc/xmlenc/0.52, BSD
     hamcrest-core 1.3: 
https://mvnrepository.com/artifact/org.hamcrest/hamcrest-core/1.3, BSD 2-Clause
-
+    py4j 0.10.9: https://mvnrepository.com/artifact/net.sf.py4j/py4j/0.10.9, 
BSD 2-clause
 
 ========================================================================
 CDDL licenses
diff --git a/dolphinscheduler-dist/release-docs/licenses/LICENSE-py4j.txt 
b/dolphinscheduler-dist/release-docs/licenses/LICENSE-py4j.txt
new file mode 100644
index 0000000..2c7adb6
--- /dev/null
+++ b/dolphinscheduler-dist/release-docs/licenses/LICENSE-py4j.txt
@@ -0,0 +1,26 @@
+Copyright (c) 2009-2018, Barthelemy Dagenais and individual contributors. All
+rights reserved.
+
+Redistribution and use in source and binary forms, with or without
+modification, are permitted provided that the following conditions are met:
+
+- Redistributions of source code must retain the above copyright notice, this
+  list of conditions and the following disclaimer.
+
+- Redistributions in binary form must reproduce the above copyright notice,
+  this list of conditions and the following disclaimer in the documentation
+  and/or other materials provided with the distribution.
+
+- The name of the author may not be used to endorse or promote products
+  derived from this software without specific prior written permission.
+
+THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
+DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
+FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
+SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
+CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
+OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
diff --git a/dolphinscheduler-python/pom.xml b/dolphinscheduler-python/pom.xml
new file mode 100644
index 0000000..ce7968c
--- /dev/null
+++ b/dolphinscheduler-python/pom.xml
@@ -0,0 +1,60 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+<modelVersion>4.0.0</modelVersion>
+<parent>
+    <groupId>org.apache.dolphinscheduler</groupId>
+    <artifactId>dolphinscheduler</artifactId>
+    <version>2.0.0-SNAPSHOT</version>
+</parent>
+<artifactId>dolphinscheduler-python</artifactId>
+<name>${project.artifactId}</name>
+<packaging>jar</packaging>
+
+<dependencies>
+    <!-- dolphinscheduler -->
+    <dependency>
+        <groupId>org.apache.dolphinscheduler</groupId>
+        <artifactId>dolphinscheduler-api</artifactId>
+    </dependency>
+
+    <!--springboot-->
+    <dependency>
+        <groupId>org.springframework.boot</groupId>
+        <artifactId>spring-boot-starter-web</artifactId>
+        <exclusions>
+            <exclusion>
+                <groupId>org.springframework.boot</groupId>
+                <artifactId>spring-boot-starter-tomcat</artifactId>
+            </exclusion>
+            <exclusion>
+                <artifactId>log4j-to-slf4j</artifactId>
+                <groupId>org.apache.logging.log4j</groupId>
+            </exclusion>
+        </exclusions>
+    </dependency>
+
+    <dependency>
+        <groupId>net.sf.py4j</groupId>
+        <artifactId>py4j</artifactId>
+    </dependency>
+
+</dependencies>
+</project>
diff --git a/dolphinscheduler-python/pydolphinscheduler/README.md 
b/dolphinscheduler-python/pydolphinscheduler/README.md
new file mode 100644
index 0000000..a987483
--- /dev/null
+++ b/dolphinscheduler-python/pydolphinscheduler/README.md
@@ -0,0 +1,103 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+   http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied.  See the License for the
+ specific language governing permissions and limitations
+ under the License.
+-->
+
+# pydolphinscheduler
+
+pydolphinscheduler is python API for Apache DolphinScheduler, which allow you 
definition
+your workflow by python code, aka workflow-as-codes.
+
+## Quick Start
+
+> **_Notice:_** For now, due to pydolphinscheduler without release to any 
binary tarball or [PyPI][pypi], you 
+> have to clone Apache DolphinScheduler code from GitHub to ensure quick start 
setup
+
+Here we show you how to install and run a simple example of pydolphinscheduler
+
+### Prepare
+
+```shell
+# Clone code from github
+git clone [email protected]:apache/dolphinscheduler.git
+
+# Install pydolphinscheduler from source
+cd dolphinscheduler-python/pydolphinscheduler
+pip setup.py install
+```
+
+### Start Server And Run Example
+
+Before you run an example, you have to start backend server. You could follow 
[development setup][dev-setup]
+section "DolphinScheduler Standalone Quick Start" to set up developer 
environment. You have to start backend
+and frontend server in this step, which mean that you could view 
DolphinScheduler UI in your browser with URL
+http://localhost:12345/dolphinscheduler
+
+After backend server is being start, all requests from `pydolphinscheduler` 
would be sends to backend server.
+And for now we could run a simple example by:
+
+```shell
+cd dolphinscheduler-python/pydolphinscheduler
+python example/tutorial.py
+```
+
+> **_NOTICE:_** Since Apache DolphinScheduler's tenant is requests while 
running command, you might need to change
+> tenant value in `example/tutorial.py`. For now the value is `tenant_exists`, 
please change it to username exists
+> in you environment. 
+
+After command execute, you could see a new project with single process 
definition named *tutorial* in the [UI][ui-project].
+
+Until now, we finish quick start by an example of pydolphinscheduler and run 
it. If you want to inspect or join
+pydolphinscheduler develop, you could take a look at [develop](#develop)
+
+## Develop
+
+pydolphinscheduler is python API for Apache DolphinScheduler, it just defines 
what workflow look like instead of
+store or execute it. We here use [py4j][py4j] to dynamically access Java 
Virtual Machine.
+
+### Setup Develop Environment
+
+We already clone the code in [quick start](#quick-start), so next step we have 
to open pydolphinscheduler project
+in you editor. We recommend you use [pycharm][pycharm] instead of [IntelliJ 
IDEA][idea] to open it. And you could
+just open directory `dolphinscheduler-python/pydolphinscheduler` instead of 
`dolphinscheduler-python`.
+
+### Brief Concept
+
+Apache DolphinScheduler is design to define workflow by UI, and 
pydolphinscheduler try to define it by code. When
+define by code, user usually do not care user, tanant, or queue exists or not. 
All user care about is create
+a new workflow by the code his/her definition. So we have some **side object** 
in `pydolphinscheduler/side`
+directory, their only check object exists or not, and create them if not 
exists. 
+
+#### Process Definition
+
+pydolphinscheduler workflow object name, process definition is also same name 
as Java object(maybe would be change to
+other word for more simple).
+
+#### Tasks
+
+pydolphinscheduler tasks object, we use tasks to define exact job we want 
DolphinScheduler do for us. For now,
+we only support `shell` task to execute shell task. [This link][all-task] list 
all tasks support in DolphinScheduler
+and would be implement in the further.
+
+
+[pypi]: https://pypi.org/
+[dev-setup]: 
https://dolphinscheduler.apache.org/en-us/development/development-environment-setup.html
+[ui-project]: http://8.142.34.29:12345/dolphinscheduler/ui/#/projects/list
+[py4j]: https://www.py4j.org/index.html
+[pycharm]: https://www.jetbrains.com/pycharm
+[idea]: https://www.jetbrains.com/idea/
+[all-task]: 
https://dolphinscheduler.apache.org/en-us/docs/dev/user_doc/guide/task/shell.html
diff --git a/dolphinscheduler-python/pydolphinscheduler/ROADMAP.md 
b/dolphinscheduler-python/pydolphinscheduler/ROADMAP.md
new file mode 100644
index 0000000..32ad5e2
--- /dev/null
+++ b/dolphinscheduler-python/pydolphinscheduler/ROADMAP.md
@@ -0,0 +1,34 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+   http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied.  See the License for the
+ specific language governing permissions and limitations
+ under the License.
+-->
+
+## Roadmap
+
+### v0.0.3
+
+Add other features, tasks, parameters in DS, keep code coverage up to 90%
+
+### v0.0.2
+
+Add docs about how to use and develop package, code coverage up to 90%, add 
CI/CD
+for package
+
+### v0.0.1(current)
+
+Setup up POC, for defining DAG with python code, running DAG manually,
+releasing to pypi
\ No newline at end of file
diff --git a/dolphinscheduler-python/pydolphinscheduler/examples/tutorial.py 
b/dolphinscheduler-python/pydolphinscheduler/examples/tutorial.py
new file mode 100644
index 0000000..7756534
--- /dev/null
+++ b/dolphinscheduler-python/pydolphinscheduler/examples/tutorial.py
@@ -0,0 +1,43 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+r"""
+After tutorial.py file submit to Apache DolphinScheduler server a DAG would be 
create,
+and workflow DAG graph as below:
+
+                  --> task_child_one
+                /                    \
+task_parent -->                        -->  task_union
+                \                   /
+                  --> task_child_two
+"""
+
+from pydolphinscheduler.core.process_definition import ProcessDefinition
+from pydolphinscheduler.tasks.shell import Shell
+
+with ProcessDefinition(name="tutorial", tenant="tenant_exists") as pd:
+    task_parent = Shell(name="task_parent", command="echo hello 
pydolphinscheduler")
+    task_child_one = Shell(name="task_child_one", command="echo 'child one'")
+    task_child_two = Shell(name="task_child_two", command="echo 'child two'")
+    task_union = Shell(name="task_union", command="echo union")
+
+    task_group = [task_child_one, task_child_two]
+    task_parent.set_downstream(task_group)
+
+    task_union << task_group
+
+    pd.run()
diff --git a/dolphinscheduler-python/pydolphinscheduler/requirements.txt 
b/dolphinscheduler-python/pydolphinscheduler/requirements.txt
new file mode 100644
index 0000000..cdec3ca
--- /dev/null
+++ b/dolphinscheduler-python/pydolphinscheduler/requirements.txt
@@ -0,0 +1,18 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+py4j~=0.10.9.2
\ No newline at end of file
diff --git a/dolphinscheduler-python/pydolphinscheduler/requirements_dev.txt 
b/dolphinscheduler-python/pydolphinscheduler/requirements_dev.txt
new file mode 100644
index 0000000..be98ce9
--- /dev/null
+++ b/dolphinscheduler-python/pydolphinscheduler/requirements_dev.txt
@@ -0,0 +1,24 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# testting
+pytest~=6.2.5
+# code linting and formatting
+flake8-black~=0.2.3
+# flake8
+# flake8-docstrings
+# flake8-black
diff --git a/dolphinscheduler-python/pydolphinscheduler/setup.cfg 
b/dolphinscheduler-python/pydolphinscheduler/setup.cfg
new file mode 100644
index 0000000..13a8339
--- /dev/null
+++ b/dolphinscheduler-python/pydolphinscheduler/setup.cfg
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
diff --git a/dolphinscheduler-python/pydolphinscheduler/setup.py 
b/dolphinscheduler-python/pydolphinscheduler/setup.py
new file mode 100644
index 0000000..8e9cea4
--- /dev/null
+++ b/dolphinscheduler-python/pydolphinscheduler/setup.py
@@ -0,0 +1,90 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+
+import sys
+from os.path import dirname, join
+
+from setuptools import find_packages, setup
+
+version = '0.0.1.dev0'
+
+if sys.version_info[0] < 3:
+    raise Exception("pydolphinscheduler does not support Python 2. Please 
upgrade to Python 3.")
+
+
+def read(*names, **kwargs):
+    return open(
+        join(dirname(__file__), *names), encoding=kwargs.get("encoding", 
"utf8")
+    ).read()
+
+
+setup(
+    name="pydolphinscheduler",
+    version=version,
+    license="Apache License 2.0",
+    description="Apache DolphinScheduler python SDK",
+    long_description=read("README.md"),
+    # Make sure pypi is expecting markdown
+    long_description_content_type="text/markdown",
+    author="Apache Software Foundation",
+    author_email="[email protected]",
+    url="https://dolphinscheduler.apache.org/";,
+    python_requires=">=3.6",
+    keywords=[
+        "dolphinscheduler",
+        "workflow",
+        "scheduler",
+        "taskflow",
+    ],
+    project_urls={
+        "Homepage": "https://dolphinscheduler.apache.org";,
+        "Documentation": 
"https://dolphinscheduler.apache.org/en-us/docs/latest/user_doc/quick-start.html";,
+        "Source": "https://github.com/apache/dolphinscheduler";,
+        "Issue Tracker": "https://github.com/apache/dolphinscheduler/issues";,
+        "Discussion": "https://github.com/apache/dolphinscheduler/discussions";,
+        "Twitter": "https://twitter.com/dolphinschedule";,
+    },
+    packages=find_packages(where="src"),
+    package_dir={"": "src"},
+    include_package_data=True,
+    classifiers=[
+        # complete classifier list: 
http://pypi.python.org/pypi?%3Aaction=list_classifiers
+        "Development Status :: 1 - Planning",
+        "Environment :: Console",
+        "Intended Audience :: Developers",
+        "License :: OSI Approved :: Apache Software License",
+        "Operating System :: Unix",
+        "Operating System :: POSIX",
+        "Operating System :: Microsoft :: Windows",
+        "Programming Language :: Python",
+        "Programming Language :: Python :: 3",
+        "Programming Language :: Python :: 3.6",
+        "Programming Language :: Python :: 3.7",
+        "Programming Language :: Python :: 3.8",
+        "Programming Language :: Python :: 3.9",
+        "Programming Language :: Python :: Implementation :: CPython",
+        "Programming Language :: Python :: Implementation :: PyPy",
+        "Topic :: Software Development :: User Interfaces",
+    ],
+    install_requires=[
+        # Core
+        "py4j~=0.10",
+        # Dev
+        "pytest~=6.2",
+    ]
+)
diff --git a/dolphinscheduler-python/pydolphinscheduler/src/__init__.py 
b/dolphinscheduler-python/pydolphinscheduler/src/__init__.py
new file mode 100644
index 0000000..13a8339
--- /dev/null
+++ b/dolphinscheduler-python/pydolphinscheduler/src/__init__.py
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
diff --git 
a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/__init__.py 
b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/__init__.py
new file mode 100644
index 0000000..13a8339
--- /dev/null
+++ 
b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/__init__.py
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
diff --git 
a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/constants.py
 
b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/constants.py
new file mode 100644
index 0000000..eda07aa
--- /dev/null
+++ 
b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/constants.py
@@ -0,0 +1,74 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+class ProcessDefinitionReleaseState:
+    """
+    ProcessDefinition release state
+    """
+    ONLINE: str = "ONLINE"
+    OFFLINE: str = "OFFLINE"
+
+
+class ProcessDefinitionDefault:
+    """
+    ProcessDefinition default values
+    """
+    PROJECT: str = "project-pydolphin"
+    TENANT: str = "tenant_pydolphin"
+    USER: str = "userPythonGateway"
+    # TODO simple set password same as username
+    USER_PWD: str = "userPythonGateway"
+    USER_EMAIL: str = "[email protected]"
+    USER_PHONE: str = "11111111111"
+    USER_STATE: int = 1
+    QUEUE: str = "queuePythonGateway"
+    WORKER_GROUP: str = "default"
+
+
+class TaskPriority(str):
+    HIGHEST = "HIGHEST"
+    HIGH = "HIGH"
+    MEDIUM = "MEDIUM"
+    LOW = "LOW"
+    LOWEST = "LOWEST"
+
+
+class TaskFlag(str):
+    YES = "YES"
+    NO = "NO"
+
+
+class TaskTimeoutFlag(str):
+    CLOSE = "CLOSE"
+
+
+class TaskType(str):
+    SHELL = "SHELL"
+
+
+class DefaultTaskCodeNum(str):
+    DEFAULT = 1
+
+
+class JavaGatewayDefault(str):
+    RESULT_MESSAGE_KEYWORD = "msg"
+    RESULT_MESSAGE_SUCCESS = "success"
+
+    RESULT_STATUS_KEYWORD = "status"
+    RESULT_STATUS_SUCCESS = "SUCCESS"
+
+    RESULT_DATA = "data"
diff --git 
a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/__init__.py
 
b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/__init__.py
new file mode 100644
index 0000000..13a8339
--- /dev/null
+++ 
b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/__init__.py
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
diff --git 
a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/base.py
 
b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/base.py
new file mode 100644
index 0000000..175754f
--- /dev/null
+++ 
b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/base.py
@@ -0,0 +1,72 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from typing import Optional, Dict
+
+# from pydolphinscheduler.side.user import User
+from pydolphinscheduler.utils.string import attr2camel
+
+
+class Base:
+    """
+    Base
+    """
+
+    _KEY_ATTR: set = {
+        "name",
+        "description"
+    }
+
+    _TO_DICT_ATTR: set = set()
+
+    DEFAULT_ATTR: Dict = {}
+
+    def __init__(
+            self,
+            name: str,
+            description: Optional[str] = None
+    ):
+        self.name = name
+        self.description = description
+
+    def __repr__(self) -> str:
+        return f'<{type(self).__name__}: name="{self.name}">'
+
+    def __eq__(self, other):
+        return type(self) == type(other) and \
+               all(getattr(self, a, None) == getattr(other, a, None) for a in 
self._KEY_ATTR)
+
+    # TODO check how Redash do
+    # TODO DRY
+    def to_dict(self, camel_attr=True) -> Dict:
+        # content = {}
+        # for attr, value in self.__dict__.items():
+        #     # Don't publish private variables
+        #     if attr.startswith("_"):
+        #         continue
+        #     else:
+        #         content[snake2camel(attr)] = value
+        # content.update(self.DEFAULT_ATTR)
+        # return content
+        content = {}
+        for attr in self._TO_DICT_ATTR:
+            val = getattr(self, attr, None)
+            if camel_attr:
+                content[attr2camel(attr)] = val
+            else:
+                content[attr] = val
+        return content
diff --git 
a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/base_side.py
 
b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/base_side.py
new file mode 100644
index 0000000..cf0f14e
--- /dev/null
+++ 
b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/base_side.py
@@ -0,0 +1,43 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from typing import Optional
+
+from pydolphinscheduler.constants import ProcessDefinitionDefault
+from pydolphinscheduler.core.base import Base
+
+
+class BaseSide(Base):
+    def __init__(
+            self,
+            name: str,
+            description: Optional[str] = None
+    ):
+        super().__init__(name, description)
+
+    @classmethod
+    def create_if_not_exists(
+            cls,
+            # TODO comment for avoiding cycle import
+            # user: Optional[User] = ProcessDefinitionDefault.USER
+            user=ProcessDefinitionDefault.USER
+    ):
+        """
+        Create Base if not exists
+        """
+
+        raise NotImplementedError
diff --git 
a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/process_definition.py
 
b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/process_definition.py
new file mode 100644
index 0000000..8d295d0
--- /dev/null
+++ 
b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/process_definition.py
@@ -0,0 +1,249 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import json
+from typing import Optional, List, Dict, Set
+
+from pydolphinscheduler.constants import ProcessDefinitionReleaseState, 
ProcessDefinitionDefault
+from pydolphinscheduler.core.base import Base
+from pydolphinscheduler.java_gateway import launch_gateway
+from pydolphinscheduler.side import Tenant, Project, User
+
+
+class ProcessDefinitionContext:
+    _context_managed_process_definition: Optional["ProcessDefinition"] = None
+
+    @classmethod
+    def set(cls, pd: "ProcessDefinition") -> None:
+        cls._context_managed_process_definition = pd
+
+    @classmethod
+    def get(cls) -> Optional["ProcessDefinition"]:
+        return cls._context_managed_process_definition
+
+    @classmethod
+    def delete(cls) -> None:
+        cls._context_managed_process_definition = None
+
+
+class ProcessDefinition(Base):
+    """
+    ProcessDefinition
+    TODO :ref: comment may not correct ref
+    TODO: maybe we should rename this class, currently use DS object name
+    """
+
+    # key attribute for identify ProcessDefinition object
+    _KEY_ATTR = {
+        "name",
+        "project",
+        "tenant",
+        "release_state",
+        "param",
+    }
+
+    _TO_DICT_ATTR = {
+        "name",
+        "description",
+        "_project",
+        "_tenant",
+        "timeout",
+        "release_state",
+        "param",
+        "tasks",
+        "task_definition_json",
+        "task_relation_json",
+    }
+
+    def __init__(
+            self,
+            name: str,
+            description: Optional[str] = None,
+            user: Optional[str] = ProcessDefinitionDefault.USER,
+            project: Optional[str] = ProcessDefinitionDefault.PROJECT,
+            tenant: Optional[str] = ProcessDefinitionDefault.TENANT,
+            queue: Optional[str] = ProcessDefinitionDefault.QUEUE,
+            timeout: Optional[int] = 0,
+            release_state: Optional[str] = 
ProcessDefinitionReleaseState.ONLINE,
+            param: Optional[List] = None
+    ):
+        super().__init__(name, description)
+        self._user = user
+        self._project = project
+        self._tenant = tenant
+        self._queue = queue
+        self.timeout = timeout
+        self.release_state = release_state
+        self.param = param
+        self.tasks: dict = {}
+        # TODO how to fix circle import
+        self._task_relations: set["TaskRelation"] = set()
+        self._process_definition_code = None
+
+    def __enter__(self) -> "ProcessDefinition":
+        ProcessDefinitionContext.set(self)
+        return self
+
+    def __exit__(self, exc_type, exc_val, exc_tb) -> None:
+        ProcessDefinitionContext.delete()
+
+    @property
+    def tenant(self) -> Tenant:
+        return Tenant(self._tenant)
+
+    @tenant.setter
+    def tenant(self, tenant: Tenant) -> None:
+        self._tenant = tenant.name
+
+    @property
+    def project(self) -> Project:
+        return Project(self._project)
+
+    @project.setter
+    def project(self, project: Project) -> None:
+        self._project = project.name
+
+    @property
+    def user(self) -> User:
+        return User(self._user,
+                    ProcessDefinitionDefault.USER_PWD,
+                    ProcessDefinitionDefault.USER_EMAIL,
+                    ProcessDefinitionDefault.USER_PHONE,
+                    ProcessDefinitionDefault.TENANT,
+                    ProcessDefinitionDefault.QUEUE,
+                    ProcessDefinitionDefault.USER_STATE)
+
+    @property
+    def task_definition_json(self) -> List[Dict]:
+        if not self.tasks:
+            return [self.tasks]
+        else:
+            return [task.to_dict() for task in self.tasks.values()]
+
+    @property
+    def task_relation_json(self) -> List[Dict]:
+        if not self.tasks:
+            return [self.tasks]
+        else:
+            self._handle_root_relation()
+            return [tr.to_dict() for tr in self._task_relations]
+
+    # TODO inti DAG's tasks are in the same place
+    @property
+    def task_location(self) -> List[Dict]:
+        if not self.tasks:
+            return [self.tasks]
+        else:
+            return [{"taskCode": task_code, "x": 0, "y": 0} for task_code in 
self.tasks]
+
+    @property
+    def task_list(self) -> List["Task"]:
+        return list(self.tasks.values())
+
+    def _handle_root_relation(self):
+        from pydolphinscheduler.core.task import TaskRelation
+        post_relation_code = set()
+        for relation in self._task_relations:
+            post_relation_code.add(relation.post_task_code)
+        for task in self.task_list:
+            if task.code not in post_relation_code:
+                root_relation = TaskRelation(pre_task_code=0, 
post_task_code=task.code)
+                self._task_relations.add(root_relation)
+
+    def add_task(self, task: "Task") -> None:
+        self.tasks[task.code] = task
+        task._process_definition = self
+
+    def add_tasks(self, tasks: List["Task"]) -> None:
+        for task in tasks:
+            self.add_task(task)
+
+    def get_task(self, code: str) -> "Task":
+        if code not in self.tasks:
+            raise ValueError("Task with code %s can not found in process 
definition %", (code, self.name))
+        return self.tasks[code]
+
+    # TODO which tying should return in this case
+    def get_tasks_by_name(self, name: str) -> Set["Task"]:
+        find = set()
+        for task in self.tasks.values():
+            if task.name == name:
+                find.add(task)
+        return find
+
+    def get_one_task_by_name(self, name: str) -> "Task":
+        tasks = self.get_tasks_by_name(name)
+        if not tasks:
+            raise ValueError(f"Can not find task with name {name}.")
+        return tasks.pop()
+
+    def run(self):
+        """
+        Run ProcessDefinition instance, a shortcut for :ref: submit and :ref: 
start
+        Only support manual for now, schedule run will coming soon
+        :return:
+        """
+        self.submit()
+        self.start()
+
+    def _ensure_side_model_exists(self):
+        """
+        Ensure side model exists which including :ref: Project, Tenant, User.
+        If those model not exists, would create default value in :ref: 
ProcessDefinitionDefault
+        """
+        # TODO used metaclass for more pythonic
+        self.tenant.create_if_not_exists(self._queue)
+        # model User have to create after Tenant created
+        self.user.create_if_not_exists()
+        # Project model need User object exists
+        self.project.create_if_not_exists(self._user)
+
+    def submit(self) -> int:
+        """
+        Submit ProcessDefinition instance to java gateway
+        :return:
+        """
+        self._ensure_side_model_exists()
+        gateway = launch_gateway()
+        self._process_definition_code = 
gateway.entry_point.createOrUpdateProcessDefinition(
+            self._user,
+            self._project,
+            self.name,
+            str(self.description) if self.description else "",
+            str(self.param) if self.param else None,
+            json.dumps(self.task_location),
+            self.timeout,
+            self._tenant,
+            # TODO add serialization function
+            json.dumps(self.task_relation_json),
+            json.dumps(self.task_definition_json),
+        )
+        return self._process_definition_code
+
+    def start(self) -> None:
+        """
+        Start ProcessDefinition instance which post to 
`start-process-instance` to java gateway
+        :return:
+        """
+        gateway = launch_gateway()
+        gateway.entry_point.execProcessInstance(
+            self._user,
+            self._project,
+            self.name,
+            "",
+            "default",
+            24 * 3600,
+        )
diff --git 
a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/task.py
 
b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/task.py
new file mode 100644
index 0000000..fa97c76
--- /dev/null
+++ 
b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/task.py
@@ -0,0 +1,237 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from typing import Optional, List, Dict, Set, Union, Sequence, Tuple
+
+from pydolphinscheduler.constants import TaskPriority, 
ProcessDefinitionDefault, TaskFlag, TaskTimeoutFlag, \
+    DefaultTaskCodeNum, JavaGatewayDefault
+from pydolphinscheduler.core.base import Base
+from pydolphinscheduler.core.process_definition import ProcessDefinition
+from pydolphinscheduler.core.process_definition import ProcessDefinitionContext
+from pydolphinscheduler.java_gateway import launch_gateway, 
gateway_result_checker
+from pydolphinscheduler.utils.string import snake2camel, class_name2camel
+
+
+class ObjectJsonBase:
+    DEFAULT_ATTR = {}
+
+    def __int__(self, *args, **kwargs):
+        pass
+
+    def __str__(self) -> str:
+        content = []
+        for attribute, value in self.__dict__.items():
+            content.append(f"\"{snake2camel(attribute)}\": {value}")
+        content = ",".join(content)
+        return f"\"{class_name2camel(type(self).__name__)}\":{{{content}}}"
+
+    # TODO check how Redash do
+    # TODO DRY
+    def to_dict(self) -> Dict:
+        content = {snake2camel(attr): value for attr, value in 
self.__dict__.items()}
+        content.update(self.DEFAULT_ATTR)
+        return content
+
+
+class TaskParams(ObjectJsonBase):
+    DEFAULT_CONDITION_RESULT = {
+        "successNode": [
+            ""
+        ],
+        "failedNode": [
+            ""
+        ]
+    }
+
+    def __init__(
+            self,
+            raw_script: str,
+            local_params: Optional[List] = None,
+            resource_list: Optional[List] = None,
+            dependence: Optional[Dict] = None,
+            wait_start_timeout: Optional[Dict] = None,
+            condition_result: Optional[Dict] = None,
+    ):
+        super().__init__()
+        self.raw_script = raw_script
+        self.local_params = local_params or []
+        self.resource_list = resource_list or []
+        self.dependence = dependence or {}
+        self.wait_start_timeout = wait_start_timeout or {}
+        # TODO need better way to handle it, this code just for POC
+        self.condition_result = condition_result or 
self.DEFAULT_CONDITION_RESULT
+
+
+class TaskRelation(ObjectJsonBase):
+    DEFAULT_ATTR = {
+        "name": "",
+        "preTaskVersion": 1,
+        "postTaskVersion": 1,
+        "conditionType": 0,
+        "conditionParams": {}
+    }
+
+    def __init__(
+            self,
+            pre_task_code: int,
+            post_task_code: int,
+    ):
+        super().__init__()
+        self.pre_task_code = pre_task_code
+        self.post_task_code = post_task_code
+
+    def __hash__(self):
+        return hash(f"{self.post_task_code}, {self.post_task_code}")
+
+
+class Task(Base):
+
+    DEFAULT_DEPS_ATTR = {
+        "name": "",
+        "preTaskVersion": 1,
+        "postTaskVersion": 1,
+        "conditionType": 0,
+        "conditionParams": {}
+    }
+
+    def __init__(
+            self,
+            name: str,
+            task_type: str,
+            task_params: TaskParams,
+            description: Optional[str] = None,
+            flag: Optional[str] = TaskFlag.YES,
+            task_priority: Optional[str] = TaskPriority.MEDIUM,
+            worker_group: Optional[str] = 
ProcessDefinitionDefault.WORKER_GROUP,
+            delay_time: Optional[int] = 0,
+            fail_retry_times: Optional[int] = 0,
+            fail_retry_interval: Optional[int] = 1,
+            timeout_flag: Optional[int] = TaskTimeoutFlag.CLOSE,
+            timeout_notify_strategy: Optional = None,
+            timeout: Optional[int] = 0,
+            process_definition: Optional[ProcessDefinition] = None,
+    ):
+
+        super().__init__(name, description)
+        self.task_type = task_type
+        self.task_params = task_params
+        self.flag = flag
+        self.task_priority = task_priority
+        self.worker_group = worker_group
+        self.fail_retry_times = fail_retry_times
+        self.fail_retry_interval = fail_retry_interval
+        self.delay_time = delay_time
+        self.timeout_flag = timeout_flag
+        self.timeout_notify_strategy = timeout_notify_strategy
+        self.timeout = timeout
+        self._process_definition = None
+        self.process_definition: ProcessDefinition = process_definition or 
ProcessDefinitionContext.get()
+        self._upstream_task_codes: Set[int] = set()
+        self._downstream_task_codes: Set[int] = set()
+        self._task_relation: Set[TaskRelation] = set()
+        # move attribute code and version after _process_definition and 
process_definition declare
+        self.code, self.version = self.gen_code_and_version()
+        # Add task to process definition, maybe we could put into property 
process_definition latter
+        if self.process_definition is not None and self.code not in 
self.process_definition.tasks:
+            self.process_definition.add_task(self)
+
+    @property
+    def process_definition(self) -> Optional[ProcessDefinition]:
+        if self._process_definition:
+            return self._process_definition
+        else:
+            raise ValueError(f'Task {self} has not been assigned to a 
ProcessDefinition yet')
+
+    @process_definition.setter
+    def process_definition(self, process_definition: 
Optional[ProcessDefinition]):
+        self._process_definition = process_definition
+
+    def __hash__(self):
+        return hash(self.code)
+
+    def __lshift__(self, other: Union["Task", Sequence["Task"]]):
+        """Implements Task << Task"""
+        self.set_upstream(other)
+        return other
+
+    def __rshift__(self, other: Union["Task", Sequence["Task"]]):
+        """Implements Task >> Task"""
+        self.set_downstream(other)
+        return other
+
+    def __rrshift__(self, other: Union["Task", Sequence["Task"]]):
+        """Called for Task >> [Task] because list don't have __rshift__ 
operators."""
+        self.__lshift__(other)
+        return self
+
+    def __rlshift__(self, other: Union["Task", Sequence["Task"]]):
+        """Called for Task << [Task] because list don't have __lshift__ 
operators."""
+        self.__rshift__(other)
+        return self
+
+    def _set_deps(self, tasks: Union["Task", Sequence["Task"]], upstream: bool 
= True) -> None:
+        if not isinstance(tasks, Sequence):
+            tasks = [tasks]
+
+        for task in tasks:
+            if upstream:
+                self._upstream_task_codes.add(task.code)
+                task._downstream_task_codes.add(self.code)
+
+                if self._process_definition:
+                    task_relation = TaskRelation(
+                        pre_task_code=task.code,
+                        post_task_code=self.code,
+                    )
+                    self.process_definition._task_relations.add(task_relation)
+            else:
+                self._downstream_task_codes.add(task.code)
+                task._upstream_task_codes.add(self.code)
+
+                if self._process_definition:
+                    task_relation = TaskRelation(
+                        pre_task_code=self.code,
+                        post_task_code=task.code,
+                    )
+                    self.process_definition._task_relations.add(task_relation)
+
+    def set_upstream(self, tasks: Union["Task", Sequence["Task"]]) -> None:
+        self._set_deps(tasks, upstream=True)
+
+    def set_downstream(self, tasks: Union["Task", Sequence["Task"]]) -> None:
+        self._set_deps(tasks, upstream=False)
+
+    # TODO code should better generate in bulk mode when :ref: 
processDefinition run submit or start
+    def gen_code_and_version(self) -> Tuple:
+        # TODO get code from specific project process definition and task name
+        gateway = launch_gateway()
+        result = 
gateway.entry_point.getCodeAndVersion(self.process_definition._project, 
self.name)
+        # result = 
gateway.entry_point.genTaskCodeList(DefaultTaskCodeNum.DEFAULT)
+        # gateway_result_checker(result)
+        return result.get("code"), result.get("version")
+
+    def to_dict(self, camel_attr=True) -> Dict:
+        content = {}
+        for attr, value in self.__dict__.items():
+            # Don't publish private variables
+            if attr.startswith("_"):
+                continue
+            elif isinstance(value, TaskParams):
+                content[snake2camel(attr)] = value.to_dict()
+            else:
+                content[snake2camel(attr)] = value
+        return content
diff --git 
a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/java_gateway.py
 
b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/java_gateway.py
new file mode 100644
index 0000000..e93e8f1
--- /dev/null
+++ 
b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/java_gateway.py
@@ -0,0 +1,43 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from typing import Any, Optional
+
+from py4j.java_collections import JavaMap
+from py4j.java_gateway import JavaGateway, GatewayParameters
+
+from pydolphinscheduler.constants import JavaGatewayDefault
+
+
+def launch_gateway() -> JavaGateway:
+    # TODO Note that automatic conversion makes calling Java methods slightly 
less efficient because
+    #  in the worst case, Py4J needs to go through all registered converters 
for all parameters.
+    #  This is why automatic conversion is disabled by default.
+    gateway = 
JavaGateway(gateway_parameters=GatewayParameters(auto_convert=True))
+    return gateway
+
+
+def gateway_result_checker(
+        result: JavaMap,
+        msg_check: Optional[str] = JavaGatewayDefault.RESULT_MESSAGE_SUCCESS
+) -> Any:
+    if result[JavaGatewayDefault.RESULT_STATUS_KEYWORD].toString() != \
+            JavaGatewayDefault.RESULT_STATUS_SUCCESS:
+        raise RuntimeError(f"Failed when try to got result for java gateway")
+    if msg_check is not None and 
result[JavaGatewayDefault.RESULT_MESSAGE_KEYWORD] != msg_check:
+        raise ValueError(f"Get result state not success.")
+    return result
diff --git 
a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/side/__init__.py
 
b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/side/__init__.py
new file mode 100644
index 0000000..2f376a5
--- /dev/null
+++ 
b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/side/__init__.py
@@ -0,0 +1,20 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from pydolphinscheduler.side.project import Project
+from pydolphinscheduler.side.tenant import Tenant
+from pydolphinscheduler.side.user import User
diff --git 
a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/side/project.py
 
b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/side/project.py
new file mode 100644
index 0000000..b118be9
--- /dev/null
+++ 
b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/side/project.py
@@ -0,0 +1,45 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from typing import Optional
+
+from pydolphinscheduler.core.base_side import BaseSide
+from pydolphinscheduler.constants import ProcessDefinitionDefault
+from pydolphinscheduler.java_gateway import launch_gateway, 
gateway_result_checker
+
+
+class Project(BaseSide):
+    """
+    Project
+    """
+
+    def __init__(
+            self,
+            name: str = ProcessDefinitionDefault.PROJECT,
+            description: Optional[str] = None
+    ):
+        super().__init__(name, description)
+
+    def create_if_not_exists(self, user=ProcessDefinitionDefault.USER) -> None:
+        """
+        Create Project if not exists
+        """
+        gateway = launch_gateway()
+        result = gateway.entry_point.createProject(user, self.name, 
self.description)
+        # TODO recover result checker
+        # gateway_result_checker(result, None)
+
diff --git 
a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/side/queue.py
 
b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/side/queue.py
new file mode 100644
index 0000000..4c0d1f6
--- /dev/null
+++ 
b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/side/queue.py
@@ -0,0 +1,44 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from typing import Optional
+
+from pydolphinscheduler.constants import ProcessDefinitionDefault
+from pydolphinscheduler.core.base_side import BaseSide
+from pydolphinscheduler.java_gateway import launch_gateway, 
gateway_result_checker
+
+
+class Queue(BaseSide):
+    """
+    Queue
+    """
+
+    def __init__(
+            self,
+            name: str = ProcessDefinitionDefault.QUEUE,
+            description: Optional[str] = ""
+    ):
+        super().__init__(name, description)
+
+    def create_if_not_exists(self, user=ProcessDefinitionDefault.USER) -> None:
+        """
+        Create Queue if not exists
+        """
+        gateway = launch_gateway()
+        # Here we set Queue.name and Queue.queueName same as self.name
+        result = gateway.entry_point.createProject(user, self.name, self.name)
+        gateway_result_checker(result, None)
\ No newline at end of file
diff --git 
a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/side/tenant.py
 
b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/side/tenant.py
new file mode 100644
index 0000000..9cba533
--- /dev/null
+++ 
b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/side/tenant.py
@@ -0,0 +1,45 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from typing import Optional
+
+from pydolphinscheduler.constants import ProcessDefinitionDefault
+from pydolphinscheduler.core.base_side import BaseSide
+from pydolphinscheduler.java_gateway import launch_gateway, 
gateway_result_checker
+
+
+class Tenant(BaseSide):
+    """
+    Tenant
+    """
+
+    def __init__(
+            self,
+            name: str = ProcessDefinitionDefault.TENANT,
+            queue: str = ProcessDefinitionDefault.QUEUE,
+            description: Optional[str] = None
+    ):
+        super().__init__(name, description)
+        self.queue = queue
+
+    def create_if_not_exists(self, queue_name: str, 
user=ProcessDefinitionDefault.USER) -> None:
+        """
+        Create Tenant if not exists
+        """
+        gateway = launch_gateway()
+        result = gateway.entry_point.createTenant(self.name, self.description, 
queue_name)
+        # gateway_result_checker(result, None)
diff --git 
a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/side/user.py
 
b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/side/user.py
new file mode 100644
index 0000000..fc7c339
--- /dev/null
+++ 
b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/side/user.py
@@ -0,0 +1,68 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from typing import Optional
+
+from pydolphinscheduler.core.base_side import BaseSide
+from pydolphinscheduler.java_gateway import launch_gateway, 
gateway_result_checker
+
+
+class User(BaseSide):
+    _KEY_ATTR = {
+        "name",
+        "password",
+        "email",
+        "phone",
+        "tenant",
+        "queue",
+        "status",
+    }
+
+    def __init__(
+            self,
+            name: str,
+            password: str,
+            email: str,
+            phone: str,
+            tenant: str,
+            queue: Optional[str] = None,
+            status: Optional[int] = 1,
+    ):
+        super().__init__(name)
+        self.password = password
+        self.email = email
+        self.phone = phone
+        self.tenant = tenant
+        self.queue = queue
+        self.status = status
+
+    def create_if_not_exists(self, **kwargs):
+        """
+        Create User if not exists
+        """
+        gateway = launch_gateway()
+        result = gateway.entry_point.createUser(
+            self.name,
+            self.password,
+            self.email,
+            self.phone,
+            self.tenant,
+            self.queue,
+            self.status
+        )
+        # TODO recover result checker
+        # gateway_result_checker(result, None)
diff --git 
a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/side/worker_group.py
 
b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/side/worker_group.py
new file mode 100644
index 0000000..d4b1bb4
--- /dev/null
+++ 
b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/side/worker_group.py
@@ -0,0 +1,35 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from typing import Optional
+
+from pydolphinscheduler.core.base_side import BaseSide
+
+
+class WorkerGroup(BaseSide):
+    """
+    Worker Group
+    """
+
+    def __init__(
+            self,
+            name: str,
+            address: str,
+            description: Optional[str] = None
+    ):
+        super().__init__(name, description)
+        self.address = address
diff --git 
a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/__init__.py
 
b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/__init__.py
new file mode 100644
index 0000000..13a8339
--- /dev/null
+++ 
b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/__init__.py
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
diff --git 
a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/shell.py
 
b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/shell.py
new file mode 100644
index 0000000..e60c78b
--- /dev/null
+++ 
b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/shell.py
@@ -0,0 +1,34 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from pydolphinscheduler.constants import TaskType
+from pydolphinscheduler.core.task import Task, TaskParams
+
+
+class Shell(Task):
+    # TODO maybe we could use instance name to replace attribute `name`
+    # which is simplify as `task_shell = Shell(command = "echo 1")` and
+    # task.name assign to `task_shell`
+    def __init__(
+            self,
+            name: str,
+            command: str,
+            task_type: str = TaskType.SHELL,
+            *args, **kwargs
+    ):
+        task_params = TaskParams(raw_script=command)
+        super().__init__(name, task_type, task_params, *args, **kwargs)
diff --git 
a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/utils/__init__.py
 
b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/utils/__init__.py
new file mode 100644
index 0000000..13a8339
--- /dev/null
+++ 
b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/utils/__init__.py
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
diff --git 
a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/utils/string.py
 
b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/utils/string.py
new file mode 100644
index 0000000..c3bab71
--- /dev/null
+++ 
b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/utils/string.py
@@ -0,0 +1,30 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+def attr2camel(attr: str, include_private=True):
+    if include_private:
+        attr = attr.lstrip("_")
+    return snake2camel(attr)
+
+
+def snake2camel(snake: str):
+    components = snake.split("_")
+    return components[0] + "".join(x.title() for x in components[1:])
+
+
+def class_name2camel(class_name: str):
+    return class_name[0].lower() + class_name[1:]
diff --git a/dolphinscheduler-python/pydolphinscheduler/test/__init__.py 
b/dolphinscheduler-python/pydolphinscheduler/test/__init__.py
new file mode 100644
index 0000000..13a8339
--- /dev/null
+++ b/dolphinscheduler-python/pydolphinscheduler/test/__init__.py
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
diff --git a/dolphinscheduler-python/pydolphinscheduler/test/core/__init__.py 
b/dolphinscheduler-python/pydolphinscheduler/test/core/__init__.py
new file mode 100644
index 0000000..13a8339
--- /dev/null
+++ b/dolphinscheduler-python/pydolphinscheduler/test/core/__init__.py
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
diff --git 
a/dolphinscheduler-python/pydolphinscheduler/test/core/test_process_definition.py
 
b/dolphinscheduler-python/pydolphinscheduler/test/core/test_process_definition.py
new file mode 100644
index 0000000..7155447
--- /dev/null
+++ 
b/dolphinscheduler-python/pydolphinscheduler/test/core/test_process_definition.py
@@ -0,0 +1,118 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import pytest
+
+from pydolphinscheduler.constants import ProcessDefinitionDefault, 
ProcessDefinitionReleaseState
+from pydolphinscheduler.core.process_definition import ProcessDefinition
+from pydolphinscheduler.core.task import Task, TaskParams
+from pydolphinscheduler.side import Tenant, Project, User
+
+TEST_PROCESS_DEFINITION_NAME = "simple-test-process-definition"
+
+
[email protected](
+    "func",
+    [
+        "run", "submit", "start"
+    ]
+)
+def test_process_definition_key_attr(func):
+    with ProcessDefinition(TEST_PROCESS_DEFINITION_NAME) as pd:
+        assert hasattr(pd, func), f"ProcessDefinition instance don't have 
attribute `{func}`"
+
+
[email protected](
+    "name,value",
+    [
+        ("project", Project(ProcessDefinitionDefault.PROJECT)),
+        ("tenant", Tenant(ProcessDefinitionDefault.TENANT)),
+        ("user", User(ProcessDefinitionDefault.USER,
+                      ProcessDefinitionDefault.USER_PWD,
+                      ProcessDefinitionDefault.USER_EMAIL,
+                      ProcessDefinitionDefault.USER_PHONE,
+                      ProcessDefinitionDefault.TENANT,
+                      ProcessDefinitionDefault.QUEUE,
+                      ProcessDefinitionDefault.USER_STATE)),
+        ("release_state", ProcessDefinitionReleaseState.ONLINE),
+    ],
+)
+def test_process_definition_default_value(name, value):
+    with ProcessDefinition(TEST_PROCESS_DEFINITION_NAME) as pd:
+        assert getattr(pd, name) == value, \
+            f"ProcessDefinition instance attribute `{name}` have not except 
default value `{getattr(pd, name)}`"
+
+
[email protected](
+    "name,cls,expect",
+    [
+        ("project", Project, "project"),
+        ("tenant", Tenant, "tenant"),
+    ],
+)
+def test_process_definition_set_attr(name, cls, expect):
+    with ProcessDefinition(TEST_PROCESS_DEFINITION_NAME) as pd:
+        setattr(pd, name, cls(expect))
+        assert getattr(pd, name) == cls(
+            expect), f"ProcessDefinition set attribute `{name}` do not work 
expect"
+
+
+def test_process_definition_to_dict_without_task():
+    expect = {
+        "name": TEST_PROCESS_DEFINITION_NAME,
+        "description": None,
+        "project": ProcessDefinitionDefault.PROJECT,
+        "tenant": ProcessDefinitionDefault.TENANT,
+        "timeout": 0,
+        "releaseState": ProcessDefinitionReleaseState.ONLINE,
+        "param": None,
+        "tasks": {},
+        "taskDefinitionJson": [{}],
+        "taskRelationJson": [{}],
+    }
+    with ProcessDefinition(TEST_PROCESS_DEFINITION_NAME) as pd:
+        assert pd.to_dict() == expect
+
+
+def test_process_definition_simple():
+    expect_tasks_num = 5
+    with ProcessDefinition(TEST_PROCESS_DEFINITION_NAME) as pd:
+        for i in range(expect_tasks_num):
+            task_params = TaskParams(raw_script=f"test-raw-script-{i}")
+            curr_task = Task(name=f"task-{i}", task_type=f"type-{i}", 
task_params=task_params)
+            # Set deps task i as i-1 parent
+            if i > 0:
+                pre_task = pd.get_one_task_by_name(f"task-{i - 1}")
+                curr_task.set_upstream(pre_task)
+        assert len(pd.tasks) == expect_tasks_num
+
+        # Test if task process_definition same as origin one
+        task: Task = pd.get_one_task_by_name("task-0")
+        assert pd is task.process_definition
+
+        # Test if all tasks with expect deps
+        for i in range(expect_tasks_num):
+            task: Task = pd.get_one_task_by_name(f"task-{i}")
+            if i == 0:
+                assert task._upstream_task_codes == set()
+                assert task._downstream_task_codes == 
{pd.get_one_task_by_name("task-1").code}
+            elif i == expect_tasks_num - 1:
+                assert task._upstream_task_codes == 
{pd.get_one_task_by_name(f"task-{i - 1}").code}
+                assert task._downstream_task_codes == set()
+            else:
+                assert task._upstream_task_codes == 
{pd.get_one_task_by_name(f"task-{i - 1}").code}
+                assert task._downstream_task_codes == 
{pd.get_one_task_by_name(f"task-{i + 1}").code}
diff --git a/dolphinscheduler-python/pydolphinscheduler/test/core/test_task.py 
b/dolphinscheduler-python/pydolphinscheduler/test/core/test_task.py
new file mode 100644
index 0000000..1d89b07
--- /dev/null
+++ b/dolphinscheduler-python/pydolphinscheduler/test/core/test_task.py
@@ -0,0 +1,96 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+
+from unittest.mock import patch
+
+from pydolphinscheduler.core.task import TaskParams, TaskRelation, Task
+
+
+def test_task_params_to_dict():
+    raw_script = "test_task_params_to_dict"
+    expect = {
+        "resourceList": [],
+        "localParams": [],
+        "rawScript": raw_script,
+        "dependence": {},
+        "conditionResult": TaskParams.DEFAULT_CONDITION_RESULT,
+        "waitStartTimeout": {}
+    }
+    task_param = TaskParams(raw_script=raw_script)
+    assert task_param.to_dict() == expect
+
+
+def test_task_relation_to_dict():
+    pre_task_code = 123
+    post_task_code = 456
+    expect = {
+        "name": "",
+        "preTaskCode": pre_task_code,
+        "postTaskCode": post_task_code,
+        "preTaskVersion": 1,
+        "postTaskVersion": 1,
+        "conditionType": 0,
+        "conditionParams": {}
+    }
+    task_param = TaskRelation(pre_task_code=pre_task_code, 
post_task_code=post_task_code)
+    assert task_param.to_dict() == expect
+
+
+def test_task_to_dict():
+    code = "123"
+    name = "test_task_to_dict"
+    task_type = "test_task_to_dict_type"
+    raw_script = "test_task_params_to_dict"
+    expect = {
+        "code": code,
+        "name": name,
+        "version": 1,
+        "description": None,
+        "delayTime": 0,
+        "taskType": task_type,
+        "taskParams": {
+            "resourceList": [],
+            "localParams": [],
+            "rawScript": raw_script,
+            "dependence": {},
+            "conditionResult": {
+                "successNode": [
+                    ""
+                ],
+                "failedNode": [
+                    ""
+                ]
+            },
+            "waitStartTimeout": {}
+        },
+        "flag": "YES",
+        "taskPriority": "MEDIUM",
+        "workerGroup": "worker-group-pydolphin",
+        "failRetryTimes": 0,
+        "failRetryInterval": 1,
+        "timeoutFlag": "CLOSE",
+        "timeoutNotifyStrategy": None,
+        "timeout": 0
+    }
+    with patch('pydolphinscheduler.core.task.Task.gen_code', 
return_value=code):
+        task = Task(
+            name=name,
+            task_type=task_type,
+            task_params=TaskParams(raw_script)
+        )
+        assert task.to_dict() == expect
diff --git 
a/dolphinscheduler-python/pydolphinscheduler/test/example/__init__.py 
b/dolphinscheduler-python/pydolphinscheduler/test/example/__init__.py
new file mode 100644
index 0000000..13a8339
--- /dev/null
+++ b/dolphinscheduler-python/pydolphinscheduler/test/example/__init__.py
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
diff --git a/dolphinscheduler-python/pydolphinscheduler/test/tasks/__init__.py 
b/dolphinscheduler-python/pydolphinscheduler/test/tasks/__init__.py
new file mode 100644
index 0000000..13a8339
--- /dev/null
+++ b/dolphinscheduler-python/pydolphinscheduler/test/tasks/__init__.py
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
diff --git 
a/dolphinscheduler-python/pydolphinscheduler/test/tasks/test_shell.py 
b/dolphinscheduler-python/pydolphinscheduler/test/tasks/test_shell.py
new file mode 100644
index 0000000..ff1b7b5
--- /dev/null
+++ b/dolphinscheduler-python/pydolphinscheduler/test/tasks/test_shell.py
@@ -0,0 +1,61 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+
+from unittest.mock import patch
+
+from pydolphinscheduler.tasks.shell import Shell
+
+
+def test_shell_to_dict():
+    code = "123"
+    name = "test_shell_to_dict"
+    command = "echo test shell"
+    expect = {
+        "code": code,
+        "name": name,
+        "version": 1,
+        "description": None,
+        "delayTime": 0,
+        "taskType": "SHELL",
+        "taskParams": {
+            "resourceList": [],
+            "localParams": [],
+            "rawScript": command,
+            "dependence": {},
+            "conditionResult": {
+                "successNode": [
+                    ""
+                ],
+                "failedNode": [
+                    ""
+                ]
+            },
+            "waitStartTimeout": {}
+        },
+        "flag": "YES",
+        "taskPriority": "MEDIUM",
+        "workerGroup": "worker-group-pydolphin",
+        "failRetryTimes": 0,
+        "failRetryInterval": 1,
+        "timeoutFlag": "CLOSE",
+        "timeoutNotifyStrategy": None,
+        "timeout": 0
+    }
+    with patch('pydolphinscheduler.core.task.Task.gen_code', 
return_value=code):
+        shell = Shell(name, command)
+        assert shell.to_dict() == expect
diff --git 
a/dolphinscheduler-python/pydolphinscheduler/test/test_java_gateway.py 
b/dolphinscheduler-python/pydolphinscheduler/test/test_java_gateway.py
new file mode 100644
index 0000000..200c06d
--- /dev/null
+++ b/dolphinscheduler-python/pydolphinscheduler/test/test_java_gateway.py
@@ -0,0 +1,46 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+
+from py4j.java_gateway import java_import, JavaGateway
+
+
+def test_gateway_connect():
+    gateway = JavaGateway()
+    app = gateway.entry_point
+    assert app.ping() == "PONG"
+
+
+def test_jvm_simple():
+    gateway = JavaGateway()
+    smaller = gateway.jvm.java.lang.Integer.MIN_VALUE
+    bigger = gateway.jvm.java.lang.Integer.MAX_VALUE
+    assert bigger > smaller
+
+
+def test_python_client_java_import_single():
+    gateway = JavaGateway()
+    java_import(gateway.jvm, 
"org.apache.dolphinscheduler.common.utils.FileUtils")
+    assert hasattr(gateway.jvm, "FileUtils")
+
+
+def test_python_client_java_import_package():
+    gateway = JavaGateway()
+    java_import(gateway.jvm, "org.apache.dolphinscheduler.common.utils.*")
+    # test if jvm view have some common utils
+    for util in ("FileUtils", "OSUtils", "DateUtils"):
+        assert hasattr(gateway.jvm, util)
diff --git 
a/dolphinscheduler-python/src/main/java/org/apache/dolphinscheduler/server/PythonGatewayServer.java
 
b/dolphinscheduler-python/src/main/java/org/apache/dolphinscheduler/server/PythonGatewayServer.java
new file mode 100644
index 0000000..4a16ab4
--- /dev/null
+++ 
b/dolphinscheduler-python/src/main/java/org/apache/dolphinscheduler/server/PythonGatewayServer.java
@@ -0,0 +1,310 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server;
+
+import org.apache.dolphinscheduler.api.enums.Status;
+import org.apache.dolphinscheduler.api.service.ExecutorService;
+import org.apache.dolphinscheduler.api.service.ProcessDefinitionService;
+import org.apache.dolphinscheduler.api.service.ProjectService;
+import org.apache.dolphinscheduler.api.service.QueueService;
+import org.apache.dolphinscheduler.api.service.TaskDefinitionService;
+import org.apache.dolphinscheduler.api.service.TenantService;
+import org.apache.dolphinscheduler.api.service.UsersService;
+import org.apache.dolphinscheduler.api.utils.Result;
+import org.apache.dolphinscheduler.common.Constants;
+import org.apache.dolphinscheduler.common.enums.FailureStrategy;
+import org.apache.dolphinscheduler.common.enums.Priority;
+import org.apache.dolphinscheduler.common.enums.ReleaseState;
+import org.apache.dolphinscheduler.common.enums.RunMode;
+import org.apache.dolphinscheduler.common.enums.TaskDependType;
+import org.apache.dolphinscheduler.common.enums.UserType;
+import org.apache.dolphinscheduler.common.enums.WarningType;
+import org.apache.dolphinscheduler.common.utils.SnowFlakeUtils;
+import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
+import org.apache.dolphinscheduler.dao.entity.Project;
+import org.apache.dolphinscheduler.dao.entity.Queue;
+import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
+import org.apache.dolphinscheduler.dao.entity.Tenant;
+import org.apache.dolphinscheduler.dao.entity.User;
+import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper;
+import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
+import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+import javax.annotation.PostConstruct;
+
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.SpringApplication;
+import 
org.springframework.boot.web.servlet.support.SpringBootServletInitializer;
+import org.springframework.context.annotation.ComponentScan;
+import org.springframework.context.annotation.FilterType;
+
+import py4j.GatewayServer;
+
+@ComponentScan(value = "org.apache.dolphinscheduler", excludeFilters = {
+    @ComponentScan.Filter(type = FilterType.REGEX, pattern = {
+        "org.apache.dolphinscheduler.server.master.*",
+        "org.apache.dolphinscheduler.server.worker.*",
+        "org.apache.dolphinscheduler.server.monitor.*",
+        "org.apache.dolphinscheduler.server.log.*"
+    })
+})
+public class PythonGatewayServer extends SpringBootServletInitializer {
+    @Autowired
+    private ProcessDefinitionMapper processDefinitionMapper;
+
+    @Autowired
+    private ProjectService projectService;
+
+    @Autowired
+    private TenantService tenantService;
+
+    @Autowired
+    private ExecutorService executorService;
+
+    @Autowired
+    private ProcessDefinitionService processDefinitionService;
+
+    @Autowired
+    private TaskDefinitionService taskDefinitionService;
+
+    @Autowired
+    private UsersService usersService;
+
+    @Autowired
+    private QueueService queueService;
+
+    @Autowired
+    private ProjectMapper projectMapper;
+
+    @Autowired
+    private TaskDefinitionMapper taskDefinitionMapper;
+
+    // TODO replace this user to build in admin user if we make sure build in 
one could not be change
+    private final User dummyAdminUser = new User() {
+        {
+            setId(Integer.MAX_VALUE);
+            setUserName("dummyUser");
+            setUserType(UserType.ADMIN_USER);
+        }
+    };
+
+    private final Queue queuePythonGateway = new Queue() {
+        {
+            setId(Integer.MAX_VALUE);
+            setQueueName("queuePythonGateway");
+        }
+    };
+
+    public String ping() {
+        return "PONG";
+    }
+
+    // TODO Should we import package in python client side? utils package can 
but service can not, why
+    // Core api
+    public Map<String, Object> genTaskCodeList(Integer genNum) {
+        return taskDefinitionService.genTaskCodeList(genNum);
+    }
+
+    public Map<String, Long> getCodeAndVersion(String projectName, String 
taskName) throws SnowFlakeUtils.SnowFlakeException {
+        Project project = projectMapper.queryByName(projectName);
+        Map<String, Long> result = new HashMap<>();
+        // project do not exists, mean task not exists too, so we should 
directly return init value
+        if (project == null) {
+            result.put("code", SnowFlakeUtils.getInstance().nextId());
+            result.put("version", 0L);
+            return result;
+        }
+        TaskDefinition taskDefinition = 
taskDefinitionMapper.queryByName(project.getCode(), taskName);
+        if (taskDefinition == null) {
+            result.put("code", SnowFlakeUtils.getInstance().nextId());
+            result.put("version", 0L);
+        } else {
+            result.put("code", taskDefinition.getCode());
+            result.put("version", (long) taskDefinition.getVersion());
+        }
+        return result;
+    }
+
+    /**
+     * create or update process definition.
+     * If process definition do not exists in Project=`projectCode` would 
create a new one
+     * If process definition already exists in Project=`projectCode` would 
update it
+     * All requests
+     * <p>
+     *
+     * @param name               process definition name
+     * @param description        description
+     * @param globalParams       global params
+     * @param locations          locations for nodes
+     * @param timeout            timeout
+     * @param tenantCode         tenantCode
+     * @param taskRelationJson   relation json for nodes
+     * @param taskDefinitionJson taskDefinitionJson
+     * @return create result code
+     */
+    public Long createOrUpdateProcessDefinition(String userName,
+                                                String projectName,
+                                                String name,
+                                                String description,
+                                                String globalParams,
+                                                String locations,
+                                                int timeout,
+                                                String tenantCode,
+                                                String taskRelationJson,
+                                                String taskDefinitionJson) {
+        User user = usersService.queryUser(userName);
+        Project project = (Project) projectService.queryByName(user, 
projectName).get(Constants.DATA_LIST);
+        long projectCode = project.getCode();
+        Map<String, Object> verifyProcessDefinitionExists = 
processDefinitionService.verifyProcessDefinitionName(user, projectCode, name);
+
+        if (verifyProcessDefinitionExists.get(Constants.STATUS) != 
Status.SUCCESS) {
+            // update process definition
+            ProcessDefinition processDefinition = 
processDefinitionMapper.queryByDefineName(projectCode, name);
+            long processDefinitionCode = processDefinition.getCode();
+            // make sure process definition offline which could edit
+            processDefinitionService.releaseProcessDefinition(user, 
projectCode, processDefinitionCode, ReleaseState.OFFLINE);
+            Map<String, Object> result = 
processDefinitionService.updateProcessDefinition(user, projectCode, name, 
processDefinitionCode, description, globalParams,
+                locations, timeout, tenantCode, taskRelationJson, 
taskDefinitionJson);
+            return processDefinitionCode;
+        } else {
+            // create process definition
+            Map<String, Object> result = 
processDefinitionService.createProcessDefinition(user, projectCode, name, 
description, globalParams,
+                locations, timeout, tenantCode, taskRelationJson, 
taskDefinitionJson);
+            ProcessDefinition processDefinition = (ProcessDefinition) 
result.get(Constants.DATA_LIST);
+            return processDefinition.getCode();
+        }
+    }
+
+    public void execProcessInstance(String userName,
+                                    String projectName,
+                                    String processDefinitionName,
+                                    String cronTime,
+                                    String workerGroup,
+                                    Integer timeout
+    ) {
+        User user = usersService.queryUser(userName);
+        Project project = projectMapper.queryByName(projectName);
+        ProcessDefinition processDefinition = 
processDefinitionMapper.queryByDefineName(project.getCode(), 
processDefinitionName);
+
+        // temp default value
+        FailureStrategy failureStrategy = FailureStrategy.CONTINUE;
+        TaskDependType taskDependType = TaskDependType.TASK_POST;
+        WarningType warningType = WarningType.NONE;
+        RunMode runMode = RunMode.RUN_MODE_SERIAL;
+        Priority priority = Priority.MEDIUM;
+        int warningGroupId = 0;
+        Long environmentCode = -1L;
+        Map<String, String> startParams = null;
+        Integer expectedParallelismNumber = null;
+        String startNodeList = null;
+
+        // make sure process definition online
+        processDefinitionService.releaseProcessDefinition(user, 
project.getCode(), processDefinition.getCode(), ReleaseState.ONLINE);
+
+        executorService.execProcessInstance(user,
+            project.getCode(),
+            processDefinition.getCode(),
+            cronTime,
+            null,
+            failureStrategy,
+            startNodeList,
+            taskDependType,
+            warningType,
+            warningGroupId,
+            runMode,
+            priority,
+            workerGroup,
+            environmentCode,
+            timeout,
+            startParams,
+            expectedParallelismNumber,
+            0
+        );
+    }
+
+    // side object
+    public Map<String, Object> createProject(String userName, String name, 
String desc) {
+        User user = usersService.queryUser(userName);
+        return projectService.createProject(user, name, desc);
+    }
+
+    public Map<String, Object> createQueue(String name, String queueName) {
+        Result<Object> verifyQueueExists = queueService.verifyQueue(name, 
queueName);
+        if (verifyQueueExists.getCode() == 0) {
+            return queueService.createQueue(dummyAdminUser, name, queueName);
+        } else {
+            Map<String, Object> result = new HashMap<>();
+            // TODO function putMsg do not work here
+            result.put(Constants.STATUS, Status.SUCCESS);
+            result.put(Constants.MSG, Status.SUCCESS.getMsg());
+            return result;
+        }
+    }
+
+    public Map<String, Object> createTenant(String tenantCode, String desc, 
String queueName) throws Exception {
+        if (tenantService.checkTenantExists(tenantCode)) {
+            Map<String, Object> result = new HashMap<>();
+            // TODO function putMsg do not work here
+            result.put(Constants.STATUS, Status.SUCCESS);
+            result.put(Constants.MSG, Status.SUCCESS.getMsg());
+            return result;
+        } else {
+            Result<Object> verifyQueueExists = 
queueService.verifyQueue(queueName, queueName);
+            if (verifyQueueExists.getCode() == 0) {
+                // TODO why create do not return id?
+                queueService.createQueue(dummyAdminUser, queueName, queueName);
+            }
+            Map<String, Object> result = 
queueService.queryQueueName(queueName);
+            List<Queue> queueList = (List<Queue>) 
result.get(Constants.DATA_LIST);
+            Queue queue = queueList.get(0);
+            return tenantService.createTenant(dummyAdminUser, tenantCode, 
queue.getId(), desc);
+        }
+    }
+
+    public void createUser(String userName,
+                           String userPassword,
+                           String email,
+                           String phone,
+                           String tenantCode,
+                           String queue,
+                           int state) {
+        User user = usersService.queryUser(userName);
+        if (Objects.isNull(user)) {
+            Map<String, Object> tenantResult = 
tenantService.queryByTenantCode(tenantCode);
+            Tenant tenant = (Tenant) tenantResult.get(Constants.DATA_LIST);
+            usersService.createUser(userName, userPassword, email, 
tenant.getId(), phone, queue, state);
+        }
+    }
+
+    @PostConstruct
+    public void run() {
+        GatewayServer server = new GatewayServer(this);
+        GatewayServer.turnLoggingOn();
+        // Start server to accept python client RPC
+        server.start();
+    }
+
+    public static void main(String[] args) {
+        SpringApplication.run(PythonGatewayServer.class, args);
+    }
+}
diff --git a/dolphinscheduler-standalone-server/pom.xml 
b/dolphinscheduler-standalone-server/pom.xml
index 8b9efc1..83ab68d 100644
--- a/dolphinscheduler-standalone-server/pom.xml
+++ b/dolphinscheduler-standalone-server/pom.xml
@@ -51,6 +51,10 @@
             <groupId>org.apache.dolphinscheduler</groupId>
             <artifactId>dolphinscheduler-alert</artifactId>
         </dependency>
+        <dependency>
+            <groupId>org.apache.dolphinscheduler</groupId>
+            <artifactId>dolphinscheduler-python</artifactId>
+        </dependency>
     </dependencies>
 
 </project>
diff --git 
a/dolphinscheduler-standalone-server/src/main/java/org/apache/dolphinscheduler/server/StandaloneServer.java
 
b/dolphinscheduler-standalone-server/src/main/java/org/apache/dolphinscheduler/server/StandaloneServer.java
index e5a9b00..4a41770 100644
--- 
a/dolphinscheduler-standalone-server/src/main/java/org/apache/dolphinscheduler/server/StandaloneServer.java
+++ 
b/dolphinscheduler-standalone-server/src/main/java/org/apache/dolphinscheduler/server/StandaloneServer.java
@@ -66,7 +66,8 @@ public class StandaloneServer {
         new SpringApplicationBuilder(
                 ApiApplicationServer.class,
                 MasterServer.class,
-                WorkerServer.class
+                WorkerServer.class,
+                PythonGatewayServer.class
         ).run(args);
     }
 
diff --git a/pom.xml b/pom.xml
index b453ce9..5f9f897 100644
--- a/pom.xml
+++ b/pom.xml
@@ -127,6 +127,7 @@
         <reflections.version>0.9.12</reflections.version>
         <byte-buddy.version>1.9.16</byte-buddy.version>
         <java-websocket.version>1.5.1</java-websocket.version>
+        <py4j.version>0.10.9</py4j.version>
     </properties>
 
     <dependencyManagement>
@@ -263,6 +264,11 @@
                 <artifactId>dolphinscheduler-spi</artifactId>
                 <version>${project.version}</version>
             </dependency>
+            <dependency>
+                <groupId>org.apache.dolphinscheduler</groupId>
+                <artifactId>dolphinscheduler-python</artifactId>
+                <version>${project.version}</version>
+            </dependency>
 
             <dependency>
                 <groupId>org.apache.curator</groupId>
@@ -678,6 +684,12 @@
                 <artifactId>javax.mail</artifactId>
                 <version>1.6.2</version>
             </dependency>
+
+            <dependency>
+                <groupId>net.sf.py4j</groupId>
+                <artifactId>py4j</artifactId>
+                <version>${py4j.version}</version>
+            </dependency>
         </dependencies>
     </dependencyManagement>
 
@@ -1234,5 +1246,6 @@
         <module>dolphinscheduler-service</module>
         <module>dolphinscheduler-microbench</module>
         <module>dolphinscheduler-standalone-server</module>
+        <module>dolphinscheduler-python</module>
     </modules>
 </project>
diff --git a/tools/dependencies/known-dependencies.txt 
b/tools/dependencies/known-dependencies.txt
index b19e8b9..7474f00 100755
--- a/tools/dependencies/known-dependencies.txt
+++ b/tools/dependencies/known-dependencies.txt
@@ -201,6 +201,7 @@ protostuff-core-1.7.2.jar
 protostuff-runtime-1.7.2.jar
 protostuff-api-1.7.2.jar
 protostuff-collectionschema-1.7.2.jar
+py4j-0.10.9.jar
 quartz-2.3.0.jar
 quartz-jobs-2.3.0.jar
 reflections-0.9.12.jar

Reply via email to